Skip to main content

Overview

The database utilities provide comprehensive data persistence for prediction market agents, including SQL storage, vector embeddings with Pinecone, and long-term memory management.

SQL Handler

The SQLHandler is a generic utility for managing SQLModel tables with built-in CRUD operations.
from prediction_market_agent.db.sql_handler import SQLHandler
from prediction_market_agent.db.models import LongTermMemories

# Initialize handler
handler = SQLHandler(
    model=LongTermMemories,
    sqlalchemy_db_url="postgresql://user:pass@localhost/db"  # Optional
)

# Get all records
all_memories = handler.get_all()

# Save multiple items
handler.save_multiple(items=[memory1, memory2, memory3])

# Remove by ID
handler.remove_by_id(item_id=123)

Key Methods

get_all()
Sequence[SQLModelType]
Retrieves all records from the table
save_multiple(items)
None
Saves multiple items to the database in a single transaction
remove_multiple(items)
None
Deletes multiple items from the database
remove_by_id(item_id)
None
Deletes a single item by its ID
get_with_filter_and_order(...)
list[SQLModelType]
Advanced query with filters, ordering, pagination
count(query_filters)
int
Counts records matching the given filters

Advanced Queries

from sqlmodel import col
from prediction_market_agent.db.models import EvaluatedGoalModel
from prediction_market_agent.db.sql_handler import SQLHandler

handler = SQLHandler(model=EvaluatedGoalModel)

# Filter and order results
recent_goals = handler.get_with_filter_and_order(
    query_filters=[
        col(EvaluatedGoalModel.agent_id) == "my_agent",
        col(EvaluatedGoalModel.is_complete) == True
    ],
    order_by_column_name="datetime_",
    order_desc=True,
    offset=0,
    limit=10
)

# Count matching records
total_complete = handler.count(
    query_filters=[
        col(EvaluatedGoalModel.is_complete) == True
    ]
)
class SQLHandler:
    def __init__(
        self,
        model: t.Type[SQLModelType],
        sqlalchemy_db_url: str | None = None,
    ):
        self.db_manager = DBManager(sqlalchemy_db_url)
        self.table = model
        self._init_table_if_not_exists()
    
    def get_with_filter_and_order(
        self,
        query_filters: t.Sequence[ColumnElement[bool] | BinaryExpression[bool]] = (),
        order_by_column_name: str | None = None,
        order_desc: bool = True,
        offset: int = 0,
        limit: int | None = None,
    ) -> list[SQLModelType]:
        with self.db_manager.get_session() as session:
            query = session.query(self.table)
            for exp in query_filters:
                query = query.where(exp)
            
            if order_by_column_name:
                query = query.order_by(
                    desc(order_by_column_name)
                    if order_desc
                    else asc(order_by_column_name)
                )
            if offset:
                query = query.offset(offset)
            if limit:
                query = query.limit(limit)
            results = query.all()
        return results

Database Models

The system includes several pre-defined models for agent operations:

LongTermMemories

from prediction_market_agent.db.models import LongTermMemories
from prediction_market_agent_tooling.tools.utils import utcnow
import json

# Create memory
memory = LongTermMemories(
    task_description="trading_agent_v1",
    metadata_=json.dumps({"action": "buy", "market_id": "0x123"}),
    datetime_=utcnow()
)

# Access metadata as dict
metadata_dict = memory.metadata_dict
id
int
Auto-generated primary key
task_description
str
required
Identifier for the task or agent
metadata_
str
JSON-serialized metadata
datetime_
DatetimeUTC
required
Timestamp of the memory

EvaluatedGoalModel

Tracks agent goals and completion status:
from prediction_market_agent.db.models import EvaluatedGoalModel
from prediction_market_agent_tooling.tools.utils import utcnow

goal = EvaluatedGoalModel(
    agent_id="trader_001",
    goal="Maximize profit on Omen markets",
    motivation="Earn 100 xDai",
    completion_criteria="Portfolio value > 1000 xDai",
    is_complete=False,
    reasoning="Still building positions",
    output=None,
    datetime_=utcnow()
)

Prompt

