Skip to main content

Overview

The BaseStructure class provides the foundational infrastructure for all swarm structures and agent systems in the Swarms framework. It offers essential utilities for state management, file I/O, metadata tracking, error logging, and resource monitoring.

Import

from swarms.structs import BaseStructure

Key Features

  • File Operations: Save and load data in JSON, YAML, and TOML formats
  • State Management: Automatic metadata and artifact tracking
  • Error Logging: Centralized error logging to files
  • Event Tracking: Log events with timestamps and severity levels
  • Async Support: Full async/await support for I/O operations
  • Thread Support: Run operations in separate threads
  • Batch Processing: Process multiple tasks concurrently
  • Data Compression: Built-in gzip compression/decompression
  • Resource Monitoring: Track CPU and memory usage
  • Serialization: Convert structures to dict/JSON/YAML/TOML

Initialization

name
str
Name of the structure for identification and file naming
description
str
Description of the structure’s purpose
save_metadata_on
bool
default:"true"
Enable automatic metadata saving
save_artifact_path
str
default:"./artifacts"
Directory path for saving artifacts
save_metadata_path
str
default:"./metadata"
Directory path for saving metadata
save_error_path
str
default:"./errors"
Directory path for saving error logs

Core Methods

run

Execute the structure’s main operation (abstract method to be implemented by subclasses).
def run(
    *args,
    **kwargs
) -> Any
return
Any
Implementation-specific return value

File Operations

save_to_file

Save data to a JSON file.
def save_to_file(
    data: Any,
    file_path: str
) -> None
data
Any
Data to save (must be JSON serializable)
file_path
str
Path to save the file

load_from_file

Load data from a JSON file.
def load_from_file(
    file_path: str
) -> Any
file_path
str
Path to the file to load
return
Any
Loaded data from the JSON file

asave_to_file

Asynchronously save data to a file.
async def asave_to_file(
    data: Any,
    file: str,
    *args,
    **kwargs
) -> None

aload_from_file

Asynchronously load data from a file.
async def aload_from_file(
    file: str
) -> Any

Metadata Management

save_metadata

Save metadata to a JSON file.
def save_metadata(
    metadata: Dict[str, Any]
) -> None
metadata
Dict[str, Any]
Metadata dictionary to save

load_metadata

Load metadata from a JSON file.
def load_metadata() -> Dict[str, Any]
return
Dict[str, Any]
Loaded metadata dictionary

save_metadata_async

Asynchronously save metadata.
async def save_metadata_async(
    metadata: Dict[str, Any]
) -> None

load_metadata_async

Asynchronously load metadata.
async def load_metadata_async() -> Dict[str, Any]

Artifact Management

save_artifact

Save an artifact to a JSON file.
def save_artifact(
    artifact: Any,
    artifact_name: str
) -> None
artifact
Any
Artifact data to save
artifact_name
str
Name for the artifact file (without extension)

load_artifact

Load an artifact from a JSON file.
def load_artifact(
    artifact_name: str
) -> Any
artifact_name
str
Name of the artifact to load
return
Any
Loaded artifact data

save_artifact_async

Asynchronously save an artifact.
async def save_artifact_async(
    artifact: Any,
    artifact_name: str
) -> None

load_artifact_async

Asynchronously load an artifact.
async def load_artifact_async(
    artifact_name: str
) -> Any

Error and Event Logging

log_error

Log an error message to file.
def log_error(
    error_message: str
) -> None
error_message
str
Error message to log

log_error_async

Asynchronously log an error.
async def log_error_async(
    error_message: str
) -> None

log_event

Log an event with timestamp and type.
def log_event(
    event: str,
    event_type: str = "INFO"
) -> None
event
str
Event description
event_type
str
default:"INFO"
Event type (INFO, WARNING, ERROR, etc.)

log_event_async

Asynchronously log an event.
async def log_event_async(
    event: str,
    event_type: str = "INFO"
) -> None

Async Operations

run_async

Run the structure asynchronously.
async def run_async(
    *args,
    **kwargs
) -> Any

run_concurrent

Run the structure concurrently using asyncio.
def run_concurrent(
    *args,
    **kwargs
) -> Any

Thread Operations

run_in_thread

Run the structure in a separate thread.
def run_in_thread(
    *args,
    **kwargs
) -> concurrent.futures.Future
return
concurrent.futures.Future
Future object representing the thread execution

save_metadata_in_thread

Save metadata in a separate thread.
def save_metadata_in_thread(
    metadata: Dict[str, Any]
) -> concurrent.futures.Future

Batch Processing

run_batched

Run multiple tasks in batches using ThreadPoolExecutor.
def run_batched(
    batched_data: List[Any],
    batch_size: int = 10,
    *args,
    **kwargs
) -> List[Any]
batched_data
List[Any]
List of data items to process
batch_size
int
default:"10"
Number of concurrent workers
return
List[Any]
List of results for each batch item

run_with_resources_batched

Run batched data with resource monitoring.
def run_with_resources_batched(
    batched_data: List[Any],
    batch_size: int = 10,
    *args,
    **kwargs
) -> List[Any]

Data Compression

compress_data

Compress data using gzip.
def compress_data(
    data: Any
) -> bytes
data
Any
Data to compress (must be JSON serializable)
return
bytes
Compressed data as bytes

decompres_data

Decompress gzip data.
def decompres_data(
    data: bytes
) -> Any
data
bytes
Compressed data to decompress
return
Any
Decompressed data

Resource Monitoring

monitor_resources

Monitor CPU and memory usage.
def monitor_resources() -> None
Logs current CPU and memory usage to the event log.

run_with_resources

