Skip to main content
The Python ecosystem provides multiple drivers for connecting to YugabyteDB. The most commonly used driver is Psycopg2, which is fully compatible with YugabyteDB’s PostgreSQL-compatible YSQL API.

Installation

pip install psycopg2-binary
For production deployments, build psycopg2 from source for optimal performance. The psycopg2-binary package is recommended for development and testing only.

Quick Start

Basic Connection

import psycopg2

# Connect to YugabyteDB
conn = psycopg2.connect(
    host='localhost',
    port=5433,
    database='yugabyte',
    user='yugabyte',
    password='yugabyte'
)

try:
    # Create a cursor
    cursor = conn.cursor()
    
    # Execute query
    cursor.execute('SELECT version()')
    version = cursor.fetchone()
    print(f'Database version: {version[0]}')
    
    # Commit transaction
    conn.commit()
finally:
    cursor.close()
    conn.close()

Using Context Managers

import psycopg2

# Automatic connection management
with psycopg2.connect(
    host='localhost',
    port=5433,
    dbname='yugabyte',
    user='yugabyte',
    password='yugabyte'
) as conn:
    with conn.cursor() as cursor:
        cursor.execute(
            'SELECT name, age FROM users WHERE id = %s',
            (user_id,)
        )
        result = cursor.fetchone()
        print(f'User: {result}')

Connection Configuration

Connection String

conn = psycopg2.connect(
    "dbname=yugabyte host=localhost port=5433 user=yugabyte password=yugabyte"
)

SSL/TLS Configuration

import psycopg2

# Connect with SSL
conn = psycopg2.connect(
    host='your-cluster.yugabyte.cloud',
    port=5433,
    dbname='yugabyte',
    user='admin',
    password='your-password',
    sslmode='verify-full',
    sslrootcert='/path/to/root.crt'
)

SSL Modes

ModeDescription
disableNo SSL connection
allowTry SSL, fallback to non-SSL
preferPrefer SSL (default)
requireRequire SSL, don’t verify certificate
verify-caRequire SSL, verify CA
verify-fullRequire SSL, verify CA and hostname

Connection Pooling

Simple Connection Pool

from psycopg2 import pool

# Create connection pool
connection_pool = pool.SimpleConnectionPool(
    minconn=1,
    maxconn=10,
    host='localhost',
    port=5433,
    database='yugabyte',
    user='yugabyte',
    password='yugabyte'
)

# Get connection from pool
conn = connection_pool.getconn()

try:
    cursor = conn.cursor()
    cursor.execute('SELECT * FROM users')
    results = cursor.fetchall()
finally:
    # Return connection to pool
    connection_pool.putconn(conn)

# Close all connections
connection_pool.closeall()

Threaded Connection Pool

from psycopg2 import pool
import threading

# Thread-safe connection pool
connection_pool = pool.ThreadedConnectionPool(
    minconn=2,
    maxconn=20,
    host='localhost',
    port=5433,
    database='yugabyte',
    user='yugabyte',
    password='yugabyte'
)

def query_database(query):
    conn = connection_pool.getconn()
    try:
        with conn.cursor() as cursor:
            cursor.execute(query)
            return cursor.fetchall()
    finally:
        connection_pool.putconn(conn)

# Use in multiple threads
threads = []
for i in range(5):
    t = threading.Thread(
        target=query_database,
        args=('SELECT * FROM users LIMIT 10',)
    )
    threads.append(t)
    t.start()

for t in threads:
    t.join()

CRUD Operations

Create (Insert)

# Single insert
cursor.execute(
    "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)",
    ('John Doe', '[email protected]', 30)
)

# Bulk insert
users_data = [
    ('Alice', '[email protected]', 25),
    ('Bob', '[email protected]', 35),
    ('Charlie', '[email protected]', 28)
]

cursor.executemany(
    "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)",
    users_data
)

conn.commit()

Read (Select)

# Fetch one
cursor.execute('SELECT * FROM users WHERE id = %s', (1,))
user = cursor.fetchone()
print(f'User: {user}')

# Fetch many
cursor.execute('SELECT * FROM users WHERE age > %s', (25,))
users = cursor.fetchmany(size=10)

