Skip to main content

Status Methods

Methods for checking server status and version information.

getStatus

Get a status report of the MCRIT server with statistics.
def getStatus(self, with_pichash: bool = True) -> dict
with_pichash
bool
default:true
Include PicHash statistics in the status report
Returns: Dictionary containing server status and statistics Example:
client = McritClient()
status = client.getStatus()
print(f"Total samples: {status['num_samples']}")
print(f"Total families: {status['num_families']}")

getVersion

Get the version of the MCRIT server.
def getVersion(self) -> str
Returns: Version string of the MCRIT server Example:
version = client.getVersion()
print(f"MCRIT version: {version}")

Sample Methods

Methods for managing and querying binary samples.

addBinarySample

Submit a binary sample for analysis.
def addBinarySample(
    self,
    binary: bytes,
    filename: str = None,
    family: str = None,
    version: str = None,
    is_dump: bool = False,
    base_addr: int = None,
    bitness: int = None
) -> dict
binary
bytes
required
The binary file content as bytes
filename
str
Original filename of the binary
family
str
Malware family name (e.g., “emotet”, “trickbot”)
version
str
Version identifier for the sample
is_dump
bool
default:false
Whether the binary is a memory dump
base_addr
int
Base address for memory dumps (hexadecimal)
bitness
int
Architecture bitness (32 or 64)
Returns: Dictionary with sample information and optional job_id Example:
with open("malware.exe", "rb") as f:
    binary = f.read()

result = client.addBinarySample(
    binary=binary,
    filename="malware.exe",
    family="emotet",
    version="2023.1",
    bitness=32
)
print(f"Sample ID: {result['sample_info']['sample_id']}")

addReport

Add a pre-disassembled SMDA report.
def addReport(self, smda_report: SmdaReport) -> Tuple[SampleEntry, Optional[str]]
smda_report
SmdaReport
required
SMDA disassembly report object
Returns: Tuple of (SampleEntry, job_id) Example:
from smda.Disassembler import Disassembler

disassembler = Disassembler()
report = disassembler.disassembleFile("sample.exe")

sample_entry, job_id = client.addReport(report)
print(f"Added sample: {sample_entry.sample_id}")

getSamples

Get all sample entries with optional pagination.
def getSamples(self, start: int = 0, limit: int = 0) -> Dict[int, SampleEntry]
start
int
default:0
Starting sample ID for pagination
limit
int
default:0
Maximum number of samples to return (0 = all)
Returns: Dictionary mapping sample IDs to SampleEntry objects Example:
# Get first 100 samples
samples = client.getSamples(start=0, limit=100)
for sample_id, sample in samples.items():
    print(f"{sample_id}: {sample.filename}")

getSampleById

Get a specific sample by its ID.
def getSampleById(self, sample_id: int) -> Optional[SampleEntry]
sample_id
int
required
The unique identifier of the sample
Returns: SampleEntry object or None if not found Example:
sample = client.getSampleById(12345)
if sample:
    print(f"Filename: {sample.filename}")
    print(f"SHA256: {sample.sha256}")
    print(f"Family: {sample.family}")

getSampleBySha256

Get a sample by its SHA256 hash.
def getSampleBySha256(self, sample_sha256: str) -> Optional[SampleEntry]
sample_sha256
str
required
SHA256 hash of the sample (hexadecimal string)
Returns: SampleEntry object or None if not found Example:
sha256 = "a1b2c3d4e5f6..."
sample = client.getSampleBySha256(sha256)
if sample:
    print(f"Found sample ID: {sample.sample_id}")

getSamplesByFamilyId

Get all samples belonging to a specific family.
def getSamplesByFamilyId(self, family_id: int) -> Optional[List[SampleEntry]]
family_id
int
required
The unique identifier of the family
Returns: List of SampleEntry objects Example:
samples = client.getSamplesByFamilyId(42)
for sample in samples:
    print(f"Sample: {sample.filename}")

modifySample

Update sample metadata.
def modifySample(
    self,
    sample_id: int,
    family_name: str = None,
    version: str = None,
    component: str = None,
    is_library: bool = None
) -> dict
sample_id
int
required
ID of the sample to modify
family_name
str
New family name
version
str
New version string
component
str
Component identifier
is_library
bool
Mark sample as library code
Example:
client.modifySample(
    sample_id=12345,
    family_name="emotet_v2",
    version="2023.2"
)