Run the structure with resource monitoring.
def run_with_resources(
    *args,
    **kwargs
) -> Any

Configuration

load_config

Load configuration from a file.
def load_config(
    config: str = None,
    *args,
    **kwargs
) -> Dict[str, Any]
config
str
Path to configuration file
return
Dict[str, Any]
Configuration dictionary

backup_data

Backup data with timestamp.
def backup_data(
    data: Any,
    backup_path: str = None,
    *args,
    **kwargs
) -> None
data
Any
Data to backup
backup_path
str
Path to backup directory

Serialization

to_dict

Convert structure to dictionary.
def to_dict() -> Dict[str, Any]
return
Dict[str, Any]
Dictionary representation of the structure

to_json

Convert structure to JSON string.
def to_json(
    indent: int = 4,
    *args,
    **kwargs
) -> str
indent
int
default:"4"
Number of spaces for indentation
return
str
JSON string representation

to_yaml

Convert structure to YAML string.
def to_yaml(
    indent: int = 4,
    *args,
    **kwargs
) -> str
indent
int
default:"4"
Number of spaces for indentation
return
str
YAML string representation

to_toml

Convert structure to TOML string.
def to_toml(
    *args,
    **kwargs
) -> str
return
str
TOML string representation

Examples

Basic Structure Implementation

from swarms.structs import BaseStructure

class MyStructure(BaseStructure):
    def __init__(self, name: str):
        super().__init__(
            name=name,
            description="Custom structure",
            save_metadata_on=True,
            save_artifact_path="./my_artifacts",
            save_metadata_path="./my_metadata"
        )
    
    def run(self, task: str):
        # Custom logic
        result = f"Processed: {task}"
        
        # Log event
        self.log_event(f"Executed task: {task}")
        
        # Save artifact
        self.save_artifact(result, "task_result")
        
        return result

# Use the structure
struct = MyStructure(name="processor")
result = struct.run("analyze data")

Async Structure

import asyncio
from swarms.structs import BaseStructure

class AsyncStructure(BaseStructure):
    async def run(self, task: str):
        # Async processing
        await asyncio.sleep(1)
        result = f"Async result: {task}"
        
        # Save async
        await self.save_artifact_async(result, "async_result")
        await self.log_event_async("Async task completed")
        
        return result

struct = AsyncStructure(name="async_processor")
result = await struct.run_async("process async")

Batch Processing Structure

class BatchStructure(BaseStructure):
    def run(self, data: str):
        # Process single item
        return f"Processed: {data}"

struct = BatchStructure(name="batch_processor")

# Process multiple items
data_items = ["item1", "item2", "item3", "item4", "item5"]
results = struct.run_batched(
    batched_data=data_items,
    batch_size=3  # Process 3 items concurrently
)

print(results)
# ['Processed: item1', 'Processed: item2', ...]

Structure with Resource Monitoring

class MonitoredStructure(BaseStructure):
    def run(self, task: str):
        # Heavy processing
        result = self.process_heavy_task(task)
        return result

struct = MonitoredStructure(name="monitored")

# Run with automatic resource monitoring
result = struct.run_with_resources("heavy task")
# Logs CPU and memory usage automatically

Structure with Error Handling

class RobustStructure(BaseStructure):
    def run(self, task: str):
        try:
            # Processing logic
            result = self.process(task)
            self.log_event("Task completed successfully")
            return result
        except Exception as e:
            # Log error
            self.log_error(f"Error processing task: {str(e)}")
            raise

struct = RobustStructure(name="robust")
try:
    result = struct.run("task")
except Exception as e:
    # Error is logged to ./errors/robust_errors.log
    pass

Data Compression Example

class CompressedStructure(BaseStructure):
    def run(self, large_data: dict):
        # Compress large data
        compressed = self.compress_data(large_data)
        
        # Save compressed data
        with open("compressed_data.gz", "wb") as f:
            f.write(compressed)
        
        # Later, decompress
        with open("compressed_data.gz", "rb") as f:
            compressed = f.read()
        
        decompressed = self.decompres_data(compressed)
        return decompressed

struct = CompressedStructure(name="compressor")
large_data = {"key": "value" * 1000}
result = struct.run(large_data)

Serialization Example

class ConfigStructure(BaseStructure):
    def __init__(self, name: str, setting: str):
        super().__init__(name=name)
        self.setting = setting

struct = ConfigStructure(
    name="config",
    setting="production"
)

# Export to different formats
json_str = struct.to_json()
yaml_str = struct.to_yaml()
toml_str = struct.to_toml()
dict_repr = struct.to_dict()

print(json_str)
# {
#   "name": "config",
#   "setting": "production",
#   ...
# }

Properties

BaseStructure provides the following properties:
  • workspace_dir: Path to the workspace directory (from environment variable)
  • name: Name of the structure
  • description: Description of the structure
  • save_metadata_on: Whether metadata saving is enabled
  • save_artifact_path: Path for artifacts
  • save_metadata_path: Path for metadata
  • save_error_path: Path for error logs

Best Practices

  1. Always call super().init(): When extending BaseStructure, call parent constructor
  2. Implement the run method: Override the abstract run() method with your logic
  3. Use logging methods: Leverage log_event and log_error for tracking
  4. Save artifacts: Use save_artifact for important outputs
  5. Handle errors gracefully: Wrap operations in try/except and use log_error
  6. Use async for I/O: Leverage async methods for file operations
  7. Monitor resources: Use run_with_resources for resource-intensive tasks
  8. Batch when possible: Use run_batched for processing multiple items
  9. Compress large data: Use compress_data for large datasets
  10. Enable metadata: Set save_metadata_on=True for production systems

Build docs developers (and LLMs) love