Skip to content

integrations

Modules:

chroma

Chomadb integration.

Classes:

  • AsyncChromaSyncHook

    Handles synchronization of memory data with Chroma collections by integrating

  • ChromaSyncHook

    Handles synchronization of memory data with Chroma collections by integrating

AsyncChromaSyncHook

AsyncChromaSyncHook(
    client: AsyncClientAPI,
    collection_name: str,
    embedding_fn: (
        EmbeddingFunction[Embeddable] | None
    ) = None,
    target_types: set[str] | None = None,
    text_field: str | None = None,
    text_formatter: TextFormatter | None = None,
    metadata_fields: list[str] | None = None,
    metadata_formatter: MetadataFormatter | None = None,
)

Bases: AsyncMemoryHook

Handles synchronization of memory data with Chroma collections by integrating fact operations and data transformations.

This class is responsible for managing connections to a Chroma collection via AsyncClientAPI, extracting and formatting text and metadata as per the provided configuration, and performing operations like deletion or upserting of facts based on various triggers or operations. It also provides flexibility in defining target data types, metadata extraction, text processing, and overall data synchronization rules.

Example
client = await chromadb.AsyncHttpClient()
hook = AsyncChromaSyncHook(client, "my_collection", text_field="content")
store = AsyncMemoryStore(AsyncInMemoryStorage(), hooks=[hook])
await store.commit_model(...)

Attributes:

  • client (AsyncClientAPI) –

    The Chroma client instance used for collection access and management.

  • collection_name (str) –

    The name of the Chroma collection to be synchronized.

  • embedding_fn (EmbeddingFunction | None) –

    Optional function for generating embeddings from text.

  • target_types (set[str]) –

    A set of fact types allowed for synchronization. If empty, all types are allowed.

  • text_field (str | None) –

    Field name of text in fact.

  • text_formatter (TextFormatter | None) –

    Optional custom function for extracting text. Overrides text_field if provided.

  • metadata_fields (list[str]) –

    The list of metadata fields to extract from the fact payload. Used if no metadata formatter is provided.

  • metadata_formatter (MetadataFormatter | None) –

    Optional custom function for extracting metadata. Overrides metadata_fields if provided.

Source code in memstate/integrations/chroma.py
def __init__(
    self,
    client: AsyncClientAPI,
    collection_name: str,
    embedding_fn: EmbeddingFunction[Embeddable] | None = None,
    target_types: set[str] | None = None,
    text_field: str | None = None,
    text_formatter: TextFormatter | None = None,
    metadata_fields: list[str] | None = None,
    metadata_formatter: MetadataFormatter | None = None,
):
    self.client = client
    self.collection_name = collection_name
    self.embedding_fn = embedding_fn

    self._collection: AsyncCollection | None = None

    self.target_types = target_types or set()

    if text_formatter is not None:
        self._extract_text = text_formatter
    elif text_field:
        self._extract_text = lambda data: str(data.get(text_field, ""))
    else:
        self._extract_text = lambda data: str(data)

    self.metadata_fields = metadata_fields or []
    self.metadata_formatter = metadata_formatter

ChromaSyncHook

ChromaSyncHook(
    client: ClientAPI,
    collection_name: str,
    embedding_fn: (
        EmbeddingFunction[Embeddable] | None
    ) = None,
    target_types: set[str] | None = None,
    text_field: str | None = None,
    text_formatter: TextFormatter | None = None,
    metadata_fields: list[str] | None = None,
    metadata_formatter: MetadataFormatter | None = None,
)

Bases: MemoryHook

Handles synchronization of memory data with Chroma collections by integrating fact operations and data transformations.

This class is responsible for managing connections to a Chroma collection via a Chroma client, extracting and formatting text and metadata as per the provided configuration, and performing operations like deletion or upserting of facts based on various triggers or operations. It also provides flexibility in defining target data types, metadata extraction, text processing, and overall data synchronization rules.

Attributes:

  • client (ClientAPI) –

    The Chroma client instance used for collection access and management.

  • collection_name (str) –

    The name of the Chroma collection to be synchronized.

  • embedding_fn (EmbeddingFunction | None) –

    Optional function for generating embeddings from text.

  • target_types (set[str]) –

    A set of fact types allowed for synchronization. If empty, all types are allowed.

  • text_field (str | None) –

    Field name of text in fact.

  • text_formatter (TextFormatter | None) –

    Optional custom function for extracting text. Overrides text_field if provided.

  • metadata_fields (list[str]) –

    The list of metadata fields to extract from the fact payload. Used if no metadata formatter is provided.

  • metadata_formatter (MetadataFormatter | None) –

    Optional custom function for extracting metadata. Overrides metadata_fields if provided.

