LDAP Protocol
Impacket’s LDAP implementation provides comprehensive support for querying and modifying Active Directory via the Lightweight Directory Access Protocol, including advanced features like paging, signing, and Kerberos authentication.Overview
The LDAP implementation is located inimpacket/ldap/ and includes:
- Connection types - LDAP (389), LDAPS (636), Global Catalog (3268)
- Authentication - Simple bind, NTLM, Kerberos
- Operations - Search, add, modify, delete
- Advanced features - Paging, signing, channel binding
Active Directory
LDAP is the primary protocol for querying and modifying Active Directory. Impacket supports all major LDAP operations needed for enumeration and exploitation.
Connection Types
LDAP Connection (Port 389)
Fromldap.py:78-150, establishing basic LDAP connection:
from impacket.ldap import ldap
# Connect to LDAP
ldap_conn = ldap.LDAPConnection(
url='ldap://dc01.corp.local',
baseDN='DC=corp,DC=local'
)
# Authenticate with credentials
ldap_conn.login(
user='jdoe',
password='P@ssw0rd',
domain='CORP'
)
print("Connected to LDAP")
LDAPS Connection (Port 636)
LDAPS provides encryption via TLS/SSL. Channel binding is automatically configured for secure authentication.
# Secure LDAP with TLS
ldap_conn = ldap.LDAPConnection(
url='ldaps://dc01.corp.local', # Note: ldaps://
baseDN='DC=corp,DC=local'
)
ldap_conn.login('jdoe', 'P@ssw0rd', 'CORP')
print("Connected via LDAPS (TLS)")
Global Catalog (Port 3268)
# Global Catalog for forest-wide searches
ldap_conn = ldap.LDAPConnection(
url='gc://dc01.corp.local',
baseDN='DC=corp,DC=local'
)
ldap_conn.login('jdoe', 'P@ssw0rd', 'CORP')
print("Connected to Global Catalog")
Authentication Methods
- NTLM
- Kerberos
# Simple password authentication (NTLM)
ldap_conn.login(
user='jdoe',
password='P@ssw0rd',
domain='CORP'
)
# With NT hash
ldap_conn.login(
user='jdoe',
password='',
domain='CORP',
lmhash='',
nthash='31d6cfe0d16ae931b73c59d7e0c089c0'
)
# Kerberos authentication (from ldap.py:172-300)
ldap_conn.kerberosLogin(
user='jdoe',
password='P@ssw0rd',
domain='CORP.LOCAL',
kdcHost='dc01.corp.local'
)
# With AES key
ldap_conn.kerberosLogin(
user='jdoe',
password='',
domain='CORP.LOCAL',
aesKey='e5c4d39c...',
kdcHost='dc01.corp.local'
)
# With existing tickets
ldap_conn.kerberosLogin(
user='jdoe',
password='',
domain='CORP.LOCAL',
TGT=tgt,
TGS=tgs,
useCache=False
)
Search Operations
Basic Search
from impacket.ldap import ldap
from impacket.ldap.ldapasn1 import SearchResultEntry
ldap_conn = ldap.LDAPConnection('ldap://dc01.corp.local', 'DC=corp,DC=local')
ldap_conn.login('jdoe', 'P@ssw0rd', 'CORP')
# Search for all users
ldap_filter = '(objectClass=user)'
attributes = ['sAMAccountName', 'mail', 'memberOf']
ldap_conn.search(
searchFilter=ldap_filter,
attributes=attributes,
sizeLimit=100
)
# Process results
for entry in ldap_conn.getResponseIterator():
if isinstance(entry, SearchResultEntry):
dn = entry['objectName']
attrs = entry['attributes']
for attr in attrs:
name = str(attr['type'])
values = [str(val) for val in attr['vals']]
print(f"{name}: {', '.join(values)}")
print()
Search with Scope
from impacket.ldap.ldapasn1 import Scope
# Scope types
# - BASE: Search only the base DN
# - ONE: Search immediate children only
# - SUB: Search entire subtree (default)
# Search specific OU only (one level)
ldap_conn.search(
searchBase='OU=Users,DC=corp,DC=local',
searchFilter='(objectClass=user)',
scope=Scope('singleLevel'),
attributes=['sAMAccountName']
)
Complex Filters
# AND condition
ldap_filter = '(&(objectClass=user)(!(userAccountControl:1.2.840.113556.1.4.803:=2)))'
# Users that are NOT disabled
# OR condition
ldap_filter = '(|(objectClass=user)(objectClass=group))'
# Users OR groups
# Nested conditions
ldap_filter = '''
(&
(objectClass=user)
(objectCategory=person)
(!(userAccountControl:1.2.840.113556.1.4.803:=2))
(|(memberOf=CN=Admins,DC=corp,DC=local)(memberOf=CN=IT,DC=corp,DC=local))
)
'''
# Active users in Admin or IT groups
ldap_conn.search(
searchFilter=ldap_filter,
attributes=['sAMAccountName', 'distinguishedName']
)
Paged Search
Use paged searches for large result sets to avoid server limits. Default page size is 1000.
from impacket.ldap.ldapasn1 import SimplePagedResultsControl
# Search with paging
page_size = 500
cookie = ''
while True:
# Create paging control
if cookie:
paging_control = SimplePagedResultsControl(
criticality=False,
size=page_size,
cookie=cookie
)
else:
paging_control = SimplePagedResultsControl(
criticality=False,
size=page_size
)
ldap_conn.search(
searchFilter='(objectClass=user)',
attributes=['sAMAccountName'],
searchControls=[paging_control]
)
# Process page results
for entry in ldap_conn.getResponseIterator():
if isinstance(entry, SearchResultEntry):
# Process entry
pass
# Get cookie for next page
controls = ldap_conn.getResponseControls()
for control in controls:
if control['controlType'] == '1.2.840.113556.1.4.319': # Paged results
cookie = control['controlValue']['cookie']
break
# No more pages
if not cookie:
break
print("Paged search complete")
Common AD Queries
Enumerate Users
def enumerate_users(ldap_conn):
"""
Enumerate all active users
"""
ldap_filter = '''
(&
(objectClass=user)
(objectCategory=person)
(!(userAccountControl:1.2.840.113556.1.4.803:=2))
)
'''
attributes = [
'sAMAccountName',
'displayName',
'mail',
'memberOf',
'lastLogon',
'pwdLastSet',
'userAccountControl'
]
ldap_conn.search(
searchFilter=ldap_filter,
attributes=attributes
)
users = []
for entry in ldap_conn.getResponseIterator():
if isinstance(entry, SearchResultEntry):
user = {}
for attr in entry['attributes']:
name = str(attr['type'])
values = [str(val) for val in attr['vals']]
user[name] = values[0] if len(values) == 1 else values
users.append(user)
return users
# Usage
users = enumerate_users(ldap_conn)
for user in users:
print(f"{user['sAMAccountName']}: {user.get('mail', 'N/A')}")
Enumerate Groups
def enumerate_groups(ldap_conn):
"""
Enumerate all groups with members
"""
ldap_filter = '(objectClass=group)'
attributes = ['sAMAccountName', 'member', 'description']
ldap_conn.search(
searchFilter=ldap_filter,
attributes=attributes
)
groups = []
for entry in ldap_conn.getResponseIterator():
if isinstance(entry, SearchResultEntry):
group = {}
for attr in entry['attributes']:
name = str(attr['type'])
values = [str(val) for val in attr['vals']]
group[name] = values
groups.append(group)
return groups
# Find privileged groups
def find_privileged_groups(ldap_conn):
privileged_sids = [
'S-1-5-32-544', # Administrators
'S-1-5-32-548', # Account Operators
'S-1-5-32-549', # Server Operators
'S-1-5-32-551', # Backup Operators
]
for sid in privileged_sids:
ldap_filter = f'(objectSid={sid})'
ldap_conn.search(
searchFilter=ldap_filter,
attributes=['sAMAccountName', 'member']
)
for entry in ldap_conn.getResponseIterator():
if isinstance(entry, SearchResultEntry):
# Process privileged group
pass
Enumerate Computers
def enumerate_computers(ldap_conn):
"""
Enumerate domain computers
"""
ldap_filter = '(objectClass=computer)'
attributes = [
'dNSHostName',
'operatingSystem',
'operatingSystemVersion',
'lastLogon',
'servicePrincipalName'
]
ldap_conn.search(
searchFilter=ldap_filter,
attributes=attributes
)
computers = []
for entry in ldap_conn.getResponseIterator():
if isinstance(entry, SearchResultEntry):
computer = {}
for attr in entry['attributes']:
name = str(attr['type'])
values = [str(val) for val in attr['vals']]
computer[name] = values[0] if len(values) == 1 else values
computers.append(computer)
return computers
# Find domain controllers
def find_domain_controllers(ldap_conn):
ldap_filter = '(&(objectClass=computer)(userAccountControl:1.2.840.113556.1.4.803:=8192))'
ldap_conn.search(
searchFilter=ldap_filter,
attributes=['dNSHostName', 'operatingSystem']
)
for entry in ldap_conn.getResponseIterator():
if isinstance(entry, SearchResultEntry):
for attr in entry['attributes']:
if str(attr['type']) == 'dNSHostName':
dc = str(attr['vals'][0])
print(f"Domain Controller: {dc}")
Find SPNs (Kerberoasting)
def find_spns(ldap_conn):
"""
Find user accounts with SPNs for Kerberoasting
"""
ldap_filter = '''
(&
(objectClass=user)
(objectCategory=person)
(servicePrincipalName=*)
(!(samAccountName=krbtgt))
(!(userAccountControl:1.2.840.113556.1.4.803:=2))
)
'''
attributes = ['sAMAccountName', 'servicePrincipalName', 'memberOf']
ldap_conn.search(
searchFilter=ldap_filter,
attributes=attributes
)
vulnerable_accounts = []
for entry in ldap_conn.getResponseIterator():
if isinstance(entry, SearchResultEntry):
account = {}
for attr in entry['attributes']:
name = str(attr['type'])
values = [str(val) for val in attr['vals']]
account[name] = values
vulnerable_accounts.append(account)
return vulnerable_accounts
# Usage
spn_accounts = find_spns(ldap_conn)
for account in spn_accounts:
print(f"\nUser: {account['sAMAccountName'][0]}")
print("SPNs:")
for spn in account.get('servicePrincipalName', []):
print(f" - {spn}")
Modify Operations
Add Object
from impacket.ldap.ldapasn1 import AddRequest, Attribute, AttributeValue
def create_user(ldap_conn, username, password, ou):
"""
Create new user object
"""
user_dn = f"CN={username},{ou}"
attributes = [
('objectClass', ['top', 'person', 'organizationalPerson', 'user']),
('cn', [username]),
('sAMAccountName', [username]),
('userPrincipalName', [f"{username}@corp.local"]),
('displayName', [username]),
]
ldap_conn.add(
user_dn,
attributes
)
print(f"User {username} created")
# Usage
create_user(
ldap_conn,
'testuser',
'P@ssw0rd123',
'OU=Users,DC=corp,DC=local'
)
Modify Object
from impacket.ldap.ldapasn1 import Operation
def modify_user_attributes(ldap_conn, user_dn, attributes):
"""
Modify user attributes
"""
# Operation types (from ldap.py:72-75)
# MODIFY_ADD = 0
# MODIFY_DELETE = 1
# MODIFY_REPLACE = 2
modifications = []
for attr_name, attr_value, operation in attributes:
modifications.append({
'operation': operation,
'attribute': {
'type': attr_name,
'vals': attr_value if isinstance(attr_value, list) else [attr_value]
}
})
ldap_conn.modify(
user_dn,
modifications
)
print(f"Modified {user_dn}")
# Usage - Add email
from impacket.ldap.ldap import MODIFY_REPLACE, MODIFY_ADD
modify_user_attributes(
ldap_conn,
'CN=John Doe,OU=Users,DC=corp,DC=local',
[
('mail', '[email protected]', MODIFY_REPLACE),
('description', 'IT Administrator', MODIFY_REPLACE)
]
)
# Add to group
modify_user_attributes(
ldap_conn,
'CN=IT Group,OU=Groups,DC=corp,DC=local',
[
('member', 'CN=John Doe,OU=Users,DC=corp,DC=local', MODIFY_ADD)
]
)
Delete Object
def delete_object(ldap_conn, dn):
"""
Delete LDAP object
"""
ldap_conn.delete(dn)
print(f"Deleted {dn}")
# Usage
delete_object(
ldap_conn,
'CN=testuser,OU=Users,DC=corp,DC=local'
)
Modify DN (Rename/Move)
def rename_object(ldap_conn, old_dn, new_rdn, new_superior=None):
"""
Rename or move LDAP object
"""
ldap_conn.modifyDN(
old_dn,
new_rdn,
deleteOldRdn=True,
newSuperior=new_superior
)
print(f"Renamed {old_dn} to {new_rdn}")
# Rename user
rename_object(
ldap_conn,
'CN=John Doe,OU=Users,DC=corp,DC=local',
'CN=Jane Doe'
)
# Move to different OU
rename_object(
ldap_conn,
'CN=Jane Doe,OU=Users,DC=corp,DC=local',
'CN=Jane Doe',
newSuperior='OU=Admins,DC=corp,DC=local'
)
Security Features
LDAP Signing
LDAP signing ensures message integrity. It’s enabled by default when using NTLM or Kerberos authentication.
# Signing is automatic with SASL authentication
ldap_conn = ldap.LDAPConnection(
url='ldap://dc01.corp.local',
baseDN='DC=corp,DC=local',
signing=True # Default
)
ldap_conn.login('user', 'pass', 'DOMAIN')
# Messages are now signed
Channel Binding
Fromldap.py:152-170, channel binding for LDAPS:
# Channel binding is automatic with LDAPS
ldap_conn = ldap.LDAPConnection(
url='ldaps://dc01.corp.local',
baseDN='DC=corp,DC=local'
)
# TLS channel binding value computed from server certificate
# Prevents LDAP relay attacks
ldap_conn.login('user', 'pass', 'DOMAIN')
Advanced Queries
User Account Control Flags
# UAC flags for filtering
UAC_FLAGS = {
'SCRIPT': 0x0001,
'ACCOUNTDISABLE': 0x0002,
'HOMEDIR_REQUIRED': 0x0008,
'LOCKOUT': 0x0010,
'PASSWD_NOTREQD': 0x0020,
'PASSWD_CANT_CHANGE': 0x0040,
'ENCRYPTED_TEXT_PWD_ALLOWED': 0x0080,
'NORMAL_ACCOUNT': 0x0200,
'WORKSTATION_TRUST_ACCOUNT': 0x1000,
'SERVER_TRUST_ACCOUNT': 0x2000,
'DONT_EXPIRE_PASSWORD': 0x10000,
'SMARTCARD_REQUIRED': 0x40000,
'TRUSTED_FOR_DELEGATION': 0x80000,
'NOT_DELEGATED': 0x100000,
'USE_DES_KEY_ONLY': 0x200000,
'DONT_REQ_PREAUTH': 0x400000,
'PASSWORD_EXPIRED': 0x800000,
}
# Find users with password never expires
ldap_filter = '''
(&
(objectClass=user)
(objectCategory=person)
(userAccountControl:1.2.840.113556.1.4.803:=65536)
)
'''
# Find users that don't require Kerberos pre-auth (AS-REP Roasting)
ldap_filter = '''
(&
(objectClass=user)
(objectCategory=person)
(userAccountControl:1.2.840.113556.1.4.803:=4194304)
)
'''
# Find computers trusted for unconstrained delegation
ldap_filter = '''
(&
(objectClass=computer)
(userAccountControl:1.2.840.113556.1.4.803:=524288)
)
'''
Time-based Queries
import datetime
def filetime_to_datetime(ft):
"""Convert Windows FILETIME to datetime"""
return datetime.datetime(1601, 1, 1) + datetime.timedelta(microseconds=ft/10)
def datetime_to_filetime(dt):
"""Convert datetime to Windows FILETIME"""
epoch = datetime.datetime(1601, 1, 1)
return int((dt - epoch).total_seconds() * 10000000)
# Find users who haven't logged in for 90 days
ninety_days_ago = datetime.datetime.now() - datetime.timedelta(days=90)
ft = datetime_to_filetime(ninety_days_ago)
ldap_filter = f'''
(&
(objectClass=user)
(objectCategory=person)
(lastLogon<={ft})
)
'''
# Find recently created accounts (last 7 days)
seven_days_ago = datetime.datetime.now() - datetime.timedelta(days=7)
ft = datetime_to_filetime(seven_days_ago)
ldap_filter = f'''
(&
(objectClass=user)
(objectCategory=person)
(whenCreated>={ft})
)
'''
Error Handling
Always handle LDAP errors gracefully. Common errors include insufficient permissions, invalid filters, and connection issues.
from impacket.ldap.ldap import LDAPSessionError
try:
ldap_conn = ldap.LDAPConnection('ldap://dc01.corp.local', 'DC=corp,DC=local')
ldap_conn.login('user', 'pass', 'DOMAIN')
ldap_conn.search(
searchFilter='(objectClass=user)',
attributes=['sAMAccountName']
)
except LDAPSessionError as e:
print(f"LDAP Error: {e}")
except Exception as e:
print(f"Error: {e}")
finally:
if ldap_conn:
ldap_conn.close()
Complete Example: AD Enumerator
from impacket.ldap import ldap
from impacket.ldap.ldapasn1 import SearchResultEntry
import sys
def enumerate_ad(target, username, password, domain):
"""
Comprehensive Active Directory enumeration
"""
ldap_url = f'ldap://{target}'
base_dn = ','.join([f'DC={x}' for x in domain.split('.')])
print(f"[*] Connecting to {ldap_url}")
ldap_conn = ldap.LDAPConnection(ldap_url, base_dn)
try:
print(f"[*] Authenticating as {domain}\\{username}")
ldap_conn.login(username, password, domain)
print("[+] Authentication successful\n")
# Enumerate domain info
print("[*] Domain Information:")
ldap_conn.search(
searchBase=base_dn,
searchFilter='(objectClass=domain)',
scope='base',
attributes=['distinguishedName', 'name', 'whenCreated']
)
for entry in ldap_conn.getResponseIterator():
if isinstance(entry, SearchResultEntry):
for attr in entry['attributes']:
print(f" {attr['type']}: {attr['vals'][0]}")
# Count users
print("\n[*] Enumerating users...")
ldap_conn.search(
searchFilter='(&(objectClass=user)(objectCategory=person))',
attributes=['sAMAccountName'],
sizeLimit=0
)
user_count = 0
for entry in ldap_conn.getResponseIterator():
if isinstance(entry, SearchResultEntry):
user_count += 1
print(f"[+] Found {user_count} users")
# Count groups
print("\n[*] Enumerating groups...")
ldap_conn.search(
searchFilter='(objectClass=group)',
attributes=['sAMAccountName'],
sizeLimit=0
)
group_count = 0
for entry in ldap_conn.getResponseIterator():
if isinstance(entry, SearchResultEntry):
group_count += 1
print(f"[+] Found {group_count} groups")
# Find administrators
print("\n[*] Enumerating Domain Admins...")
ldap_conn.search(
searchFilter='(memberOf=CN=Domain Admins,CN=Users,' + base_dn + ')',
attributes=['sAMAccountName', 'distinguishedName']
)
print("Domain Admins:")
for entry in ldap_conn.getResponseIterator():
if isinstance(entry, SearchResultEntry):
for attr in entry['attributes']:
if str(attr['type']) == 'sAMAccountName':
print(f" - {attr['vals'][0]}")
# Find computers
print("\n[*] Enumerating computers...")
ldap_conn.search(
searchFilter='(objectClass=computer)',
attributes=['dNSHostName', 'operatingSystem'],
sizeLimit=10
)
print("Sample computers:")
for entry in ldap_conn.getResponseIterator():
if isinstance(entry, SearchResultEntry):
hostname = ''
os = ''
for attr in entry['attributes']:
if str(attr['type']) == 'dNSHostName':
hostname = str(attr['vals'][0])
elif str(attr['type']) == 'operatingSystem':
os = str(attr['vals'][0])
print(f" - {hostname}: {os}")
# Find SPNs
print("\n[*] Finding accounts with SPNs...")
ldap_conn.search(
searchFilter='''
(&
(objectClass=user)
(servicePrincipalName=*)
(!(samAccountName=krbtgt))
)
''',
attributes=['sAMAccountName', 'servicePrincipalName']
)
spn_count = 0
for entry in ldap_conn.getResponseIterator():
if isinstance(entry, SearchResultEntry):
spn_count += 1
for attr in entry['attributes']:
if str(attr['type']) == 'sAMAccountName':
print(f" - {attr['vals'][0]}")
print(f"\n[+] Found {spn_count} accounts with SPNs")
except Exception as e:
print(f"[-] Error: {e}")
return 1
finally:
ldap_conn.close()
print("\n[*] Connection closed")
return 0
if __name__ == '__main__':
if len(sys.argv) < 5:
print(f"Usage: {sys.argv[0]} <target> <username> <password> <domain>")
sys.exit(1)
target = sys.argv[1]
username = sys.argv[2]
password = sys.argv[3]
domain = sys.argv[4]
sys.exit(enumerate_ad(target, username, password, domain))
Related Topics
- Kerberos - Kerberos authentication for LDAP
- SMB Protocol - SMB for accessing domain controllers
References
- Source:
impacket/ldap/ - RFC 4511: LDAP Protocol
- [MS-ADTS]: Active Directory Technical Specification
- [MS-ADSC]: Active Directory Schema Classes