BinaryDB provides a comprehensive exception hierarchy to help you handle errors gracefully. All custom exceptions inherit from a base DatabaseError class.
Exception Hierarchy
All BinaryDB exceptions are defined in errors.py and follow this structure:
DatabaseError (base exception)
├── DatabaseIOError
├── DatabaseCorruptedError
├── KeyValidationError
├── RecordTypeError
├── TransactionError
└── ConcurrencyError
Base Exception
class DatabaseError ( Exception ):
"""Base exception for all database-related errors."""
Catch this to handle any BinaryDB error:
from binarydb.database import Database
from binarydb.errors import DatabaseError
db = Database( "mydata.pkl" )
try :
db.load()
db.set( "key" , "value" )
db.commit()
except DatabaseError as e:
print ( f "Database error occurred: { e } " )
# Handle any database-related error
DatabaseIOError
Raised when disk I/O operations fail.
class DatabaseIOError ( DatabaseError ):
"""Raised when disk I/O operations fail."""
When It’s Raised
commit() - Failed to write database to disk (database.py:168)
Common Causes
Insufficient disk space
Permission denied (read-only filesystem)
Disk hardware failure
Path doesn’t exist or isn’t writable
Example Usage
from binarydb.database import Database
from binarydb.errors import DatabaseIOError
db = Database( "/read-only/path/mydata.pkl" )
db.set( "key" , "value" )
try :
db.commit()
except DatabaseIOError as e:
print ( f "Failed to save database: { e } " )
print ( f "Underlying cause: { e.__cause__ } " )
# Fallback strategies:
# 1. Retry with exponential backoff
# 2. Save to alternative location
# 3. Keep in memory only
# 4. Alert user/admin
Implementation Detail
try :
with tmp.open( "wb" ) as f:
pickle.dump( self ._data, f, protocol = pickle. HIGHEST_PROTOCOL )
tmp.replace( self ._path)
except OSError as exc:
raise DatabaseIOError( "Failed to write database to disk" ) from exc
The original OSError is preserved using from exc, allowing you to inspect the underlying system error:
try :
db.commit()
except DatabaseIOError as e:
if isinstance (e.__cause__, PermissionError ):
print ( "Permission denied - check file permissions" )
elif isinstance (e.__cause__, FileNotFoundError ):
print ( "Directory doesn't exist" )
elif isinstance (e.__cause__, OSError ):
print ( f "System error: { e.__cause__.errno } " )
DatabaseCorruptedError
Raised when the database file format is invalid or corrupted.
class DatabaseCorruptedError ( DatabaseError ):
"""Raised when the database file format is invalid or corrupted."""
When It’s Raised
load() - Failed to deserialize pickle file (database.py:186-189)
load() - File doesn’t contain a dictionary (database.py:191-192)
Common Causes
File was manually edited or corrupted
Incomplete write from previous crash
Pickle version incompatibility
File contains valid pickle data but wrong format (not a dict)
Example Usage
from binarydb.database import Database
from binarydb.errors import DatabaseCorruptedError
import shutil
from pathlib import Path
def load_with_backup ( db_path ):
"""Load database with automatic backup restoration."""
db = Database(db_path)
backup_path = Path( str (db_path) + ".backup" )
try :
db.load()
print ( "Database loaded successfully" )
except DatabaseCorruptedError as e:
print ( f "Database corrupted: { e } " )
if backup_path.exists():
print ( "Restoring from backup..." )
shutil.copy(backup_path, db._path)
db.load()
print ( "Backup restored successfully" )
else :
print ( "No backup found, starting with empty database" )
# db is already empty, just save it
db.commit()
return db
db = load_with_backup( "mydata.pkl" )
Implementation Detail
try :
with self ._path.open( "rb" ) as f:
data = pickle.load(f)
except Exception as exc:
raise DatabaseCorruptedError(
"Failed to load database file"
) from exc
if not isinstance (data, dict ):
raise DatabaseCorruptedError( "Invalid database format" )
KeyValidationError
Raised when a database key is invalid.
class KeyValidationError ( DatabaseError ):
"""Raised when a database key is invalid."""
When It’s Raised
Raised by _validate_key() when:
Key is not a string (database.py:66-67)
Key is an empty string (database.py:69-70)
Called by all key-based operations:
set(), get(), delete(), update(), exists()
Common Causes
Passing integer, bytes, or other non-string types as keys
Passing empty string "" as key
Using None as key
Example Usage
from binarydb.database import Database
from binarydb.errors import KeyValidationError
db = Database( "mydata.pkl" )
# Invalid: Integer key
try :
db.set( 123 , "value" )
except KeyValidationError as e:
print (e) # "Database keys must be strings"
# Invalid: Empty string key
try :
db.set( "" , "value" )
except KeyValidationError as e:
print (e) # "Database keys cannot be empty"
# Invalid: None key
try :
db.set( None , "value" )
except KeyValidationError as e:
print (e) # "Database keys must be strings"
# Valid: Non-empty string
db.set( "valid_key" , "value" ) # OK
Validation Helper
You can validate keys before operations:
def safe_set ( db , key , value ):
"""Safely set a value with key validation."""
if not isinstance (key, str ):
key = str (key) # Convert to string
if not key:
raise ValueError ( "Key cannot be empty" )
try :
db.set(key, value)
return True
except KeyValidationError as e:
print ( f "Validation failed: { e } " )
return False
# Usage
safe_set(db, 123 , "value" ) # Converts 123 to "123"
safe_set(db, "" , "value" ) # Raises ValueError before calling db.set()
RecordTypeError
Raised when a stored record has an unexpected type.
class RecordTypeError ( DatabaseError ):
"""Raised when a stored record has an unexpected type."""
When It’s Raised
update() - Attempting to update a non-dictionary record (database.py:128-129)
Common Causes
Calling update() on a string, number, list, or other non-dict value
Assuming a value is a dict when it’s not
Example Usage
from binarydb.database import Database
from binarydb.errors import RecordTypeError
db = Database( "mydata.pkl" )
# Store different types
db.set( "user" , { "name" : "Alice" , "age" : 30 }) # Dict
db.set( "score" , 100 ) # Integer
db.set( "tags" , [ "python" , "database" ]) # List
# Valid: Update dict
db.update( "user" , { "age" : 31 }) # OK
# Invalid: Update non-dict
try :
db.update( "score" , { "value" : 200 })
except RecordTypeError as e:
print (e) # "Cannot update non-dictionary record"
try :
db.update( "tags" , { "new_tag" : "python3" })
except RecordTypeError as e:
print (e) # "Cannot update non-dictionary record"
Safe Update Pattern
def safe_update ( db , key , changes ):
"""Safely update a record with type checking."""
if not db.exists(key):
print ( f "Key ' { key } ' doesn't exist" )
return False
record = db.get(key)
if not isinstance (record, dict ):
print ( f "Cannot update non-dict record (type: { type (record). __name__ } )" )
return False
try :
db.update(key, changes)
return True
except RecordTypeError as e:
print ( f "Update failed: { e } " )
return False
# Usage
db.set( "user" , { "name" : "Alice" })
safe_update(db, "user" , { "age" : 30 }) # Success
db.set( "count" , 42 )
safe_update(db, "count" , { "value" : 100 }) # Fails gracefully
Implementation Detail
record = self ._data[key]
if not isinstance (record, dict ):
raise RecordTypeError( "Cannot update non-dictionary record" )
TransactionError
Raised on invalid transaction operations.
class TransactionError ( DatabaseError ):
"""Raised on invalid transaction operations."""
When It’s Raised
begin() - Starting a nested transaction (database.py:209-210)
rollback() - No active transaction (database.py:221-222)
end() - No active transaction (database.py:236-237)
Common Causes
Calling begin() twice without end() or rollback()
Calling rollback() or end() without begin()
Forgetting to close a transaction
Example Usage
from binarydb.database import Database
from binarydb.errors import TransactionError
db = Database( "mydata.pkl" )
db.load()
# Error: Nested transaction
db.begin()
try :
db.begin() # Error!
except TransactionError as e:
print (e) # "Transaction already active"
db.end()
# Error: No active transaction
try :
db.rollback() # No transaction started
except TransactionError as e:
print (e) # "No active transaction"
try :
db.end() # No transaction started
except TransactionError as e:
print (e) # "No active transaction"
Safe Transaction Pattern
def safe_transaction ( db , operations ):
"""
Execute operations in a transaction with proper error handling.
Args:
db: Database instance
operations: Callable that performs database operations
Returns:
True if transaction succeeded, False otherwise
"""
try :
db.begin()
except TransactionError:
print ( "Transaction already active, rolling back first" )
db.rollback()
db.begin()
try :
operations(db)
db.end()
return True
except Exception as e:
print ( f "Transaction failed: { e } " )
try :
db.rollback()
except TransactionError:
pass # Already rolled back
return False
# Usage
def my_operations ( db ):
db.set( "key1" , "value1" )
db.set( "key2" , "value2" )
db.update( "existing" , { "field" : "value" })
success = safe_transaction(db, my_operations)
if success:
print ( "Transaction committed" )
else :
print ( "Transaction rolled back" )
ConcurrencyError
Raised when concurrent access rules are violated.
class ConcurrencyError ( DatabaseError ):
"""Raised when concurrent access rules are violated."""
When It’s Raised
Currently not raised by any code in the provided implementation. Reserved for future concurrency features like file locking or multi-process access control.
Potential Future Usage
# Hypothetical future implementation
from binarydb.database import Database
from binarydb.errors import ConcurrencyError
db = Database( "mydata.pkl" )
try :
db.load() # Might acquire lock
except ConcurrencyError as e:
print ( "Database is locked by another process" )
# Wait and retry, or fail gracefully
Comprehensive Error Handling Example
from binarydb.database import Database
from binarydb.errors import (
DatabaseError,
DatabaseIOError,
DatabaseCorruptedError,
KeyValidationError,
RecordTypeError,
TransactionError,
)
import logging
import time
logging.basicConfig( level = logging. INFO )
class SafeDatabase :
"""Wrapper for Database with comprehensive error handling."""
def __init__ ( self , path , max_retries = 3 ):
self .db = Database(path)
self .max_retries = max_retries
self ._load_database()
def _load_database ( self ):
"""Load database with retry logic."""
for attempt in range ( self .max_retries):
try :
self .db.load()
logging.info( f "Database loaded: { len ( self .db) } records" )
return
except DatabaseCorruptedError as e:
logging.error( f "Database corrupted: { e } " )
if attempt == self .max_retries - 1 :
logging.warning( "Starting with empty database" )
return
time.sleep( 1 )
except DatabaseIOError as e:
logging.error( f "I/O error loading database: { e } " )
if attempt == self .max_retries - 1 :
raise
time.sleep( 1 )
def set ( self , key , value ):
"""Safely set a value."""
try :
self .db.set(key, value)
return True
except KeyValidationError as e:
logging.error( f "Invalid key ' { key } ': { e } " )
return False
def get ( self , key , default = None ):
"""Safely get a value."""
try :
return self .db.get(key, default)
except KeyValidationError as e:
logging.error( f "Invalid key ' { key } ': { e } " )
return default
def update ( self , key , changes ):
"""Safely update a record."""
try :
self .db.update(key, changes)
return True
except KeyValidationError as e:
logging.error( f "Invalid key ' { key } ': { e } " )
return False
except KeyError :
logging.error( f "Key ' { key } ' not found" )
return False
except RecordTypeError as e:
logging.error( f "Cannot update ' { key } ': { e } " )
return False
def transact ( self , operations ):
"""Execute operations in a transaction."""
try :
self .db.begin()
except TransactionError as e:
logging.warning( f "Transaction error: { e } " )
return False
try :
operations( self .db)
self .db.end()
logging.info( "Transaction committed" )
return True
except DatabaseError as e:
logging.error( f "Transaction failed: { e } " )
try :
self .db.rollback()
except TransactionError:
pass
return False
def commit ( self ):
"""Safely commit changes."""
for attempt in range ( self .max_retries):
try :
self .db.commit()
logging.info( "Database committed successfully" )
return True
except DatabaseIOError as e:
logging.error( f "Commit failed (attempt { attempt + 1 } ): { e } " )
if attempt == self .max_retries - 1 :
logging.error( "Max retries reached, data not saved!" )
return False
time.sleep( 1 )
def close ( self ):
"""Close database safely."""
try :
self .db.close()
logging.info( "Database closed" )
except DatabaseError as e:
logging.error( f "Error closing database: { e } " )
# Usage
try :
db = SafeDatabase( "mydata.pkl" )
# Safe operations
db.set( "user:1" , { "name" : "Alice" })
db.update( "user:1" , { "email" : "[email protected] " })
# Safe transaction
def create_user ( db ):
db.set( "user:2" , { "name" : "Bob" })
db.set( "user:2:prefs" , { "theme" : "dark" })
db.transact(create_user)
# Safe commit
db.commit()
finally :
db.close()
Best Practices
Catch specific exceptions first
Handle specific errors before catching base DatabaseError: try :
db.load()
except DatabaseCorruptedError:
# Handle corruption specifically
restore_from_backup()
except DatabaseIOError:
# Handle I/O errors specifically
log_and_retry()
except DatabaseError:
# Handle any other database error
log_error()
Use from exc to preserve the original exception: try :
db.commit()
except DatabaseIOError as e:
# Original OSError is available via e.__cause__
logging.error( f "Save failed: { e } " , exc_info = True )
raise CustomError( "Failed to save" ) from e
Always close transactions
Use try-finally or context managers to ensure transactions are closed: db.begin()
try :
# ... operations ...
db.end()
except Exception :
db.rollback()
raise
Next Steps
Database Operations Master CRUD operations and error handling
Transactions Handle transaction errors effectively