Source code in memstate/integrations/chroma.py
def __init__(
    self,
    client: ClientAPI,
    collection_name: str,
    embedding_fn: EmbeddingFunction[Embeddable] | None = None,
    target_types: set[str] | None = None,
    text_field: str | None = None,
    text_formatter: TextFormatter | None = None,
    metadata_fields: list[str] | None = None,
    metadata_formatter: MetadataFormatter | None = None,
):
    self.client = client
    self.collection = client.get_or_create_collection(
        name=collection_name,
        embedding_function=embedding_fn,
    )
    self.target_types = target_types or set()

    if text_formatter is not None:
        self._extract_text = text_formatter
    elif text_field:
        self._extract_text = lambda data: str(data.get(text_field, ""))
    else:
        self._extract_text = lambda data: str(data)

    self.metadata_fields = metadata_fields or []
    self.metadata_formatter = metadata_formatter

langgraph

LangGraph Checkpointer.

Classes:

AsyncMemStateCheckpointer

AsyncMemStateCheckpointer(
    memory: AsyncMemoryStore,
    serde: SerializerProtocol | None = None,
)

Bases: BaseCheckpointSaver[str]

Async manages the storage, retrieval, and deletion of checkpoint data in memory.

The AsyncMemStateCheckpointer class enables storing checkpoints using an in-memory storage system, facilitating workflows that require checkpointing and versioning mechanisms. It interacts with a memory store to persist checkpoint data and associated metadata, supporting use cases that require checkpointer objects functioning as temporary memory storage.

Attributes:

  • memory (AsyncMemoryStore) –

    Reference to the memory store used for storage operations.

  • serde (SerializerProtocol) –

    Serializer for serializing checkpoint data.

  • fact_type (str) –

    String identifier for checkpoint facts within the memory store.

  • write_type (str) –

    String identifier for write facts within the memory store.

Methods:

  • adelete_thread

    Asynchronously deletes a specific thread from memory by its identifier.

  • aget_tuple

    Asynchronously gets a checkpoint tuple based on the provided configuration.

  • alist

    Asynchronously list and yield checkpoint tuples based on given configuration and filters.

  • aput

    Asynchronously updates the state of a process by committing checkpoint metadata into memory

  • aput_writes

    Asynchronously executes the operation to store a sequence of writes by committing them as facts into memory

Source code in memstate/integrations/langgraph.py
def __init__(self, memory: AsyncMemoryStore, serde: SerializerProtocol | None = None) -> None:
    super().__init__(serde=serde or JsonPlusSerializer())
    self.memory = memory
    self.fact_type = "langgraph_checkpoint"
    self.write_type = "langgraph_write"

adelete_thread async

adelete_thread(thread_id: str) -> None

Asynchronously deletes a specific thread from memory by its identifier.

This method removes the session associated with the given thread ID from the memory, ensuring that it is no longer retained or accessed in the system.

Parameters:

  • thread_id
    (str) –

    The unique identifier of the thread to be deleted.

Returns:

  • None

    This method does not return any value.

Source code in memstate/integrations/langgraph.py
async def adelete_thread(self, thread_id: str) -> None:
    """
    Asynchronously deletes a specific thread from memory by its identifier.

    This method removes the session associated with the given thread ID
    from the memory, ensuring that it is no longer retained or accessed
    in the system.

    Args:
        thread_id (str): The unique identifier of the thread to be deleted.

    Returns:
        This method does not return any value.
    """
    await self.memory.discard_session(thread_id)

aget_tuple async

aget_tuple(
    config: RunnableConfig,
) -> CheckpointTuple | None

Asynchronously gets a checkpoint tuple based on the provided configuration.

This method queries memory to retrieve facts associated with a specific thread ID from the configuration. Depending on whether a thread timestamp is provided, it selects the most recent fact or the one matching the given timestamp. Finally, it reconstructs the checkpoint tuple based on the retrieved fact's payload.

Parameters:

  • config
    (RunnableConfig) –

    Configuration object containing thread-specific retrieval data, including thread_id and optionally thread_ts.

Returns:

  • CheckpointTuple | None

    A CheckpointTuple containing the retrieved checkpoint data if a fact is found, otherwise None.

Source code in memstate/integrations/langgraph.py
async def aget_tuple(self, config: RunnableConfig) -> CheckpointTuple | None:
    """
    Asynchronously gets a checkpoint tuple based on the provided configuration.

    This method queries memory to retrieve facts associated with a specific thread ID
    from the configuration. Depending on whether a thread timestamp is provided,
    it selects the most recent fact or the one matching the given timestamp.
    Finally, it reconstructs the checkpoint tuple based on the retrieved fact's payload.

    Args:
        config (RunnableConfig): Configuration object containing thread-specific retrieval
            data, including `thread_id` and optionally `thread_ts`.

    Returns:
        A `CheckpointTuple` containing the retrieved checkpoint data if a fact is found, otherwise `None`.
    """
    thread_id = config["configurable"]["thread_id"]
    thread_ts = config["configurable"].get("thread_ts")

    facts = await self.memory.storage.query(type_filter=self.fact_type, json_filters={"session_id": thread_id})

    if not facts:
        return None

    facts.sort(key=lambda x: x["ts"], reverse=True)

    fact = None
    if thread_ts:
        for f in facts:
            if f["payload"].get("thread_ts") == thread_ts:
                fact = f
                break
    else:
        fact = facts[0]

    if not fact:
        return None

    payload = fact["payload"]
    checkpoint = payload["checkpoint"]
    pending_sends = checkpoint.get("pending_sends") or []

    return CheckpointTuple(
        config=config,
        checkpoint=checkpoint,
        metadata=payload["metadata"],
        parent_config=None,
        pending_writes=pending_sends,
    )