deleteSample

Delete a sample from the database.
def deleteSample(self, sample_id: int) -> dict
sample_id
int
required
ID of the sample to delete
Example:
result = client.deleteSample(12345)

isSampleId

Check if a sample ID exists.
def isSampleId(self, sample_id: int) -> bool
sample_id
int
required
Sample ID to check
Returns: True if the sample exists, False otherwise

Family Methods

Methods for managing malware families.

getFamilies

Get all family entries.
def getFamilies(self) -> Optional[Dict[int, FamilyEntry]]
Returns: Dictionary mapping family IDs to FamilyEntry objects Example:
families = client.getFamilies()
for family_id, family in families.items():
    print(f"{family_id}: {family.family_name} ({len(family.samples)} samples)")

getFamily

Get a specific family by its ID.
def getFamily(self, family_id: int, with_samples: bool = True) -> Optional[FamilyEntry]
family_id
int
required
The unique identifier of the family
with_samples
bool
default:true
Include sample information in the response
Returns: FamilyEntry object or None if not found Example:
family = client.getFamily(42, with_samples=True)
if family:
    print(f"Family: {family.family_name}")
    print(f"Samples: {len(family.samples)}")

modifyFamily

Update family metadata.
def modifyFamily(
    self,
    family_id: int,
    family_name: str = None,
    is_library: bool = None
) -> dict
family_id
int
required
ID of the family to modify
family_name
str
New family name
is_library
bool
Mark family as library code
Example:
client.modifyFamily(
    family_id=42,
    family_name="emotet_v2",
    is_library=False
)

deleteFamily

Delete a family from the database.
def deleteFamily(self, family_id: int, keep_samples: bool = False) -> dict
family_id
int
required
ID of the family to delete
keep_samples
bool
default:false
If True, samples are retained without family assignment
Example:
# Delete family but keep samples
client.deleteFamily(42, keep_samples=True)

isFamilyId

Check if a family ID exists.
def isFamilyId(self, family_id: int) -> bool
family_id
int
required
Family ID to check
Returns: True if the family exists, False otherwise

Function Methods

Methods for querying function-level information.

getFunctions

Get all function entries with optional pagination.
def getFunctions(self, start: int = 0, limit: int = 0) -> Dict[int, FunctionEntry]
start
int
default:0
Starting function ID for pagination
limit
int
default:0
Maximum number of functions to return (0 = all)
Returns: Dictionary mapping function IDs to FunctionEntry objects

getFunctionById

Get a specific function by its ID.
def getFunctionById(self, function_id: int, with_xcfg: bool = False) -> Optional[FunctionEntry]
function_id
int
required
The unique identifier of the function
with_xcfg
bool
default:false
Include extended control flow graph data
Returns: FunctionEntry object or None if not found Example:
function = client.getFunctionById(789, with_xcfg=True)
if function:
    print(f"Function: {function.function_name}")
    print(f"Offset: 0x{function.offset:x}")

getFunctionsBySampleId

Get all functions for a specific sample.
def getFunctionsBySampleId(self, sample_id: int) -> List[FunctionEntry]
sample_id
int
required
ID of the sample
Returns: List of FunctionEntry objects Example:
functions = client.getFunctionsBySampleId(12345)
print(f"Found {len(functions)} functions")
for func in functions:
    print(f"  - 0x{func.offset:x}: {func.function_name}")

getFunctionsByIds

Get multiple functions by their IDs.
def getFunctionsByIds(
    self,
    function_ids: List[int],
    with_label_only: bool = False
) -> Dict[int, FunctionEntry]
function_ids
List[int]
required
List of function IDs to retrieve
with_label_only
bool
default:false
Only return functions with labels
Returns: Dictionary mapping function IDs to FunctionEntry objects Example:
function_ids = [100, 101, 102, 103]
functions = client.getFunctionsByIds(function_ids)

isFunctionId

Check if a function ID exists.
def isFunctionId(self, function_id: int) -> bool
function_id
int
required
Function ID to check
Returns: True if the function exists, False otherwise

Matching Methods

Methods for requesting and retrieving code similarity matches.

requestMatchesForSmdaReport

