Skip to main content

LDAP Protocol

Impacket’s LDAP implementation provides comprehensive support for querying and modifying Active Directory via the Lightweight Directory Access Protocol, including advanced features like paging, signing, and Kerberos authentication.

Overview

The LDAP implementation is located in impacket/ldap/ and includes:
  • Connection types - LDAP (389), LDAPS (636), Global Catalog (3268)
  • Authentication - Simple bind, NTLM, Kerberos
  • Operations - Search, add, modify, delete
  • Advanced features - Paging, signing, channel binding

Active Directory

LDAP is the primary protocol for querying and modifying Active Directory. Impacket supports all major LDAP operations needed for enumeration and exploitation.

Connection Types

LDAP Connection (Port 389)

From ldap.py:78-150, establishing basic LDAP connection:
from impacket.ldap import ldap

# Connect to LDAP
ldap_conn = ldap.LDAPConnection(
    url='ldap://dc01.corp.local',
    baseDN='DC=corp,DC=local'
)

# Authenticate with credentials
ldap_conn.login(
    user='jdoe',
    password='P@ssw0rd',
    domain='CORP'
)

print("Connected to LDAP")

LDAPS Connection (Port 636)

LDAPS provides encryption via TLS/SSL. Channel binding is automatically configured for secure authentication.
# Secure LDAP with TLS
ldap_conn = ldap.LDAPConnection(
    url='ldaps://dc01.corp.local',  # Note: ldaps://
    baseDN='DC=corp,DC=local'
)

ldap_conn.login('jdoe', 'P@ssw0rd', 'CORP')
print("Connected via LDAPS (TLS)")

Global Catalog (Port 3268)

# Global Catalog for forest-wide searches
ldap_conn = ldap.LDAPConnection(
    url='gc://dc01.corp.local',
    baseDN='DC=corp,DC=local'
)

ldap_conn.login('jdoe', 'P@ssw0rd', 'CORP')
print("Connected to Global Catalog")

Authentication Methods

# Simple password authentication (NTLM)
ldap_conn.login(
    user='jdoe',
    password='P@ssw0rd',
    domain='CORP'
)

# With NT hash
ldap_conn.login(
    user='jdoe',
    password='',
    domain='CORP',
    lmhash='',
    nthash='31d6cfe0d16ae931b73c59d7e0c089c0'
)

Search Operations

from impacket.ldap import ldap
from impacket.ldap.ldapasn1 import SearchResultEntry

ldap_conn = ldap.LDAPConnection('ldap://dc01.corp.local', 'DC=corp,DC=local')
ldap_conn.login('jdoe', 'P@ssw0rd', 'CORP')

# Search for all users
ldap_filter = '(objectClass=user)'
attributes = ['sAMAccountName', 'mail', 'memberOf']

ldap_conn.search(
    searchFilter=ldap_filter,
    attributes=attributes,
    sizeLimit=100
)

# Process results
for entry in ldap_conn.getResponseIterator():
    if isinstance(entry, SearchResultEntry):
        dn = entry['objectName']
        attrs = entry['attributes']
        
        for attr in attrs:
            name = str(attr['type'])
            values = [str(val) for val in attr['vals']]
            print(f"{name}: {', '.join(values)}")
        print()

Search with Scope

from impacket.ldap.ldapasn1 import Scope

# Scope types
# - BASE: Search only the base DN
# - ONE: Search immediate children only  
# - SUB: Search entire subtree (default)

# Search specific OU only (one level)
ldap_conn.search(
    searchBase='OU=Users,DC=corp,DC=local',
    searchFilter='(objectClass=user)',
    scope=Scope('singleLevel'),
    attributes=['sAMAccountName']
)

Complex Filters

# AND condition
ldap_filter = '(&(objectClass=user)(!(userAccountControl:1.2.840.113556.1.4.803:=2)))'
# Users that are NOT disabled

# OR condition  
ldap_filter = '(|(objectClass=user)(objectClass=group))'
# Users OR groups

# Nested conditions
ldap_filter = '''
(&
  (objectClass=user)
  (objectCategory=person)
  (!(userAccountControl:1.2.840.113556.1.4.803:=2))
  (|(memberOf=CN=Admins,DC=corp,DC=local)(memberOf=CN=IT,DC=corp,DC=local))
)
'''
# Active users in Admin or IT groups

ldap_conn.search(
    searchFilter=ldap_filter,
    attributes=['sAMAccountName', 'distinguishedName']
)
Use paged searches for large result sets to avoid server limits. Default page size is 1000.
from impacket.ldap.ldapasn1 import SimplePagedResultsControl