alist async

alist(
    config: RunnableConfig | None,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None
) -> AsyncIterator[CheckpointTuple]

Asynchronously list and yield checkpoint tuples based on given configuration and filters.

This function retrieves fact data stored in memory, optionally applies filters based on configuration or other specified parameters, and yields checkpoint tuples sorted by timestamp. The functionality includes support for limiting the number of facts processed.

Parameters:

  • config
    (RunnableConfig | None) –

    Configuration information for filtering facts. Optional.

  • filter
    (dict[str, Any] | None, default: None ) –

    Additional criteria for filtering facts based on key-value pairs. Optional.

  • before
    (RunnableConfig | None, default: None ) –

    Configuration object to apply filter before a certain criterion. Optional.

  • limit
    (int | None, default: None ) –

    Maximum number of facts to process and yield. Optional.

Returns:

  • AsyncIterator[CheckpointTuple]

    An iterator over checkpoint tuples derived from the filtered facts.

Source code in memstate/integrations/langgraph.py
async def alist(
    self,
    config: RunnableConfig | None,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None,
) -> AsyncIterator[CheckpointTuple]:
    """
    Asynchronously list and yield checkpoint tuples based on given configuration and filters.

    This function retrieves fact data stored in memory, optionally applies
    filters based on configuration or other specified parameters, and yields
    checkpoint tuples sorted by timestamp. The functionality includes support
    for limiting the number of facts processed.

    Args:
        config (RunnableConfig | None): Configuration information for filtering facts. Optional.
        filter (dict[str, Any] | None): Additional criteria for filtering facts based on key-value pairs. Optional.
        before (RunnableConfig | None): Configuration object to apply filter before a certain criterion. Optional.
        limit (int | None): Maximum number of facts to process and yield. Optional.

    Returns:
        An iterator over checkpoint tuples derived from the filtered facts.
    """

    json_filters = {}
    if config and "configurable" in config:
        thread_id = config["configurable"].get("thread_id")
        if thread_id:
            json_filters["session_id"] = thread_id

    # AWAIT QUERY
    facts = await self.memory.storage.query(
        type_filter=self.fact_type, json_filters=json_filters if json_filters else None
    )
    facts.sort(key=lambda x: x["ts"], reverse=True)

    if limit:
        facts = facts[:limit]

    # ASYNC YIELD
    for fact in facts:
        payload = fact["payload"]
        yield CheckpointTuple(
            {
                "configurable": {
                    "thread_id": payload.get("thread_id") or json_filters.get("session_id"),
                    "thread_ts": payload["thread_ts"],
                }
            },
            payload["checkpoint"],
            payload["metadata"],
            (payload.get("checkpoint") or {}).get("pending_sends", []),
        )

aput async

aput(
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: dict[str, Any],
) -> RunnableConfig

Asynchronously updates the state of a process by committing checkpoint metadata into memory and returning an updated configuration object.

This method handles storing the provided checkpoint and its associated metadata to facilitate process tracking. It interacts with the memory instance to ensure the relevant details are committed within the appropriate session. After updating memory, it returns a modified configuration containing the updated thread parameters.

Parameters:

  • config
    (RunnableConfig) –

    The configuration object for the runnable, which must include a thread_id under the configurable key.

  • checkpoint
    (Checkpoint) –

    The checkpoint object containing state information to be stored in memory.

  • metadata
    (CheckpointMetadata) –

    Additional metadata corresponding to the checkpoint, providing supplementary details about the stored state.

  • new_versions
    (dict[str, Any]) –

    A mapping of version keys to their new corresponding values, used to track changes in versions during the execution process.

Returns:

  • RunnableConfig

    A modified configuration object reflecting the updated thread parameters after committing the provided checkpoint to memory.