Request matches for a disassembled SMDA report.
def requestMatchesForSmdaReport(
    self,
    smda_report: SmdaReport,
    minhash_threshold: float = None,
    pichash_size: int = None,
    band_matches_required: int = None,
    force_recalculation: bool = False
) -> str
smda_report
SmdaReport
required
SMDA disassembly report
minhash_threshold
float
Minimum similarity threshold (0.0 to 1.0)
pichash_size
int
PicHash size parameter
band_matches_required
int
Number of band matches required
force_recalculation
bool
default:false
Force recalculation of hashes
Returns: Job ID for tracking the matching job Example:
from smda.Disassembler import Disassembler

disassembler = Disassembler()
report = disassembler.disassembleFile("unknown.exe")

job_id = client.requestMatchesForSmdaReport(
    report,
    minhash_threshold=0.8
)

# Wait for results
result = client.awaitResult(job_id)

requestMatchesForUnmappedBinary

Request matches for an unmapped binary file.
def requestMatchesForUnmappedBinary(
    self,
    binary: bytes,
    minhash_threshold: float = None,
    pichash_size: int = None,
    band_matches_required: int = None,
    disassemble_locally: bool = True,
    force_recalculation: bool = False
) -> str
binary
bytes
required
Binary file content
disassemble_locally
bool
default:true
Disassemble locally before sending (faster)
Returns: Job ID for tracking the matching job Example:
with open("sample.exe", "rb") as f:
    binary = f.read()

job_id = client.requestMatchesForUnmappedBinary(
    binary,
    minhash_threshold=0.75,
    disassemble_locally=True
)

requestMatchesForMappedBinary

Request matches for a mapped binary (memory dump).
def requestMatchesForMappedBinary(
    self,
    binary: bytes,
    base_address: int,
    minhash_threshold: float = None,
    pichash_size: int = None,
    band_matches_required: int = None,
    disassemble_locally: bool = True,
    force_recalculation: bool = False
) -> str
binary
bytes
required
Binary file content
base_address
int
required
Base address where the binary was loaded in memory
Returns: Job ID for tracking the matching job

requestMatchesForSample

Request matches for an existing sample in the database.
def requestMatchesForSample(
    self,
    sample_id: int,
    minhash_threshold: float = None,
    pichash_size: int = None,
    band_matches_required: int = None,
    force_recalculation: bool = False
) -> str
sample_id
int
required
ID of the sample to match against the database
Returns: Job ID for tracking the matching job Example:
job_id = client.requestMatchesForSample(
    sample_id=12345,
    minhash_threshold=0.8
)
result = client.awaitResult(job_id)

requestMatchesForSampleVs

Request matches between two specific samples.
def requestMatchesForSampleVs(
    self,
    sample_id: int,
    other_sample_id: int,
    minhash_threshold: float = None,
    pichash_size: int = None,
    band_matches_required: int = None,
    force_recalculation: bool = False
) -> str
sample_id
int
required
ID of the first sample
other_sample_id
int
required
ID of the second sample
Returns: Job ID for tracking the matching job Example:
job_id = client.requestMatchesForSampleVs(
    sample_id=100,
    other_sample_id=200
)
result = client.awaitResult(job_id)

requestMatchesCross

Request cross-matching between multiple samples.
def requestMatchesCross(
    self,
    sample_ids: List[int],
    sample_group_only: bool = False,
    minhash_threshold: float = None,
    pichash_size: int = None,
    band_matches_required: int = None,
    force_recalculation: bool = False
) -> str
sample_ids
List[int]
required
List of sample IDs to cross-match
sample_group_only
bool
default:false
Only match within the provided sample group
Returns: Job ID for tracking the matching job Example:
sample_ids = [100, 101, 102, 103]
job_id = client.requestMatchesCross(
    sample_ids,
    sample_group_only=True,
    minhash_threshold=0.7
)

getMatchFunctionVs

Get match information between two specific functions.
def getMatchFunctionVs(
    self,
    function_id_a: int,
    function_id_b: int
) -> dict
function_id_a
int
required
ID of the first function
function_id_b
int
required
ID of the second function
Returns: Dictionary with matching information

getMatchesForSmdaFunction

Get matches for a single function from an SMDA report.
def getMatchesForSmdaFunction(
    self,
    smda_report: SmdaReport,
    minhash_threshold: float = None,
    pichash_size: int = None,
    force_recalculation: bool = None,
    band_matches_required: int = None,
    exclude_self_matches: bool = False
) -> dict
smda_report
SmdaReport
required
SMDA report containing a single function
exclude_self_matches
bool
default:false
Exclude matches to the same function
Returns: Dictionary with matching results