Checkpoints for agent prompts:
from prediction_market_agent.db.models import Prompt
from prediction_market_agent_tooling.tools.utils import utcnow

prompt = Prompt(
    prompt="Act as a prediction market trader...",
    session_identifier="session_123",
    datetime_=utcnow()
)

BlockchainMessage

Messages sent via blockchain transactions:
from prediction_market_agent.db.models import BlockchainMessage

message = BlockchainMessage(
    consumer_address="0xConsumer...",
    sender_address="0xSender...",
    transaction_hash="0xabcd1234...",
    block="12345678",
    value_wei="1000000000000000000",  # 1 xDai
    data_field="0x48656c6c6f"
)

# Access parsed values
block_number = message.block_parsed  # int
value = message.value_wei_parsed  # xDaiWei

Long-Term Memory Handler

Manage agent memories with automatic serialization:
from prediction_market_agent.db.long_term_memory_table_handler import LongTermMemoryTableHandler
from prediction_market_agent_tooling.tools.utils import utcnow
from datetime import timedelta

# Initialize handler
memory_handler = LongTermMemoryTableHandler(
    task_description="my_agent_v1"
)

# Save history
memory_handler.save_history([
    {"role": "system", "content": "You are a trader"},
    {"role": "user", "content": "What markets should I trade?"}
])

# Search memories
recent_memories = memory_handler.search(
    from_=utcnow() - timedelta(days=7),
    to_=utcnow(),
    limit=50
)

# Count total memories
total = memory_handler.count()
The memory handler works seamlessly with ChatHistory:
from prediction_market_agent.agents.microchain_agent.memory import ChatHistory, DatedChatHistory

# Create chat history
chat = ChatHistory.from_list_of_dicts([
    {"role": "system", "content": "System prompt"},
    {"role": "user", "content": "User message"},
    {"role": "assistant", "content": "Assistant response"}
])

# Save to long-term memory
chat.save_to(memory_handler)

# Load from memory
loaded_chat = DatedChatHistory.from_long_term_memory(
    memory_handler,
    from_=utcnow() - timedelta(days=1)
)

# Cluster by session
sessions = loaded_chat.cluster_by_session()

Prompt Table Handler

Manage agent prompts with session tracking:
from prediction_market_agent.db.prompt_table_handler import PromptTableHandler
from prediction_market_agent.agents.identifiers import AgentIdentifier

# Initialize from agent identifier
handler = PromptTableHandler.from_agent_identifier(
    identifier=AgentIdentifier.TRADING_AGENT
)

# Save prompt
handler.save_prompt(
    prompt="Act as a prediction market trader. Research markets..."
)

# Fetch latest
latest_prompt = handler.fetch_latest_prompt()
if latest_prompt:
    print(f"Last updated: {latest_prompt.datetime_}")
    print(f"Prompt: {latest_prompt.prompt}")

Pinecone Vector Database

The PineconeHandler provides vector embeddings for market similarity search:
from prediction_market_agent.db.pinecone_handler import PineconeHandler
from prediction_market_agent_tooling.tools.datetime_utc import DatetimeUTC

# Initialize handler
pinecone = PineconeHandler(model="text-embedding-3-large")

# Insert Omen markets
pinecone.insert_all_omen_markets_if_not_exists(
    created_after=DatetimeUTC(2024, 1, 1)
)

# Find similar markets
similar = pinecone.find_nearest_questions_with_threshold(
    text="Will Bitcoin reach $100k?",
    limit=10,
    threshold=0.25,
    filter_on_metadata={
        "category": {"$eq": "Cryptocurrency"}
    }
)

for market in similar:
    print(f"Market: {market.market_title}")
    print(f"URL: {market.market_url}")
  • Automatic deduplication: Filters out duplicate markets
  • Batch insertion: Processes markets in chunks of 100
  • Similarity search: Finds related markets using embeddings
  • Metadata filtering: Filter by category, date, volume, etc.
  • SHA-256 IDs: Uses deterministic IDs based on question titles