Source code in memstate/integrations/langgraph.py
async def aput(
    self,
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: dict[str, Any],
) -> RunnableConfig:
    """
    Asynchronously updates the state of a process by committing checkpoint metadata into memory
    and returning an updated configuration object.

    This method handles storing the provided checkpoint and its associated metadata
    to facilitate process tracking. It interacts with the memory instance to ensure
    the relevant details are committed within the appropriate session. After updating
    memory, it returns a modified configuration containing the updated thread
    parameters.

    Args:
        config (RunnableConfig): The configuration object for the runnable, which must include a `thread_id` under the `configurable` key.
        checkpoint (Checkpoint): The checkpoint object containing state information to be stored in memory.
        metadata (CheckpointMetadata): Additional metadata corresponding to the checkpoint, providing
            supplementary details about the stored state.
        new_versions (dict[str, Any]): A mapping of version keys to their new corresponding
            values, used to track changes in versions during the execution process.

    Returns:
        A modified configuration object reflecting the updated thread
            parameters after committing the provided checkpoint to memory.
    """
    thread_id = config["configurable"]["thread_id"]

    payload = {
        "checkpoint": checkpoint,
        "metadata": metadata,
        "new_versions": new_versions,
        "thread_ts": checkpoint["id"],
    }

    # AWAIT COMMIT
    await self.memory.commit(
        Fact(type=self.fact_type, payload=payload, source="langgraph_checkpoint"), session_id=thread_id
    )

    return {
        "configurable": {
            "thread_id": thread_id,
            "thread_ts": checkpoint["id"],
        }
    }

aput_writes async

aput_writes(
    config: RunnableConfig,
    writes: Sequence[tuple[str, Any]],
    task_id: str,
    task_path: str = "",
) -> None

Asynchronously executes the operation to store a sequence of writes by committing them as facts into memory with associated task and thread information. Each write entry in the sequence is processed with a specific channel, value, and index to generate a payload, which is then committed.

Parameters:

  • config
    (RunnableConfig) –

    The configuration object implementing the RunnableConfig interface. It must contain a "configurable" dictionary with a thread ID linked under the key "thread_id".

  • writes
    (Sequence[tuple[str, Any]]) –

    A sequence of tuples where each tuple contains a string representing the channel and an associated value of type Any to be committed.

  • task_id
    (str) –

    A string representing the unique identifier for the task that groups all the writes.

  • task_path
    (str, default: '' ) –

    (optional) A string that represents the path or hierarchy associated with the task. Defaults to an empty string if not provided.

Returns:

  • None

    None

Source code in memstate/integrations/langgraph.py
async def aput_writes(
    self,
    config: RunnableConfig,
    writes: Sequence[tuple[str, Any]],
    task_id: str,
    task_path: str = "",
) -> None:
    """
    Asynchronously executes the operation to store a sequence of writes by committing them as facts into memory
    with associated task and thread information. Each write entry in the sequence is processed
    with a specific channel, value, and index to generate a payload, which is then committed.

    Args:
        config (RunnableConfig): The configuration object implementing the `RunnableConfig` interface. It must
            contain a "configurable" dictionary with a thread ID linked under the key "thread_id".
        writes (Sequence[tuple[str, Any]]): A sequence of tuples where each tuple contains a string representing the channel
            and an associated value of type `Any` to be committed.
        task_id (str): A string representing the unique identifier for the task that groups all the writes.
        task_path (str): (optional) A string that represents the path or hierarchy associated with
            the task. Defaults to an empty string if not provided.

    Returns:
        None
    """
    thread_id = config["configurable"]["thread_id"]

    for idx, (channel, value) in enumerate(writes):
        payload = {
            "task_id": task_id,
            "task_path": task_path,
            "channel": channel,
            "value": value,
            "idx": idx,
            "thread_id": thread_id,
        }

        # AWAIT COMMIT
        await self.memory.commit(
            Fact(type=self.write_type, payload=payload, source="langgraph_writes"), session_id=thread_id
        )

MemStateCheckpointer

MemStateCheckpointer(
    memory: MemoryStore,
    serde: SerializerProtocol | None = None,
)

Bases: BaseCheckpointSaver[str]

Manages the storage, retrieval, and deletion of checkpoint data in memory.

The MemStateCheckpointer class enables storing checkpoints using an in-memory storage system, facilitating workflows that require checkpointing and versioning mechanisms. It interacts with a memory store to persist checkpoint data and associated metadata, supporting use cases that require checkpointer objects functioning as temporary memory storage.

Attributes:

  • memory (MemoryStore) –

    Reference to the memory store used for storage operations.

  • serde (SerializerProtocol) –

    Serializer for serializing checkpoint data.

  • fact_type (str) –

    String identifier for checkpoint facts within the memory store.

  • write_type (str) –

    String identifier for write facts within the memory store.

Methods:

  • delete_thread

    Deletes a specific thread from memory by its identifier.

  • get_tuple

    Gets a checkpoint tuple based on the provided configuration.

  • list

    List and yield checkpoint tuples based on given configuration and filters.

  • put

    Updates the state of a process by committing checkpoint metadata into memory

  • put_writes

    Executes the operation to store a sequence of writes by committing them as facts into memory