# Search with paging
page_size = 500
cookie = ''

while True:
    # Create paging control
    if cookie:
        paging_control = SimplePagedResultsControl(
            criticality=False,
            size=page_size,
            cookie=cookie
        )
    else:
        paging_control = SimplePagedResultsControl(
            criticality=False,
            size=page_size
        )
    
    ldap_conn.search(
        searchFilter='(objectClass=user)',
        attributes=['sAMAccountName'],
        searchControls=[paging_control]
    )
    
    # Process page results
    for entry in ldap_conn.getResponseIterator():
        if isinstance(entry, SearchResultEntry):
            # Process entry
            pass
    
    # Get cookie for next page
    controls = ldap_conn.getResponseControls()
    for control in controls:
        if control['controlType'] == '1.2.840.113556.1.4.319':  # Paged results
            cookie = control['controlValue']['cookie']
            break
    
    # No more pages
    if not cookie:
        break

print("Paged search complete")

Common AD Queries

Enumerate Users

def enumerate_users(ldap_conn):
    """
    Enumerate all active users
    """
    ldap_filter = '''
    (&
      (objectClass=user)
      (objectCategory=person)
      (!(userAccountControl:1.2.840.113556.1.4.803:=2))
    )
    '''
    
    attributes = [
        'sAMAccountName',
        'displayName',
        'mail',
        'memberOf',
        'lastLogon',
        'pwdLastSet',
        'userAccountControl'
    ]
    
    ldap_conn.search(
        searchFilter=ldap_filter,
        attributes=attributes
    )
    
    users = []
    for entry in ldap_conn.getResponseIterator():
        if isinstance(entry, SearchResultEntry):
            user = {}
            for attr in entry['attributes']:
                name = str(attr['type'])
                values = [str(val) for val in attr['vals']]
                user[name] = values[0] if len(values) == 1 else values
            users.append(user)
    
    return users

# Usage
users = enumerate_users(ldap_conn)
for user in users:
    print(f"{user['sAMAccountName']}: {user.get('mail', 'N/A')}")

Enumerate Groups

def enumerate_groups(ldap_conn):
    """
    Enumerate all groups with members
    """
    ldap_filter = '(objectClass=group)'
    attributes = ['sAMAccountName', 'member', 'description']
    
    ldap_conn.search(
        searchFilter=ldap_filter,
        attributes=attributes
    )
    
    groups = []
    for entry in ldap_conn.getResponseIterator():
        if isinstance(entry, SearchResultEntry):
            group = {}
            for attr in entry['attributes']:
                name = str(attr['type'])
                values = [str(val) for val in attr['vals']]
                group[name] = values
            groups.append(group)
    
    return groups

# Find privileged groups
def find_privileged_groups(ldap_conn):
    privileged_sids = [
        'S-1-5-32-544',  # Administrators
        'S-1-5-32-548',  # Account Operators
        'S-1-5-32-549',  # Server Operators
        'S-1-5-32-551',  # Backup Operators
    ]
    
    for sid in privileged_sids:
        ldap_filter = f'(objectSid={sid})'
        ldap_conn.search(
            searchFilter=ldap_filter,
            attributes=['sAMAccountName', 'member']
        )
        
        for entry in ldap_conn.getResponseIterator():
            if isinstance(entry, SearchResultEntry):
                # Process privileged group
                pass

Enumerate Computers

def enumerate_computers(ldap_conn):
    """
    Enumerate domain computers
    """
    ldap_filter = '(objectClass=computer)'
    attributes = [
        'dNSHostName',
        'operatingSystem',
        'operatingSystemVersion',
        'lastLogon',
        'servicePrincipalName'
    ]
    
    ldap_conn.search(
        searchFilter=ldap_filter,
        attributes=attributes
    )
    
    computers = []
    for entry in ldap_conn.getResponseIterator():
        if isinstance(entry, SearchResultEntry):
            computer = {}
            for attr in entry['attributes']:
                name = str(attr['type'])
                values = [str(val) for val in attr['vals']]
                computer[name] = values[0] if len(values) == 1 else values
            computers.append(computer)
    
    return computers

# Find domain controllers
def find_domain_controllers(ldap_conn):
    ldap_filter = '(&(objectClass=computer)(userAccountControl:1.2.840.113556.1.4.803:=8192))'
    ldap_conn.search(
        searchFilter=ldap_filter,
        attributes=['dNSHostName', 'operatingSystem']
    )
    
    for entry in ldap_conn.getResponseIterator():
        if isinstance(entry, SearchResultEntry):
            for attr in entry['attributes']:
                if str(attr['type']) == 'dNSHostName':
                    dc = str(attr['vals'][0])
                    print(f"Domain Controller: {dc}")