def insert_all_omen_markets_if_not_exists(
    self, created_after: DatetimeUTC | None = None
) -> None:
    # Fetch markets from Omen subgraph
    markets = subgraph_handler.get_omen_markets_simple(
        limit=sys.maxsize,
        filter_by=FilterBy.NONE,
        sort_by=SortBy.NEWEST,
        created_after=created_after,
    )
    
    # Deduplicate by title
    markets_without_duplicates = self.deduplicate_markets(markets)
    
    # Filter out existing markets
    missing_markets = self.filter_markets_already_in_index(
        markets=markets_without_duplicates
    )
    
    # Insert in batches
    for market in missing_markets:
        texts.append(market.question_title)
        metadatas.append(PineconeMetadata.from_omen_market(market).model_dump())
    
    # Batch insert
    for text_chunk, metadata_chunk in zip(chunked_texts, chunked_metadatas):
        ids_chunk = [self.encode_text(text) for text in text_chunk]
        self.insert_texts(
            ids=ids_chunk, 
            texts=text_chunk, 
            metadatas=metadata_chunk
        )

Evaluated Goal Handler

Track and retrieve agent goals:
from prediction_market_agent.db.evaluated_goal_table_handler import EvaluatedGoalTableHandler
from prediction_market_agent.db.models import EvaluatedGoalModel
from prediction_market_agent_tooling.tools.utils import utcnow

# Initialize
goal_handler = EvaluatedGoalTableHandler(
    agent_id="my_trading_agent"
)

# Save goal
goal = EvaluatedGoalModel(
    agent_id="my_trading_agent",
    goal="Maximize trading profit",
    motivation="Build sustainable income",
    completion_criteria="Portfolio > 1000 xDai",
    is_complete=False,
    reasoning="Starting with research phase",
    output=None,
    datetime_=utcnow()
)
goal_handler.save_evaluated_goal(goal)

# Retrieve latest goals
recent_goals = goal_handler.get_latest_evaluated_goals(limit=10)

Usage in Agents

Think Thoroughly Agent

The Think Thoroughly Agent uses multiple database components:
from prediction_market_agent.agents.think_thoroughly_agent.think_thoroughly_agent import ThinkThoroughlyBase
from prediction_market_agent.db.long_term_memory_table_handler import LongTermMemoryTableHandler
from prediction_market_agent.db.pinecone_handler import PineconeHandler

class ThinkThoroughlyAgent(ThinkThoroughlyBase):
    def __init__(self, enable_langfuse: bool, memory: bool = True):
        super().__init__(enable_langfuse, memory)
        
        # Initialize database handlers
        self.pinecone_handler = PineconeHandler()
        
        if memory:
            self._long_term_memory = LongTermMemoryTableHandler(
                task_description=str(self.identifier)
            )
    
    def find_related_markets(self, question: str):
        # Use Pinecone for similarity search
        return self.pinecone_handler.find_nearest_questions_with_threshold(
            text=question,
            limit=10,
            threshold=0.3
        )

Best Practices

Use Transactions

The SQLHandler uses database sessions with automatic commit/rollback. Always use save_multiple for batch operations.

Index Strategy

Add database indexes on frequently queried columns like agent_id, datetime_, and is_complete.

Memory Limits

Implement pagination when fetching large result sets. Use offset and limit parameters.

Cleanup

Regularly archive or delete old records to maintain performance. Set up retention policies.

Configuration

# Database URL (defaults to local SQLite if not set)
DATABASE_URL=postgresql://user:password@localhost:5432/prediction_market

# Pinecone
PINECONE_API_KEY=your_pinecone_key

# OpenAI for embeddings
OPENAI_API_KEY=your_openai_key
If sqlalchemy_db_url is not provided to SQLHandler, it will use the default from prediction_market_agent_tooling.tools.db.db_manager.DBManager, which typically creates a local SQLite database.

Dependencies

pip install sqlmodel sqlalchemy psycopg2-binary pinecone-client langchain-pinecone langchain-openai

Migration Tips

1

Define Models

Create SQLModel classes with table=True
2

Initialize Handler

The handler automatically creates tables if they don’t exist
3

Add Indexes

Use SQLAlchemy’s Index in your model definitions
4

Test Migrations

Use Alembic for complex schema changes in production

Build docs developers (and LLMs) love