Source code in memstate/integrations/langgraph.py
def __init__(self, memory: MemoryStore, serde: SerializerProtocol | None = None) -> None:
    super().__init__(serde=serde or JsonPlusSerializer())
    self.memory = memory
    self.fact_type = "langgraph_checkpoint"
    self.write_type = "langgraph_write"

delete_thread

delete_thread(thread_id: str) -> None

Deletes a specific thread from memory by its identifier.

This method removes the session associated with the given thread ID from the memory, ensuring that it is no longer retained or accessed in the system.

Parameters:

  • thread_id
    (str) –

    The unique identifier of the thread to be deleted.

Returns:

  • None

    This method does not return any value.

Source code in memstate/integrations/langgraph.py
def delete_thread(self, thread_id: str) -> None:
    """
    Deletes a specific thread from memory by its identifier.

    This method removes the session associated with the given thread ID
    from the memory, ensuring that it is no longer retained or accessed
    in the system.

    Args:
        thread_id (str): The unique identifier of the thread to be deleted.

    Returns:
        This method does not return any value.
    """
    self.memory.discard_session(thread_id)

get_tuple

get_tuple(config: RunnableConfig) -> CheckpointTuple | None

Gets a checkpoint tuple based on the provided configuration.

This method queries memory to retrieve facts associated with a specific thread ID from the configuration. Depending on whether a thread timestamp is provided, it selects the most recent fact or the one matching the given timestamp. Finally, it reconstructs the checkpoint tuple based on the retrieved fact's payload.

Parameters:

  • config
    (RunnableConfig) –

    Configuration object containing thread-specific retrieval data, including thread_id and optionally thread_ts.

Returns:

  • CheckpointTuple | None

    A CheckpointTuple containing the retrieved checkpoint data if a fact is found, otherwise None.

Source code in memstate/integrations/langgraph.py
def get_tuple(self, config: RunnableConfig) -> CheckpointTuple | None:
    """
    Gets a checkpoint tuple based on the provided configuration.

    This method queries memory to retrieve facts associated with a specific thread ID
    from the configuration. Depending on whether a thread timestamp is provided,
    it selects the most recent fact or the one matching the given timestamp.
    Finally, it reconstructs the checkpoint tuple based on the retrieved fact's payload.

    Args:
        config (RunnableConfig): Configuration object containing thread-specific retrieval
            data, including `thread_id` and optionally `thread_ts`.

    Returns:
        A `CheckpointTuple` containing the retrieved checkpoint data if a fact is found, otherwise `None`.
    """
    thread_id = config["configurable"]["thread_id"]
    thread_ts = config["configurable"].get("thread_ts")

    facts = self.memory.query(typename=self.fact_type, filters={"session_id": thread_id})

    if not facts:
        return None

    if thread_ts:
        matching = [f for f in facts if f["payload"].get("thread_ts") == thread_ts]
        fact = matching[0] if matching else None
    else:
        facts.sort(key=lambda x: x["ts"], reverse=True)
        fact = facts[0]

    if not fact:
        return None

    payload = fact["payload"]
    checkpoint = payload["checkpoint"]
    pending_sends = checkpoint.get("pending_sends") or []

    # TODO: If you need support for restoring PENDING writes,
    # you'll need to run query(typename=self.write_type) here and insert them.
    # For a basic checkpointer, this isn't always necessary, since pending_sends is included in the checkpoint.

    return CheckpointTuple(
        config=config,
        checkpoint=checkpoint,
        metadata=payload["metadata"],
        parent_config=None,  #  (optional, skip for now)
        pending_writes=pending_sends,
    )

list

list(
    config: RunnableConfig | None,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None
) -> Iterator[CheckpointTuple]

List and yield checkpoint tuples based on given configuration and filters.

This function retrieves fact data stored in memory, optionally applies filters based on configuration or other specified parameters, and yields checkpoint tuples sorted by timestamp. The functionality includes support for limiting the number of facts processed.

Parameters:

  • config
    (RunnableConfig | None) –

    Configuration information for filtering facts. Optional.

  • filter
    (dict[str, Any] | None, default: None ) –

    Additional criteria for filtering facts based on key-value pairs. Optional.

  • before
    (RunnableConfig | None, default: None ) –

    Configuration object to apply filter before a certain criterion. Optional.

  • limit
    (int | None, default: None ) –

    Maximum number of facts to process and yield. Optional.

Returns:

  • Iterator[CheckpointTuple]

    An iterator over checkpoint tuples derived from the filtered facts.

