storage
Classes:
-
AsyncMemoryStore–Handles in-memory storage of structured data with schema enforcement, transactional
-
Constraint–Represents a constraint with properties for configuration.
-
MemoryStore–Handles in-memory storage of structured data with schema enforcement, transactional
-
SchemaRegistry–Manages schema registration and validation with a mapping of type names to Pydantic models.
AsyncMemoryStore
AsyncMemoryStore(
storage: AsyncStorageBackend,
hooks: list[AsyncMemoryHook] | None = None,
)
Handles in-memory storage of structured data with schema enforcement, transactional capabilities, and hooks for custom operations.
This class provides a structured method to store and retrieve facts with enforced schema validation and constraints. It also supports mechanisms for transactional logging, model validation, and hook execution during operations.
Attributes:
-
storage(AsyncStorageBackend) –Backend storage mechanism for persisting facts and transaction information.
-
hooks(list[AsyncMemoryHook]) –List of hooks to be executed during memory operations.
-
API Reference
integrationslanggraphAsyncMemStateCheckpointer
Methods:
-
add_hook–Adds a new memory hook to the list of hooks.
-
commit–Asynchronously commits a
Factobject to the storage, optionally allowing for ephemeral -
commit_model–Asynchronously commits a model to the store using the provided schema registry and additional metadata.
-
delete–Asynchronously deletes an existing fact from storage identified by the given fact ID. This operation logs the
-
discard_session–Asynchronously discard a session and clear related stored data in the storage.
-
get–Asynchronously retrieves a fact from the storage based on the provided fact ID.
-
promote_session–Asynchronously promotes session-related facts by modifying the session ID to dissociate
-
query–Asynchronously executes a query against the storage with optional type and filter constraints.
-
register_schema–Registers a schema in the schema registry and optionally applies a constraint.
-
rollback–Asynchronously reverts the state of the storage by rolling back a specified number of transactional
-
update–Asynchronously updates an existing fact in the store by applying a patch to its contents. The update process
Source code in memstate/storage.py
add_hook
add_hook(hook: AsyncMemoryHook) -> None
Adds a new memory hook to the list of hooks.
This method registers a AsyncMemoryHook instance into the internal hooks
list for further processing. A AsyncMemoryHook is an abstraction that can
be used to monitor and react to specific memory-related events.
Parameters:
-
(hookAsyncMemoryHook) –The hook instance to be added to the hooks list.
Returns:
-
None–None
Source code in memstate/storage.py
commit
async
commit(
fact: Fact,
session_id: str | None = None,
ephemeral: bool = False,
actor: str | None = None,
reason: str | None = None,
) -> str
Asynchronously commits a Fact object to the storage, optionally allowing for ephemeral
storage, and updates existing records if applicable. The operation evaluates
constraints such as immutability or uniqueness, handles potential duplicates,
and invokes hooks for logging and notifications. Supports rollback of changes
in case of errors.
Parameters:
-
(factFact) –The
Factobject to be committed. Validates the payload against schema registry and potentially updates or creates a new entry in the storage. -
(session_idstr | None, default:None) –Optional session identifier associated with the
Fact. -
(ephemeralbool, default:False) –Indicates whether the
Factis transient and should not be persisted. Defaults toFalse. -
(actorstr | None, default:None) –Optional identifier for the individual or system responsible for initiating the commit. Used for logging and auditing purposes.
-
(reasonstr | None, default:None) –Optional string describing the purpose of the commit. Used primarily for auditing and logging.
Returns:
-
str–The unique identifier of the committed fact.
Raises:
-
HookError–If an error occurs during hook execution.
Source code in memstate/storage.py
710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 | |
commit_model
async
commit_model(
model: BaseModel,
fact_id: str | None = None,
source: str | None = None,
session_id: str | None = None,
ephemeral: bool = False,
actor: str | None = None,
reason: str | None = None,
) -> str
Asynchronously commits a model to the store using the provided schema registry and additional metadata.
This method registers a given model object with a schema type derived from its class. Metadata such
as fact_id, source, session_id, ephemeral, actor, and reason can be supplied to categorize
or provide context for the operation. If the model's schema type is not registered, an error is raised.
Parameters:
-
(modelBaseModel) –The model instance to commit.
-
(fact_idstr | None, default:None) –Optional unique identifier for the fact. If not provided, a new UUID is generated.
-
(sourcestr | None, default:None) –Optional source of the operation. Defaults to None.
-
(session_idstr | None, default:None) –Optional identifier for the session in which the commit is performed. Defaults to None.
-
(ephemeralbool, default:False) –Optional. Determines if the data should be treated as ephemeral. Defaults to False.
-
(actorstr | None, default:None) –Optional identifier for the entity performing the commit. Defaults to None.
-
(reasonstr | None, default:None) –Optional description or justification for the commit operation. Defaults to None.
Returns:
-
str–The result of the commit operation as a string.
Raises:
-
MemoryStoreError–If the model's schema type is not registered.
-
HookError–If an error occurs during hook execution.
Source code in memstate/storage.py
delete
async
delete(
session_id: str | None,
fact_id: str,
actor: str | None = None,
reason: str | None = None,
) -> str
Asynchronously deletes an existing fact from storage identified by the given fact ID. This operation logs the deletion, notifies hooks about the operation, and ensures thread safety during execution.
Parameters:
-
(session_idstr | None) –Optional identifier for the session associated with the deletion operation. Defaults to None.
-
(fact_idstr) –The unique identifier of the fact to be deleted.
-
(actorstr | None, default:None) –Optional identifier for the user or system performing the deletion. Defaults to None if not applicable.
-
(reasonstr | None, default:None) –Optional reason or context for the deletion operation. Defaults to None.
Returns:
-
str–The fact ID of the deleted fact.
Raises:
-
MemoryStoreError–If the fact with the given ID is not found in storage.
Source code in memstate/storage.py
discard_session
async
discard_session(session_id: str) -> int
Asynchronously discard a session and clear related stored data in the storage.
This method removes all records associated with the given session ID from the storage and logs the operation if any data is cleared.
Parameters:
-
(session_idstr) –The unique identifier of the session to discard.
Returns:
-
int–The number of records cleared from the storage.
Source code in memstate/storage.py
get
async
get(fact_id: str) -> dict[str, Any] | None
Asynchronously retrieves a fact from the storage based on the provided fact ID.
This method accesses the underlying storage to load a fact corresponding to the given identifier. If the fact ID does not exist in the storage, the method will return None.
Parameters:
-
(fact_idstr) –The unique identifier of the fact to retrieve.
Returns:
-
dict[str, Any] | None–A dictionary representation of the fact if found, otherwise None.
Source code in memstate/storage.py
promote_session
async
promote_session(
session_id: str,
selector: (
Callable[[dict[str, Any]], bool] | None
) = None,
actor: str | None = None,
reason: str | None = None,
) -> list[str]
Asynchronously promotes session-related facts by modifying the session ID to dissociate them from the provided session. This is based on the selector criteria (if provided). The promotion operation will be logged and associated hooks will be notified.
Parameters:
-
(session_idstr) –The unique identifier of the session whose facts are to be processed for promotion.
-
(selectorCallable[[dict[str, Any]], bool] | None, default:None) –A callable to filter facts based on custom logic. If provided, only facts passing this filter will be promoted. Defaults to None.
-
(actorstr | None, default:None) –Optional identifier for the user or system performing the promotion operation. Defaults to None.
-
(reasonstr | None, default:None) –Optional reason or context for the promotion operation. Defaults to None.
Returns:
-
list[str]–A list of identifiers for the promoted facts.
Source code in memstate/storage.py
query
async
query(
typename: str | None = None,
filters: dict[str, Any] | None = None,
session_id: str | None = None,
) -> list[dict[str, Any]]
Asynchronously executes a query against the storage with optional type and filter constraints.
This method interacts with the underlying storage to filter and retrieve data
based on the provided type and filtering criteria. The typename allows for
filtering objects of a specific type, whereas filters enables more fine-grained
queries by applying a JSON-based filter.
Parameters:
-
(typenamestr | None, default:None) –A string that specifies the type of objects to query. If set to None, no type filtering is applied.
-
(filtersdict[str, Any] | None, default:None) –A dictionary representing JSON-style filter constraints to apply to the query. If set to None, no filter constraints are applied.
-
(session_idstr | None, default:None) –Optional identifier for the session associated with the query. Defaults to None.
Returns:
-
list[dict[str, Any]]–A list of dictionaries containing query results that match the given filters and type constraints.
Source code in memstate/storage.py
register_schema
register_schema(
typename: str,
model: type[BaseModel],
constraint: Constraint | None = None,
) -> None
Registers a schema in the schema registry and optionally applies a constraint.
Parameters:
-
(typenamestr) –The unique identifier for the model being registered.
-
(modeltype[BaseModel]) –The Pydantic model class to register.
-
(constraintConstraint | None, default:None) –Optional constraint to associate with the type.
Returns:
-
None–None
Source code in memstate/storage.py
rollback
async
rollback(session_id: str, steps: int = 1) -> None
Asynchronously reverts the state of the storage by rolling back a specified number of transactional operations. Each operation is extracted from the transaction log and reversed based on its type (e.g., CREATE, UPDATE, DELETE).
Parameters:
-
(session_idstr) –The unique identifier of the session to roll back.
-
(stepsint, default:1) –The number of transactional steps to roll back. Defaults to 1. Must be a positive integer.
Returns:
-
None–None
Source code in memstate/storage.py
update
async
update(
fact_id: str,
patch: dict[str, Any],
actor: str | None = None,
reason: str | None = None,
) -> str
Asynchronously updates an existing fact in the store by applying a patch to its contents. The update process validates the resulting payload using the schema registry and manages concurrent modifications with locking. If the update fails during hook notification, the operation is rolled back to its previous state.
Parameters:
-
(fact_idstr) –The unique identifier of the fact to be updated.
-
(patchdict[str, Any]) –A dictionary representing the modifications to be applied to the current fact's payload.
-
(actorstr | None, default:None) –Optional identifier for the user or system performing the update. Defaults to None if not applicable.
-
(reasonstr | None, default:None) –Optional reason or context for the update operation. Defaults to None.
Returns:
-
str–The unique identifier of the updated fact.
Raises:
-
MemoryStoreError–If the fact with the specified identifier is not found in the store.
-
HookError–If an error occurs during the hook notification process.
Source code in memstate/storage.py
Constraint
Represents a constraint with properties for configuration.
This class is used to define constraints with options for a singleton key and immutability. It provides a structure to manage these constraint properties for further processing or validation.
Attributes:
-
singleton_key(str | None) –Optional key used to identify a singleton behavior. If set, it implies uniqueness based on the value of the key.
-
immutable(bool) –Indicates if the constraint is immutable. If True, the constraint cannot be modified after its creation.
-
API Reference
storage
Source code in memstate/storage.py
MemoryStore
MemoryStore(
storage: StorageBackend,
hooks: list[MemoryHook] | None = None,
)
Handles in-memory storage of structured data with schema enforcement, transactional capabilities, and hooks for custom operations.
This class provides a structured method to store and retrieve facts with enforced schema validation and constraints. It also supports mechanisms for transactional logging, model validation, and hook execution during operations.
Attributes:
-
storage(StorageBackend) –Backend storage mechanism for persisting facts and transaction information.
-
hooks(list[MemoryHook]) –List of hooks to be executed during memory operations.
-
API Reference
integrationslanggraphMemStateCheckpointer
Methods:
-
add_hook–Adds a new memory hook to the list of hooks.
-
commit–Commits a
Factobject to the storage, optionally allowing for ephemeral -
commit_model–Commits a model to the store using the provided schema registry and additional metadata.
-
delete–Deletes an existing fact from storage identified by the given fact ID. This operation logs the
-
discard_session–Discard a session and clear related stored data in the storage.
-
get–Retrieves a fact from the storage based on the provided fact ID.
-
promote_session–Promotes session-related facts by modifying the session ID to dissociate
-
query–Executes a query against the storage with optional type and filter constraints.
-
register_schema–Registers a schema in the schema registry and optionally applies a constraint.
-
rollback–Reverts the state of the storage by rolling back a specified number of transactional
-
update–Updates an existing fact in the store by applying a patch to its contents. The update process
Source code in memstate/storage.py
add_hook
add_hook(hook: MemoryHook) -> None
Adds a new memory hook to the list of hooks.
This method registers a MemoryHook instance into the internal hooks
list for further processing. A MemoryHook is an abstraction that can
be used to monitor and react to specific memory-related events.
Parameters:
-
(hookMemoryHook) –The hook instance to be added to the hooks list.
Returns:
-
None–None
Source code in memstate/storage.py
commit
commit(
fact: Fact,
session_id: str | None = None,
ephemeral: bool = False,
actor: str | None = None,
reason: str | None = None,
) -> str
Commits a Fact object to the storage, optionally allowing for ephemeral
storage, and updates existing records if applicable. The operation evaluates
constraints such as immutability or uniqueness, handles potential duplicates,
and invokes hooks for logging and notifications. Supports rollback of changes
in case of errors.
Parameters:
-
(factFact) –The
Factobject to be committed. Validates the payload against schema registry and potentially updates or creates a new entry in the storage. -
(session_idstr | None, default:None) –Optional session identifier associated with the
Fact. -
(ephemeralbool, default:False) –Indicates whether the
Factis transient and should not be persisted. Defaults toFalse. -
(actorstr | None, default:None) –Optional identifier for the individual or system responsible for initiating the commit. Used for logging and auditing purposes.
-
(reasonstr | None, default:None) –Optional string describing the purpose of the commit. Used primarily for auditing and logging.
Returns:
-
str–The unique identifier of the committed fact.
Raises:
-
HookError–If an error occurs during hook execution.
Source code in memstate/storage.py
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 | |
commit_model
commit_model(
model: BaseModel,
fact_id: str | None = None,
source: str | None = None,
session_id: str | None = None,
ephemeral: bool = False,
actor: str | None = None,
reason: str | None = None,
) -> str
Commits a model to the store using the provided schema registry and additional metadata.
This method registers a given model object with a schema type derived from its class. Metadata such
as fact_id, source, session_id, ephemeral, actor, and reason can be supplied to categorize
or provide context for the operation. If the model's schema type is not registered, an error is raised.
Parameters:
-
(modelBaseModel) –The model instance to commit.
-
(fact_idstr | None, default:None) –Optional unique identifier for the fact. If not provided, a new UUID is generated.
-
(sourcestr | None, default:None) –Optional source of the operation. Defaults to None.
-
(session_idstr | None, default:None) –Optional identifier for the session in which the commit is performed. Defaults to None.
-
(ephemeralbool, default:False) –Optional. Determines if the data should be treated as ephemeral. Defaults to False.
-
(actorstr | None, default:None) –Optional identifier for the entity performing the commit. Defaults to None.
-
(reasonstr | None, default:None) –Optional description or justification for the commit operation. Defaults to None.
Returns:
-
str–The result of the commit operation as a string.
Raises:
-
MemoryStoreError–If the model's schema type is not registered.
-
HookError–If an error occurs during hook execution.
Source code in memstate/storage.py
delete
delete(
session_id: str | None,
fact_id: str,
actor: str | None = None,
reason: str | None = None,
) -> str
Deletes an existing fact from storage identified by the given fact ID. This operation logs the deletion, notifies hooks about the operation, and ensures thread safety during execution.
Parameters:
-
(session_idstr | None) –Optional identifier for the session in which the deletion is performed. Defaults to None.
-
(fact_idstr) –The unique identifier of the fact to be deleted.
-
(actorstr | None, default:None) –Optional identifier for the user or system performing the deletion. Defaults to None if not applicable.
-
(reasonstr | None, default:None) –Optional reason or context for the deletion operation. Defaults to None.
Returns:
-
str–The fact ID of the deleted fact.
Raises:
-
MemoryStoreError–If the fact with the given ID is not found in storage.
Source code in memstate/storage.py
discard_session
discard_session(session_id: str) -> int
Discard a session and clear related stored data in the storage.
This method removes all records associated with the given session ID from the storage and logs the operation if any data is cleared.
Parameters:
-
(session_idstr) –The unique identifier of the session to discard.
Returns:
-
int–The number of records cleared from the storage.
Source code in memstate/storage.py
get
get(fact_id: str) -> dict[str, Any] | None
Retrieves a fact from the storage based on the provided fact ID.
This method accesses the underlying storage to load a fact corresponding to the given identifier. If the fact ID does not exist in the storage, the method will return None.
Parameters:
-
(fact_idstr) –The unique identifier of the fact to retrieve.
Returns:
-
dict[str, Any] | None–A dictionary representation of the fact if found, otherwise None.
Source code in memstate/storage.py
promote_session
promote_session(
session_id: str,
selector: (
Callable[[dict[str, Any]], bool] | None
) = None,
actor: str | None = None,
reason: str | None = None,
) -> list[str]
Promotes session-related facts by modifying the session ID to dissociate them from the provided session. This is based on the selector criteria (if provided). The promotion operation will be logged and associated hooks will be notified.
Parameters:
-
(session_idstr) –The unique identifier of the session whose facts are to be processed for promotion.
-
(selectorCallable[[dict[str, Any]], bool] | None, default:None) –A callable to filter facts based on custom logic. If provided, only facts passing this filter will be promoted. Defaults to None.
-
(actorstr | None, default:None) –Optional identifier for the user or system performing the promotion operation. Defaults to None.
-
(reasonstr | None, default:None) –Optional reason or context for the promotion operation. Defaults to None.
Returns:
-
list[str]–A list of identifiers for the promoted facts.
Source code in memstate/storage.py
query
query(
typename: str | None = None,
filters: dict[str, Any] | None = None,
session_id: str | None = None,
) -> list[dict[str, Any]]
Executes a query against the storage with optional type and filter constraints.
This method interacts with the underlying storage to filter and retrieve data
based on the provided type and filtering criteria. The typename allows for
filtering objects of a specific type, whereas filters enables more fine-grained
queries by applying a JSON-based filter.
Parameters:
-
(typenamestr | None, default:None) –A string that specifies the type of objects to query. If set to None, no type filtering is applied.
-
(filtersdict[str, Any] | None, default:None) –A dictionary representing JSON-style filter constraints to apply to the query. If set to None, no filter constraints are applied.
-
(session_idstr | None, default:None) –Optional identifier for the session associated with the query. Defaults to None.
Returns:
-
list[dict[str, Any]]–A list of dictionaries containing query results that match the given filters and type constraints.
Source code in memstate/storage.py
register_schema
register_schema(
typename: str,
model: type[BaseModel],
constraint: Constraint | None = None,
) -> None
Registers a schema in the schema registry and optionally applies a constraint.
Parameters:
-
(typenamestr) –The unique identifier for the model being registered.
-
(modeltype[BaseModel]) –The Pydantic model class to register.
-
(constraintConstraint | None, default:None) –Optional constraint to associate with the type.
Returns:
-
None–None
Source code in memstate/storage.py
rollback
rollback(session_id: str, steps: int = 1) -> None
Reverts the state of the storage by rolling back a specified number of transactional operations. Each operation is extracted from the transaction log and reversed based on its type (e.g., CREATE, UPDATE, DELETE).
Parameters:
-
(session_idstr) –The unique identifier of the session to roll back.
-
(stepsint, default:1) –The number of transactional steps to roll back. Defaults to 1. Must be a positive integer.
Returns:
-
None–None
Source code in memstate/storage.py
update
update(
fact_id: str,
patch: dict[str, Any],
actor: str | None = None,
reason: str | None = None,
) -> str
Updates an existing fact in the store by applying a patch to its contents. The update process validates the resulting payload using the schema registry and manages concurrent modifications with locking. If the update fails during hook notification, the operation is rolled back to its previous state.
Parameters:
-
(fact_idstr) –The unique identifier of the fact to be updated.
-
(patchdict[str, Any]) –A dictionary representing the modifications to be applied to the current fact's payload.
-
(actorstr | None, default:None) –Optional identifier for the user or system performing the update. Defaults to None if not applicable.
-
(reasonstr | None, default:None) –Optional reason or context for the update operation. Defaults to None.
Returns:
-
str–The unique identifier of the updated fact.
Raises:
-
MemoryStoreError–If the fact with the specified identifier is not found in the store.
-
HookError–If an error occurs during the hook notification process.
Source code in memstate/storage.py
SchemaRegistry
Manages schema registration and validation with a mapping of type names to Pydantic models.
The SchemaRegistry class allows for the registration of Pydantic models with associated type names. It provides functionality for validating payloads against the registered schemas and for reverse- looking up type names by model classes.
Attributes:
-
schemas(dict[str, type[BaseModel]]) –A mapping of type names to their registered Pydantic models.
Methods:
-
get_type_by_model–Retrieve the type name associated with a given model class.
-
register–Registers a model under a specific type name within the schema registry.
-
validate–Validates the given payload against the model schema for the specified type name. If no schema
Source code in memstate/storage.py
get_type_by_model
get_type_by_model(
model_class: type[BaseModel],
) -> str | None
Retrieve the type name associated with a given model class.
This method iterates through a dictionary of schemas and checks if the provided model class matches any value in the dictionary. If a match is found, the corresponding type name is returned. If no match is found, it returns None.
Parameters:
-
(model_classtype[BaseModel]) –The Pydantic model class to find the corresponding type name for.
Returns:
-
str | None–The type name associated with the provided model class, or None if no match is found.
Source code in memstate/storage.py
register
Registers a model under a specific type name within the schema registry.
This method associates a given model with a unique type name in the internal schema registry. The registered type name and model can later be retrieved or used for validation or other processing purposes.
Parameters:
-
(typenamestr) –The unique identifier for the model being registered.
-
(modeltype[BaseModel]) –The Pydantic model class to register.
Returns:
-
None–None
Source code in memstate/storage.py
validate
Validates the given payload against the model schema for the specified type name. If no schema exists for the provided type name, the payload is returned unmodified. If validation fails, an exception containing the details of the validation failure is raised.
Parameters:
-
(typenamestr) –The type name for which the payload is to be validated.
-
(payloaddict[str, Any]) –The dictionary payload to be validated against the corresponding model schema.
Returns:
-
dict[str, Any]–A dictionary containing the validated payload in JSON-serializable format.
Raises:
-
ValidationFailed–If the input payload fails validation against the schema.