Overview
DBOS stores workflow inputs, outputs, and intermediate state in the system database. The serialization API controls how Python objects are converted to and from strings for database storage.
DBOS provides two built-in serializers and supports custom serializers for advanced use cases.
Built-in Serializers
DefaultSerializer (Pickle)
The default serializer uses Python’s pickle module with base64 encoding.
from dbos import DefaultSerializer
serializer = DefaultSerializer()
# Serialization
data = {"key": "value", "number": 42}
serialized = serializer.serialize(data)
# Returns: base64-encoded pickle string
# Deserialization
deserialized = serializer.deserialize(serialized)
print(serialized.name()) # "py_pickle"
Characteristics:
- Format: Base64-encoded pickle
- Name:
py_pickle
- Compatibility: Python-only, not portable across languages
- Use case: Default for Python-only applications
DBOSPortableJSON
A portable JSON serializer for cross-language compatibility.
from dbos import DBOSPortableJSON
serializer = DBOSPortableJSON
# Serialization
data = {"timestamp": datetime.now(), "count": 100}
serialized = serializer.serialize(data)
# Returns: compact JSON string
print(serializer.name()) # "portable_json"
Characteristics:
- Format: Compact JSON (no whitespace)
- Name:
portable_json
- Compatibility: Portable across TypeScript, Python, and other languages
- Type conversion: Automatically converts datetime, date, Decimal, etc.
- Use case: Multi-language workflows, cross-platform compatibility
The WorkflowSerializationFormat enum specifies which serializer to use for a workflow.
from dbos import WorkflowSerializationFormat
class WorkflowSerializationFormat(str, Enum):
PORTABLE = "portable" # Use DBOSPortableJSON
NATIVE = "native" # Use DefaultSerializer (pickle)
DEFAULT = None # Use the configured serializer
Usage with Workflows
from dbos import DBOS, WorkflowSerializationFormat
@DBOS.workflow(serialization_format=WorkflowSerializationFormat.PORTABLE)
def portable_workflow(data: dict) -> dict:
# This workflow uses portable JSON serialization
return {"result": "success"}
@DBOS.workflow(serialization_format=WorkflowSerializationFormat.NATIVE)
def native_workflow(data: dict) -> dict:
# This workflow uses pickle serialization
return {"result": "success"}
@DBOS.workflow() # Uses DEFAULT
def default_workflow(data: dict) -> dict:
# Uses the serializer configured in DBOSConfig
return {"result": "success"}
Serializer Base Class
Implement a custom serializer by extending the Serializer abstract class.
from dbos import Serializer
from abc import ABC, abstractmethod
class Serializer(ABC):
@abstractmethod
def serialize(self, data: Any) -> str:
"""Convert Python object to string."""
pass
@abstractmethod
def deserialize(self, serialized_data: str) -> Any:
"""Convert string back to Python object."""
pass
def name(self) -> str:
"""Return serializer name (stored in database)."""
return "custom_serializer"
Method Reference
serialize
(data: Any) -> str
required
Serialize a Python object to a string.Parameters:
data: Any Python object to serialize
Returns: String representation suitable for database storageRaises: Should raise an exception if serialization fails
deserialize
(serialized_data: str) -> Any
required
Deserialize a string back to a Python object.Parameters:
serialized_data: String representation from the database
Returns: Python objectRaises: Should raise an exception if deserialization fails
Return the serializer name. This name is stored in the database and used to identify which serializer to use for deserialization.Default: "custom_serializer"Returns: String identifier for this serializer
Custom Serializer Example
JSON with msgpack
import msgpack
from dbos import Serializer, DBOS, DBOSConfig
from typing import Any
class MsgPackSerializer(Serializer):
"""High-performance binary serializer using msgpack."""
def serialize(self, data: Any) -> str:
try:
# Serialize to bytes then encode as base64 for string storage
import base64
packed = msgpack.packb(data, use_bin_type=True)
return base64.b64encode(packed).decode('utf-8')
except Exception as e:
raise ValueError(f"Failed to serialize: {e}")
def deserialize(self, serialized_data: str) -> Any:
import base64
packed = base64.b64decode(serialized_data)
return msgpack.unpackb(packed, raw=False)
def name(self) -> str:
return "msgpack"
# Configure DBOS to use the custom serializer
config: DBOSConfig = {
"name": "my-app",
"serializer": MsgPackSerializer(),
}
dbos = DBOS(config=config)
Compressed JSON
import json
import zlib
import base64
from dbos import Serializer
class CompressedJSONSerializer(Serializer):
"""JSON serializer with gzip compression."""
def serialize(self, data: Any) -> str:
# Convert to JSON
json_str = json.dumps(data)
# Compress
compressed = zlib.compress(json_str.encode('utf-8'))
# Encode as base64
return base64.b64encode(compressed).decode('utf-8')
def deserialize(self, serialized_data: str) -> Any:
# Decode from base64
compressed = base64.b64decode(serialized_data)
# Decompress
json_str = zlib.decompress(compressed).decode('utf-8')
# Parse JSON
return json.loads(json_str)
def name(self) -> str:
return "compressed_json"
Portable JSON Type Conversions
The DBOSPortableJSON serializer automatically converts Python types to JSON-compatible values:
Supported Type Conversions
| Python Type | JSON Representation | Example |
|---|
datetime | RFC 3339 UTC string | "2024-03-15T10:30:00.000Z" |
date | ISO 8601 date string | "2024-03-15" |
Decimal | String | "123.45" |
tuple | Array | [1, 2, 3] |
set | Array | [1, 2, 3] |
dict | Object | {"key": "value"} |
Example
from datetime import datetime, date
from decimal import Decimal
from dbos import DBOSPortableJSON
data = {
"timestamp": datetime(2024, 3, 15, 10, 30, 0),
"birthdate": date(1990, 1, 1),
"price": Decimal("99.99"),
"tags": {"python", "dbos", "workflow"},
}
serialized = DBOSPortableJSON.serialize(data)
print(serialized)
# {"timestamp":"2024-03-15T10:30:00.000Z","birthdate":"1990-01-01","price":"99.99","tags":["python","dbos","workflow"]}
deserialized = DBOSPortableJSON.deserialize(serialized)
# Note: datetime becomes string, set becomes list
When using portable JSON, be aware that type information is lost:
datetime objects become strings
Decimal objects become strings
set objects become lists
tuple objects become lists
You may need to convert types back after deserialization.
Portable Workflow Errors
DBOS provides PortableWorkflowError for portable error handling across languages.
from dbos import PortableWorkflowError, JsonWorkflowErrorData
@dataclass
class PortableWorkflowError(Exception):
message: str
name: str
code: int | str | None = None
data: JsonValue | None = None
Usage
from dbos import DBOS, PortableWorkflowError
@DBOS.workflow()
def my_workflow():
# Raise a portable error
raise PortableWorkflowError(
message="Invalid input provided",
name="ValidationError",
code=400,
data={"field": "email", "reason": "invalid format"}
)
Error Data Structure
class JsonWorkflowErrorData(TypedDict, total=False):
name: str # Error type/name
message: str # Human-readable message
code: int | str | None # Application error code
data: JsonValue | None # Structured error details
Converting Exceptions
DBOS automatically converts Python exceptions to portable format:
from dbos._serialization import exception_to_workflow_error_data
try:
raise ValueError("Something went wrong")
except Exception as e:
error_data = exception_to_workflow_error_data(e)
# {"name": "ValueError", "message": "Something went wrong", "code": None}
The converter attempts to extract:
- name: From exception class name
- message: From
str(exception)
- code: From attributes:
code, error_code, errno, status, status_code
- data: From attributes:
data, details, payload, extra, meta, metadata
Internal Serialization Functions
These functions are used internally by DBOS but can be useful for advanced use cases.
serialize_value
def serialize_value(
value: Optional[Any],
serialization_type: Optional[WorkflowSerializationFormat],
serializer: Serializer,
) -> tuple[Optional[str], str]:
"""Serialize a value using the specified format."""
Returns a tuple of (serialized_data, serializer_name).
deserialize_value
def deserialize_value(
serialized_value: Optional[str],
serialization: Optional[str],
serializer: Serializer,
) -> Optional[Any]:
"""Deserialize a value using the specified serializer."""
serialize_args
def serialize_args(
args: Tuple[Any, ...],
kwargs: Dict[str, Any],
serialization_type: Optional[WorkflowSerializationFormat],
serializer: Serializer,
) -> tuple[str, str]:
"""Serialize workflow arguments."""
deserialize_args
def deserialize_args(
serialized_value: str,
serialization: Optional[str],
serializer: Serializer,
) -> WorkflowInputs:
"""Deserialize workflow arguments."""
Returns a WorkflowInputs dictionary:
class WorkflowInputs(TypedDict):
args: Tuple[Any, ...] # Positional arguments
kwargs: Dict[str, Any] # Keyword arguments
Best Practices
Choosing a Serializer
Use DefaultSerializer (pickle) when:
- Your application is Python-only
- You need to serialize complex Python objects
- Performance is critical
- You don’t need cross-language compatibility
Use DBOSPortableJSON when:
- You need cross-language workflow compatibility
- You’re integrating with TypeScript DBOS applications
- You want human-readable serialized data
- You’re working with simple data types (dicts, lists, primitives)
Use a custom serializer when:
- You have specific performance requirements
- You need specialized data formats (protobuf, msgpack, etc.)
- You want compression for large payloads
- You have compliance requirements for data storage
Configuration
from dbos import DBOS, DBOSConfig, DBOSPortableJSON
# Global configuration (affects all workflows)
config: DBOSConfig = {
"name": "my-app",
"serializer": DBOSPortableJSON, # Use portable JSON by default
}
dbos = DBOS(config=config)
# Per-workflow override
from dbos import WorkflowSerializationFormat
@DBOS.workflow(serialization_format=WorkflowSerializationFormat.NATIVE)
def special_workflow():
# Uses pickle despite global portable JSON config
pass
Error Handling
from dbos import Serializer
import logging
class SafeSerializer(Serializer):
"""Serializer with error handling and logging."""
def serialize(self, data: Any) -> str:
try:
return json.dumps(data)
except Exception as e:
logging.error(f"Serialization failed: {e}")
# Fallback or re-raise
raise ValueError(f"Cannot serialize {type(data).__name__}: {e}")
def deserialize(self, serialized_data: str) -> Any:
try:
return json.loads(serialized_data)
except json.JSONDecodeError as e:
logging.error(f"Deserialization failed: {e}")
raise ValueError(f"Invalid JSON data: {e}")
See Also