getMatchesForPicHash

Get all matches for a specific PicHash value.
def getMatchesForPicHash(self, pichash: int, summary: bool = False) -> dict
pichash
int
required
PicHash value (64-bit integer)
summary
bool
default:false
Return only summary information
Returns: Dictionary with matching functions Example:
matches = client.getMatchesForPicHash(0x1234567890abcdef)
for match in matches:
    print(f"Function: {match['function_id']}")

getMatchesForPicBlockHash

Get all matches for a specific PicBlockHash value.
def getMatchesForPicBlockHash(self, picblockhash: int, summary: bool = False) -> dict
picblockhash
int
required
PicBlockHash value (64-bit integer)
summary
bool
default:false
Return only summary information
Returns: Dictionary with matching basic blocks

Query Methods

Methods for searching and querying the database.

search_families

Search for families by name.
def search_families(
    self,
    search_term: str,
    cursor: str = None,
    is_ascending: bool = True,
    sort_by: str = None,
    limit: int = None
) -> dict
search_term
str
required
Search query string
cursor
str
Pagination cursor from previous search
is_ascending
bool
default:true
Sort order
sort_by
str
Field to sort by
limit
int
Maximum results per page
Returns: Dictionary with search results and pagination cursors Example:
result = client.search_families("emotet", limit=50)
families = result["search_results"]
for family_id, family in families.items():
    print(f"{family_id}: {family['family_name']}")

# Get next page
if result["cursor"]["forward"]:
    next_result = client.search_families(
        "emotet",
        cursor=result["cursor"]["forward"],
        limit=50
    )

search_samples

Search for samples by filename or hash.
def search_samples(
    self,
    search_term: str,
    cursor: str = None,
    is_ascending: bool = True,
    sort_by: str = None,
    limit: int = None
) -> dict
search_term
str
required
Search query string (filename, hash, etc.)
Returns: Dictionary with search results and pagination cursors Example:
result = client.search_samples("malware.exe")
for sample_id, sample in result["search_results"].items():
    print(f"{sample_id}: {sample['filename']}")

search_functions

Search for functions by name or characteristics.
def search_functions(
    self,
    search_term: str,
    cursor: str = None,
    is_ascending: bool = True,
    sort_by: str = None,
    limit: int = None
) -> dict
search_term
str
required
Search query string
Returns: Dictionary with search results and pagination cursors

Job Methods

Methods for managing and monitoring asynchronous jobs.

getJobData

Get information about a specific job.
def getJobData(self, job_id: str) -> Job
job_id
str
required
Unique identifier of the job
Returns: Job object with status information Example:
job = client.getJobData(job_id)
print(f"Status: {job.status}")
print(f"Progress: {job.progress}%")
if job.is_finished:
    print(f"Result ID: {job.result}")

getJobCount

Get the count of jobs matching a filter.
def getJobCount(self, filter: str = None) -> int
filter
str
Filter expression for jobs
Returns: Number of jobs matching the filter

getQueueData

Get queue data with filtering options.
def getQueueData(
    self,
    start: int = 0,
    limit: int = 0,
    method: str = None,
    filter: str = None,
    state: str = None,
    ascending: bool = False
) -> List[Job]
start
int
default:0
Starting position for pagination
limit
int
default:0
Maximum number of jobs to return
method
str
Filter by job method
filter
str
Additional filter expression
state
str
Filter by job state (“queued”, “started”, “finished”, “failed”)
ascending
bool
default:false
Sort order
Returns: List of Job objects Example:
# Get all running jobs
jobs = client.getQueueData(state="started", limit=100)
for job in jobs:
    print(f"{job.job_id}: {job.method} - {job.progress}%")

getQueueStatistics

Get summary statistics about the job queue.
def getQueueStatistics(self, with_refresh: bool = False) -> dict
with_refresh
bool
default:false
Refresh statistics before returning
Returns: Dictionary with queue statistics Example:
stats = client.getQueueStatistics()
print(f"Queued jobs: {stats['queued']}")
print(f"Running jobs: {stats['started']}")
print(f"Finished jobs: {stats['finished']}")

getResultForJob

Get the result for a completed job.
def getResultForJob(self, job_id: str, compact: bool = False) -> dict
job_id
str
required
Job identifier
compact
bool
default:false
Return compact result format
Returns: Dictionary with job results Example:
result = client.getResultForJob(job_id, compact=True)
if result:
    print(f"Matches found: {len(result['matches'])}")

