Status Methods
Methods for checking server status and version information.
getStatus
Get a status report of the MCRIT server with statistics.
def getStatus(self, with_pichash: bool = True) -> dict
Include PicHash statistics in the status report
Returns: Dictionary containing server status and statistics
Example:
client = McritClient()
status = client.getStatus()
print(f"Total samples: {status['num_samples']}")
print(f"Total families: {status['num_families']}")
getVersion
Get the version of the MCRIT server.
def getVersion(self) -> str
Returns: Version string of the MCRIT server
Example:
version = client.getVersion()
print(f"MCRIT version: {version}")
Sample Methods
Methods for managing and querying binary samples.
addBinarySample
Submit a binary sample for analysis.
def addBinarySample(
self,
binary: bytes,
filename: str = None,
family: str = None,
version: str = None,
is_dump: bool = False,
base_addr: int = None,
bitness: int = None
) -> dict
The binary file content as bytes
Original filename of the binary
Malware family name (e.g., “emotet”, “trickbot”)
Version identifier for the sample
Whether the binary is a memory dump
Base address for memory dumps (hexadecimal)
Architecture bitness (32 or 64)
Returns: Dictionary with sample information and optional job_id
Example:
with open("malware.exe", "rb") as f:
binary = f.read()
result = client.addBinarySample(
binary=binary,
filename="malware.exe",
family="emotet",
version="2023.1",
bitness=32
)
print(f"Sample ID: {result['sample_info']['sample_id']}")
addReport
Add a pre-disassembled SMDA report.
def addReport(self, smda_report: SmdaReport) -> Tuple[SampleEntry, Optional[str]]
SMDA disassembly report object
Returns: Tuple of (SampleEntry, job_id)
Example:
from smda.Disassembler import Disassembler
disassembler = Disassembler()
report = disassembler.disassembleFile("sample.exe")
sample_entry, job_id = client.addReport(report)
print(f"Added sample: {sample_entry.sample_id}")
getSamples
Get all sample entries with optional pagination.
def getSamples(self, start: int = 0, limit: int = 0) -> Dict[int, SampleEntry]
Starting sample ID for pagination
Maximum number of samples to return (0 = all)
Returns: Dictionary mapping sample IDs to SampleEntry objects
Example:
# Get first 100 samples
samples = client.getSamples(start=0, limit=100)
for sample_id, sample in samples.items():
print(f"{sample_id}: {sample.filename}")
getSampleById
Get a specific sample by its ID.
def getSampleById(self, sample_id: int) -> Optional[SampleEntry]
The unique identifier of the sample
Returns: SampleEntry object or None if not found
Example:
sample = client.getSampleById(12345)
if sample:
print(f"Filename: {sample.filename}")
print(f"SHA256: {sample.sha256}")
print(f"Family: {sample.family}")
getSampleBySha256
Get a sample by its SHA256 hash.
def getSampleBySha256(self, sample_sha256: str) -> Optional[SampleEntry]
SHA256 hash of the sample (hexadecimal string)
Returns: SampleEntry object or None if not found
Example:
sha256 = "a1b2c3d4e5f6..."
sample = client.getSampleBySha256(sha256)
if sample:
print(f"Found sample ID: {sample.sample_id}")
getSamplesByFamilyId
Get all samples belonging to a specific family.
def getSamplesByFamilyId(self, family_id: int) -> Optional[List[SampleEntry]]
The unique identifier of the family
Returns: List of SampleEntry objects
Example:
samples = client.getSamplesByFamilyId(42)
for sample in samples:
print(f"Sample: {sample.filename}")
modifySample
Update sample metadata.
def modifySample(
self,
sample_id: int,
family_name: str = None,
version: str = None,
component: str = None,
is_library: bool = None
) -> dict
ID of the sample to modify
Mark sample as library code
Example:
client.modifySample(
sample_id=12345,
family_name="emotet_v2",
version="2023.2"
)
deleteSample
Delete a sample from the database.
def deleteSample(self, sample_id: int) -> dict
ID of the sample to delete
Example:
result = client.deleteSample(12345)
isSampleId
Check if a sample ID exists.
def isSampleId(self, sample_id: int) -> bool
Returns: True if the sample exists, False otherwise
Family Methods
Methods for managing malware families.
getFamilies
Get all family entries.
def getFamilies(self) -> Optional[Dict[int, FamilyEntry]]
Returns: Dictionary mapping family IDs to FamilyEntry objects
Example:
families = client.getFamilies()
for family_id, family in families.items():
print(f"{family_id}: {family.family_name} ({len(family.samples)} samples)")
getFamily
Get a specific family by its ID.
def getFamily(self, family_id: int, with_samples: bool = True) -> Optional[FamilyEntry]
The unique identifier of the family
Include sample information in the response
Returns: FamilyEntry object or None if not found
Example:
family = client.getFamily(42, with_samples=True)
if family:
print(f"Family: {family.family_name}")
print(f"Samples: {len(family.samples)}")
modifyFamily
Update family metadata.
def modifyFamily(
self,
family_id: int,
family_name: str = None,
is_library: bool = None
) -> dict
ID of the family to modify
Mark family as library code
Example:
client.modifyFamily(
family_id=42,
family_name="emotet_v2",
is_library=False
)
deleteFamily
Delete a family from the database.
def deleteFamily(self, family_id: int, keep_samples: bool = False) -> dict
ID of the family to delete
If True, samples are retained without family assignment
Example:
# Delete family but keep samples
client.deleteFamily(42, keep_samples=True)
isFamilyId
Check if a family ID exists.
def isFamilyId(self, family_id: int) -> bool
Returns: True if the family exists, False otherwise
Function Methods
Methods for querying function-level information.
getFunctions
Get all function entries with optional pagination.
def getFunctions(self, start: int = 0, limit: int = 0) -> Dict[int, FunctionEntry]
Starting function ID for pagination
Maximum number of functions to return (0 = all)
Returns: Dictionary mapping function IDs to FunctionEntry objects
getFunctionById
Get a specific function by its ID.
def getFunctionById(self, function_id: int, with_xcfg: bool = False) -> Optional[FunctionEntry]
The unique identifier of the function
Include extended control flow graph data
Returns: FunctionEntry object or None if not found
Example:
function = client.getFunctionById(789, with_xcfg=True)
if function:
print(f"Function: {function.function_name}")
print(f"Offset: 0x{function.offset:x}")
getFunctionsBySampleId
Get all functions for a specific sample.
def getFunctionsBySampleId(self, sample_id: int) -> List[FunctionEntry]
Returns: List of FunctionEntry objects
Example:
functions = client.getFunctionsBySampleId(12345)
print(f"Found {len(functions)} functions")
for func in functions:
print(f" - 0x{func.offset:x}: {func.function_name}")
getFunctionsByIds
Get multiple functions by their IDs.
def getFunctionsByIds(
self,
function_ids: List[int],
with_label_only: bool = False
) -> Dict[int, FunctionEntry]
List of function IDs to retrieve
Only return functions with labels
Returns: Dictionary mapping function IDs to FunctionEntry objects
Example:
function_ids = [100, 101, 102, 103]
functions = client.getFunctionsByIds(function_ids)
isFunctionId
Check if a function ID exists.
def isFunctionId(self, function_id: int) -> bool
Returns: True if the function exists, False otherwise
Matching Methods
Methods for requesting and retrieving code similarity matches.
requestMatchesForSmdaReport
Request matches for a disassembled SMDA report.
def requestMatchesForSmdaReport(
self,
smda_report: SmdaReport,
minhash_threshold: float = None,
pichash_size: int = None,
band_matches_required: int = None,
force_recalculation: bool = False
) -> str
Minimum similarity threshold (0.0 to 1.0)
Number of band matches required
Force recalculation of hashes
Returns: Job ID for tracking the matching job
Example:
from smda.Disassembler import Disassembler
disassembler = Disassembler()
report = disassembler.disassembleFile("unknown.exe")
job_id = client.requestMatchesForSmdaReport(
report,
minhash_threshold=0.8
)
# Wait for results
result = client.awaitResult(job_id)
requestMatchesForUnmappedBinary
Request matches for an unmapped binary file.
def requestMatchesForUnmappedBinary(
self,
binary: bytes,
minhash_threshold: float = None,
pichash_size: int = None,
band_matches_required: int = None,
disassemble_locally: bool = True,
force_recalculation: bool = False
) -> str
Disassemble locally before sending (faster)
Returns: Job ID for tracking the matching job
Example:
with open("sample.exe", "rb") as f:
binary = f.read()
job_id = client.requestMatchesForUnmappedBinary(
binary,
minhash_threshold=0.75,
disassemble_locally=True
)
Request matches for a mapped binary (memory dump).
def requestMatchesForMappedBinary(
self,
binary: bytes,
base_address: int,
minhash_threshold: float = None,
pichash_size: int = None,
band_matches_required: int = None,
disassemble_locally: bool = True,
force_recalculation: bool = False
) -> str
Base address where the binary was loaded in memory
Returns: Job ID for tracking the matching job
requestMatchesForSample
Request matches for an existing sample in the database.
def requestMatchesForSample(
self,
sample_id: int,
minhash_threshold: float = None,
pichash_size: int = None,
band_matches_required: int = None,
force_recalculation: bool = False
) -> str
ID of the sample to match against the database
Returns: Job ID for tracking the matching job
Example:
job_id = client.requestMatchesForSample(
sample_id=12345,
minhash_threshold=0.8
)
result = client.awaitResult(job_id)
requestMatchesForSampleVs
Request matches between two specific samples.
def requestMatchesForSampleVs(
self,
sample_id: int,
other_sample_id: int,
minhash_threshold: float = None,
pichash_size: int = None,
band_matches_required: int = None,
force_recalculation: bool = False
) -> str
Returns: Job ID for tracking the matching job
Example:
job_id = client.requestMatchesForSampleVs(
sample_id=100,
other_sample_id=200
)
result = client.awaitResult(job_id)
requestMatchesCross
Request cross-matching between multiple samples.
def requestMatchesCross(
self,
sample_ids: List[int],
sample_group_only: bool = False,
minhash_threshold: float = None,
pichash_size: int = None,
band_matches_required: int = None,
force_recalculation: bool = False
) -> str
List of sample IDs to cross-match
Only match within the provided sample group
Returns: Job ID for tracking the matching job
Example:
sample_ids = [100, 101, 102, 103]
job_id = client.requestMatchesCross(
sample_ids,
sample_group_only=True,
minhash_threshold=0.7
)
getMatchFunctionVs
Get match information between two specific functions.
def getMatchFunctionVs(
self,
function_id_a: int,
function_id_b: int
) -> dict
ID of the second function
Returns: Dictionary with matching information
getMatchesForSmdaFunction
Get matches for a single function from an SMDA report.
def getMatchesForSmdaFunction(
self,
smda_report: SmdaReport,
minhash_threshold: float = None,
pichash_size: int = None,
force_recalculation: bool = None,
band_matches_required: int = None,
exclude_self_matches: bool = False
) -> dict
SMDA report containing a single function
Exclude matches to the same function
Returns: Dictionary with matching results
getMatchesForPicHash
Get all matches for a specific PicHash value.
def getMatchesForPicHash(self, pichash: int, summary: bool = False) -> dict
PicHash value (64-bit integer)
Return only summary information
Returns: Dictionary with matching functions
Example:
matches = client.getMatchesForPicHash(0x1234567890abcdef)
for match in matches:
print(f"Function: {match['function_id']}")
getMatchesForPicBlockHash
Get all matches for a specific PicBlockHash value.
def getMatchesForPicBlockHash(self, picblockhash: int, summary: bool = False) -> dict
PicBlockHash value (64-bit integer)
Return only summary information
Returns: Dictionary with matching basic blocks
Query Methods
Methods for searching and querying the database.
search_families
Search for families by name.
def search_families(
self,
search_term: str,
cursor: str = None,
is_ascending: bool = True,
sort_by: str = None,
limit: int = None
) -> dict
Pagination cursor from previous search
Returns: Dictionary with search results and pagination cursors
Example:
result = client.search_families("emotet", limit=50)
families = result["search_results"]
for family_id, family in families.items():
print(f"{family_id}: {family['family_name']}")
# Get next page
if result["cursor"]["forward"]:
next_result = client.search_families(
"emotet",
cursor=result["cursor"]["forward"],
limit=50
)
search_samples
Search for samples by filename or hash.
def search_samples(
self,
search_term: str,
cursor: str = None,
is_ascending: bool = True,
sort_by: str = None,
limit: int = None
) -> dict
Search query string (filename, hash, etc.)
Returns: Dictionary with search results and pagination cursors
Example:
result = client.search_samples("malware.exe")
for sample_id, sample in result["search_results"].items():
print(f"{sample_id}: {sample['filename']}")
search_functions
Search for functions by name or characteristics.
def search_functions(
self,
search_term: str,
cursor: str = None,
is_ascending: bool = True,
sort_by: str = None,
limit: int = None
) -> dict
Returns: Dictionary with search results and pagination cursors
Job Methods
Methods for managing and monitoring asynchronous jobs.
getJobData
Get information about a specific job.
def getJobData(self, job_id: str) -> Job
Unique identifier of the job
Returns: Job object with status information
Example:
job = client.getJobData(job_id)
print(f"Status: {job.status}")
print(f"Progress: {job.progress}%")
if job.is_finished:
print(f"Result ID: {job.result}")
getJobCount
Get the count of jobs matching a filter.
def getJobCount(self, filter: str = None) -> int
Filter expression for jobs
Returns: Number of jobs matching the filter
getQueueData
Get queue data with filtering options.
def getQueueData(
self,
start: int = 0,
limit: int = 0,
method: str = None,
filter: str = None,
state: str = None,
ascending: bool = False
) -> List[Job]
Starting position for pagination
Maximum number of jobs to return
Additional filter expression
Filter by job state (“queued”, “started”, “finished”, “failed”)
Returns: List of Job objects
Example:
# Get all running jobs
jobs = client.getQueueData(state="started", limit=100)
for job in jobs:
print(f"{job.job_id}: {job.method} - {job.progress}%")
getQueueStatistics
Get summary statistics about the job queue.
def getQueueStatistics(self, with_refresh: bool = False) -> dict
Refresh statistics before returning
Returns: Dictionary with queue statistics
Example:
stats = client.getQueueStatistics()
print(f"Queued jobs: {stats['queued']}")
print(f"Running jobs: {stats['started']}")
print(f"Finished jobs: {stats['finished']}")
getResultForJob
Get the result for a completed job.
def getResultForJob(self, job_id: str, compact: bool = False) -> dict
Return compact result format
Returns: Dictionary with job results
Example:
result = client.getResultForJob(job_id, compact=True)
if result:
print(f"Matches found: {len(result['matches'])}")
getResult
Get a result by its result ID.
def getResult(self, result_id: str, compact: bool = False) -> dict
Return compact result format
Returns: Dictionary with result data
getJobForResult
Get the job information for a specific result.
def getJobForResult(self, result_id: str) -> Job
Returns: Job object
awaitResult
Wait for a job to complete and return its result.
def awaitResult(
self,
job_id: str,
sleep_time: int = 2,
compact: bool = False
) -> dict
Job identifier to wait for
Seconds to wait between status checks
Return compact result format
Returns: Dictionary with job results
Raises: JobTerminatedError if the job is terminated
Example:
from mcrit.client.McritClient import JobTerminatedError
job_id = client.requestMatchesForSample(12345)
try:
result = client.awaitResult(job_id, sleep_time=5)
print(f"Job completed successfully")
except JobTerminatedError:
print(f"Job was terminated")
deleteJob
Delete a job from the queue.
def deleteJob(self, job_id: str) -> dict
deleteQueueData
Delete multiple jobs matching criteria.
def deleteQueueData(
self,
method: str = None,
created_before: datetime.datetime = None,
finished_before: datetime.datetime = None
) -> dict
Delete jobs of this method type
Delete jobs created before this timestamp
Delete jobs finished before this timestamp
Example:
from datetime import datetime, timedelta
# Delete finished jobs older than 7 days
week_ago = datetime.now() - timedelta(days=7)
client.deleteQueueData(finished_before=week_ago)
Import/Export Methods
Methods for importing and exporting MCRIT data.
getExportData
Export samples and their data.
def getExportData(
self,
sample_ids: List[int] = None,
compress_data: bool = True
) -> dict
List of sample IDs to export (None = all samples)
Returns: Dictionary with export data
Example:
import json
# Export specific samples
export_data = client.getExportData(sample_ids=[100, 101, 102])
# Save to file
with open("export.json", "w") as f:
json.dump(export_data, f)
addImportData
Import previously exported data.
def addImportData(self, import_data: dict) -> dict
Dictionary with import data (from getExportData)
Returns: Import result information
Example:
import json
# Load from file
with open("export.json", "r") as f:
import_data = json.load(f)
# Import into MCRIT
result = client.addImportData(import_data)
print(f"Import completed: {result}")
Unique Blocks Methods
Methods for analyzing unique code blocks.
requestUniqueBlocksForSamples
Request unique block analysis for multiple samples.
def requestUniqueBlocksForSamples(self, sample_ids: List[int]) -> dict
List of sample IDs to analyze
Returns: Dictionary with unique block information
Example:
sample_ids = [100, 101, 102]
unique_blocks = client.requestUniqueBlocksForSamples(sample_ids)
for sample_id, blocks in unique_blocks.items():
print(f"Sample {sample_id}: {len(blocks)} unique blocks")
requestUniqueBlocksForFamily
Request unique block analysis for a family.
def requestUniqueBlocksForFamily(self, family_id: int) -> dict
Returns: Dictionary with unique block information
Example:
unique_blocks = client.requestUniqueBlocksForFamily(42)
Maintenance Methods
Methods for server maintenance and recalculation.
completeMinhashes
Complete missing MinHash calculations.
def completeMinhashes(self) -> dict
Returns: Status information
rebuildIndex
Rebuild the matching index.
def rebuildIndex(self) -> dict
Returns: Status information
recalculatePicHashes
Recalculate all PicHash values.
def recalculatePicHashes(self) -> dict
Returns: Status information
recalculateMinHashes
Recalculate all MinHash values.
def recalculateMinHashes(self) -> dict
Returns: Status information
respawn
Restart worker processes.
def respawn(self) -> dict
Returns: Status information