Skip to main content
The McritClient class provides a Python interface to interact with the MCRIT REST API. It supports all backend operations including sample submission, querying, matching, and data management.

Installation

Install MCRIT to get the client library:
pip install -e .
Or install from requirements:
pip install -r requirements.txt

Basic Usage

Initialization

from mcrit.client.McritClient import McritClient

# Connect to local server
client = McritClient()

# Connect to remote server
client = McritClient(mcrit_server="http://192.168.1.100:8000")

# Connect with API token
client = McritClient(
    mcrit_server="http://192.168.1.100:8000",
    apitoken="your_api_token"
)

# Connect with username
client = McritClient(
    mcrit_server="http://192.168.1.100:8000",
    apitoken="your_api_token",
    username="analyst"
)

Configuration

You can update credentials after initialization:
client.setApitoken("new_token")
client.setUsername("analyst")

Status and Version

Get Server Status

status = client.getStatus(with_pichash=True)
print(f"Families: {status['status']['num_families']}")
print(f"Samples: {status['status']['num_samples']}")
print(f"Functions: {status['status']['num_functions']}")

Get Server Version

version = client.getVersion()
print(f"MCRIT version: {version}")

Sample Submission

Submit SMDA Report

from smda.Disassembler import Disassembler
from smda.common.SmdaReport import SmdaReport

# Disassemble a file
disassembler = Disassembler()
smda_report = disassembler.disassembleFile("/path/to/sample.exe")

# Add metadata
smda_report.family = "malware_family"
smda_report.version = "v1.0"
smda_report.is_library = False

# Submit to MCRIT
sample_entry, job_id = client.addReport(smda_report)
print(f"Sample ID: {sample_entry.sample_id}")
print(f"Job ID: {job_id}")

Submit Binary Sample

# Read binary file
with open("/path/to/sample.exe", "rb") as f:
    binary_data = f.read()

# Submit binary
result = client.addBinarySample(
    binary=binary_data,
    filename="sample.exe",
    family="malware_family",
    version="v1.0",
    is_dump=False,
    base_addr=None,
    bitness=32
)

Submit Memory Dump

# Submit a memory dump with base address
with open("/path/to/dump.bin", "rb") as f:
    dump_data = f.read()

result = client.addBinarySample(
    binary=dump_data,
    filename="dump.bin",
    family="malware_family",
    version="v1.0",
    is_dump=True,
    base_addr=0x400000,
    bitness=32
)

Working with Families

Get Family Information

# Get a specific family
family_entry = client.getFamily(family_id=1, with_samples=True)
print(f"Family: {family_entry.family_name}")
print(f"Number of samples: {len(family_entry.samples)}")

# Get all families
families = client.getFamilies()
for family_id, family_entry in families.items():
    print(f"{family_id}: {family_entry.family_name}")

Check if Family Exists

if client.isFamilyId(1):
    print("Family exists")

Modify Family

# Rename family
client.modifyFamily(family_id=1, family_name="new_name")

# Mark as library
client.modifyFamily(family_id=1, is_library=True)

Delete Family

# Delete family but keep samples
client.deleteFamily(family_id=1, keep_samples=True)

# Delete family and all samples
client.deleteFamily(family_id=1, keep_samples=False)

Working with Samples

Get Sample Information

# Get sample by ID
sample = client.getSampleById(sample_id=1)
print(f"SHA256: {sample.sha256}")
print(f"Family: {sample.family}")
print(f"Functions: {sample.statistics['num_functions']}")

# Get sample by SHA256
sample = client.getSampleBySha256("ca29de1dc8817868c93e54b09f557fe14e40083c0955294df5bd91f52ba469c8")

# Get all samples
samples = client.getSamples(start=0, limit=100)

# Get samples by family
samples = client.getSamplesByFamilyId(family_id=1)

Check if Sample Exists

if client.isSampleId(1):
    print("Sample exists")

Modify Sample Metadata

client.modifySample(
    sample_id=1,
    family_name="new_family",
    version="v2.0",
    component="dropper",
    is_library=False
)

Delete Sample

client.deleteSample(sample_id=1)

Working with Functions

Get Function Information

# Get function by ID
function = client.getFunctionById(function_id=1, with_xcfg=False)
print(f"Function name: {function.function_name}")
print(f"Offset: 0x{function.offset:x}")

