integrations
Modules:
chroma
Chomadb integration.
Classes:
-
AsyncChromaSyncHook–Handles synchronization of memory data with Chroma collections by integrating
-
ChromaSyncHook–Handles synchronization of memory data with Chroma collections by integrating
AsyncChromaSyncHook
AsyncChromaSyncHook(
client: AsyncClientAPI,
collection_name: str,
embedding_fn: (
EmbeddingFunction[Embeddable] | None
) = None,
target_types: set[str] | None = None,
text_field: str | None = None,
text_formatter: TextFormatter | None = None,
metadata_fields: list[str] | None = None,
metadata_formatter: MetadataFormatter | None = None,
)
Bases: AsyncMemoryHook
Handles synchronization of memory data with Chroma collections by integrating fact operations and data transformations.
This class is responsible for managing connections to a Chroma collection via AsyncClientAPI, extracting and formatting text and metadata as per the provided configuration, and performing operations like deletion or upserting of facts based on various triggers or operations. It also provides flexibility in defining target data types, metadata extraction, text processing, and overall data synchronization rules.
Example
Attributes:
-
client(AsyncClientAPI) –The Chroma client instance used for collection access and management.
-
collection_name(str) –The name of the Chroma collection to be synchronized.
-
embedding_fn(EmbeddingFunction | None) –Optional function for generating embeddings from text.
-
target_types(set[str]) –A set of fact types allowed for synchronization. If empty, all types are allowed.
-
text_field(str | None) –Field name of text in fact.
-
text_formatter(TextFormatter | None) –Optional custom function for extracting text. Overrides
text_fieldif provided. -
metadata_fields(list[str]) –The list of metadata fields to extract from the fact payload. Used if no metadata formatter is provided.
-
metadata_formatter(MetadataFormatter | None) –Optional custom function for extracting metadata. Overrides
metadata_fieldsif provided.
Source code in memstate/integrations/chroma.py
ChromaSyncHook
ChromaSyncHook(
client: ClientAPI,
collection_name: str,
embedding_fn: (
EmbeddingFunction[Embeddable] | None
) = None,
target_types: set[str] | None = None,
text_field: str | None = None,
text_formatter: TextFormatter | None = None,
metadata_fields: list[str] | None = None,
metadata_formatter: MetadataFormatter | None = None,
)
Bases: MemoryHook
Handles synchronization of memory data with Chroma collections by integrating fact operations and data transformations.
This class is responsible for managing connections to a Chroma collection via a Chroma client, extracting and formatting text and metadata as per the provided configuration, and performing operations like deletion or upserting of facts based on various triggers or operations. It also provides flexibility in defining target data types, metadata extraction, text processing, and overall data synchronization rules.
Attributes:
-
client(ClientAPI) –The Chroma client instance used for collection access and management.
-
collection_name(str) –The name of the Chroma collection to be synchronized.
-
embedding_fn(EmbeddingFunction | None) –Optional function for generating embeddings from text.
-
target_types(set[str]) –A set of fact types allowed for synchronization. If empty, all types are allowed.
-
text_field(str | None) –Field name of text in fact.
-
text_formatter(TextFormatter | None) –Optional custom function for extracting text. Overrides
text_fieldif provided. -
metadata_fields(list[str]) –The list of metadata fields to extract from the fact payload. Used if no metadata formatter is provided.
-
metadata_formatter(MetadataFormatter | None) –Optional custom function for extracting metadata. Overrides
metadata_fieldsif provided.
Source code in memstate/integrations/chroma.py
langgraph
LangGraph Checkpointer.
Classes:
-
AsyncMemStateCheckpointer–Async manages the storage, retrieval, and deletion of checkpoint data in memory.
-
MemStateCheckpointer–Manages the storage, retrieval, and deletion of checkpoint data in memory.
AsyncMemStateCheckpointer
AsyncMemStateCheckpointer(
memory: AsyncMemoryStore,
serde: SerializerProtocol | None = None,
)
Bases: BaseCheckpointSaver[str]
Async manages the storage, retrieval, and deletion of checkpoint data in memory.
The AsyncMemStateCheckpointer class enables storing checkpoints using an in-memory storage system, facilitating workflows that require checkpointing and versioning mechanisms. It interacts with a memory store to persist checkpoint data and associated metadata, supporting use cases that require checkpointer objects functioning as temporary memory storage.
Attributes:
-
memory(AsyncMemoryStore) –Reference to the memory store used for storage operations.
-
serde(SerializerProtocol) –Serializer for serializing checkpoint data.
-
fact_type(str) –String identifier for checkpoint facts within the memory store.
-
write_type(str) –String identifier for write facts within the memory store.
Methods:
-
adelete_thread–Asynchronously deletes a specific thread from memory by its identifier.
-
aget_tuple–Asynchronously gets a checkpoint tuple based on the provided configuration.
-
alist–Asynchronously list and yield checkpoint tuples based on given configuration and filters.
-
aput–Asynchronously updates the state of a process by committing checkpoint metadata into memory
-
aput_writes–Asynchronously executes the operation to store a sequence of writes by committing them as facts into memory
Source code in memstate/integrations/langgraph.py
adelete_thread
async
adelete_thread(thread_id: str) -> None
Asynchronously deletes a specific thread from memory by its identifier.
This method removes the session associated with the given thread ID from the memory, ensuring that it is no longer retained or accessed in the system.
Parameters:
-
(thread_idstr) –The unique identifier of the thread to be deleted.
Returns:
-
None–This method does not return any value.
Source code in memstate/integrations/langgraph.py
aget_tuple
async
aget_tuple(
config: RunnableConfig,
) -> CheckpointTuple | None
Asynchronously gets a checkpoint tuple based on the provided configuration.
This method queries memory to retrieve facts associated with a specific thread ID from the configuration. Depending on whether a thread timestamp is provided, it selects the most recent fact or the one matching the given timestamp. Finally, it reconstructs the checkpoint tuple based on the retrieved fact's payload.
Parameters:
-
(configRunnableConfig) –Configuration object containing thread-specific retrieval data, including
thread_idand optionallythread_ts.
Returns:
-
CheckpointTuple | None–A
CheckpointTuplecontaining the retrieved checkpoint data if a fact is found, otherwiseNone.
Source code in memstate/integrations/langgraph.py
alist
async
alist(
config: RunnableConfig | None,
*,
filter: dict[str, Any] | None = None,
before: RunnableConfig | None = None,
limit: int | None = None
) -> AsyncIterator[CheckpointTuple]
Asynchronously list and yield checkpoint tuples based on given configuration and filters.
This function retrieves fact data stored in memory, optionally applies filters based on configuration or other specified parameters, and yields checkpoint tuples sorted by timestamp. The functionality includes support for limiting the number of facts processed.
Parameters:
-
(configRunnableConfig | None) –Configuration information for filtering facts. Optional.
-
(filterdict[str, Any] | None, default:None) –Additional criteria for filtering facts based on key-value pairs. Optional.
-
(beforeRunnableConfig | None, default:None) –Configuration object to apply filter before a certain criterion. Optional.
-
(limitint | None, default:None) –Maximum number of facts to process and yield. Optional.
Returns:
-
AsyncIterator[CheckpointTuple]–An iterator over checkpoint tuples derived from the filtered facts.
Source code in memstate/integrations/langgraph.py
aput
async
aput(
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
new_versions: dict[str, Any],
) -> RunnableConfig
Asynchronously updates the state of a process by committing checkpoint metadata into memory and returning an updated configuration object.
This method handles storing the provided checkpoint and its associated metadata to facilitate process tracking. It interacts with the memory instance to ensure the relevant details are committed within the appropriate session. After updating memory, it returns a modified configuration containing the updated thread parameters.
Parameters:
-
(configRunnableConfig) –The configuration object for the runnable, which must include a
thread_idunder theconfigurablekey. -
(checkpointCheckpoint) –The checkpoint object containing state information to be stored in memory.
-
(metadataCheckpointMetadata) –Additional metadata corresponding to the checkpoint, providing supplementary details about the stored state.
-
(new_versionsdict[str, Any]) –A mapping of version keys to their new corresponding values, used to track changes in versions during the execution process.
Returns:
-
RunnableConfig–A modified configuration object reflecting the updated thread parameters after committing the provided checkpoint to memory.
Source code in memstate/integrations/langgraph.py
aput_writes
async
aput_writes(
config: RunnableConfig,
writes: Sequence[tuple[str, Any]],
task_id: str,
task_path: str = "",
) -> None
Asynchronously executes the operation to store a sequence of writes by committing them as facts into memory with associated task and thread information. Each write entry in the sequence is processed with a specific channel, value, and index to generate a payload, which is then committed.
Parameters:
-
(configRunnableConfig) –The configuration object implementing the
RunnableConfiginterface. It must contain a "configurable" dictionary with a thread ID linked under the key "thread_id". -
(writesSequence[tuple[str, Any]]) –A sequence of tuples where each tuple contains a string representing the channel and an associated value of type
Anyto be committed. -
(task_idstr) –A string representing the unique identifier for the task that groups all the writes.
-
(task_pathstr, default:'') –(optional) A string that represents the path or hierarchy associated with the task. Defaults to an empty string if not provided.
Returns:
-
None–None
Source code in memstate/integrations/langgraph.py
MemStateCheckpointer
MemStateCheckpointer(
memory: MemoryStore,
serde: SerializerProtocol | None = None,
)
Bases: BaseCheckpointSaver[str]
Manages the storage, retrieval, and deletion of checkpoint data in memory.
The MemStateCheckpointer class enables storing checkpoints using an in-memory storage system, facilitating workflows that require checkpointing and versioning mechanisms. It interacts with a memory store to persist checkpoint data and associated metadata, supporting use cases that require checkpointer objects functioning as temporary memory storage.
Attributes:
-
memory(MemoryStore) –Reference to the memory store used for storage operations.
-
serde(SerializerProtocol) –Serializer for serializing checkpoint data.
-
fact_type(str) –String identifier for checkpoint facts within the memory store.
-
write_type(str) –String identifier for write facts within the memory store.
Methods:
-
delete_thread–Deletes a specific thread from memory by its identifier.
-
get_tuple–Gets a checkpoint tuple based on the provided configuration.
-
list–List and yield checkpoint tuples based on given configuration and filters.
-
put–Updates the state of a process by committing checkpoint metadata into memory
-
put_writes–Executes the operation to store a sequence of writes by committing them as facts into memory
Source code in memstate/integrations/langgraph.py
delete_thread
delete_thread(thread_id: str) -> None
Deletes a specific thread from memory by its identifier.
This method removes the session associated with the given thread ID from the memory, ensuring that it is no longer retained or accessed in the system.
Parameters:
-
(thread_idstr) –The unique identifier of the thread to be deleted.
Returns:
-
None–This method does not return any value.
Source code in memstate/integrations/langgraph.py
get_tuple
get_tuple(config: RunnableConfig) -> CheckpointTuple | None
Gets a checkpoint tuple based on the provided configuration.
This method queries memory to retrieve facts associated with a specific thread ID from the configuration. Depending on whether a thread timestamp is provided, it selects the most recent fact or the one matching the given timestamp. Finally, it reconstructs the checkpoint tuple based on the retrieved fact's payload.
Parameters:
-
(configRunnableConfig) –Configuration object containing thread-specific retrieval data, including
thread_idand optionallythread_ts.
Returns:
-
CheckpointTuple | None–A
CheckpointTuplecontaining the retrieved checkpoint data if a fact is found, otherwiseNone.
Source code in memstate/integrations/langgraph.py
list
list(
config: RunnableConfig | None,
*,
filter: dict[str, Any] | None = None,
before: RunnableConfig | None = None,
limit: int | None = None
) -> Iterator[CheckpointTuple]
List and yield checkpoint tuples based on given configuration and filters.
This function retrieves fact data stored in memory, optionally applies filters based on configuration or other specified parameters, and yields checkpoint tuples sorted by timestamp. The functionality includes support for limiting the number of facts processed.
Parameters:
-
(configRunnableConfig | None) –Configuration information for filtering facts. Optional.
-
(filterdict[str, Any] | None, default:None) –Additional criteria for filtering facts based on key-value pairs. Optional.
-
(beforeRunnableConfig | None, default:None) –Configuration object to apply filter before a certain criterion. Optional.
-
(limitint | None, default:None) –Maximum number of facts to process and yield. Optional.
Returns:
-
Iterator[CheckpointTuple]–An iterator over checkpoint tuples derived from the filtered facts.
Source code in memstate/integrations/langgraph.py
put
put(
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
new_versions: dict[str, Any],
) -> RunnableConfig
Updates the state of a process by committing checkpoint metadata into memory and returning an updated configuration object.
This method handles storing the provided checkpoint and its associated metadata to facilitate process tracking. It interacts with the memory instance to ensure the relevant details are committed within the appropriate session. After updating memory, it returns a modified configuration containing the updated thread parameters.
Parameters:
-
(configRunnableConfig) –The configuration object for the runnable, which must include a
thread_idunder theconfigurablekey. -
(checkpointCheckpoint) –The checkpoint object containing state information to be stored in memory.
-
(metadataCheckpointMetadata) –Additional metadata corresponding to the checkpoint, providing supplementary details about the stored state.
-
(new_versionsdict[str, Any]) –A mapping of version keys to their new corresponding values, used to track changes in versions during the execution process.
Returns:
-
RunnableConfig–A modified configuration object reflecting the updated thread parameters after committing the provided checkpoint to memory.
Source code in memstate/integrations/langgraph.py
put_writes
put_writes(
config: RunnableConfig,
writes: Sequence[tuple[str, Any]],
task_id: str,
task_path: str = "",
) -> None
Executes the operation to store a sequence of writes by committing them as facts into memory with associated task and thread information. Each write entry in the sequence is processed with a specific channel, value, and index to generate a payload, which is then committed.
Parameters:
-
(configRunnableConfig) –The configuration object implementing the
RunnableConfiginterface. It must contain a "configurable" dictionary with a thread ID linked under the key "thread_id". -
(writesSequence[tuple[str, Any]]) –A sequence of tuples where each tuple contains a string representing the channel and an associated value of type
Anyto be committed. -
(task_idstr) –A string representing the unique identifier for the task that groups all the writes.
-
(task_pathstr, default:'') –(optional) A string that represents the path or hierarchy associated with the task. Defaults to an empty string if not provided.
Returns:
-
None–None
Source code in memstate/integrations/langgraph.py
qdrant
Qdrant integration.
Classes:
-
AsyncQdrantSyncHook–Handles synchronization of memory updates with a Qdrant collection by managing
-
FastEmbedEncoder–Default embedding implementation using FastEmbed.
-
QdrantSyncHook–Handles synchronization of memory updates with a Qdrant collection by managing
AsyncQdrantSyncHook
AsyncQdrantSyncHook(
client: AsyncQdrantClient,
collection_name: str,
embedding_fn: EmbeddingFunction | None = None,
target_types: set[str] | None = None,
text_field: str | None = None,
text_formatter: TextFormatter | None = None,
metadata_fields: list[str] | None = None,
metadata_formatter: MetadataFormatter | None = None,
distance: Distance = COSINE,
)
Bases: AsyncMemoryHook
Handles synchronization of memory updates with a Qdrant collection by managing CRUD operations on vector embeddings and metadata. Designed for real-time updates and integration with Qdrant database collections.
The class provides support for maintaining vector embeddings, setting up collection parameters, formatting metadata, and handling various operations such as inserts, updates, deletions, and session discards. Additionally, it supports embedding functions and configurable distance metrics for vector similarity functionality.
Attributes:
-
client(AsyncQdrantClient) –Client instance for interacting with the Qdrant database.
-
collection_name(str) –Name of the Qdrant collection to synchronize with.
-
embedding_fn(EmbeddingFunction | None) –Function responsible for generating vector embeddings, defaulting to FastEmbedEncoder if not provided.
-
target_types(set[str] | None) –Set of target types that define which operations are supported for synchronization. Defaults to an empty set.
-
distance(Distance) –Metric to compute vector similarity in Qdrant, e.g., COSINE, EUCLIDEAN.
-
metadata_fields(list[str] | None) –List of fields to include in the metadata payload.
-
metadata_formatter(MetadataFormatter | None) –Formatter function for structuring metadata. Optional.
Source code in memstate/integrations/qdrant.py
FastEmbedEncoder
FastEmbedEncoder(
model_name: str = "sentence-transformers/all-MiniLM-L6-v2",
options: dict[str, Any] | None = None,
)
Default embedding implementation using FastEmbed. Used if no custom embedding_fn is provided.
This class provides a lightweight wrapper around the FastEmbed library to generate embeddings for text inputs. The encoder initializes with a specific pre-trained model from FastEmbed and can be invoked to generate a numerical vector representation of the provided text.
Attributes:
-
model(TextEmbedding) –Instance of the FastEmbed TextEmbedding model used to generate embeddings.
If the FastEmbed library is not installed on the system, an ImportError is raised, advising the user to install the library or provide a custom embedding function.
Parameters:
-
(model_namestr, default:'sentence-transformers/all-MiniLM-L6-v2') –The name of the model to be used for text embeddings. Defaults to "sentence-transformers/all-MiniLM-L6-v2".
-
(optionsdict[str, Any] | None, default:None) –A dictionary of options for configuring the embedding model. If not provided, an empty dictionary is used.
Raises:
-
ImportError–If the FastEmbed library is not installed on the system, this exception is raised, advising the use of
pip install fastembedor providing a custom embedding function.
Source code in memstate/integrations/qdrant.py
QdrantSyncHook
QdrantSyncHook(
client: QdrantClient,
collection_name: str,
embedding_fn: EmbeddingFunction | None = None,
target_types: set[str] | None = None,
text_field: str | None = None,
text_formatter: TextFormatter | None = None,
metadata_fields: list[str] | None = None,
metadata_formatter: MetadataFormatter | None = None,
distance: Distance = COSINE,
)
Bases: MemoryHook
Handles synchronization of memory updates with a Qdrant collection by managing CRUD operations on vector embeddings and metadata. Designed for real-time updates and integration with Qdrant database collections.
The class provides support for maintaining vector embeddings, setting up collection parameters, formatting metadata, and handling various operations such as inserts, updates, deletions, and session discards. Additionally, it supports embedding functions and configurable distance metrics for vector similarity functionality.
Example
encoder = FastEmbedEncoder(model_name="BAAI/bge-small-en-v1.5", options={"cuda": True})
hook = QdrantSyncHook(client, "memory", embedding_fn=encoder)
resp = openai.embeddings.create(input=text, model="text-embedding-3-small")
openai_embedder = resp.data[0].embedding
hook = QdrantSyncHook(client, "memory", embedding_fn=openai_embedder)
Attributes:
-
client(QdrantClient) –Client instance for interacting with the Qdrant database.
-
collection_name(str) –Name of the Qdrant collection to synchronize with.
-
embedding_fn(EmbeddingFunction | None) –Function responsible for generating vector embeddings, defaulting to FastEmbedEncoder if not provided.
-
target_types(set[str] | None) –Set of target types that define which operations are supported for synchronization. Defaults to an empty set.
-
distance(Distance) –Metric to compute vector similarity in Qdrant, e.g., COSINE, EUCLIDEAN.
-
metadata_fields(list[str] | None) –List of fields to include in the metadata payload.
-
metadata_formatter(MetadataFormatter | None) –Formatter function for structuring metadata. Optional.