Find SPNs (Kerberoasting)

def find_spns(ldap_conn):
    """
    Find user accounts with SPNs for Kerberoasting
    """
    ldap_filter = '''
    (&
      (objectClass=user)
      (objectCategory=person)
      (servicePrincipalName=*)
      (!(samAccountName=krbtgt))
      (!(userAccountControl:1.2.840.113556.1.4.803:=2))
    )
    '''
    
    attributes = ['sAMAccountName', 'servicePrincipalName', 'memberOf']
    
    ldap_conn.search(
        searchFilter=ldap_filter,
        attributes=attributes
    )
    
    vulnerable_accounts = []
    for entry in ldap_conn.getResponseIterator():
        if isinstance(entry, SearchResultEntry):
            account = {}
            for attr in entry['attributes']:
                name = str(attr['type'])
                values = [str(val) for val in attr['vals']]
                account[name] = values
            vulnerable_accounts.append(account)
    
    return vulnerable_accounts

# Usage
spn_accounts = find_spns(ldap_conn)
for account in spn_accounts:
    print(f"\nUser: {account['sAMAccountName'][0]}")
    print("SPNs:")
    for spn in account.get('servicePrincipalName', []):
        print(f"  - {spn}")

Modify Operations

Add Object

from impacket.ldap.ldapasn1 import AddRequest, Attribute, AttributeValue

def create_user(ldap_conn, username, password, ou):
    """
    Create new user object
    """
    user_dn = f"CN={username},{ou}"
    
    attributes = [
        ('objectClass', ['top', 'person', 'organizationalPerson', 'user']),
        ('cn', [username]),
        ('sAMAccountName', [username]),
        ('userPrincipalName', [f"{username}@corp.local"]),
        ('displayName', [username]),
    ]
    
    ldap_conn.add(
        user_dn,
        attributes
    )
    
    print(f"User {username} created")

# Usage
create_user(
    ldap_conn,
    'testuser',
    'P@ssw0rd123',
    'OU=Users,DC=corp,DC=local'
)

Modify Object

from impacket.ldap.ldapasn1 import Operation

def modify_user_attributes(ldap_conn, user_dn, attributes):
    """
    Modify user attributes
    """
    # Operation types (from ldap.py:72-75)
    # MODIFY_ADD = 0
    # MODIFY_DELETE = 1  
    # MODIFY_REPLACE = 2
    
    modifications = []
    
    for attr_name, attr_value, operation in attributes:
        modifications.append({
            'operation': operation,
            'attribute': {
                'type': attr_name,
                'vals': attr_value if isinstance(attr_value, list) else [attr_value]
            }
        })
    
    ldap_conn.modify(
        user_dn,
        modifications
    )
    
    print(f"Modified {user_dn}")

# Usage - Add email
from impacket.ldap.ldap import MODIFY_REPLACE, MODIFY_ADD

modify_user_attributes(
    ldap_conn,
    'CN=John Doe,OU=Users,DC=corp,DC=local',
    [
        ('mail', '[email protected]', MODIFY_REPLACE),
        ('description', 'IT Administrator', MODIFY_REPLACE)
    ]
)

# Add to group
modify_user_attributes(
    ldap_conn,
    'CN=IT Group,OU=Groups,DC=corp,DC=local',
    [
        ('member', 'CN=John Doe,OU=Users,DC=corp,DC=local', MODIFY_ADD)
    ]
)

Delete Object

def delete_object(ldap_conn, dn):
    """
    Delete LDAP object
    """
    ldap_conn.delete(dn)
    print(f"Deleted {dn}")

# Usage
delete_object(
    ldap_conn,
    'CN=testuser,OU=Users,DC=corp,DC=local'
)

Modify DN (Rename/Move)

def rename_object(ldap_conn, old_dn, new_rdn, new_superior=None):
    """
    Rename or move LDAP object
    """
    ldap_conn.modifyDN(
        old_dn,
        new_rdn,
        deleteOldRdn=True,
        newSuperior=new_superior
    )
    
    print(f"Renamed {old_dn} to {new_rdn}")

# Rename user
rename_object(
    ldap_conn,
    'CN=John Doe,OU=Users,DC=corp,DC=local',
    'CN=Jane Doe'
)

# Move to different OU
rename_object(
    ldap_conn,
    'CN=Jane Doe,OU=Users,DC=corp,DC=local',
    'CN=Jane Doe',
    newSuperior='OU=Admins,DC=corp,DC=local'
)