# Get function with control flow graph
function = client.getFunctionById(function_id=1, with_xcfg=True)

# Get all functions for a sample
functions = client.getFunctionsBySampleId(sample_id=1)

# Get multiple functions by IDs
function_ids = [1, 2, 3, 4, 5]
functions = client.getFunctionsByIds(function_ids)

# Get only labeled functions
functions = client.getFunctionsByIds(function_ids, with_label_only=True)

Check if Function Exists

if client.isFunctionId(1):
    print("Function exists")

Matching and Queries

Request Matches for Sample

# Request matching for an existing sample
job_id = client.requestMatchesForSample(
    sample_id=1,
    minhash_threshold=0.7,
    pichash_size=None,
    band_matches_required=None,
    force_recalculation=False
)

# Wait for results
result = client.awaitResult(job_id, sleep_time=2)

Query with SMDA Report

from smda.Disassembler import Disassembler

# Disassemble and query
disassembler = Disassembler()
smda_report = disassembler.disassembleFile("/path/to/unknown.exe")

job_id = client.requestMatchesForSmdaReport(
    smda_report=smda_report,
    minhash_threshold=0.7,
    pichash_size=None,
    band_matches_required=None,
    force_recalculation=False
)

result = client.awaitResult(job_id, sleep_time=2, compact=True)

Query with Binary

# Query unmapped binary
with open("/path/to/sample.exe", "rb") as f:
    binary_data = f.read()

job_id = client.requestMatchesForUnmappedBinary(
    binary=binary_data,
    minhash_threshold=0.7,
    disassemble_locally=True,
    force_recalculation=False
)

# Query mapped binary (memory dump)
with open("/path/to/dump.bin", "rb") as f:
    dump_data = f.read()

job_id = client.requestMatchesForMappedBinary(
    binary=dump_data,
    base_address=0x400000,
    minhash_threshold=0.7,
    disassemble_locally=True,
    force_recalculation=False
)

result = client.awaitResult(job_id)

Match Sample vs Sample

# Compare two samples
job_id = client.requestMatchesForSampleVs(
    sample_id=1,
    other_sample_id=2,
    minhash_threshold=0.7
)

result = client.awaitResult(job_id)

Cross Matching

# Match multiple samples against each other
sample_ids = [1, 2, 3, 4, 5]

job_id = client.requestMatchesCross(
    sample_ids=sample_ids,
    sample_group_only=True,  # Only match within this group
    minhash_threshold=0.7
)

result = client.awaitResult(job_id)

Function-Level Matching

# Match a single function
from smda.common.SmdaReport import SmdaReport

# Create SMDA report with single function
smda_report = SmdaReport()  # ... populate with function data

matches = client.getMatchesForSmdaFunction(
    smda_report=smda_report,
    minhash_threshold=0.7,
    exclude_self_matches=True
)

Query by Hash

# Query by PicHash
matches = client.getMatchesForPicHash(pichash=0x1234567890abcdef, summary=False)

# Query by PicBlockHash
matches = client.getMatchesForPicBlockHash(picblockhash=0x1234567890abcdef, summary=True)

Function vs Function Matching

# Compare two functions directly
match_result = client.getMatchFunctionVs(function_id_a=1, function_id_b=2)

Job Management

Get Job Information

# Get job data
job = client.getJobData(job_id="64243b27f3876416bffad86e")
print(f"Job state: {job.state}")
print(f"Progress: {job.progress}")

# Get all jobs
jobs = client.getQueueData(start=0, limit=100)

# Filter jobs by method
jobs = client.getQueueData(method="addBinarySample")

# Filter by state
jobs = client.getQueueData(state="finished")

# Get job count
count = client.getJobCount(filter="matching")

Get Queue Statistics

stats = client.getQueueStatistics(with_refresh=True)
print(f"Queued jobs: {stats['queued']}")
print(f"Running jobs: {stats['running']}")

Delete Jobs

import datetime

# Delete a specific job
client.deleteJob(job_id="64243b27f3876416bffad86e")

# Delete jobs by criteria
client.deleteQueueData(
    method="addBinarySample",
    created_before=datetime.datetime(2023, 1, 1),
    finished_before=datetime.datetime(2023, 6, 1)
)

