SafeNetworking enriches firewall threat events with malware intelligence from AutoFocus. The processing pipeline implements intelligent caching, parallel processing, and adaptive rate limiting to maximize throughput while staying within API quotas.
Events are classified based on cache availability:
for hit in searchResponse.hits: domainName = hit['SFN']['domain_name'] # Check if domain is cached domainSearch = Search(index="sfn-domain-details") \ .query("match", name=domainName) if domainSearch.execute(): priDocIds.append(entry) # Primary: use cache else: secDocIds.append(entry) # Secondary: needs lookup
From project/dns/runner.py:44-66Primary Events: Domain intelligence cached in sfn-domain-details
Faster processing (no API call)
Cache validated for age (DNS_DOMAIN_INFO_MAX_AGE)
If expired, treated as secondary
Secondary Events: Domain not cached or cache expired
Both primary and secondary events are processed in parallel:
def processDNS(): priDocIds, secDocIds = unprocessedEventSearch() # Cap at 16 for AutoFocus minute point limits multiProcNum = min(app.config['DNS_POOL_COUNT'], 16) # Process primary events (cached domains) with Pool(multiProcNum) as pool: results = pool.map(searchDomain, priDocIds) # Process secondary events (AutoFocus lookups) with Pool(multiProcNum) as pool: results = pool.map(searchDomain, secDocIds)
From project/dns/runner.py:85-107
Rate Limiting: AutoFocus allows maximum 16 API calls per minute. The system enforces DNS_POOL_COUNT + URL_POOL_COUNT <= 16 to prevent quota violations.
AutoFocus searches are asynchronous. SafeNetworking polls for completion:
cookie = queryData['af_cookie']cookieURL = resultURL + cookie# Poll until timeout or completion thresholdfor timer in range(lookupTimeout): # Default: 2 minutes time.sleep(61) # Wait 1 minute cookieResults = requests.post(url=cookieURL, data=resultData) domainData = cookieResults.json() # Check if enough results returned if domainData['af_complete_percentage'] >= maxPercentage: # Default: 20% break
From project/dns/dnsutils.py:418-438
Performance Optimization: SafeNetworking doesn’t wait for 100% completion. If 20% of results are returned within 2 minutes, it proceeds with available data. This balances accuracy with throughput.
Each malware sample contains multiple tags. SafeNetworking extracts and caches them:
def processTagList(tagObj): tagList = list() sample = tagObj['_source'] if 'tag' in sample: for tagName in sample['tag']: tagData = processTag(tagName) # 2 API points if not cached tagList.append(tagData) return tagList
From project/dns/dnsutils.py:131-150The processTag() function checks sfn-tag-details cache before querying AutoFocus.
AutoFocus returns quota info with each response. SafeNetworking monitors and reacts:
if 'message' in queryData: if "Daily Bucket Exceeded" in queryData['message']: checkAfPoints(queryData['bucket_info']) # Retry after throttling/sleeping elif "Minute Bucket Exceeded" in queryData['message']: checkAfPoints(queryData['bucket_info']) # Retry after throttling/sleeping
IoT processing differs from DNS - it pulls from an external honeypot database:
def processIoT(): # Calculate time since last update latestTime = round(getLatestTime('sfn-iot-details')) # in minutes # Query external API for new IPs hpQuery = f"{app.config['IOT_DB_URL']}/query_sfn_ip?gap={latestTime}" queryResponse = requests.get(url=hpQuery) updateDict = literal_eval(queryResponse.text)
API failures are logged but don’t crash the system:
if 'af_cookie' not in queryData: app.logger.error(f"Unable to retrieve domain info from AutoFocus") # Return empty result set return [('2000-01-01T00:00:00', 'NA', [('No Samples Returned for Domain', ...)])]
# Process up to 1000 events per cycleapp.config['DNS_EVENT_QUERY_SIZE'] = 1000# Run cycle every 5 secondsapp.config['DNS_POOL_TIME'] = 5# Theoretical maximum: 12,000 events/minute# Actual: Limited by AutoFocus quota and cache hit rate