Source code in memstate/integrations/langgraph.py
def list(
    self,
    config: RunnableConfig | None,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None,
) -> Iterator[CheckpointTuple]:
    """
    List and yield checkpoint tuples based on given configuration and filters.

    This function retrieves fact data stored in memory, optionally applies
    filters based on configuration or other specified parameters, and yields
    checkpoint tuples sorted by timestamp. The functionality includes support
    for limiting the number of facts processed.

    Args:
        config (RunnableConfig | None): Configuration information for filtering facts. Optional.
        filter (dict[str, Any] | None): Additional criteria for filtering facts based on key-value pairs. Optional.
        before (RunnableConfig | None): Configuration object to apply filter before a certain criterion. Optional.
        limit (int | None): Maximum number of facts to process and yield. Optional.

    Returns:
        An iterator over checkpoint tuples derived from the filtered facts.
    """

    json_filters = {}

    if config and "configurable" in config:
        thread_id = config["configurable"].get("thread_id")
        if thread_id:
            json_filters["session_id"] = thread_id

    facts = self.memory.query(typename=self.fact_type, filters=json_filters if json_filters else None)

    facts.sort(key=lambda x: x["ts"], reverse=True)

    if limit:
        facts = facts[:limit]

    for fact in facts:
        payload = fact["payload"]
        yield CheckpointTuple(
            {
                "configurable": {
                    "thread_id": payload.get("thread_id") or json_filters.get("session_id"),
                    "thread_ts": payload["thread_ts"],
                }
            },
            payload["checkpoint"],
            payload["metadata"],
            (payload.get("checkpoint") or {}).get("pending_sends", []),
        )

put

put(
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: dict[str, Any],
) -> RunnableConfig

Updates the state of a process by committing checkpoint metadata into memory and returning an updated configuration object.

This method handles storing the provided checkpoint and its associated metadata to facilitate process tracking. It interacts with the memory instance to ensure the relevant details are committed within the appropriate session. After updating memory, it returns a modified configuration containing the updated thread parameters.

Parameters:

  • config
    (RunnableConfig) –

    The configuration object for the runnable, which must include a thread_id under the configurable key.

  • checkpoint
    (Checkpoint) –

    The checkpoint object containing state information to be stored in memory.

  • metadata
    (CheckpointMetadata) –

    Additional metadata corresponding to the checkpoint, providing supplementary details about the stored state.

  • new_versions
    (dict[str, Any]) –

    A mapping of version keys to their new corresponding values, used to track changes in versions during the execution process.

Returns:

  • RunnableConfig

    A modified configuration object reflecting the updated thread parameters after committing the provided checkpoint to memory.

Source code in memstate/integrations/langgraph.py
def put(
    self,
    config: RunnableConfig,
    checkpoint: Checkpoint,
    metadata: CheckpointMetadata,
    new_versions: dict[str, Any],
) -> RunnableConfig:
    """
    Updates the state of a process by committing checkpoint metadata into memory
    and returning an updated configuration object.

    This method handles storing the provided checkpoint and its associated metadata
    to facilitate process tracking. It interacts with the memory instance to ensure
    the relevant details are committed within the appropriate session. After updating
    memory, it returns a modified configuration containing the updated thread
    parameters.

    Args:
        config (RunnableConfig): The configuration object for the runnable, which must include a `thread_id` under the `configurable` key.
        checkpoint (Checkpoint): The checkpoint object containing state information to be stored in memory.
        metadata (CheckpointMetadata): Additional metadata corresponding to the checkpoint, providing
            supplementary details about the stored state.
        new_versions (dict[str, Any]): A mapping of version keys to their new corresponding
            values, used to track changes in versions during the execution process.

    Returns:
        A modified configuration object reflecting the updated thread
            parameters after committing the provided checkpoint to memory.
    """
    thread_id = config["configurable"]["thread_id"]

    payload = {
        "checkpoint": checkpoint,
        "metadata": metadata,
        "new_versions": new_versions,
        "thread_ts": checkpoint["id"],
    }

    self.memory.commit(
        Fact(type=self.fact_type, payload=payload, source="langgraph_checkpoint"), session_id=thread_id
    )

    return {
        "configurable": {
            "thread_id": thread_id,
            "thread_ts": checkpoint["id"],
        }
    }

put_writes

put_writes(
    config: RunnableConfig,
    writes: Sequence[tuple[str, Any]],
    task_id: str,
    task_path: str = "",
) -> None

Executes the operation to store a sequence of writes by committing them as facts into memory with associated task and thread information. Each write entry in the sequence is processed with a specific channel, value, and index to generate a payload, which is then committed.

Parameters:

  • config
    (RunnableConfig) –

    The configuration object implementing the RunnableConfig interface. It must contain a "configurable" dictionary with a thread ID linked under the key "thread_id".

  • writes
    (Sequence[tuple[str, Any]]) –

    A sequence of tuples where each tuple contains a string representing the channel and an associated value of type Any to be committed.

  • task_id
    (str) –

    A string representing the unique identifier for the task that groups all the writes.

  • task_path
    (str, default: '' ) –

    (optional) A string that represents the path or hierarchy associated with the task. Defaults to an empty string if not provided.

