Installation
pip install -e clients/python
Requires Python 3.10+, grpcio >= 1.78.0, protobuf >= 6.31.1, and a running Membrane daemon (default: localhost:9090).
Quick start
from membrane import MembraneClient, Sensitivity, TrustContext
# Connect to the running Membrane daemon
client = MembraneClient("localhost:9090")
# Ingest an event
record = client.ingest_event(
event_kind="file_edit",
ref="src/main.py",
summary="Refactored authentication module",
sensitivity=Sensitivity.LOW,
)
print(f"Created record: {record.id}")
# Retrieve memories relevant to a task
trust = TrustContext(
max_sensitivity=Sensitivity.MEDIUM,
authenticated=True,
actor_id="agent-1",
)
records = client.retrieve("fix the login bug", trust=trust, limit=5)
for r in records:
print(f" [{r.type.value}] {r.id} (salience={r.salience:.2f})")
# Reinforce a useful memory
client.reinforce(record.id, actor="agent-1", rationale="Used successfully")
# Clean up
client.close()
Context manager
with MembraneClient("localhost:9090") as client:
record = client.ingest_observation(
subject="user",
predicate="prefers",
obj={"language": "Python"},
sensitivity=Sensitivity.LOW,
)
MembraneClient
Constructor
MembraneClient(
addr: str = "localhost:9090",
*,
tls: bool = False,
tls_ca_cert: str | None = None,
api_key: str | None = None,
timeout: float | None = None,
)
gRPC server address in host:port format. Defaults to "localhost:9090".
Enable TLS transport. When True and tls_ca_cert is not provided, system root certificates are used.
Path to a PEM-encoded CA certificate for server verification. Implies tls=True.
Bearer token sent as authorization metadata on every RPC.
Default timeout in seconds for all RPC calls. None means no timeout.
TLS and authentication example
client = MembraneClient(
"membrane.example.com:443",
tls=True, # use TLS transport
tls_ca_cert="/path/to/ca.pem", # optional custom CA
api_key="your-api-key", # Bearer token auth
timeout=10.0, # default timeout in seconds
)
Ingestion methods
ingest_event
Create an episodic record from an event.
def ingest_event(
self,
event_kind: str,
ref: str,
*,
summary: str = "",
sensitivity: Sensitivity | str = Sensitivity.LOW,
source: str = "python-client",
tags: Sequence[str] | None = None,
scope: str = "",
timestamp: str | None = None,
) -> MemoryRecord
Kind of event, e.g. "file_edit", "tool_call", "error".
Reference identifier for the event source.
Human-readable summary of the event.
Sensitivity classification. Defaults to Sensitivity.LOW.
Provenance source identifier. Defaults to "python-client".
Tags for categorization and retrieval filtering.
Visibility scope for the record.
RFC 3339 timestamp. Defaults to now.
record = client.ingest_event(
source="my-agent",
event_kind="tool_call",
ref="task#1",
summary="Ran database migration successfully",
tags=["db", "migration"],
)
Create an episodic record from a tool invocation.
def ingest_tool_output(
self,
tool_name: str,
*,
args: dict[str, Any] | None = None,
result: Any = None,
sensitivity: Sensitivity | str = Sensitivity.LOW,
source: str = "python-client",
depends_on: Sequence[str] | None = None,
tags: Sequence[str] | None = None,
scope: str = "",
timestamp: str | None = None,
) -> MemoryRecord
Name of the tool that produced output.
Arguments passed to the tool.
Result returned by the tool (any JSON-serializable value).
IDs of records this output depends on.
record = client.ingest_tool_output(
"run_tests",
args={"suite": "auth"},
result={"passed": 42, "failed": 0},
tags=["tests", "auth"],
)
ingest_observation
Create a semantic record from a subject-predicate-object triple.
def ingest_observation(
self,
subject: str,
predicate: str,
obj: Any,
*,
sensitivity: Sensitivity | str = Sensitivity.LOW,
source: str = "python-client",
tags: Sequence[str] | None = None,
scope: str = "",
timestamp: str | None = None,
) -> MemoryRecord
The subject of the observation.
The predicate relating subject to object.
The object value (any JSON-serializable value).
record = client.ingest_observation(
subject="user",
predicate="prefers",
obj={"language": "Python"},
sensitivity=Sensitivity.LOW,
)
ingest_outcome
Attach an outcome to an existing episodic record.
def ingest_outcome(
self,
target_record_id: str,
outcome_status: OutcomeStatus | str,
*,
source: str = "python-client",
timestamp: str | None = None,
) -> MemoryRecord
ID of the record to attach the outcome to.
outcome_status
OutcomeStatus | str
required
One of "success", "failure", or "partial".
from membrane import OutcomeStatus
client.ingest_outcome(record.id, OutcomeStatus.SUCCESS)
ingest_working_state
Create a working memory snapshot for a task thread.
def ingest_working_state(
self,
thread_id: str,
state: str,
*,
next_actions: Sequence[str] | None = None,
open_questions: Sequence[str] | None = None,
context_summary: str = "",
active_constraints: Sequence[dict[str, Any]] | None = None,
sensitivity: Sensitivity | str = Sensitivity.LOW,
source: str = "python-client",
tags: Sequence[str] | None = None,
scope: str = "",
timestamp: str | None = None,
) -> MemoryRecord
Identifier for the task thread.
Current task state: "planning", "executing", "blocked", "waiting", or "done".
Human-readable summary of current context.
active_constraints
Sequence[dict[str, Any]] | None
Active constraints as JSON-serializable dicts.
client.ingest_working_state(
thread_id="session-001",
state="executing",
next_actions=["run tests", "deploy"],
context_summary="Backend initialized, frontend pending, docs TODO",
)
Retrieval methods
retrieve
Retrieve memory records relevant to a task descriptor. Returns only records (backward-compatible helper).
def retrieve(
self,
task_descriptor: str,
*,
trust: TrustContext | None = None,
memory_types: Sequence[MemoryType | str] | None = None,
min_salience: float = 0.0,
limit: int = 10,
) -> list[MemoryRecord]
Natural-language description of the current task.
Trust context controlling access. Defaults to a minimal context with Sensitivity.LOW.
memory_types
Sequence[MemoryType | str] | None
Filter by memory types: "episodic", "working", "semantic", "competence", "plan_graph".
Minimum salience threshold. Defaults to 0.0.
Maximum records to return. Defaults to 10.
trust = TrustContext(
max_sensitivity=Sensitivity.MEDIUM,
authenticated=True,
actor_id="agent-1",
)
records = client.retrieve(
"database operations",
trust=trust,
memory_types=["semantic", "competence"],
limit=5,
)
retrieve_with_selection
Retrieve records plus optional selector metadata (ranked candidates, confidence).
def retrieve_with_selection(
self,
task_descriptor: str,
*,
trust: TrustContext | None = None,
memory_types: Sequence[MemoryType | str] | None = None,
min_salience: float = 0.0,
limit: int = 10,
) -> RetrieveResult
Returns a RetrieveResult dataclass:
@dataclass
class RetrieveResult:
records: list[MemoryRecord]
selection: SelectionResult | None # optional selector metadata
@dataclass
class SelectionResult:
selected: list[MemoryRecord]
confidence: float
needs_more: bool
retrieve_by_id
Fetch a single record by its ID.
def retrieve_by_id(
self,
record_id: str,
*,
trust: TrustContext | None = None,
) -> MemoryRecord
record = client.retrieve_by_id("rec-uuid-here")
Revision methods
supersede
Replace a record with a new version.
def supersede(
self,
old_id: str,
new_record: dict[str, Any] | MemoryRecord,
actor: str,
rationale: str,
) -> MemoryRecord
updated = client.supersede(
old_record.id,
{**old_record.to_dict(), "payload": {"version": "2.0"}},
actor="agent-1",
rationale="Updated version field",
)
fork
Create a conditional variant of a record.
def fork(
self,
source_id: str,
forked_record: dict[str, Any] | MemoryRecord,
actor: str,
rationale: str,
) -> MemoryRecord
variant = client.fork(
source_record.id,
{**source_record.to_dict(), "scope": "dev"},
actor="agent-1",
rationale="Different behavior for dev environment",
)
retract
Soft-delete a record.
def retract(
self,
record_id: str,
actor: str,
rationale: str,
) -> None
client.retract(record.id, actor="agent-1", rationale="No longer accurate")
merge
Combine multiple records into a single consolidated record.
def merge(
self,
record_ids: Sequence[str],
merged_record: dict[str, Any] | MemoryRecord,
actor: str,
rationale: str,
) -> MemoryRecord
merged = client.merge(
[id1, id2, id3],
{"type": "semantic", "payload": {"fact": "combined knowledge"}},
actor="agent-1",
rationale="Consolidating duplicate semantic records",
)
contest
Mark a record as contested due to conflicting evidence.
def contest(
self,
record_id: str,
contesting_ref: str,
actor: str,
rationale: str,
) -> None
client.contest(
record.id,
conflicting_record.id,
actor="agent-1",
rationale="New evidence contradicts this",
)
Reinforcement methods
reinforce
Boost a record’s salience score.
def reinforce(
self,
record_id: str,
actor: str,
rationale: str,
) -> None
client.reinforce(record.id, actor="agent-1", rationale="Used successfully in task")
penalize
Reduce a record’s salience score.
def penalize(
self,
record_id: str,
amount: float,
actor: str,
rationale: str,
) -> None
client.penalize(record.id, amount=0.2, actor="agent-1", rationale="Led to incorrect result")
get_metrics
Retrieve a point-in-time metrics snapshot from the daemon.
def get_metrics(self) -> dict[str, Any]
metrics = client.get_metrics()
print(metrics["total_records"], metrics["avg_salience"])
close
Close the underlying gRPC channel. The context manager calls this automatically.
Core types
Sensitivity
class Sensitivity(str, Enum):
PUBLIC = "public"
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
HYPER = "hyper"
Controls access during retrieval. Records above the caller’s max_sensitivity are returned in redacted form.
TrustContext
@dataclass
class TrustContext:
max_sensitivity: Sensitivity = Sensitivity.LOW # highest sensitivity the caller may access
authenticated: bool = False
actor_id: str = ""
scopes: list[str] = field(default_factory=list)
MemoryRecord
@dataclass
class MemoryRecord:
id: str
type: MemoryType # "episodic" | "working" | "semantic" | "competence" | "plan_graph"
sensitivity: Sensitivity
confidence: float # 0–1 applicability score
salience: float # 0–1 current importance score
scope: str
tags: list[str]
created_at: str
updated_at: str
lifecycle: Lifecycle | None
provenance: Provenance | None
relations: list[Relation]
payload: Any # type-specific structured payload
audit_log: list[AuditEntry]
MemoryType
class MemoryType(str, Enum):
EPISODIC = "episodic"
WORKING = "working"
SEMANTIC = "semantic"
COMPETENCE = "competence"
PLAN_GRAPH = "plan_graph"
OutcomeStatus
class OutcomeStatus(str, Enum):
SUCCESS = "success"
FAILURE = "failure"
PARTIAL = "partial"
Relation
@dataclass
class Relation:
target_id: str = "" # ID of the related MemoryRecord
kind: str = "" # relationship type (maps to Go Predicate)
weight: float = 1.0 # strength of the relationship [0, 1]
The Python Relation.kind field maps to Go’s schema.Relation.Predicate. Common values: supports, contradicts, derived_from, supersedes, contested_by.
Additional exported types
All of the following are importable from the top-level membrane package:
from membrane import (
AuditAction, # Enum: create, revise, fork, merge, delete, reinforce, decay
AuditEntry, # Dataclass: action, actor, timestamp, rationale
DecayCurve, # Enum: EXPONENTIAL = "exponential"
DecayProfile, # Dataclass: curve, half_life_seconds
DeletionPolicy, # Enum: AUTO_PRUNE, MANUAL_ONLY, NEVER
EdgeKind, # Enum: DATA = "data", CONTROL = "control"
Lifecycle, # Dataclass: decay, last_reinforced_at, deletion_policy
ProvenanceKind, # Enum: EVENT, ARTIFACT, TOOL_CALL, OBSERVATION, OUTCOME
ProvenanceSource,# Dataclass: kind, ref, timestamp
Provenance, # Dataclass: sources (list[ProvenanceSource])
Relation, # Dataclass: target_id, kind, weight
RetrieveResult, # Dataclass: records, selection
RevisionStatus, # Enum: ACTIVE, CONTESTED, RETRACTED
SelectionResult, # Dataclass: selected, confidence, needs_more
TaskState, # Enum: PLANNING, EXECUTING, BLOCKED, WAITING, DONE
ValidityMode, # Enum: GLOBAL, CONDITIONAL, TIMEBOXED
)
Error handling
Failed RPCs raise grpc.RpcError. Inspect e.code() and e.details() for the gRPC status:
import grpc
from membrane import MembraneClient
client = MembraneClient("localhost:9090")
try:
record = client.retrieve_by_id("nonexistent-id")
except grpc.RpcError as e:
print(e.code()) # e.g. StatusCode.NOT_FOUND
print(e.details()) # human-readable message
finally:
client.close()