Security Features

LDAP Signing

LDAP signing ensures message integrity. It’s enabled by default when using NTLM or Kerberos authentication.
# Signing is automatic with SASL authentication
ldap_conn = ldap.LDAPConnection(
    url='ldap://dc01.corp.local',
    baseDN='DC=corp,DC=local',
    signing=True  # Default
)

ldap_conn.login('user', 'pass', 'DOMAIN')
# Messages are now signed

Channel Binding

From ldap.py:152-170, channel binding for LDAPS:
# Channel binding is automatic with LDAPS
ldap_conn = ldap.LDAPConnection(
    url='ldaps://dc01.corp.local',
    baseDN='DC=corp,DC=local'
)

# TLS channel binding value computed from server certificate
# Prevents LDAP relay attacks
ldap_conn.login('user', 'pass', 'DOMAIN')

Advanced Queries

User Account Control Flags

# UAC flags for filtering
UAC_FLAGS = {
    'SCRIPT': 0x0001,
    'ACCOUNTDISABLE': 0x0002,
    'HOMEDIR_REQUIRED': 0x0008,
    'LOCKOUT': 0x0010,
    'PASSWD_NOTREQD': 0x0020,
    'PASSWD_CANT_CHANGE': 0x0040,
    'ENCRYPTED_TEXT_PWD_ALLOWED': 0x0080,
    'NORMAL_ACCOUNT': 0x0200,
    'WORKSTATION_TRUST_ACCOUNT': 0x1000,
    'SERVER_TRUST_ACCOUNT': 0x2000,
    'DONT_EXPIRE_PASSWORD': 0x10000,
    'SMARTCARD_REQUIRED': 0x40000,
    'TRUSTED_FOR_DELEGATION': 0x80000,
    'NOT_DELEGATED': 0x100000,
    'USE_DES_KEY_ONLY': 0x200000,
    'DONT_REQ_PREAUTH': 0x400000,
    'PASSWORD_EXPIRED': 0x800000,
}

# Find users with password never expires
ldap_filter = '''
(&
  (objectClass=user)
  (objectCategory=person)
  (userAccountControl:1.2.840.113556.1.4.803:=65536)
)
'''

# Find users that don't require Kerberos pre-auth (AS-REP Roasting)
ldap_filter = '''
(&
  (objectClass=user)
  (objectCategory=person)
  (userAccountControl:1.2.840.113556.1.4.803:=4194304)
)
'''

# Find computers trusted for unconstrained delegation
ldap_filter = '''
(&
  (objectClass=computer)
  (userAccountControl:1.2.840.113556.1.4.803:=524288)
)
'''

Time-based Queries

import datetime

def filetime_to_datetime(ft):
    """Convert Windows FILETIME to datetime"""
    return datetime.datetime(1601, 1, 1) + datetime.timedelta(microseconds=ft/10)

def datetime_to_filetime(dt):
    """Convert datetime to Windows FILETIME"""
    epoch = datetime.datetime(1601, 1, 1)
    return int((dt - epoch).total_seconds() * 10000000)

# Find users who haven't logged in for 90 days
ninety_days_ago = datetime.datetime.now() - datetime.timedelta(days=90)
ft = datetime_to_filetime(ninety_days_ago)

ldap_filter = f'''
(&
  (objectClass=user)
  (objectCategory=person)
  (lastLogon<={ft})
)
'''

# Find recently created accounts (last 7 days)
seven_days_ago = datetime.datetime.now() - datetime.timedelta(days=7)
ft = datetime_to_filetime(seven_days_ago)

ldap_filter = f'''
(&
  (objectClass=user)
  (objectCategory=person)
  (whenCreated>={ft})
)
'''

Error Handling

Always handle LDAP errors gracefully. Common errors include insufficient permissions, invalid filters, and connection issues.
from impacket.ldap.ldap import LDAPSessionError

try:
    ldap_conn = ldap.LDAPConnection('ldap://dc01.corp.local', 'DC=corp,DC=local')
    ldap_conn.login('user', 'pass', 'DOMAIN')
    
    ldap_conn.search(
        searchFilter='(objectClass=user)',
        attributes=['sAMAccountName']
    )
    
except LDAPSessionError as e:
    print(f"LDAP Error: {e}")
    
except Exception as e:
    print(f"Error: {e}")
    
finally:
    if ldap_conn:
        ldap_conn.close()

Complete Example: AD Enumerator

from impacket.ldap import ldap
from impacket.ldap.ldapasn1 import SearchResultEntry
import sys