Wait for Results

try:
    result = client.awaitResult(job_id, sleep_time=2, compact=False)
    print("Job completed successfully")
except JobTerminatedError:
    print("Job was terminated")

Get Results

# Get result by job ID
result = client.getResultForJob(job_id="64243b27f3876416bffad86e", compact=False)

# Get result by result ID
result = client.getResult(result_id="64243b28cbc77c2df4d8d79f", compact=True)

# Get job for a result
job = client.getJobForResult(result_id="64243b28cbc77c2df4d8d79f")

Search Families

# Search for families
results = client.search_families(
    search_term="wannacry",
    cursor=None,
    is_ascending=True,
    sort_by="family_name",
    limit=10
)

for family_id, family_data in results["search_results"].items():
    print(f"Family {family_id}: {family_data['family_name']}")

# Get next page
if results["cursor"]["forward"]:
    next_results = client.search_families(
        search_term="wannacry",
        cursor=results["cursor"]["forward"]
    )

Search Samples

results = client.search_samples(
    search_term="unpacked",
    cursor=None,
    is_ascending=True,
    sort_by="sha256",
    limit=20
)

for sample_id, sample_data in results["search_results"].items():
    print(f"Sample {sample_id}: {sample_data['filename']}")

Search Functions

results = client.search_functions(
    search_term="WinMain",
    cursor=None,
    is_ascending=True,
    sort_by="function_name",
    limit=50
)

for function_id, function_data in results["search_results"].items():
    print(f"Function {function_id}: {function_data['function_name']}")

Data Import/Export

For details on import and export operations, see the Data Import/Export Guide.

Export Data

# Export all data
export_data = client.getExportData(compress_data=True)

# Export specific samples
export_data = client.getExportData(sample_ids=[1, 2, 3], compress_data=True)

# Save to file
import json
with open("export.mcrit", "w") as f:
    json.dump(export_data, f)

Import Data

import json

# Load from file
with open("export.mcrit", "r") as f:
    import_data = json.load(f)

# Import into MCRIT
result = client.addImportData(import_data)
print(f"Imported: {result['num_samples_imported']} samples")
print(f"Skipped: {result['num_samples_skipped']} samples")

Unique Blocks

Find Unique Blocks in Samples

# Find unique blocks across samples
result = client.requestUniqueBlocksForSamples(sample_ids=[1, 2, 3])

Find Unique Blocks for Family

# Find unique blocks for a family
result = client.requestUniqueBlocksForFamily(family_id=1)

Maintenance Operations

Rebuild Index

# Rebuild the MinHash index
client.rebuildIndex()

Complete MinHashes

# Ensure all functions have MinHashes calculated
client.completeMinhashes()

Recalculate Hashes

# Recalculate PicHashes
client.recalculatePicHashes()

# Recalculate MinHashes
client.recalculateMinHashes()

Respawn Workers

# Restart worker processes
client.respawn()

Error Handling

from mcrit.client.McritClient import McritClient, JobTerminatedError

try:
    client = McritClient()
    job_id = client.requestMatchesForSample(sample_id=1)
    result = client.awaitResult(job_id, sleep_time=2)
except JobTerminatedError:
    print("Job was terminated")
except Exception as e:
    print(f"Error: {e}")

Raw Response Mode

Get raw HTTP responses for custom processing:
client = McritClient(raw_responses=True)

# Returns requests.Response object
response = client.getStatus()
if response.status_code == 200:
    data = response.json()

Best Practices

  • Reuse the same McritClient instance for multiple operations
  • Set appropriate timeouts for long-running operations
  • Use API tokens for authentication in production environments
  • Use awaitResult() to wait for asynchronous jobs to complete
  • Set appropriate sleep_time to balance responsiveness and server load
  • Use compact=True for large result sets to reduce memory usage
  • Use getFunctionsByIds() instead of multiple getFunctionById() calls
  • Process samples in batches for better performance
  • Use force_recalculation=False to leverage cached results
  • Use cursors for paginating through large result sets
  • Keep the same search_term, is_ascending, and sort_by when using cursors
  • Set appropriate limit values to avoid overwhelming the client

See Also

Build docs developers (and LLMs) love