Returns:

  • None

    None

Source code in memstate/integrations/langgraph.py
def put_writes(
    self,
    config: RunnableConfig,
    writes: Sequence[tuple[str, Any]],
    task_id: str,
    task_path: str = "",
) -> None:
    """
    Executes the operation to store a sequence of writes by committing them as facts into memory
    with associated task and thread information. Each write entry in the sequence is processed
    with a specific channel, value, and index to generate a payload, which is then committed.

    Args:
        config (RunnableConfig): The configuration object implementing the `RunnableConfig` interface. It must
            contain a "configurable" dictionary with a thread ID linked under the key "thread_id".
        writes (Sequence[tuple[str, Any]]): A sequence of tuples where each tuple contains a string representing the channel
            and an associated value of type `Any` to be committed.
        task_id (str): A string representing the unique identifier for the task that groups all the writes.
        task_path (str): (optional) A string that represents the path or hierarchy associated with
            the task. Defaults to an empty string if not provided.

    Returns:
        None
    """
    thread_id = config["configurable"]["thread_id"]

    for idx, (channel, value) in enumerate(writes):
        payload = {
            "task_id": task_id,
            "task_path": task_path,
            "channel": channel,
            "value": value,
            "idx": idx,
            "thread_id": thread_id,
        }

        self.memory.commit(
            Fact(type=self.write_type, payload=payload, source="langgraph_writes"), session_id=thread_id
        )

qdrant

Qdrant integration.

Classes:

  • AsyncQdrantSyncHook

    Handles synchronization of memory updates with a Qdrant collection by managing

  • FastEmbedEncoder

    Default embedding implementation using FastEmbed.

  • QdrantSyncHook

    Handles synchronization of memory updates with a Qdrant collection by managing

AsyncQdrantSyncHook

AsyncQdrantSyncHook(
    client: AsyncQdrantClient,
    collection_name: str,
    embedding_fn: EmbeddingFunction | None = None,
    target_types: set[str] | None = None,
    text_field: str | None = None,
    text_formatter: TextFormatter | None = None,
    metadata_fields: list[str] | None = None,
    metadata_formatter: MetadataFormatter | None = None,
    distance: Distance = COSINE,
)

Bases: AsyncMemoryHook

Handles synchronization of memory updates with a Qdrant collection by managing CRUD operations on vector embeddings and metadata. Designed for real-time updates and integration with Qdrant database collections.

The class provides support for maintaining vector embeddings, setting up collection parameters, formatting metadata, and handling various operations such as inserts, updates, deletions, and session discards. Additionally, it supports embedding functions and configurable distance metrics for vector similarity functionality.

Attributes:

  • client (AsyncQdrantClient) –

    Client instance for interacting with the Qdrant database.

  • collection_name (str) –

    Name of the Qdrant collection to synchronize with.

  • embedding_fn (EmbeddingFunction | None) –

    Function responsible for generating vector embeddings, defaulting to FastEmbedEncoder if not provided.

  • target_types (set[str] | None) –

    Set of target types that define which operations are supported for synchronization. Defaults to an empty set.

  • distance (Distance) –

    Metric to compute vector similarity in Qdrant, e.g., COSINE, EUCLIDEAN.

  • metadata_fields (list[str] | None) –

    List of fields to include in the metadata payload.

  • metadata_formatter (MetadataFormatter | None) –

    Formatter function for structuring metadata. Optional.

Source code in memstate/integrations/qdrant.py
def __init__(
    self,
    client: AsyncQdrantClient,
    collection_name: str,
    embedding_fn: EmbeddingFunction | None = None,
    target_types: set[str] | None = None,
    text_field: str | None = None,
    text_formatter: TextFormatter | None = None,
    metadata_fields: list[str] | None = None,
    metadata_formatter: MetadataFormatter | None = None,
    distance: models.Distance = models.Distance.COSINE,
) -> None:
    self.client = client
    self.collection_name = collection_name
    self.embedding_fn = embedding_fn or FastEmbedEncoder()
    self.target_types = target_types or set()
    self.distance = distance

    self._collection_checked = False

    if text_formatter is not None:
        self._extract_text = text_formatter
    elif text_field:
        self._extract_text = lambda data: str(data.get(text_field, ""))
    else:
        self._extract_text = lambda data: str(data)

    self.metadata_fields = metadata_fields or []
    self.metadata_formatter = metadata_formatter

FastEmbedEncoder

FastEmbedEncoder(
    model_name: str = "sentence-transformers/all-MiniLM-L6-v2",
    options: dict[str, Any] | None = None,
)

Default embedding implementation using FastEmbed. Used if no custom embedding_fn is provided.

This class provides a lightweight wrapper around the FastEmbed library to generate embeddings for text inputs. The encoder initializes with a specific pre-trained model from FastEmbed and can be invoked to generate a numerical vector representation of the provided text.