getResult

Get a result by its result ID.
def getResult(self, result_id: str, compact: bool = False) -> dict
result_id
str
required
Result identifier
compact
bool
default:false
Return compact result format
Returns: Dictionary with result data

getJobForResult

Get the job information for a specific result.
def getJobForResult(self, result_id: str) -> Job
result_id
str
required
Result identifier
Returns: Job object

awaitResult

Wait for a job to complete and return its result.
def awaitResult(
    self,
    job_id: str,
    sleep_time: int = 2,
    compact: bool = False
) -> dict
job_id
str
required
Job identifier to wait for
sleep_time
int
default:2
Seconds to wait between status checks
compact
bool
default:false
Return compact result format
Returns: Dictionary with job results Raises: JobTerminatedError if the job is terminated Example:
from mcrit.client.McritClient import JobTerminatedError

job_id = client.requestMatchesForSample(12345)

try:
    result = client.awaitResult(job_id, sleep_time=5)
    print(f"Job completed successfully")
except JobTerminatedError:
    print(f"Job was terminated")

deleteJob

Delete a job from the queue.
def deleteJob(self, job_id: str) -> dict
job_id
str
required
Job identifier to delete

deleteQueueData

Delete multiple jobs matching criteria.
def deleteQueueData(
    self,
    method: str = None,
    created_before: datetime.datetime = None,
    finished_before: datetime.datetime = None
) -> dict
method
str
Delete jobs of this method type
created_before
datetime
Delete jobs created before this timestamp
finished_before
datetime
Delete jobs finished before this timestamp
Example:
from datetime import datetime, timedelta

# Delete finished jobs older than 7 days
week_ago = datetime.now() - timedelta(days=7)
client.deleteQueueData(finished_before=week_ago)

Import/Export Methods

Methods for importing and exporting MCRIT data.

getExportData

Export samples and their data.
def getExportData(
    self,
    sample_ids: List[int] = None,
    compress_data: bool = True
) -> dict
sample_ids
List[int]
List of sample IDs to export (None = all samples)
compress_data
bool
default:true
Compress the export data
Returns: Dictionary with export data Example:
import json

# Export specific samples
export_data = client.getExportData(sample_ids=[100, 101, 102])

# Save to file
with open("export.json", "w") as f:
    json.dump(export_data, f)

addImportData

Import previously exported data.
def addImportData(self, import_data: dict) -> dict
import_data
dict
required
Dictionary with import data (from getExportData)
Returns: Import result information Example:
import json

# Load from file
with open("export.json", "r") as f:
    import_data = json.load(f)

# Import into MCRIT
result = client.addImportData(import_data)
print(f"Import completed: {result}")

Unique Blocks Methods

Methods for analyzing unique code blocks.

requestUniqueBlocksForSamples

Request unique block analysis for multiple samples.
def requestUniqueBlocksForSamples(self, sample_ids: List[int]) -> dict
sample_ids
List[int]
required
List of sample IDs to analyze
Returns: Dictionary with unique block information Example:
sample_ids = [100, 101, 102]
unique_blocks = client.requestUniqueBlocksForSamples(sample_ids)
for sample_id, blocks in unique_blocks.items():
    print(f"Sample {sample_id}: {len(blocks)} unique blocks")

requestUniqueBlocksForFamily

Request unique block analysis for a family.
def requestUniqueBlocksForFamily(self, family_id: int) -> dict
family_id
int
required
Family ID to analyze
Returns: Dictionary with unique block information Example:
unique_blocks = client.requestUniqueBlocksForFamily(42)

Maintenance Methods

Methods for server maintenance and recalculation.

completeMinhashes

Complete missing MinHash calculations.
def completeMinhashes(self) -> dict
Returns: Status information

rebuildIndex

Rebuild the matching index.
def rebuildIndex(self) -> dict
Returns: Status information

recalculatePicHashes

Recalculate all PicHash values.
def recalculatePicHashes(self) -> dict
Returns: Status information

recalculateMinHashes

Recalculate all MinHash values.
def recalculateMinHashes(self) -> dict
Returns: Status information

respawn

Restart worker processes.
def respawn(self) -> dict
Returns: Status information

Build docs developers (and LLMs) love