# Fetch all
cursor.execute('SELECT * FROM users')
all_users = cursor.fetchall()

# Iterate over results
cursor.execute('SELECT name, email FROM users')
for name, email in cursor:
    print(f'{name}: {email}')

Update

# Update single record
cursor.execute(
    "UPDATE users SET age = %s WHERE id = %s",
    (31, 1)
)

# Update multiple records
cursor.execute(
    "UPDATE users SET status = %s WHERE age > %s",
    ('senior', 50)
)

print(f'Updated {cursor.rowcount} rows')
conn.commit()

Delete

# Delete specific record
cursor.execute('DELETE FROM users WHERE id = %s', (1,))

# Delete with condition
cursor.execute('DELETE FROM users WHERE age < %s', (18,))

print(f'Deleted {cursor.rowcount} rows')
conn.commit()

Async Operations (aiopg)

Async Connection

import asyncio
import aiopg

async def main():
    # Create async connection
    dsn = 'dbname=yugabyte user=yugabyte password=yugabyte host=localhost port=5433'
    async with aiopg.create_pool(dsn) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cursor:
                await cursor.execute('SELECT * FROM users')
                rows = await cursor.fetchall()
                print(rows)

# Run async code
asyncio.run(main())

Async Connection Pool

import asyncio
import aiopg

async def query_user(pool, user_id):
    async with pool.acquire() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute(
                'SELECT name, email FROM users WHERE id = %s',
                (user_id,)
            )
            return await cursor.fetchone()

async def main():
    dsn = 'dbname=yugabyte user=yugabyte password=yugabyte host=localhost'
    
    async with aiopg.create_pool(dsn, minsize=5, maxsize=10) as pool:
        # Concurrent queries
        tasks = [query_user(pool, i) for i in range(1, 11)]
        results = await asyncio.gather(*tasks)
        print(results)

asyncio.run(main())

Best Practices

Parameterized Queries

Always use parameterized queries to prevent SQL injection:
# ✅ Good - Parameterized query
cursor.execute(
    'SELECT * FROM users WHERE email = %s',
    (user_email,)
)

# ❌ Bad - String interpolation (vulnerable to SQL injection)
cursor.execute(
    f'SELECT * FROM users WHERE email = \'{user_email}\''
)

Transaction Management

import psycopg2

conn = psycopg2.connect(
    host='localhost',
    port=5433,
    database='yugabyte',
    user='yugabyte',
    password='yugabyte'
)

try:
    cursor = conn.cursor()
    
    # Start transaction (implicit)
    cursor.execute('INSERT INTO accounts (id, balance) VALUES (%s, %s)', (1, 1000))
    cursor.execute('INSERT INTO accounts (id, balance) VALUES (%s, %s)', (2, 2000))
    
    # Commit transaction
    conn.commit()
    print('Transaction committed')
except Exception as e:
    # Rollback on error
    conn.rollback()
    print(f'Transaction rolled back: {e}')
finally:
    cursor.close()
    conn.close()

Cursor Types

# Default cursor
cursor = conn.cursor()

# Dictionary cursor - access by column name
import psycopg2.extras
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cursor.execute('SELECT name, email FROM users WHERE id = 1')
row = cursor.fetchone()
print(f"Name: {row['name']}, Email: {row['email']}")

# Named tuple cursor
cursor = conn.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor)
cursor.execute('SELECT name, email FROM users WHERE id = 1')
row = cursor.fetchone()
print(f"Name: {row.name}, Email: {row.email}")

Error Handling

import psycopg2
from psycopg2 import errors

try:
    conn = psycopg2.connect(
        host='localhost',
        port=5433,
        database='yugabyte',
        user='yugabyte',
        password='yugabyte'
    )
    cursor = conn.cursor()
    cursor.execute('SELECT * FROM non_existent_table')
except errors.UndefinedTable as e:
    print(f'Table does not exist: {e}')
except errors.OperationalError as e:
    print(f'Connection error: {e}')
except psycopg2.Error as e:
    print(f'Database error: {e}')
finally:
    if 'cursor' in locals():
        cursor.close()
    if 'conn' in locals():
        conn.close()

Additional Resources

Build docs developers (and LLMs) love