Attributes:

  • model (TextEmbedding) –

    Instance of the FastEmbed TextEmbedding model used to generate embeddings.

If the FastEmbed library is not installed on the system, an ImportError is raised, advising the user to install the library or provide a custom embedding function.

Parameters:

  • model_name

    (str, default: 'sentence-transformers/all-MiniLM-L6-v2' ) –

    The name of the model to be used for text embeddings. Defaults to "sentence-transformers/all-MiniLM-L6-v2".

  • options

    (dict[str, Any] | None, default: None ) –

    A dictionary of options for configuring the embedding model. If not provided, an empty dictionary is used.

Raises:

  • ImportError

    If the FastEmbed library is not installed on the system, this exception is raised, advising the use of pip install fastembed or providing a custom embedding function.

Source code in memstate/integrations/qdrant.py
def __init__(
    self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2", options: dict[str, Any] | None = None
):
    """
    Initializes the embedding model using the specified `model_name` and optional `options`
    dictionary. The class leverages the FastEmbed library for handling text embeddings.

    If the FastEmbed library is not installed on the system, an ImportError is raised,
    advising the user to install the library or provide a custom embedding function.

    Args:
        model_name (str): The name of the model to be used for text embeddings.
            Defaults to "sentence-transformers/all-MiniLM-L6-v2".
        options (dict[str, Any] | None): A dictionary of options for configuring the embedding model.
            If not provided, an empty dictionary is used.

    Raises:
        ImportError: If the FastEmbed library is not installed on the system, this exception
            is raised, advising the use of `pip install fastembed` or providing a custom embedding function.
    """
    try:
        from fastembed import TextEmbedding
    except ImportError:
        raise ImportError(
            "FastEmbed is not installed. " "Install it via `pip install fastembed` or pass a custom `embedding_fn`."
        )
    self.model = TextEmbedding(model_name, **(options or {}))

QdrantSyncHook

QdrantSyncHook(
    client: QdrantClient,
    collection_name: str,
    embedding_fn: EmbeddingFunction | None = None,
    target_types: set[str] | None = None,
    text_field: str | None = None,
    text_formatter: TextFormatter | None = None,
    metadata_fields: list[str] | None = None,
    metadata_formatter: MetadataFormatter | None = None,
    distance: Distance = COSINE,
)

Bases: MemoryHook

Handles synchronization of memory updates with a Qdrant collection by managing CRUD operations on vector embeddings and metadata. Designed for real-time updates and integration with Qdrant database collections.

The class provides support for maintaining vector embeddings, setting up collection parameters, formatting metadata, and handling various operations such as inserts, updates, deletions, and session discards. Additionally, it supports embedding functions and configurable distance metrics for vector similarity functionality.

Example
encoder = FastEmbedEncoder(model_name="BAAI/bge-small-en-v1.5", options={"cuda": True})
hook = QdrantSyncHook(client, "memory", embedding_fn=encoder)
resp = openai.embeddings.create(input=text, model="text-embedding-3-small")
openai_embedder = resp.data[0].embedding
hook = QdrantSyncHook(client, "memory", embedding_fn=openai_embedder)

Attributes:

  • client (QdrantClient) –

    Client instance for interacting with the Qdrant database.

  • collection_name (str) –

    Name of the Qdrant collection to synchronize with.

  • embedding_fn (EmbeddingFunction | None) –

    Function responsible for generating vector embeddings, defaulting to FastEmbedEncoder if not provided.

  • target_types (set[str] | None) –

    Set of target types that define which operations are supported for synchronization. Defaults to an empty set.

  • distance (Distance) –

    Metric to compute vector similarity in Qdrant, e.g., COSINE, EUCLIDEAN.

  • metadata_fields (list[str] | None) –

    List of fields to include in the metadata payload.

  • metadata_formatter (MetadataFormatter | None) –

    Formatter function for structuring metadata. Optional.

Source code in memstate/integrations/qdrant.py
def __init__(
    self,
    client: QdrantClient,
    collection_name: str,
    embedding_fn: EmbeddingFunction | None = None,
    target_types: set[str] | None = None,
    text_field: str | None = None,
    text_formatter: TextFormatter | None = None,
    metadata_fields: list[str] | None = None,
    metadata_formatter: MetadataFormatter | None = None,
    distance: models.Distance = models.Distance.COSINE,
) -> None:
    self.client = client
    self.collection_name = collection_name

    self.embedding_fn = embedding_fn or FastEmbedEncoder()

    self.target_types = target_types or set()
    self.distance = distance

    if text_formatter is not None:
        self._extract_text = text_formatter
    elif text_field:
        self._extract_text = lambda data: str(data.get(text_field, ""))
    else:
        self._extract_text = lambda data: str(data)

    self.metadata_fields = metadata_fields or []
    self.metadata_formatter = metadata_formatter

    self._ensure_collection()