Overview
The database utilities provide comprehensive data persistence for prediction market agents, including SQL storage, vector embeddings with Pinecone, and long-term memory management.
SQL Handler
The SQLHandler is a generic utility for managing SQLModel tables with built-in CRUD operations.
from prediction_market_agent.db.sql_handler import SQLHandler
from prediction_market_agent.db.models import LongTermMemories
# Initialize handler
handler = SQLHandler(
model = LongTermMemories,
sqlalchemy_db_url = "postgresql://user:pass@localhost/db" # Optional
)
# Get all records
all_memories = handler.get_all()
# Save multiple items
handler.save_multiple( items = [memory1, memory2, memory3])
# Remove by ID
handler.remove_by_id( item_id = 123 )
Key Methods
Retrieves all records from the table
Saves multiple items to the database in a single transaction
Deletes multiple items from the database
Deletes a single item by its ID
get_with_filter_and_order(...)
Advanced query with filters, ordering, pagination
Counts records matching the given filters
Advanced Queries
from sqlmodel import col
from prediction_market_agent.db.models import EvaluatedGoalModel
from prediction_market_agent.db.sql_handler import SQLHandler
handler = SQLHandler( model = EvaluatedGoalModel)
# Filter and order results
recent_goals = handler.get_with_filter_and_order(
query_filters = [
col(EvaluatedGoalModel.agent_id) == "my_agent" ,
col(EvaluatedGoalModel.is_complete) == True
],
order_by_column_name = "datetime_" ,
order_desc = True ,
offset = 0 ,
limit = 10
)
# Count matching records
total_complete = handler.count(
query_filters = [
col(EvaluatedGoalModel.is_complete) == True
]
)
class SQLHandler :
def __init__ (
self ,
model : t.Type[SQLModelType],
sqlalchemy_db_url : str | None = None ,
):
self .db_manager = DBManager(sqlalchemy_db_url)
self .table = model
self ._init_table_if_not_exists()
def get_with_filter_and_order (
self ,
query_filters : t.Sequence[ColumnElement[ bool ] | BinaryExpression[ bool ]] = (),
order_by_column_name : str | None = None ,
order_desc : bool = True ,
offset : int = 0 ,
limit : int | None = None ,
) -> list[SQLModelType]:
with self .db_manager.get_session() as session:
query = session.query( self .table)
for exp in query_filters:
query = query.where(exp)
if order_by_column_name:
query = query.order_by(
desc(order_by_column_name)
if order_desc
else asc(order_by_column_name)
)
if offset:
query = query.offset(offset)
if limit:
query = query.limit(limit)
results = query.all()
return results
Database Models
The system includes several pre-defined models for agent operations:
LongTermMemories
from prediction_market_agent.db.models import LongTermMemories
from prediction_market_agent_tooling.tools.utils import utcnow
import json
# Create memory
memory = LongTermMemories(
task_description = "trading_agent_v1" ,
metadata_ = json.dumps({ "action" : "buy" , "market_id" : "0x123" }),
datetime_ = utcnow()
)
# Access metadata as dict
metadata_dict = memory.metadata_dict
Auto-generated primary key
Identifier for the task or agent
EvaluatedGoalModel
Tracks agent goals and completion status:
from prediction_market_agent.db.models import EvaluatedGoalModel
from prediction_market_agent_tooling.tools.utils import utcnow
goal = EvaluatedGoalModel(
agent_id = "trader_001" ,
goal = "Maximize profit on Omen markets" ,
motivation = "Earn 100 xDai" ,
completion_criteria = "Portfolio value > 1000 xDai" ,
is_complete = False ,
reasoning = "Still building positions" ,
output = None ,
datetime_ = utcnow()
)
Prompt
Checkpoints for agent prompts:
from prediction_market_agent.db.models import Prompt
from prediction_market_agent_tooling.tools.utils import utcnow
prompt = Prompt(
prompt = "Act as a prediction market trader..." ,
session_identifier = "session_123" ,
datetime_ = utcnow()
)
BlockchainMessage
Messages sent via blockchain transactions:
from prediction_market_agent.db.models import BlockchainMessage
message = BlockchainMessage(
consumer_address = "0xConsumer..." ,
sender_address = "0xSender..." ,
transaction_hash = "0xabcd1234..." ,
block = "12345678" ,
value_wei = "1000000000000000000" , # 1 xDai
data_field = "0x48656c6c6f"
)
# Access parsed values
block_number = message.block_parsed # int
value = message.value_wei_parsed # xDaiWei
Long-Term Memory Handler
Manage agent memories with automatic serialization:
from prediction_market_agent.db.long_term_memory_table_handler import LongTermMemoryTableHandler
from prediction_market_agent_tooling.tools.utils import utcnow
from datetime import timedelta
# Initialize handler
memory_handler = LongTermMemoryTableHandler(
task_description = "my_agent_v1"
)
# Save history
memory_handler.save_history([
{ "role" : "system" , "content" : "You are a trader" },
{ "role" : "user" , "content" : "What markets should I trade?" }
])
# Search memories
recent_memories = memory_handler.search(
from_ = utcnow() - timedelta( days = 7 ),
to_ = utcnow(),
limit = 50
)
# Count total memories
total = memory_handler.count()
The memory handler works seamlessly with ChatHistory: from prediction_market_agent.agents.microchain_agent.memory import ChatHistory, DatedChatHistory
# Create chat history
chat = ChatHistory.from_list_of_dicts([
{ "role" : "system" , "content" : "System prompt" },
{ "role" : "user" , "content" : "User message" },
{ "role" : "assistant" , "content" : "Assistant response" }
])
# Save to long-term memory
chat.save_to(memory_handler)
# Load from memory
loaded_chat = DatedChatHistory.from_long_term_memory(
memory_handler,
from_ = utcnow() - timedelta( days = 1 )
)
# Cluster by session
sessions = loaded_chat.cluster_by_session()
Prompt Table Handler
Manage agent prompts with session tracking:
from prediction_market_agent.db.prompt_table_handler import PromptTableHandler
from prediction_market_agent.agents.identifiers import AgentIdentifier
# Initialize from agent identifier
handler = PromptTableHandler.from_agent_identifier(
identifier = AgentIdentifier. TRADING_AGENT
)
# Save prompt
handler.save_prompt(
prompt = "Act as a prediction market trader. Research markets..."
)
# Fetch latest
latest_prompt = handler.fetch_latest_prompt()
if latest_prompt:
print ( f "Last updated: { latest_prompt.datetime_ } " )
print ( f "Prompt: { latest_prompt.prompt } " )
Pinecone Vector Database
The PineconeHandler provides vector embeddings for market similarity search:
from prediction_market_agent.db.pinecone_handler import PineconeHandler
from prediction_market_agent_tooling.tools.datetime_utc import DatetimeUTC
# Initialize handler
pinecone = PineconeHandler( model = "text-embedding-3-large" )
# Insert Omen markets
pinecone.insert_all_omen_markets_if_not_exists(
created_after = DatetimeUTC( 2024 , 1 , 1 )
)
# Find similar markets
similar = pinecone.find_nearest_questions_with_threshold(
text = "Will Bitcoin reach $100k?" ,
limit = 10 ,
threshold = 0.25 ,
filter_on_metadata = {
"category" : { "$eq" : "Cryptocurrency" }
}
)
for market in similar:
print ( f "Market: { market.market_title } " )
print ( f "URL: { market.market_url } " )
Automatic deduplication : Filters out duplicate markets
Batch insertion : Processes markets in chunks of 100
Similarity search : Finds related markets using embeddings
Metadata filtering : Filter by category, date, volume, etc.
SHA-256 IDs : Uses deterministic IDs based on question titles
def insert_all_omen_markets_if_not_exists (
self , created_after : DatetimeUTC | None = None
) -> None :
# Fetch markets from Omen subgraph
markets = subgraph_handler.get_omen_markets_simple(
limit = sys.maxsize,
filter_by = FilterBy. NONE ,
sort_by = SortBy. NEWEST ,
created_after = created_after,
)
# Deduplicate by title
markets_without_duplicates = self .deduplicate_markets(markets)
# Filter out existing markets
missing_markets = self .filter_markets_already_in_index(
markets = markets_without_duplicates
)
# Insert in batches
for market in missing_markets:
texts.append(market.question_title)
metadatas.append(PineconeMetadata.from_omen_market(market).model_dump())
# Batch insert
for text_chunk, metadata_chunk in zip (chunked_texts, chunked_metadatas):
ids_chunk = [ self .encode_text(text) for text in text_chunk]
self .insert_texts(
ids = ids_chunk,
texts = text_chunk,
metadatas = metadata_chunk
)
The handler uses cosine similarity with configurable thresholds: # Find very similar markets (high threshold)
exact_matches = pinecone.find_nearest_questions_with_threshold(
text = "Who will win the 2024 US Presidential election?" ,
limit = 5 ,
threshold = 0.8 # Very high similarity
)
# Find loosely related markets (low threshold)
related = pinecone.find_nearest_questions_with_threshold(
text = "US politics" ,
limit = 20 ,
threshold = 0.25 # Broader search
)
# Filter by metadata
recent_crypto = pinecone.find_nearest_questions_with_threshold(
text = "Bitcoin price" ,
limit = 10 ,
threshold = 0.3 ,
filter_on_metadata = {
"created_at" : { "$gte" : "2024-01-01" },
"category" : { "$eq" : "Cryptocurrency" }
}
)
Evaluated Goal Handler
Track and retrieve agent goals:
from prediction_market_agent.db.evaluated_goal_table_handler import EvaluatedGoalTableHandler
from prediction_market_agent.db.models import EvaluatedGoalModel
from prediction_market_agent_tooling.tools.utils import utcnow
# Initialize
goal_handler = EvaluatedGoalTableHandler(
agent_id = "my_trading_agent"
)
# Save goal
goal = EvaluatedGoalModel(
agent_id = "my_trading_agent" ,
goal = "Maximize trading profit" ,
motivation = "Build sustainable income" ,
completion_criteria = "Portfolio > 1000 xDai" ,
is_complete = False ,
reasoning = "Starting with research phase" ,
output = None ,
datetime_ = utcnow()
)
goal_handler.save_evaluated_goal(goal)
# Retrieve latest goals
recent_goals = goal_handler.get_latest_evaluated_goals( limit = 10 )
Usage in Agents
Think Thoroughly Agent
The Think Thoroughly Agent uses multiple database components:
think_thoroughly_usage.py
from prediction_market_agent.agents.think_thoroughly_agent.think_thoroughly_agent import ThinkThoroughlyBase
from prediction_market_agent.db.long_term_memory_table_handler import LongTermMemoryTableHandler
from prediction_market_agent.db.pinecone_handler import PineconeHandler
class ThinkThoroughlyAgent ( ThinkThoroughlyBase ):
def __init__ ( self , enable_langfuse : bool , memory : bool = True ):
super (). __init__ (enable_langfuse, memory)
# Initialize database handlers
self .pinecone_handler = PineconeHandler()
if memory:
self ._long_term_memory = LongTermMemoryTableHandler(
task_description = str ( self .identifier)
)
def find_related_markets ( self , question : str ):
# Use Pinecone for similarity search
return self .pinecone_handler.find_nearest_questions_with_threshold(
text = question,
limit = 10 ,
threshold = 0.3
)
Best Practices
Use Transactions The SQLHandler uses database sessions with automatic commit/rollback. Always use save_multiple for batch operations.
Index Strategy Add database indexes on frequently queried columns like agent_id, datetime_, and is_complete.
Memory Limits Implement pagination when fetching large result sets. Use offset and limit parameters.
Cleanup Regularly archive or delete old records to maintain performance. Set up retention policies.
Configuration
# Database URL (defaults to local SQLite if not set)
DATABASE_URL = postgresql://user:password@localhost:5432/prediction_market
# Pinecone
PINECONE_API_KEY = your_pinecone_key
# OpenAI for embeddings
OPENAI_API_KEY = your_openai_key
If sqlalchemy_db_url is not provided to SQLHandler, it will use the default from prediction_market_agent_tooling.tools.db.db_manager.DBManager, which typically creates a local SQLite database.
Dependencies
pip install sqlmodel sqlalchemy psycopg2-binary pinecone-client langchain-pinecone langchain-openai
Migration Tips
Define Models
Create SQLModel classes with table=True
Initialize Handler
The handler automatically creates tables if they don’t exist
Add Indexes
Use SQLAlchemy’s Index in your model definitions
Test Migrations
Use Alembic for complex schema changes in production