def enumerate_ad(target, username, password, domain):
    """
    Comprehensive Active Directory enumeration
    """
    ldap_url = f'ldap://{target}'
    base_dn = ','.join([f'DC={x}' for x in domain.split('.')])
    
    print(f"[*] Connecting to {ldap_url}")
    ldap_conn = ldap.LDAPConnection(ldap_url, base_dn)
    
    try:
        print(f"[*] Authenticating as {domain}\\{username}")
        ldap_conn.login(username, password, domain)
        print("[+] Authentication successful\n")
        
        # Enumerate domain info
        print("[*] Domain Information:")
        ldap_conn.search(
            searchBase=base_dn,
            searchFilter='(objectClass=domain)',
            scope='base',
            attributes=['distinguishedName', 'name', 'whenCreated']
        )
        
        for entry in ldap_conn.getResponseIterator():
            if isinstance(entry, SearchResultEntry):
                for attr in entry['attributes']:
                    print(f"  {attr['type']}: {attr['vals'][0]}")
        
        # Count users
        print("\n[*] Enumerating users...")
        ldap_conn.search(
            searchFilter='(&(objectClass=user)(objectCategory=person))',
            attributes=['sAMAccountName'],
            sizeLimit=0
        )
        
        user_count = 0
        for entry in ldap_conn.getResponseIterator():
            if isinstance(entry, SearchResultEntry):
                user_count += 1
        
        print(f"[+] Found {user_count} users")
        
        # Count groups
        print("\n[*] Enumerating groups...")
        ldap_conn.search(
            searchFilter='(objectClass=group)',
            attributes=['sAMAccountName'],
            sizeLimit=0
        )
        
        group_count = 0
        for entry in ldap_conn.getResponseIterator():
            if isinstance(entry, SearchResultEntry):
                group_count += 1
        
        print(f"[+] Found {group_count} groups")
        
        # Find administrators
        print("\n[*] Enumerating Domain Admins...")
        ldap_conn.search(
            searchFilter='(memberOf=CN=Domain Admins,CN=Users,' + base_dn + ')',
            attributes=['sAMAccountName', 'distinguishedName']
        )
        
        print("Domain Admins:")
        for entry in ldap_conn.getResponseIterator():
            if isinstance(entry, SearchResultEntry):
                for attr in entry['attributes']:
                    if str(attr['type']) == 'sAMAccountName':
                        print(f"  - {attr['vals'][0]}")
        
        # Find computers
        print("\n[*] Enumerating computers...")
        ldap_conn.search(
            searchFilter='(objectClass=computer)',
            attributes=['dNSHostName', 'operatingSystem'],
            sizeLimit=10
        )
        
        print("Sample computers:")
        for entry in ldap_conn.getResponseIterator():
            if isinstance(entry, SearchResultEntry):
                hostname = ''
                os = ''
                for attr in entry['attributes']:
                    if str(attr['type']) == 'dNSHostName':
                        hostname = str(attr['vals'][0])
                    elif str(attr['type']) == 'operatingSystem':
                        os = str(attr['vals'][0])
                print(f"  - {hostname}: {os}")
        
        # Find SPNs
        print("\n[*] Finding accounts with SPNs...")
        ldap_conn.search(
            searchFilter='''
            (&
              (objectClass=user)
              (servicePrincipalName=*)
              (!(samAccountName=krbtgt))
            )
            ''',
            attributes=['sAMAccountName', 'servicePrincipalName']
        )
        
        spn_count = 0
        for entry in ldap_conn.getResponseIterator():
            if isinstance(entry, SearchResultEntry):
                spn_count += 1
                for attr in entry['attributes']:
                    if str(attr['type']) == 'sAMAccountName':
                        print(f"  - {attr['vals'][0]}")
        
        print(f"\n[+] Found {spn_count} accounts with SPNs")
        
    except Exception as e:
        print(f"[-] Error: {e}")
        return 1
        
    finally:
        ldap_conn.close()
        print("\n[*] Connection closed")
    
    return 0

if __name__ == '__main__':
    if len(sys.argv) < 5:
        print(f"Usage: {sys.argv[0]} <target> <username> <password> <domain>")
        sys.exit(1)
    
    target = sys.argv[1]
    username = sys.argv[2]
    password = sys.argv[3]
    domain = sys.argv[4]
    
    sys.exit(enumerate_ad(target, username, password, domain))

References

  • Source: impacket/ldap/
  • RFC 4511: LDAP Protocol
  • [MS-ADTS]: Active Directory Technical Specification
  • [MS-ADSC]: Active Directory Schema Classes

Build docs developers (and LLMs) love