Skip to content

backends

Modules:

  • base

    Base storage backend interface.

  • inmemory

    In-memory storage backend implementation.

  • postgres

    Postgres storage backend implementation using SQLAlchemy.

  • redis

    Redis storage backend implementation.

  • sqlite

    SQLite storage backend implementation.

base

Base storage backend interface.

Classes:

AsyncStorageBackend

Bases: ABC

Asynchronous storage interface (non-blocking I/O).

Methods:

  • append_tx

    Log a transaction asynchronously.

  • close

    Cleanup resources asynchronously.

  • delete

    Delete a fact asynchronously.

  • delete_session

    Bulk delete ephemeral facts asynchronously. Returns deleted IDs.

  • delete_txs

    Delete specific transactions from the log by their UUIDs.

  • get_session_facts

    Retrieve all facts belonging to a specific session.

  • get_tx_log

    Retrieve transaction history asynchronously.

  • load

    Load a single fact by ID asynchronously.

  • query

    Find facts matching criteria asynchronously.

  • save

    Upsert a fact asynchronously.

append_tx abstractmethod async

append_tx(tx_data: dict[str, Any]) -> None

Log a transaction asynchronously.

Source code in memstate/backends/base.py
@abstractmethod
async def append_tx(self, tx_data: dict[str, Any]) -> None:
    """Log a transaction asynchronously."""
    pass

close async

close() -> None

Cleanup resources asynchronously.

Source code in memstate/backends/base.py
async def close(self) -> None:
    """Cleanup resources asynchronously."""
    pass

delete abstractmethod async

delete(id: str) -> None

Delete a fact asynchronously.

Source code in memstate/backends/base.py
@abstractmethod
async def delete(self, id: str) -> None:
    """Delete a fact asynchronously."""
    pass

delete_session abstractmethod async

delete_session(session_id: str) -> list[str]

Bulk delete ephemeral facts asynchronously. Returns deleted IDs.

Source code in memstate/backends/base.py
@abstractmethod
async def delete_session(self, session_id: str) -> list[str]:
    """Bulk delete ephemeral facts asynchronously. Returns deleted IDs."""
    pass

delete_txs abstractmethod async

delete_txs(tx_uuids: list[str]) -> None

Delete specific transactions from the log by their UUIDs.

Source code in memstate/backends/base.py
@abstractmethod
async def delete_txs(self, tx_uuids: list[str]) -> None:
    """Delete specific transactions from the log by their UUIDs."""
    pass

get_session_facts abstractmethod async

get_session_facts(session_id: str) -> list[dict[str, Any]]

Retrieve all facts belonging to a specific session.

Source code in memstate/backends/base.py
@abstractmethod
async def get_session_facts(self, session_id: str) -> list[dict[str, Any]]:
    """Retrieve all facts belonging to a specific session."""
    pass

get_tx_log abstractmethod async

get_tx_log(
    session_id: str, limit: int = 100, offset: int = 0
) -> list[dict[str, Any]]

Retrieve transaction history asynchronously.

Source code in memstate/backends/base.py
@abstractmethod
async def get_tx_log(self, session_id: str, limit: int = 100, offset: int = 0) -> list[dict[str, Any]]:
    """Retrieve transaction history asynchronously."""
    pass

load abstractmethod async

load(id: str) -> dict[str, Any] | None

Load a single fact by ID asynchronously.

Source code in memstate/backends/base.py
@abstractmethod
async def load(self, id: str) -> dict[str, Any] | None:
    """Load a single fact by ID asynchronously."""
    pass

query abstractmethod async

query(
    type_filter: str | None = None,
    json_filters: dict[str, Any] | None = None,
) -> list[dict[str, Any]]

Find facts matching criteria asynchronously.

Source code in memstate/backends/base.py
@abstractmethod
async def query(
    self, type_filter: str | None = None, json_filters: dict[str, Any] | None = None
) -> list[dict[str, Any]]:
    """Find facts matching criteria asynchronously."""
    pass

save abstractmethod async

save(fact_data: dict[str, Any]) -> None

Upsert a fact asynchronously.

Source code in memstate/backends/base.py
@abstractmethod
async def save(self, fact_data: dict[str, Any]) -> None:
    """Upsert a fact asynchronously."""
    pass

StorageBackend

Bases: ABC

Synchronous storage interface (blocking I/O).

Methods:

  • append_tx

    Log a transaction.

  • close

    Cleanup resources (optional).

  • delete

    Delete a fact.

  • delete_session

    Bulk delete ephemeral facts (Working Memory cleanup). Returns deleted IDs.

  • delete_txs

    Delete specific transactions from the log by their UUIDs.

  • get_session_facts

    Retrieve all facts belonging to a specific session.

  • get_tx_log

    Retrieve transaction history (newest first typically, or ordered by seq).

  • load

    Load a single fact by ID.

  • query

    Find facts matching criteria.

  • save

    Upsert a fact.

append_tx abstractmethod

append_tx(tx_data: dict[str, Any]) -> None

Log a transaction.

Source code in memstate/backends/base.py
@abstractmethod
def append_tx(self, tx_data: dict[str, Any]) -> None:
    """Log a transaction."""
    pass

close

close() -> None

Cleanup resources (optional).

Source code in memstate/backends/base.py
def close(self) -> None:
    """Cleanup resources (optional)."""
    pass

delete abstractmethod

delete(id: str) -> None

Delete a fact.

Source code in memstate/backends/base.py
@abstractmethod
def delete(self, id: str) -> None:
    """Delete a fact."""
    pass

delete_session abstractmethod

delete_session(session_id: str) -> list[str]

Bulk delete ephemeral facts (Working Memory cleanup). Returns deleted IDs.

Source code in memstate/backends/base.py
@abstractmethod
def delete_session(self, session_id: str) -> list[str]:
    """Bulk delete ephemeral facts (Working Memory cleanup). Returns deleted IDs."""
    pass

delete_txs abstractmethod

delete_txs(tx_uuids: list[str]) -> None

Delete specific transactions from the log by their UUIDs.

Source code in memstate/backends/base.py
@abstractmethod
def delete_txs(self, tx_uuids: list[str]) -> None:
    """Delete specific transactions from the log by their UUIDs."""
    pass

get_session_facts abstractmethod

get_session_facts(session_id: str) -> list[dict[str, Any]]

Retrieve all facts belonging to a specific session.

Source code in memstate/backends/base.py
@abstractmethod
def get_session_facts(self, session_id: str) -> list[dict[str, Any]]:
    """Retrieve all facts belonging to a specific session."""
    pass

get_tx_log abstractmethod

get_tx_log(
    session_id: str, limit: int = 100, offset: int = 0
) -> list[dict[str, Any]]

Retrieve transaction history (newest first typically, or ordered by seq).

Source code in memstate/backends/base.py
@abstractmethod
def get_tx_log(self, session_id: str, limit: int = 100, offset: int = 0) -> list[dict[str, Any]]:
    """Retrieve transaction history (newest first typically, or ordered by seq)."""
    pass

load abstractmethod

load(id: str) -> dict[str, Any] | None

Load a single fact by ID.

Source code in memstate/backends/base.py
@abstractmethod
def load(self, id: str) -> dict[str, Any] | None:
    """Load a single fact by ID."""
    pass

query abstractmethod

query(
    type_filter: str | None = None,
    json_filters: dict[str, Any] | None = None,
) -> list[dict[str, Any]]

Find facts matching criteria.

Source code in memstate/backends/base.py
@abstractmethod
def query(self, type_filter: str | None = None, json_filters: dict[str, Any] | None = None) -> list[dict[str, Any]]:
    """Find facts matching criteria."""
    pass

save abstractmethod

save(fact_data: dict[str, Any]) -> None

Upsert a fact.

Source code in memstate/backends/base.py
@abstractmethod
def save(self, fact_data: dict[str, Any]) -> None:
    """Upsert a fact."""
    pass

inmemory

In-memory storage backend implementation.

Classes:

AsyncInMemoryStorage

AsyncInMemoryStorage()

Bases: AsyncStorageBackend

Class representing an async in-memory storage backend.

Provides methods for storing, retrieving, deleting, querying, and managing session-related and transaction-log data entirely within memory. This class implements thread-safe operations and supports querying with filtering logic using hierarchical paths in JSON-like structures.

Attributes:

  • _store (dict[str, dict[str, Any]]) –

    Internal storage for facts indexed by their ID.

  • _tx_log (list[dict[str, Any]]) –

    List of transaction log entries.

  • _lock (Lock) –

    Asynchronous lock to ensure safe concurrent access to the storage and transaction log.

Methods:

  • append_tx

    Asynchronously appends a transaction record to the transaction log in a thread-safe manner.

  • close

    Asynchronously closes the current open resource or connection.

  • delete

    Asynchronously removes an entry from the store based on the provided identifier. If the identifier

  • delete_session

    Asynchronously deletes all facts associated with a given session ID from the store.

  • delete_txs

    Asynchronously removes a list of transactions from the transaction log whose session IDs match the provided

  • get_session_facts

    Asynchronously retrieves all facts associated with a specific session.

  • get_tx_log

    Asynchronously retrieves and returns a portion of the transaction log. The transaction log is accessed in

  • load

    Asynchronously loads an item from the store based on the provided identifier.

  • query

    Asynchronously query data from the internal store based on specified filters.

  • save

    Asynchronously saves the given fact data into the internal store. The save operation is thread-safe

Source code in memstate/backends/inmemory.py
def __init__(self) -> None:
    self._store: dict[str, dict[str, Any]] = {}
    self._tx_log: list[dict[str, Any]] = []
    self._lock = asyncio.Lock()

append_tx async

append_tx(tx_data: dict[str, Any]) -> None

Asynchronously appends a transaction record to the transaction log in a thread-safe manner.

Parameters:

  • tx_data
    (dict[str, Any]) –

    A dictionary containing transaction data to be appended.

Returns:

  • None

    None

Source code in memstate/backends/inmemory.py
async def append_tx(self, tx_data: dict[str, Any]) -> None:
    """
    Asynchronously appends a transaction record to the transaction log in a thread-safe manner.

    Args:
        tx_data (dict[str, Any]): A dictionary containing transaction data to be appended.

    Returns:
        None
    """
    async with self._lock:
        self._tx_log.append(tx_data)

close async

close() -> None

Asynchronously closes the current open resource or connection.

This method is responsible for cleanup or finalization tasks. It ensures that resources, such as file handles or network connections, are properly released or closed. Once called, the resource cannot be used again unless it is reopened.

Returns:

  • None

    None

Source code in memstate/backends/inmemory.py
async def close(self) -> None:
    """
    Asynchronously closes the current open resource or connection.

    This method is responsible for cleanup or finalization tasks.
    It ensures that resources, such as file handles or network connections,
    are properly released or closed. Once called, the resource cannot
    be used again unless it is reopened.

    Returns:
        None
    """
    pass

delete async

delete(id: str) -> None

Asynchronously removes an entry from the store based on the provided identifier. If the identifier does not exist, the method performs no action and completes silently.

Parameters:

  • id
    (str) –

    The identifier of the entry to be removed from the store. Must be a string.

Returns:

  • None

    None

Source code in memstate/backends/inmemory.py
async def delete(self, id: str) -> None:
    """
    Asynchronously removes an entry from the store based on the provided identifier. If the identifier
    does not exist, the method performs no action and completes silently.

    Args:
        id (str): The identifier of the entry to be removed from the store. Must be a string.

    Returns:
        None
    """
    async with self._lock:
        self._store.pop(id, None)

delete_session async

delete_session(session_id: str) -> list[str]

Asynchronously deletes all facts associated with a given session ID from the store.

This method identifies all fact records in the store that are linked to the specified session ID, removes them, and returns a list of fact identifiers that were deleted.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose associated facts should be removed.

Returns:

  • list[str]

    A list of fact ids identifiers that were deleted from the store.

Source code in memstate/backends/inmemory.py
async def delete_session(self, session_id: str) -> list[str]:
    """
    Asynchronously deletes all facts associated with a given session ID from the store.

    This method identifies all fact records in the store that are linked to the specified
    session ID, removes them, and returns a list of fact identifiers that were deleted.

    Args:
        session_id (str): The identifier of the session whose associated facts should be removed.

    Returns:
        A list of fact ids identifiers that were deleted from the store.
    """
    async with self._lock:
        to_delete = [fid for fid, f in self._store.items() if f.get("session_id") == session_id]
        for fid in to_delete:
            del self._store[fid]
        return to_delete

delete_txs async

delete_txs(tx_uuids: list[str]) -> None

Asynchronously removes a list of transactions from the transaction log whose session IDs match the provided transaction IDs. If the provided list is empty, no transactions are processed.

Parameters:

  • tx_uuids
    (list[str]) –

    A list of transaction UUIDs to be removed from the log.

Returns:

  • None

    None

Source code in memstate/backends/inmemory.py
async def delete_txs(self, tx_uuids: list[str]) -> None:
    """
    Asynchronously removes a list of transactions from the transaction log whose session IDs match the provided
    transaction IDs. If the provided list is empty, no transactions are processed.

    Args:
        tx_uuids (list[str]): A list of transaction UUIDs to be removed from the log.

    Returns:
        None
    """
    if not tx_uuids:
        return

    async with self._lock:
        ids_to_delete = set(tx_uuids)

        self._tx_log = [tx for tx in self._tx_log if tx["uuid"] not in ids_to_delete]

get_session_facts async

get_session_facts(session_id: str) -> list[dict[str, Any]]

Asynchronously retrieves all facts associated with a specific session.

This method filters and returns a list of all facts from the internal store that match the provided session ID. Each fact is represented as a dictionary, and the list may be empty if no facts match the provided session ID.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose facts are to be retrieved.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries, where each dictionary represents a fact related to the specified session.

Source code in memstate/backends/inmemory.py
async def get_session_facts(self, session_id: str) -> list[dict[str, Any]]:
    """
    Asynchronously retrieves all facts associated with a specific session.

    This method filters and returns a list of all facts from the internal store
    that match the provided session ID. Each fact is represented as a dictionary,
    and the list may be empty if no facts match the provided session ID.

    Args:
        session_id (str): The identifier of the session whose facts are to be retrieved.

    Returns:
        A list of dictionaries, where each dictionary represents a fact related to the specified session.
    """
    return [f for f in self._store.values() if f.get("session_id") == session_id]

get_tx_log async

get_tx_log(
    session_id: str, limit: int = 100, offset: int = 0
) -> list[dict[str, Any]]

Asynchronously retrieves and returns a portion of the transaction log. The transaction log is accessed in reverse order of insertion, i.e., the most recently added item is the first in the result.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose transactions should be retrieved.

  • limit
    (int, default: 100 ) –

    The maximum number of transaction log entries to be retrieved. Default is 100.

  • offset
    (int, default: 0 ) –

    The starting position relative to the most recent entry that determines where to begin retrieving the log entries. Default is 0.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries representing the requested subset of the transaction log. The dictionaries contain details of individual transaction log entries.

Source code in memstate/backends/inmemory.py
async def get_tx_log(self, session_id: str, limit: int = 100, offset: int = 0) -> list[dict[str, Any]]:
    """
    Asynchronously retrieves and returns a portion of the transaction log. The transaction log is accessed in
    reverse order of insertion, i.e., the most recently added item is the first in the result.

    Args:
        session_id (str): The identifier of the session whose transactions should be retrieved.
        limit (int): The maximum number of transaction log entries to be retrieved. Default is 100.
        offset (int): The starting position relative to the most recent entry that determines where to begin
            retrieving the log entries. Default is 0.

    Returns:
        A list of dictionaries representing the requested subset of the transaction log. The dictionaries
            contain details of individual transaction log entries.
    """
    async with self._lock:
        reversed_log = reversed(self._tx_log)
        filtered = [tx for tx in reversed_log if tx.get("session_id") == session_id]
        return filtered[offset : offset + limit]

load async

load(id: str) -> dict[str, Any] | None

Asynchronously loads an item from the store based on the provided identifier.

This method retrieves the item associated with the given id from the internal store. If no item is found for the provided identifier, it returns None.

Parameters:

  • id
    (str) –

    The unique identifier of the item to load.

Returns:

  • dict[str, Any] | None

    The item retrieved from the store or None if the identifier does not exist in the store.

Source code in memstate/backends/inmemory.py
async def load(self, id: str) -> dict[str, Any] | None:
    """
    Asynchronously loads an item from the store based on the provided identifier.

    This method retrieves the item associated with the given `id`
    from the internal store. If no item is found for the provided
    identifier, it returns ``None``.

    Args:
        id (str): The unique identifier of the item to load.

    Returns:
        The item retrieved from the store or ``None`` if the identifier does not exist in the store.
    """
    async with self._lock:
        return self._store.get(id)

query async

query(
    type_filter: str | None = None,
    json_filters: dict[str, Any] | None = None,
) -> list[dict[str, Any]]

Asynchronously query data from the internal store based on specified filters.

This method iterates through the internal store and filters the data based on the provided type_filter and json_filters. The results will include only the entries that match all specified filtering criteria.

Parameters:

  • type_filter
    (str | None, default: None ) –

    Optional filter to include only items with a matching "type" field. If None, this filter is ignored.

  • json_filters
    (dict[str, Any] | None, default: None ) –

    A dictionary where keys represent the path within the JSON data structure, and values represent the required values for inclusion. If None, this filter is ignored.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries containing the data entries from the internal store that match the specified filters.

Source code in memstate/backends/inmemory.py
async def query(
    self, type_filter: str | None = None, json_filters: dict[str, Any] | None = None
) -> list[dict[str, Any]]:
    """
    Asynchronously query data from the internal store based on specified filters.

    This method iterates through the internal store and filters the data based on
    the provided `type_filter` and `json_filters`. The results will include
    only the entries that match all specified filtering criteria.

    Args:
        type_filter (str | None): Optional filter to include only items with a matching "type" field.
            If None, this filter is ignored.
        json_filters (dict[str, Any] | None): A dictionary where keys represent the path within the JSON
            data structure, and values represent the required values for inclusion.
            If None, this filter is ignored.

    Returns:
        A list of dictionaries containing the data entries from the internal store that match the specified filters.
    """
    async with self._lock:
        results = []
        for fact in self._store.values():
            if type_filter and fact["type"] != type_filter:
                continue

            if json_filters:
                match = True
                for k, v in json_filters.items():
                    actual_val = self._get_value_by_path(fact, k)
                    if actual_val != v:
                        match = False
                        break
                if not match:
                    continue

            results.append(fact)
        return results

save async

save(fact_data: dict[str, Any]) -> None

Asynchronously saves the given fact data into the internal store. The save operation is thread-safe and ensures data consistency by utilizing a lock mechanism.

Parameters:

  • fact_data
    (dict[str, Any]) –

    A dictionary containing fact data to be stored. The dictionary must include an "id" key with a corresponding value as a unique identifier.

Returns:

  • None

    None

Source code in memstate/backends/inmemory.py
async def save(self, fact_data: dict[str, Any]) -> None:
    """
    Asynchronously saves the given fact data into the internal store. The save operation is thread-safe
    and ensures data consistency by utilizing a lock mechanism.

    Args:
        fact_data (dict[str, Any]): A dictionary containing fact data to be stored. The dictionary
            must include an "id" key with a corresponding value as a unique identifier.

    Returns:
        None
    """
    async with self._lock:
        self._store[fact_data["id"]] = fact_data

InMemoryStorage

InMemoryStorage()

Bases: StorageBackend

Class representing an in-memory storage backend.

Provides methods for storing, retrieving, deleting, querying, and managing session-related and transaction-log data entirely within memory. This class implements thread-safe operations and supports querying with filtering logic using hierarchical paths in JSON-like structures.

Attributes:

  • _store (dict[str, dict[str, Any]]) –

    Internal storage for facts indexed by their ID.

  • _tx_log (list[dict[str, Any]]) –

    List of transaction log entries.

  • _lock (RLock) –

    Reentrant lock for synchronizing access to the storage.

Methods:

  • append_tx

    Appends a transaction record to the transaction log in a thread-safe manner.

  • close

    Closes the current open resource or connection.

  • delete

    Removes an entry from the store based on the provided identifier. If the identifier

  • delete_session

    Deletes all facts associated with a given session ID from the store.

  • delete_txs

    Removes a list of transactions from the transaction log whose session IDs match the provided

  • get_session_facts

    Retrieves all facts associated with a specific session.

  • get_tx_log

    Retrieves and returns a portion of the transaction log. The transaction log is accessed in

  • load

    Loads an item from the store based on the provided identifier.

  • query

    Query data from the internal store based on specified filters.

  • save

    Saves the given fact data into the internal store. The save operation is thread-safe

Source code in memstate/backends/inmemory.py
def __init__(self) -> None:
    self._store: dict[str, dict[str, Any]] = {}
    self._tx_log: list[dict[str, Any]] = []
    self._lock = threading.RLock()

append_tx

append_tx(tx_data: dict[str, Any]) -> None

Appends a transaction record to the transaction log in a thread-safe manner.

Parameters:

  • tx_data
    (dict[str, Any]) –

    A dictionary containing transaction data to be appended.

Returns:

  • None

    None

Source code in memstate/backends/inmemory.py
def append_tx(self, tx_data: dict[str, Any]) -> None:
    """
    Appends a transaction record to the transaction log in a thread-safe manner.

    Args:
        tx_data (dict[str, Any]): A dictionary containing transaction data to be appended.

    Returns:
        None
    """
    with self._lock:
        self._tx_log.append(tx_data)

close

close() -> None

Closes the current open resource or connection.

This method is responsible for cleanup or finalization tasks. It ensures that resources, such as file handles or network connections, are properly released or closed. Once called, the resource cannot be used again unless it is reopened.

Returns:

  • None

    None

Source code in memstate/backends/inmemory.py
def close(self) -> None:
    """
    Closes the current open resource or connection.

    This method is responsible for cleanup or finalization tasks.
    It ensures that resources, such as file handles or network connections,
    are properly released or closed. Once called, the resource cannot
    be used again unless it is reopened.

    Returns:
        None
    """
    pass

delete

delete(id: str) -> None

Removes an entry from the store based on the provided identifier. If the identifier does not exist, the method performs no action and completes silently.

Parameters:

  • id
    (str) –

    The identifier of the entry to be removed from the store. Must be a string.

Returns:

  • None

    None

Source code in memstate/backends/inmemory.py
def delete(self, id: str) -> None:
    """
    Removes an entry from the store based on the provided identifier. If the identifier
    does not exist, the method performs no action and completes silently.

    Args:
        id (str): The identifier of the entry to be removed from the store. Must be a string.

    Returns:
        None
    """
    with self._lock:
        self._store.pop(id, None)

delete_session

delete_session(session_id: str) -> list[str]

Deletes all facts associated with a given session ID from the store.

This method identifies all fact records in the store that are linked to the specified session ID, removes them, and returns a list of fact identifiers that were deleted.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose associated facts should be removed.

Returns:

  • list[str]

    A list of fact ids identifiers that were deleted from the store.

Source code in memstate/backends/inmemory.py
def delete_session(self, session_id: str) -> list[str]:
    """
    Deletes all facts associated with a given session ID from the store.

    This method identifies all fact records in the store that are linked to the specified
    session ID, removes them, and returns a list of fact identifiers that were deleted.

    Args:
        session_id (str): The identifier of the session whose associated facts should be removed.

    Returns:
        A list of fact ids identifiers that were deleted from the store.
    """
    with self._lock:
        to_delete = [fid for fid, f in self._store.items() if f.get("session_id") == session_id]
        for fid in to_delete:
            del self._store[fid]
        return to_delete

delete_txs

delete_txs(tx_uuids: list[str]) -> None

Removes a list of transactions from the transaction log whose session IDs match the provided transaction IDs. If the provided list is empty, no transactions are processed.

Parameters:

  • tx_uuids
    (list[str]) –

    A list of transaction UUIDs to be removed from the log.

Returns:

  • None

    None

Source code in memstate/backends/inmemory.py
def delete_txs(self, tx_uuids: list[str]) -> None:
    """
    Removes a list of transactions from the transaction log whose session IDs match the provided
    transaction IDs. If the provided list is empty, no transactions are processed.

    Args:
        tx_uuids (list[str]): A list of transaction UUIDs to be removed from the log.

    Returns:
        None
    """
    if not tx_uuids:
        return

    with self._lock:
        ids_to_delete = set(tx_uuids)

        self._tx_log = [tx for tx in self._tx_log if tx["uuid"] not in ids_to_delete]

get_session_facts

get_session_facts(session_id: str) -> list[dict[str, Any]]

Retrieves all facts associated with a specific session.

This method filters and returns a list of all facts from the internal store that match the provided session ID. Each fact is represented as a dictionary, and the list may be empty if no facts match the provided session ID.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose facts are to be retrieved.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries, where each dictionary represents a fact related to the specified session.

Source code in memstate/backends/inmemory.py
def get_session_facts(self, session_id: str) -> list[dict[str, Any]]:
    """
    Retrieves all facts associated with a specific session.

    This method filters and returns a list of all facts from the internal store
    that match the provided session ID. Each fact is represented as a dictionary,
    and the list may be empty if no facts match the provided session ID.

    Args:
        session_id (str): The identifier of the session whose facts are to be retrieved.

    Returns:
        A list of dictionaries, where each dictionary represents a fact related to the specified session.
    """
    return [f for f in self._store.values() if f.get("session_id") == session_id]

get_tx_log

get_tx_log(
    session_id: str, limit: int = 100, offset: int = 0
) -> list[dict[str, Any]]

Retrieves and returns a portion of the transaction log. The transaction log is accessed in reverse order of insertion, i.e., the most recently added item is the first in the result.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose transactions should be retrieved.

  • limit
    (int, default: 100 ) –

    The maximum number of transaction log entries to be retrieved. Default is 100.

  • offset
    (int, default: 0 ) –

    The starting position relative to the most recent entry that determines where to begin retrieving the log entries. Default is 0.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries representing the requested subset of the transaction log. The dictionaries contain details of individual transaction log entries.

Source code in memstate/backends/inmemory.py
def get_tx_log(self, session_id: str, limit: int = 100, offset: int = 0) -> list[dict[str, Any]]:
    """
    Retrieves and returns a portion of the transaction log. The transaction log is accessed in
    reverse order of insertion, i.e., the most recently added item is the first in the result.

    Args:
        session_id (str): The identifier of the session whose transactions should be retrieved.
        limit (int): The maximum number of transaction log entries to be retrieved. Default is 100.
        offset (int): The starting position relative to the most recent entry that determines where to begin
            retrieving the log entries. Default is 0.

    Returns:
        A list of dictionaries representing the requested subset of the transaction log. The dictionaries
            contain details of individual transaction log entries.
    """
    with self._lock:
        reversed_log = reversed(self._tx_log)
        filtered = [tx for tx in reversed_log if tx.get("session_id") == session_id]
        return filtered[offset : offset + limit]

load

load(id: str) -> dict[str, Any] | None

Loads an item from the store based on the provided identifier.

This method retrieves the item associated with the given id from the internal store. If no item is found for the provided identifier, it returns None.

Parameters:

  • id
    (str) –

    The unique identifier of the item to load.

Returns:

  • dict[str, Any] | None

    The item retrieved from the store or None if the identifier does not exist in the store.

Source code in memstate/backends/inmemory.py
def load(self, id: str) -> dict[str, Any] | None:
    """
    Loads an item from the store based on the provided identifier.

    This method retrieves the item associated with the given `id`
    from the internal store. If no item is found for the provided
    identifier, it returns ``None``.

    Args:
        id (str): The unique identifier of the item to load.

    Returns:
        The item retrieved from the store or ``None`` if the identifier does not exist in the store.
    """
    with self._lock:
        return self._store.get(id)

query

query(
    type_filter: str | None = None,
    json_filters: dict[str, Any] | None = None,
) -> list[dict[str, Any]]

Query data from the internal store based on specified filters.

This method iterates through the internal store and filters the data based on the provided type_filter and json_filters. The results will include only the entries that match all specified filtering criteria.

Parameters:

  • type_filter
    (str | None, default: None ) –

    Optional filter to include only items with a matching "type" field. If None, this filter is ignored.

  • json_filters
    (dict[str, Any] | None, default: None ) –

    A dictionary where keys represent the path within the JSON data structure, and values represent the required values for inclusion. If None, this filter is ignored.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries containing the data entries from the internal store that match the specified filters.

Source code in memstate/backends/inmemory.py
def query(self, type_filter: str | None = None, json_filters: dict[str, Any] | None = None) -> list[dict[str, Any]]:
    """
    Query data from the internal store based on specified filters.

    This method iterates through the internal store and filters the data based on
    the provided `type_filter` and `json_filters`. The results will include
    only the entries that match all specified filtering criteria.

    Args:
        type_filter (str | None): Optional filter to include only items with a matching "type" field.
            If None, this filter is ignored.
        json_filters (dict[str, Any] | None): A dictionary where keys represent the path within the JSON
            data structure, and values represent the required values for inclusion.
            If None, this filter is ignored.

    Returns:
        A list of dictionaries containing the data entries from the internal store that match the specified filters.
    """
    with self._lock:
        results = []
        for fact in self._store.values():
            if type_filter and fact["type"] != type_filter:
                continue
            if json_filters:
                match = True
                for k, v in json_filters.items():
                    # The simplest depth-first search payload
                    actual_val = self._get_value_by_path(fact, k)
                    if actual_val != v:
                        match = False
                        break
                if not match:
                    continue
            results.append(fact)
        return results

save

save(fact_data: dict[str, Any]) -> None

Saves the given fact data into the internal store. The save operation is thread-safe and ensures data consistency by utilizing a lock mechanism.

Parameters:

  • fact_data
    (dict[str, Any]) –

    A dictionary containing fact data to be stored. The dictionary must include an "id" key with a corresponding value as a unique identifier.

Returns:

  • None

    None

Source code in memstate/backends/inmemory.py
def save(self, fact_data: dict[str, Any]) -> None:
    """
    Saves the given fact data into the internal store. The save operation is thread-safe
    and ensures data consistency by utilizing a lock mechanism.

    Args:
        fact_data (dict[str, Any]): A dictionary containing fact data to be stored. The dictionary
            must include an "id" key with a corresponding value as a unique identifier.

    Returns:
        None
    """
    with self._lock:
        self._store[fact_data["id"]] = fact_data

postgres

Postgres storage backend implementation using SQLAlchemy.

Classes:

  • AsyncPostgresStorage

    Async storage backend implementation using PostgreSQL and SQLAlchemy.

  • PostgresStorage

    Storage backend implementation using PostgreSQL and SQLAlchemy.

AsyncPostgresStorage

AsyncPostgresStorage(
    engine_or_url: str | AsyncEngine,
    table_prefix: str = "memstate",
)

Bases: AsyncStorageBackend

Async storage backend implementation using PostgreSQL and SQLAlchemy.

This class provides methods for interacting with a PostgreSQL database to store, retrieve, and manage structured data and logs. It uses SQLAlchemy for ORM capabilities and supports advanced querying and filtering using JSONB.

Example
store = AsyncPostgresStorage(...)
await store.create_tables()

Attributes _engine (str | Engine): SQLAlchemy Engine or connection URL for interacting with the PostgreSQL database. _metadata (MetaData): SQLAlchemy MetaData object for defining table schemas. _table_prefix (str): Prefix for naming tables to avoid conflicts. _facts_table (Table): SQLAlchemy Table for storing facts data with JSONB indexing. _log_table (Table): SQLAlchemy Table for transaction logs.

Methods:

  • append_tx

    Asynchronously appends a transaction record to the transaction log.

  • close

    Asynchronously closes the current open resource or connection.

  • create_tables

    Helper to create tables asynchronously (uses run_sync).

  • delete

    Asynchronously removes an entry from the store based on the provided identifier. If the identifier

  • delete_session

    Asynchronously deletes all facts associated with a given session ID from the store.

  • delete_txs

    Asynchronously removes a list of transactions from the transaction log whose session IDs match the provided

  • get_session_facts

    Asynchronously retrieves all facts associated with a specific session.

  • get_tx_log

    Asynchronously retrieves and returns a portion of the transaction log. The transaction log is accessed in

  • load

    Asynchronously loads an item from the store based on the provided identifier.

  • query

    Asynchronously query data from the internal store based on specified filters.

  • save

    Asynchronously saves the given fact data into the internal store. The save operation

Source code in memstate/backends/postgres.py
def __init__(self, engine_or_url: str | AsyncEngine, table_prefix: str = "memstate") -> None:
    if isinstance(engine_or_url, str):
        self._engine = create_async_engine(engine_or_url, future=True)
    else:
        self._engine = engine_or_url

    self._metadata = MetaData()
    self._table_prefix = table_prefix

    self._facts_table = Table(
        f"{table_prefix}_facts",
        self._metadata,
        Column("id", String, primary_key=True),
        Column("doc", JSONB, nullable=False),
    )

    self._log_table = Table(
        f"{table_prefix}_log",
        self._metadata,
        Column("seq", Integer, primary_key=True, autoincrement=True),
        Column("session_id", String, index=True, nullable=True),
        Column("entry", JSONB, nullable=False),
        Index(f"ix_{table_prefix}_log_entry_gin", "entry", postgresql_using="gin"),
    )
    Index(f"ix_{table_prefix}_log_uuid", self._log_table.c.entry["uuid"].astext, postgresql_using="btree"),

append_tx async

append_tx(tx_data: dict[str, Any]) -> None

Asynchronously appends a transaction record to the transaction log.

Parameters:

  • tx_data
    (dict[str, Any]) –

    A dictionary containing transaction data to be appended.

Returns:

  • None

    None

Source code in memstate/backends/postgres.py
async def append_tx(self, tx_data: dict[str, Any]) -> None:
    """
    Asynchronously appends a transaction record to the transaction log.

    Args:
        tx_data (dict[str, Any]): A dictionary containing transaction data to be appended.

    Returns:
        None
    """
    session_id = tx_data.get("session_id")

    async with self._engine.begin() as conn:
        await conn.execute(self._log_table.insert().values(session_id=session_id, entry=tx_data))

close async

close() -> None

Asynchronously closes the current open resource or connection.

This method is responsible for cleanup or finalization tasks. It ensures that resources, such as file handles or network connections, are properly released or closed. Once called, the resource cannot be used again unless it is reopened.

Returns:

  • None

    None

Source code in memstate/backends/postgres.py
async def close(self) -> None:
    """
    Asynchronously closes the current open resource or connection.

    This method is responsible for cleanup or finalization tasks.
    It ensures that resources, such as file handles or network connections,
    are properly released or closed. Once called, the resource cannot
    be used again unless it is reopened.

    Returns:
        None
    """
    await self._engine.dispose()

create_tables async

create_tables() -> None

Helper to create tables asynchronously (uses run_sync).

Returns:

  • None

    None

Source code in memstate/backends/postgres.py
async def create_tables(self) -> None:
    """
    Helper to create tables asynchronously (uses run_sync).

    Returns:
        None
    """
    async with self._engine.begin() as conn:
        await conn.run_sync(self._metadata.create_all)

delete async

delete(id: str) -> None

Asynchronously removes an entry from the store based on the provided identifier. If the identifier does not exist, the method performs no action and completes silently.

Parameters:

  • id
    (str) –

    The identifier of the entry to be removed from the store. Must be a string.

Returns:

  • None

    None

Source code in memstate/backends/postgres.py
async def delete(self, id: str) -> None:
    """
    Asynchronously removes an entry from the store based on the provided identifier. If the identifier
    does not exist, the method performs no action and completes silently.

    Args:
        id (str): The identifier of the entry to be removed from the store. Must be a string.

    Returns:
        None
    """
    async with self._engine.begin() as conn:
        await conn.execute(delete(self._facts_table).where(self._facts_table.c.id == id))

delete_session async

delete_session(session_id: str) -> list[str]

Asynchronously deletes all facts associated with a given session ID from the store.

This method identifies all fact records in the store that are linked to the specified session ID, removes them, and returns a list of fact identifiers that were deleted.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose associated facts should be removed.

Returns:

  • list[str]

    A list of fact ids identifiers that were deleted from the store.

Source code in memstate/backends/postgres.py
async def delete_session(self, session_id: str) -> list[str]:
    """
    Asynchronously deletes all facts associated with a given session ID from the store.

    This method identifies all fact records in the store that are linked to the specified
    session ID, removes them, and returns a list of fact identifiers that were deleted.

    Args:
        session_id (str): The identifier of the session whose associated facts should be removed.

    Returns:
        A list of fact ids identifiers that were deleted from the store.
    """
    del_stmt = (
        delete(self._facts_table)
        .where(self._facts_table.c.doc["session_id"].astext == session_id)
        .returning(self._facts_table.c.id)
    )
    async with self._engine.begin() as conn:
        result = await conn.execute(del_stmt)
        return [r[0] for r in result.all()]

delete_txs async

delete_txs(tx_uuids: list[str]) -> None

Asynchronously removes a list of transactions from the transaction log whose session IDs match the provided transaction IDs. If the provided list is empty, no transactions are processed.

Parameters:

  • tx_uuids
    (list[str]) –

    A list of transaction UUIDs to be removed from the log.

Returns:

  • None

    None

Source code in memstate/backends/postgres.py
async def delete_txs(self, tx_uuids: list[str]) -> None:
    """
    Asynchronously removes a list of transactions from the transaction log whose session IDs match the provided
    transaction IDs. If the provided list is empty, no transactions are processed.

    Args:
        tx_uuids (list[str]): A list of transaction UUIDs to be removed from the log.

    Returns:
        None
    """
    if not tx_uuids:
        return

    stmt = delete(self._log_table).where(self._log_table.c.entry["uuid"].astext.in_(tx_uuids))

    async with self._engine.begin() as conn:
        await conn.execute(stmt)

get_session_facts async

get_session_facts(session_id: str) -> list[dict[str, Any]]

Asynchronously retrieves all facts associated with a specific session.

This method filters and returns a list of all facts from the internal store that match the provided session ID. Each fact is represented as a dictionary, and the list may be empty if no facts match the provided session ID.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose facts are to be retrieved.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries, where each dictionary represents a fact related to the specified session.

Source code in memstate/backends/postgres.py
async def get_session_facts(self, session_id: str) -> list[dict[str, Any]]:
    """
    Asynchronously retrieves all facts associated with a specific session.

    This method filters and returns a list of all facts from the internal store
    that match the provided session ID. Each fact is represented as a dictionary,
    and the list may be empty if no facts match the provided session ID.

    Args:
        session_id (str): The identifier of the session whose facts are to be retrieved.

    Returns:
        A list of dictionaries, where each dictionary represents a fact related to the specified session.
    """
    stmt = select(self._facts_table.c.doc).where(self._facts_table.c.doc["session_id"].astext == session_id)
    async with self._engine.connect() as conn:
        result = await conn.execute(stmt)
        return [r[0] for r in result.all()]

get_tx_log async

get_tx_log(
    session_id: str, limit: int = 100, offset: int = 0
) -> list[dict[str, Any]]

Asynchronously retrieves and returns a portion of the transaction log. The transaction log is accessed in reverse order of insertion, i.e., the most recently added item is the first in the result.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose transactions should be retrieved.

  • limit
    (int, default: 100 ) –

    The maximum number of transaction log entries to be retrieved. Default is 100.

  • offset
    (int, default: 0 ) –

    The starting position relative to the most recent entry that determines where to begin retrieving the log entries. Default is 0.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries representing the requested subset of the transaction log. The dictionaries contain details of individual transaction log entries.

Source code in memstate/backends/postgres.py
async def get_tx_log(self, session_id: str, limit: int = 100, offset: int = 0) -> list[dict[str, Any]]:
    """
    Asynchronously retrieves and returns a portion of the transaction log. The transaction log is accessed in
    reverse order of insertion, i.e., the most recently added item is the first in the result.

    Args:
        session_id (str): The identifier of the session whose transactions should be retrieved.
        limit (int): The maximum number of transaction log entries to be retrieved. Default is 100.
        offset (int): The starting position relative to the most recent entry that determines where to begin
            retrieving the log entries. Default is 0.

    Returns:
        A list of dictionaries representing the requested subset of the transaction log. The dictionaries
            contain details of individual transaction log entries.
    """
    stmt = (
        select(self._log_table.c.entry)
        .where(self._log_table.c.session_id == session_id)
        .order_by(desc(self._log_table.c.seq))
        .limit(limit)
        .offset(offset)
    )
    async with self._engine.connect() as conn:
        result = await conn.execute(stmt)
        return [r[0] for r in result.all()]

load async

load(id: str) -> dict[str, Any] | None

Asynchronously loads an item from the store based on the provided identifier.

This method retrieves the item associated with the given id from the internal store. If no item is found for the provided identifier, it returns None.

Parameters:

  • id
    (str) –

    The unique identifier of the item to load.

Returns:

  • dict[str, Any] | None

    The item retrieved from the store or None if the identifier does not exist in the store.

Source code in memstate/backends/postgres.py
async def load(self, id: str) -> dict[str, Any] | None:
    """
    Asynchronously loads an item from the store based on the provided identifier.

    This method retrieves the item associated with the given `id`
    from the internal store. If no item is found for the provided
    identifier, it returns ``None``.

    Args:
        id (str): The unique identifier of the item to load.

    Returns:
        The item retrieved from the store or ``None`` if the identifier does not exist in the store.
    """
    async with self._engine.connect() as conn:
        stmt = select(self._facts_table.c.doc).where(self._facts_table.c.id == id)
        result = await conn.execute(stmt)
        row = result.first()
        if row:
            return row[0]
        return None

query async

query(
    type_filter: str | None = None,
    json_filters: dict[str, Any] | None = None,
) -> list[dict[str, Any]]

Asynchronously query data from the internal store based on specified filters.

This method iterates through the internal store and filters the data based on the provided type_filter and json_filters. The results will include only the entries that match all specified filtering criteria.

Parameters:

  • type_filter
    (str | None, default: None ) –

    Optional filter to include only items with a matching "type" field. If None, this filter is ignored.

  • json_filters
    (dict[str, Any] | None, default: None ) –

    A dictionary where keys represent the path within the JSON data structure, and values represent the required values for inclusion. If None, this filter is ignored.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries containing the data entries from the internal store that match the specified filters.

Source code in memstate/backends/postgres.py
async def query(
    self, type_filter: str | None = None, json_filters: dict[str, Any] | None = None
) -> list[dict[str, Any]]:
    """
    Asynchronously query data from the internal store based on specified filters.

    This method iterates through the internal store and filters the data based on
    the provided `type_filter` and `json_filters`. The results will include
    only the entries that match all specified filtering criteria.

    Args:
        type_filter (str | None): Optional filter to include only items with a matching "type" field.
            If None, this filter is ignored.
        json_filters (dict[str, Any] | None): A dictionary where keys represent the path within the JSON
            data structure, and values represent the required values for inclusion.
            If None, this filter is ignored.

    Returns:
        A list of dictionaries containing the data entries from the internal store that match the specified filters.
    """
    stmt = select(self._facts_table.c.doc)

    if type_filter:
        stmt = stmt.where(self._facts_table.c.doc["type"].astext == type_filter)

    if json_filters:
        for key, value in json_filters.items():
            path_parts = key.split(".")
            json_col: Any = self._facts_table.c.doc
            for part in path_parts[:-1]:
                json_col = json_col[part]
            stmt = stmt.where(json_col[path_parts[-1]] == func.to_jsonb(value))

    async with self._engine.connect() as conn:
        result = await conn.execute(stmt)
        return [r[0] for r in result.all()]

save async

save(fact_data: dict[str, Any]) -> None

Asynchronously saves the given fact data into the internal store. The save operation and ensures data consistency by utilizing a lock mechanism.

Parameters:

  • fact_data
    (dict[str, Any]) –

    A dictionary containing fact data to be stored. The dictionary must include an "id" key with a corresponding value as a unique identifier.

Returns:

  • None

    None

Source code in memstate/backends/postgres.py
async def save(self, fact_data: dict[str, Any]) -> None:
    """
    Asynchronously saves the given fact data into the internal store. The save operation
    and ensures data consistency by utilizing a lock mechanism.

    Args:
        fact_data (dict[str, Any]): A dictionary containing fact data to be stored. The dictionary
            must include an "id" key with a corresponding value as a unique identifier.

    Returns:
        None
    """
    stmt = pg_insert(self._facts_table).values(id=fact_data["id"], doc=fact_data)
    upsert_stmt = stmt.on_conflict_do_update(index_elements=["id"], set_={"doc": stmt.excluded.doc})
    async with self._engine.begin() as conn:
        await conn.execute(upsert_stmt)

PostgresStorage

PostgresStorage(
    engine_or_url: str | Engine,
    table_prefix: str = "memstate",
)

Bases: StorageBackend

Storage backend implementation using PostgreSQL and SQLAlchemy.

This class provides methods for interacting with a PostgreSQL database to store, retrieve, and manage structured data and logs. It uses SQLAlchemy for ORM capabilities and supports advanced querying and filtering using JSONB.

Attributes:

  • _engine (str | Engine) –

    SQLAlchemy Engine or connection URL for interacting with the PostgreSQL database.

  • _metadata (MetaData) –

    SQLAlchemy MetaData object for defining table schemas.

  • _table_prefix (str) –

    Prefix for naming tables to avoid conflicts.

  • _facts_table (Table) –

    SQLAlchemy Table for storing facts data with JSONB indexing.

  • _log_table (Table) –

    SQLAlchemy Table for transaction logs.

Methods:

  • append_tx

    Appends a transaction record to the transaction log.

  • close

    Closes the current open resource or connection.

  • delete

    Removes an entry from the store based on the provided identifier. If the identifier

  • delete_session

    Deletes all facts associated with a given session ID from the store.

  • delete_txs

    Removes a list of transactions from the transaction log whose session IDs match the provided

  • get_session_facts

    Retrieves all facts associated with a specific session.

  • get_tx_log

    Retrieves and returns a portion of the transaction log. The transaction log is accessed in

  • load

    Loads an item from the store based on the provided identifier.

  • query

    Query data from the internal store based on specified filters.

  • save

    Saves the given fact data into the internal store. The save operation

Source code in memstate/backends/postgres.py
def __init__(self, engine_or_url: str | Engine, table_prefix: str = "memstate") -> None:
    if isinstance(engine_or_url, str):
        self._engine = create_engine(engine_or_url, future=True)
    else:
        self._engine = engine_or_url

    self._metadata = MetaData()
    self._table_prefix = table_prefix

    # --- Define Tables ---
    self._facts_table = Table(
        f"{table_prefix}_facts",
        self._metadata,
        Column("id", String, primary_key=True),
        Column("doc", JSONB, nullable=False),  # Используем JSONB для индексации
    )

    self._log_table = Table(
        f"{table_prefix}_log",
        self._metadata,
        Column("seq", Integer, primary_key=True, autoincrement=True),
        Column("session_id", String, index=True, nullable=True),
        Column("entry", JSONB, nullable=False),
        Index(f"ix_{table_prefix}_log_entry_gin", "entry", postgresql_using="gin"),
    )
    Index(f"ix_{table_prefix}_log_uuid", self._log_table.c.entry["uuid"].astext, postgresql_using="btree"),

    with self._engine.begin() as conn:
        self._metadata.create_all(conn)

append_tx

append_tx(tx_data: dict[str, Any]) -> None

Appends a transaction record to the transaction log.

Parameters:

  • tx_data
    (dict[str, Any]) –

    A dictionary containing transaction data to be appended.

Returns:

  • None

    None

Source code in memstate/backends/postgres.py
def append_tx(self, tx_data: dict[str, Any]) -> None:
    """
    Appends a transaction record to the transaction log.

    Args:
        tx_data (dict[str, Any]): A dictionary containing transaction data to be appended.

    Returns:
        None
    """
    session_id = tx_data.get("session_id")

    with self._engine.begin() as conn:
        conn.execute(self._log_table.insert().values(session_id=session_id, entry=tx_data))

close

close() -> None

Closes the current open resource or connection.

This method is responsible for cleanup or finalization tasks. It ensures that resources, such as file handles or network connections, are properly released or closed. Once called, the resource cannot be used again unless it is reopened.

Returns:

  • None

    None

Source code in memstate/backends/postgres.py
def close(self) -> None:
    """
    Closes the current open resource or connection.

    This method is responsible for cleanup or finalization tasks.
    It ensures that resources, such as file handles or network connections,
    are properly released or closed. Once called, the resource cannot
    be used again unless it is reopened.

    Returns:
        None
    """
    self._engine.dispose()

delete

delete(id: str) -> None

Removes an entry from the store based on the provided identifier. If the identifier does not exist, the method performs no action and completes silently.

Parameters:

  • id
    (str) –

    The identifier of the entry to be removed from the store. Must be a string.

Returns:

  • None

    None

Source code in memstate/backends/postgres.py
def delete(self, id: str) -> None:
    """
    Removes an entry from the store based on the provided identifier. If the identifier
    does not exist, the method performs no action and completes silently.

    Args:
        id (str): The identifier of the entry to be removed from the store. Must be a string.

    Returns:
        None
    """
    with self._engine.begin() as conn:
        conn.execute(delete(self._facts_table).where(self._facts_table.c.id == id))

delete_session

delete_session(session_id: str) -> list[str]

Deletes all facts associated with a given session ID from the store.

This method identifies all fact records in the store that are linked to the specified session ID, removes them, and returns a list of fact identifiers that were deleted.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose associated facts should be removed.

Returns:

  • list[str]

    A list of fact ids identifiers that were deleted from the store.

Source code in memstate/backends/postgres.py
def delete_session(self, session_id: str) -> list[str]:
    """
    Deletes all facts associated with a given session ID from the store.

    This method identifies all fact records in the store that are linked to the specified
    session ID, removes them, and returns a list of fact identifiers that were deleted.

    Args:
        session_id (str): The identifier of the session whose associated facts should be removed.

    Returns:
        A list of fact ids identifiers that were deleted from the store.
    """
    del_stmt = (
        delete(self._facts_table)
        .where(self._facts_table.c.doc["session_id"].astext == session_id)
        .returning(self._facts_table.c.id)
    )

    with self._engine.begin() as conn:
        result = conn.execute(del_stmt)
        return [r[0] for r in result.all()]

delete_txs

delete_txs(tx_uuids: list[str]) -> None

Removes a list of transactions from the transaction log whose session IDs match the provided transaction IDs. If the provided list is empty, no transactions are processed.

Parameters:

  • tx_uuids
    (list[str]) –

    A list of transaction UUIDs to be removed from the log.

Returns:

  • None

    None

Source code in memstate/backends/postgres.py
def delete_txs(self, tx_uuids: list[str]) -> None:
    """
    Removes a list of transactions from the transaction log whose session IDs match the provided
    transaction IDs. If the provided list is empty, no transactions are processed.

    Args:
        tx_uuids (list[str]): A list of transaction UUIDs to be removed from the log.

    Returns:
        None
    """
    if not tx_uuids:
        return

    stmt = delete(self._log_table).where(self._log_table.c.entry["uuid"].astext.in_(tx_uuids))

    with self._engine.begin() as conn:
        conn.execute(stmt)

get_session_facts

get_session_facts(session_id: str) -> list[dict[str, Any]]

Retrieves all facts associated with a specific session.

This method filters and returns a list of all facts from the internal store that match the provided session ID. Each fact is represented as a dictionary, and the list may be empty if no facts match the provided session ID.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose facts are to be retrieved.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries, where each dictionary represents a fact related to the specified session.

Source code in memstate/backends/postgres.py
def get_session_facts(self, session_id: str) -> list[dict[str, Any]]:
    """
    Retrieves all facts associated with a specific session.

    This method filters and returns a list of all facts from the internal store
    that match the provided session ID. Each fact is represented as a dictionary,
    and the list may be empty if no facts match the provided session ID.

    Args:
        session_id (str): The identifier of the session whose facts are to be retrieved.

    Returns:
        A list of dictionaries, where each dictionary represents a fact related to the specified session.
    """
    stmt = select(self._facts_table.c.doc).where(self._facts_table.c.doc["session_id"].astext == session_id)
    with self._engine.connect() as conn:
        rows = conn.execute(stmt).all()
        return [r[0] for r in rows]

get_tx_log

get_tx_log(
    session_id: str, limit: int = 100, offset: int = 0
) -> list[dict[str, Any]]

Retrieves and returns a portion of the transaction log. The transaction log is accessed in reverse order of insertion, i.e., the most recently added item is the first in the result.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose transactions should be retrieved.

  • limit
    (int, default: 100 ) –

    The maximum number of transaction log entries to be retrieved. Default is 100.

  • offset
    (int, default: 0 ) –

    The starting position relative to the most recent entry that determines where to begin retrieving the log entries. Default is 0.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries representing the requested subset of the transaction log. The dictionaries contain details of individual transaction log entries.

Source code in memstate/backends/postgres.py
def get_tx_log(self, session_id: str, limit: int = 100, offset: int = 0) -> list[dict[str, Any]]:
    """
    Retrieves and returns a portion of the transaction log. The transaction log is accessed in
    reverse order of insertion, i.e., the most recently added item is the first in the result.

    Args:
        session_id (str): The identifier of the session whose transactions should be retrieved.
        limit (int): The maximum number of transaction log entries to be retrieved. Default is 100.
        offset (int): The starting position relative to the most recent entry that determines where to begin
            retrieving the log entries. Default is 0.

    Returns:
        A list of dictionaries representing the requested subset of the transaction log. The dictionaries
            contain details of individual transaction log entries.
    """
    stmt = (
        select(self._log_table.c.entry)
        .where(self._log_table.c.session_id == session_id)
        .order_by(desc(self._log_table.c.seq))
        .limit(limit)
        .offset(offset)
    )
    with self._engine.connect() as conn:
        rows = conn.execute(stmt).all()
        return [r[0] for r in rows]

load

load(id: str) -> dict[str, Any] | None

Loads an item from the store based on the provided identifier.

This method retrieves the item associated with the given id from the internal store. If no item is found for the provided identifier, it returns None.

Parameters:

  • id
    (str) –

    The unique identifier of the item to load.

Returns:

  • dict[str, Any] | None

    The item retrieved from the store or None if the identifier does not exist in the store.

Source code in memstate/backends/postgres.py
def load(self, id: str) -> dict[str, Any] | None:
    """
    Loads an item from the store based on the provided identifier.

    This method retrieves the item associated with the given `id`
    from the internal store. If no item is found for the provided
    identifier, it returns ``None``.

    Args:
        id (str): The unique identifier of the item to load.

    Returns:
        The item retrieved from the store or ``None`` if the identifier does not exist in the store.
    """
    with self._engine.connect() as conn:
        stmt = select(self._facts_table.c.doc).where(self._facts_table.c.id == id)
        row = conn.execute(stmt).first()
        if row:
            return row[0]  # SQLAlchemy deserializes JSONB automatically
        return None

query

query(
    type_filter: str | None = None,
    json_filters: dict[str, Any] | None = None,
) -> list[dict[str, Any]]

Query data from the internal store based on specified filters.

This method iterates through the internal store and filters the data based on the provided type_filter and json_filters. The results will include only the entries that match all specified filtering criteria.

Parameters:

  • type_filter
    (str | None, default: None ) –

    Optional filter to include only items with a matching "type" field. If None, this filter is ignored.

  • json_filters
    (dict[str, Any] | None, default: None ) –

    A dictionary where keys represent the path within the JSON data structure, and values represent the required values for inclusion. If None, this filter is ignored.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries containing the data entries from the internal store that match the specified filters.

Source code in memstate/backends/postgres.py
def query(self, type_filter: str | None = None, json_filters: dict[str, Any] | None = None) -> list[dict[str, Any]]:
    """
    Query data from the internal store based on specified filters.

    This method iterates through the internal store and filters the data based on
    the provided `type_filter` and `json_filters`. The results will include
    only the entries that match all specified filtering criteria.

    Args:
        type_filter (str | None): Optional filter to include only items with a matching "type" field.
            If None, this filter is ignored.
        json_filters (dict[str, Any] | None): A dictionary where keys represent the path within the JSON
            data structure, and values represent the required values for inclusion.
            If None, this filter is ignored.

    Returns:
        A list of dictionaries containing the data entries from the internal store that match the specified filters.
    """
    stmt = select(self._facts_table.c.doc)

    # 1. Filter by type (fact)
    if type_filter:
        # Postgres JSONB access: doc->>'type'
        stmt = stmt.where(self._facts_table.c.doc["type"].astext == type_filter)

    # 2. JSON filters (the hardest part)
    # We expect keys of type "payload.user.id"
    if json_filters:
        for key, value in json_filters.items():
            # Split the path: payload.role -> ['payload', 'role']
            path_parts = key.split(".")

            # Building a JSONB access chain
            json_col: ColumnElement[Any] = self._facts_table.c.doc

            # Go deeper to the last key
            for part in path_parts[:-1]:
                json_col = json_col[part]

            # Compare the last key
            # Important: cast value to JSONB so that types (int/bool/str) work
            # Or use the @> (contains) operator for reliability

            # Simple option (SQLAlchemy automatically casts types when comparing JSONB)
            stmt = stmt.where(json_col[path_parts[-1]] == func.to_jsonb(value))

    with self._engine.connect() as conn:
        rows = conn.execute(stmt).all()
        return [r[0] for r in rows]

save

save(fact_data: dict[str, Any]) -> None

Saves the given fact data into the internal store. The save operation and ensures data consistency by utilizing a lock mechanism.

Parameters:

  • fact_data
    (dict[str, Any]) –

    A dictionary containing fact data to be stored. The dictionary must include an "id" key with a corresponding value as a unique identifier.

Returns:

  • None

    None

Source code in memstate/backends/postgres.py
def save(self, fact_data: dict[str, Any]) -> None:
    """
    Saves the given fact data into the internal store. The save operation
    and ensures data consistency by utilizing a lock mechanism.

    Args:
        fact_data (dict[str, Any]): A dictionary containing fact data to be stored. The dictionary
            must include an "id" key with a corresponding value as a unique identifier.

    Returns:
        None
    """
    # Postgres Native Upsert (INSERT ... ON CONFLICT DO UPDATE)
    stmt = pg_insert(self._facts_table).values(id=fact_data["id"], doc=fact_data)
    upsert_stmt = stmt.on_conflict_do_update(
        index_elements=["id"], set_={"doc": stmt.excluded.doc}  # Conflict over PK
    )

    with self._engine.begin() as conn:
        conn.execute(upsert_stmt)

redis

Redis storage backend implementation.

Classes:

  • AsyncRedisStorage

    AsyncRedisStorage class provides an async Redis-based implementation for storing and retrieving structured

  • RedisStorage

    RedisStorage class provides a Redis-based implementation for storing and retrieving structured

AsyncRedisStorage

AsyncRedisStorage(
    client_or_url: Union[
        str, Redis
    ] = "redis://localhost:6379/0",
)

Bases: AsyncStorageBackend

AsyncRedisStorage class provides an async Redis-based implementation for storing and retrieving structured data, facilitating the management of session-based storage, type-based indexing, and transaction logs.

This class aims to handle data persistence efficiently using Redis as the backend, enabling features such as loading, saving, querying, and deleting data with support for session-specific and type-specific operations. It includes tools to query and backfill JSON filters and also supports transactional logging.

Attributes:

  • prefix (str) –

    Prefix used for all Redis keys to avoid collisions with other data in the Redis instance.

  • r (Redis) –

    Redis client for performing operations against the Redis database.

  • _owns_client (bool) –

    Flag indicating whether the Redis client was created by the AsyncRedisStorage class.

Methods:

  • append_tx

    Asynchronously appends a transaction record to the transaction log.

  • close

    Asynchronously closes the current open resource or connection.

  • delete

    Asynchronously removes an entry from the store based on the provided identifier. If the identifier

  • delete_session

    Asynchronously deletes all facts associated with a given session ID from the store.

  • delete_txs

    Asynchronously removes a list of transactions from the transaction log whose session IDs match the provided

  • get_session_facts

    Asynchronously retrieves all facts associated with a specific session.

  • get_tx_log

    Asynchronously retrieves and returns a portion of the transaction log. The transaction log is accessed in

  • load

    Asynchronously loads an item from the store based on the provided identifier.

  • query

    Asynchronously query data from the internal store based on specified filters.

  • save

    Asynchronously saves the given fact data into the internal store. The save operation

Source code in memstate/backends/redis.py
def __init__(self, client_or_url: Union[str, "aredis.Redis"] = "redis://localhost:6379/0") -> None:
    self.prefix = "mem:"

    if isinstance(client_or_url, str):
        self.r = aredis.from_url(client_or_url, decode_responses=True)  # type: ignore[no-untyped-call]
        self._owns_client = True
    else:
        self.r = client_or_url
        self._owns_client = False

append_tx async

append_tx(tx_data: dict[str, Any]) -> None

Asynchronously appends a transaction record to the transaction log.

Parameters:

  • tx_data
    (dict[str, Any]) –

    A dictionary containing transaction data to be appended.

Returns:

  • None

    None

Source code in memstate/backends/redis.py
async def append_tx(self, tx_data: dict[str, Any]) -> None:
    """
    Asynchronously appends a transaction record to the transaction log.

    Args:
        tx_data (dict[str, Any]): A dictionary containing transaction data to be appended.

    Returns:
        None
    """
    uuid = tx_data["uuid"]
    seq = tx_data["seq"]
    session_id = tx_data.get("session_id")

    async with self.r.pipeline() as pipe:
        pipe.set(self._tx_key(uuid), json.dumps(tx_data, default=str))
        pipe.zadd(f"{self.prefix}tx_log", {uuid: seq})
        if session_id:
            pipe.zadd(f"{self.prefix}tx_log:session:{session_id}", {uuid: seq})
        await pipe.execute()

close async

close() -> None

Asynchronously closes the current open resource or connection.

This method is responsible for cleanup or finalization tasks. It ensures that resources, such as file handles or network connections, are properly released or closed. Once called, the resource cannot be used again unless it is reopened.

Returns:

  • None

    None

Source code in memstate/backends/redis.py
async def close(self) -> None:
    """
    Asynchronously closes the current open resource or connection.

    This method is responsible for cleanup or finalization tasks.
    It ensures that resources, such as file handles or network connections,
    are properly released or closed. Once called, the resource cannot
    be used again unless it is reopened.

    Returns:
        None
    """
    if self._owns_client:
        await self.r.aclose()

delete async

delete(id: str) -> None

Asynchronously removes an entry from the store based on the provided identifier. If the identifier does not exist, the method performs no action and completes silently.

Parameters:

  • id
    (str) –

    The identifier of the entry to be removed from the store. Must be a string.

Returns:

  • None

    None

Source code in memstate/backends/redis.py
async def delete(self, id: str) -> None:
    """
    Asynchronously removes an entry from the store based on the provided identifier. If the identifier
    does not exist, the method performs no action and completes silently.

    Args:
        id (str): The identifier of the entry to be removed from the store. Must be a string.

    Returns:
        None
    """
    data = await self.load(id)
    if data:
        async with self.r.pipeline() as pipe:
            pipe.delete(self._key(id))
            pipe.srem(f"{self.prefix}type:{data['type']}", id)
            if data.get("session_id"):
                pipe.srem(f"{self.prefix}session:{data['session_id']}", id)
            await pipe.execute()

delete_session async

delete_session(session_id: str) -> list[str]

Asynchronously deletes all facts associated with a given session ID from the store.

This method identifies all fact records in the store that are linked to the specified session ID, removes them, and returns a list of fact identifiers that were deleted.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose associated facts should be removed.

Returns:

  • list[str]

    A list of fact ids identifiers that were deleted from the store.

Source code in memstate/backends/redis.py
async def delete_session(self, session_id: str) -> list[str]:
    """
    Asynchronously deletes all facts associated with a given session ID from the store.

    This method identifies all fact records in the store that are linked to the specified
    session ID, removes them, and returns a list of fact identifiers that were deleted.

    Args:
        session_id (str): The identifier of the session whose associated facts should be removed.

    Returns:
        A list of fact ids identifiers that were deleted from the store.
    """
    key = f"{self.prefix}session:{session_id}"
    ids = list(await self.r.smembers(key))

    if not ids:
        return []

    async with self.r.pipeline() as pipe:
        for i in ids:
            pipe.delete(self._key(i))
        pipe.delete(key)
        await pipe.execute()

    return ids

delete_txs async

delete_txs(tx_uuids: list[str]) -> None

Asynchronously removes a list of transactions from the transaction log whose session IDs match the provided transaction IDs. If the provided list is empty, no transactions are processed.

Parameters:

  • tx_uuids
    (list[str]) –

    A list of transaction UUIDs to be removed from the log.

Returns:

  • None

    None

Source code in memstate/backends/redis.py
async def delete_txs(self, tx_uuids: list[str]) -> None:
    """
    Asynchronously removes a list of transactions from the transaction log whose session IDs match the provided
    transaction IDs. If the provided list is empty, no transactions are processed.

    Args:
        tx_uuids (list[str]): A list of transaction UUIDs to be removed from the log.

    Returns:
        None
    """
    if not tx_uuids:
        return

    keys_to_load = [self._tx_key(uid) for uid in tx_uuids]
    raw_data = await self.r.mget(keys_to_load)

    sessions_to_clean: dict[str, list[str]] = {}  # {session_id: [uuid, uuid]}
    for raw in raw_data:
        if raw:
            tx = json.loads(raw)
            sid = tx.get("session_id")
            uuid = tx["uuid"]
            if sid:
                if sid not in sessions_to_clean:
                    sessions_to_clean[sid] = []
                sessions_to_clean[sid].append(uuid)

    async with self.r.pipeline() as pipe:
        pipe.delete(*keys_to_load)
        pipe.zrem(f"{self.prefix}tx_log", *tx_uuids)
        for sid, uuids in sessions_to_clean.items():
            pipe.zrem(f"{self.prefix}tx_log:session:{sid}", *uuids)
        await pipe.execute()

get_session_facts async

get_session_facts(session_id: str) -> list[dict[str, Any]]

Asynchronously retrieves all facts associated with a specific session.

This method filters and returns a list of all facts from the internal store that match the provided session ID. Each fact is represented as a dictionary, and the list may be empty if no facts match the provided session ID.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose facts are to be retrieved.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries, where each dictionary represents a fact related to the specified session.

Source code in memstate/backends/redis.py
async def get_session_facts(self, session_id: str) -> list[dict[str, Any]]:
    """
    Asynchronously retrieves all facts associated with a specific session.

    This method filters and returns a list of all facts from the internal store
    that match the provided session ID. Each fact is represented as a dictionary,
    and the list may be empty if no facts match the provided session ID.

    Args:
        session_id (str): The identifier of the session whose facts are to be retrieved.

    Returns:
        A list of dictionaries, where each dictionary represents a fact related to the specified session.
    """
    key = f"{self.prefix}session:{session_id}"
    ids = await self.r.smembers(key)

    if not ids:
        return []

    async with self.r.pipeline() as pipe:
        for i in ids:
            pipe.get(self._key(i))
        raw_docs = await pipe.execute()

    results = []
    for raw_doc in raw_docs:
        if raw_doc:
            results.append(json.loads(raw_doc))
    return results

get_tx_log async

get_tx_log(
    session_id: str, limit: int = 100, offset: int = 0
) -> list[dict[str, Any]]

Asynchronously retrieves and returns a portion of the transaction log. The transaction log is accessed in reverse order of insertion, i.e., the most recently added item is the first in the result.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose transactions should be retrieved.

  • limit
    (int, default: 100 ) –

    The maximum number of transaction log entries to be retrieved. Default is 100.

  • offset
    (int, default: 0 ) –

    The starting position relative to the most recent entry that determines where to begin retrieving the log entries. Default is 0.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries representing the requested subset of the transaction log. The dictionaries contain details of individual transaction log entries.

Source code in memstate/backends/redis.py
async def get_tx_log(self, session_id: str, limit: int = 100, offset: int = 0) -> list[dict[str, Any]]:
    """
    Asynchronously retrieves and returns a portion of the transaction log. The transaction log is accessed in
    reverse order of insertion, i.e., the most recently added item is the first in the result.

    Args:
        session_id (str): The identifier of the session whose transactions should be retrieved.
        limit (int): The maximum number of transaction log entries to be retrieved. Default is 100.
        offset (int): The starting position relative to the most recent entry that determines where to begin
            retrieving the log entries. Default is 0.

    Returns:
        A list of dictionaries representing the requested subset of the transaction log. The dictionaries
            contain details of individual transaction log entries.
    """
    index_key = f"{self.prefix}tx_log:session:{session_id}"
    uuids = await self.r.zrevrange(index_key, offset, offset + limit - 1)

    if not uuids:
        return []

    tx_keys = [self._tx_key(uuid) for uuid in uuids]
    raw_data = await self.r.mget(tx_keys)

    results = []
    for item in raw_data:
        if item:
            results.append(json.loads(item))
    return results

load async

load(id: str) -> dict[str, Any] | None

Asynchronously loads an item from the store based on the provided identifier.

This method retrieves the item associated with the given id from the internal store. If no item is found for the provided identifier, it returns None.

Parameters:

  • id
    (str) –

    The unique identifier of the item to load.

Returns:

  • dict[str, Any] | None

    The item retrieved from the store or None if the identifier does not exist in the store.

Source code in memstate/backends/redis.py
async def load(self, id: str) -> dict[str, Any] | None:
    """
    Asynchronously loads an item from the store based on the provided identifier.

    This method retrieves the item associated with the given `id`
    from the internal store. If no item is found for the provided
    identifier, it returns ``None``.

    Args:
        id (str): The unique identifier of the item to load.

    Returns:
        The item retrieved from the store or ``None`` if the identifier does not exist in the store.
    """
    raw_data = await self.r.get(self._key(id))
    return json.loads(raw_data) if raw_data else None

query async

query(
    type_filter: str | None = None,
    json_filters: dict[str, Any] | None = None,
) -> list[dict[str, Any]]

Asynchronously query data from the internal store based on specified filters.

This method iterates through the internal store and filters the data based on the provided type_filter and json_filters. The results will include only the entries that match all specified filtering criteria.

Parameters:

  • type_filter
    (str | None, default: None ) –

    Optional filter to include only items with a matching "type" field. If None, this filter is ignored.

  • json_filters
    (dict[str, Any] | None, default: None ) –

    A dictionary where keys represent the path within the JSON data structure, and values represent the required values for inclusion. If None, this filter is ignored.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries containing the data entries from the internal store that match the specified filters.

Source code in memstate/backends/redis.py
async def query(
    self, type_filter: str | None = None, json_filters: dict[str, Any] | None = None
) -> list[dict[str, Any]]:
    """
    Asynchronously query data from the internal store based on specified filters.

    This method iterates through the internal store and filters the data based on
    the provided `type_filter` and `json_filters`. The results will include
    only the entries that match all specified filtering criteria.

    Args:
        type_filter (str | None): Optional filter to include only items with a matching "type" field.
            If None, this filter is ignored.
        json_filters (dict[str, Any] | None): A dictionary where keys represent the path within the JSON
            data structure, and values represent the required values for inclusion.
            If None, this filter is ignored.

    Returns:
        A list of dictionaries containing the data entries from the internal store that match the specified filters.
    """
    if type_filter:
        ids = await self.r.smembers(f"{self.prefix}type:{type_filter}")
    else:
        keys = await self.r.keys(f"{self.prefix}fact:*")
        ids = [k.split(":")[-1] for k in keys]

    if not ids:
        return []

    async with self.r.pipeline() as pipe:
        for i in list(ids):
            pipe.get(self._key(i))
        raw_docs = await pipe.execute()

    results = []
    for doc_str in raw_docs:
        if not doc_str:
            continue
        fact = json.loads(doc_str)

        if json_filters:
            match = True
            for k, v in json_filters.items():
                actual_val = self._get_value_by_path(fact, k)
                if actual_val != v:
                    match = False
                    break
            if not match:
                continue
        results.append(fact)

    return results

save async

save(fact_data: dict[str, Any]) -> None

Asynchronously saves the given fact data into the internal store. The save operation and ensures data consistency by utilizing a lock mechanism.

Parameters:

  • fact_data
    (dict[str, Any]) –

    A dictionary containing fact data to be stored. The dictionary must include an "id" key with a corresponding value as a unique identifier.

Returns:

  • None

    None

Source code in memstate/backends/redis.py
async def save(self, fact_data: dict[str, Any]) -> None:
    """
    Asynchronously saves the given fact data into the internal store. The save operation
    and ensures data consistency by utilizing a lock mechanism.

    Args:
        fact_data (dict[str, Any]): A dictionary containing fact data to be stored. The dictionary
            must include an "id" key with a corresponding value as a unique identifier.

    Returns:
        None
    """
    async with self.r.pipeline() as pipe:
        pipe.set(self._key(fact_data["id"]), json.dumps(fact_data))
        pipe.sadd(f"{self.prefix}type:{fact_data['type']}", fact_data["id"])
        if fact_data.get("session_id"):
            pipe.sadd(f"{self.prefix}session:{fact_data['session_id']}", fact_data["id"])
        await pipe.execute()

RedisStorage

RedisStorage(
    client_or_url: Union[
        str, Redis
    ] = "redis://localhost:6379/0",
)

Bases: StorageBackend

RedisStorage class provides a Redis-based implementation for storing and retrieving structured data, facilitating the management of session-based storage, type-based indexing, and transaction logs.

This class aims to handle data persistence efficiently using Redis as the backend, enabling features such as loading, saving, querying, and deleting data with support for session-specific and type-specific operations. It includes tools to query and backfill JSON filters and also supports transactional logging.

Attributes:

  • prefix (str) –

    Prefix used for all Redis keys to avoid collisions with other data in the Redis instance.

  • r (Redis) –

    Redis client for performing operations against the Redis database.

  • _owns_client (bool) –

    Flag indicating whether the Redis client was created by the RedisStorage class.

Methods:

  • append_tx

    Appends a transaction record to the transaction log.

  • close

    Closes the current open resource or connection.

  • delete

    Removes an entry from the store based on the provided identifier. If the identifier

  • delete_session

    Deletes all facts associated with a given session ID from the store.

  • delete_txs

    Removes a list of transactions from the transaction log whose session IDs match the provided

  • get_session_facts

    Retrieves all facts associated with a specific session.

  • get_tx_log

    Retrieves and returns a portion of the transaction log. The transaction log is accessed in

  • load

    Loads an item from the store based on the provided identifier.

  • query

    Query data from the internal store based on specified filters.

  • save

    Saves the given fact data into the internal store. The save operation

Source code in memstate/backends/redis.py
def __init__(self, client_or_url: Union[str, "redis.Redis"] = "redis://localhost:6379/0") -> None:
    self.prefix = "mem:"

    if isinstance(client_or_url, str):
        self.r = redis.from_url(client_or_url, decode_responses=True)  # type: ignore[no-untyped-call]
        self._owns_client = True
    else:
        self.r = client_or_url
        self._owns_client = False

append_tx

append_tx(tx_data: dict[str, Any]) -> None

Appends a transaction record to the transaction log.

Parameters:

  • tx_data
    (dict[str, Any]) –

    A dictionary containing transaction data to be appended.

Returns:

  • None

    None

Source code in memstate/backends/redis.py
def append_tx(self, tx_data: dict[str, Any]) -> None:
    """
    Appends a transaction record to the transaction log.

    Args:
        tx_data (dict[str, Any]): A dictionary containing transaction data to be appended.

    Returns:
        None
    """
    uuid = tx_data["uuid"]
    seq = tx_data["seq"]
    session_id = tx_data.get("session_id")

    pipe = self.r.pipeline()
    pipe.set(self._tx_key(uuid), json.dumps(tx_data, default=str))
    pipe.zadd(f"{self.prefix}tx_log", {uuid: seq})
    if session_id:
        pipe.zadd(f"{self.prefix}tx_log:session:{session_id}", {uuid: seq})
    pipe.execute()

close

close() -> None

Closes the current open resource or connection.

This method is responsible for cleanup or finalization tasks. It ensures that resources, such as file handles or network connections, are properly released or closed. Once called, the resource cannot be used again unless it is reopened.

Returns:

  • None

    None

Source code in memstate/backends/redis.py
def close(self) -> None:
    """
    Closes the current open resource or connection.

    This method is responsible for cleanup or finalization tasks.
    It ensures that resources, such as file handles or network connections,
    are properly released or closed. Once called, the resource cannot
    be used again unless it is reopened.

    Returns:
        None
    """
    if self._owns_client:
        self.r.close()

delete

delete(id: str) -> None

Removes an entry from the store based on the provided identifier. If the identifier does not exist, the method performs no action and completes silently.

Parameters:

  • id
    (str) –

    The identifier of the entry to be removed from the store. Must be a string.

Returns:

  • None

    None

Source code in memstate/backends/redis.py
def delete(self, id: str) -> None:
    """
    Removes an entry from the store based on the provided identifier. If the identifier
    does not exist, the method performs no action and completes silently.

    Args:
        id (str): The identifier of the entry to be removed from the store. Must be a string.

    Returns:
        None
    """
    # Need to load first to clear indexes? For speed we might skip,
    # but correctly we should clean up sets.
    # For MVP: just delete key. Indexes might have stale IDs (handled by load check).
    data = self.load(id)
    if data:
        pipe = self.r.pipeline()
        pipe.delete(self._key(id))
        pipe.srem(f"{self.prefix}type:{data['type']}", id)
        if data.get("session_id"):
            pipe.srem(f"{self.prefix}session:{data['session_id']}", id)
        pipe.execute()

delete_session

delete_session(session_id: str) -> list[str]

Deletes all facts associated with a given session ID from the store.

This method identifies all facts records in the store that are linked to the specified session ID, removes them, and returns a list of fact identifiers that were deleted.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose associated facts should be removed.

Returns:

  • list[str]

    A list of fact ids identifiers that were deleted from the store.

Source code in memstate/backends/redis.py
def delete_session(self, session_id: str) -> list[str]:
    """
    Deletes all facts associated with a given session ID from the store.

    This method identifies all facts records in the store that are linked to the specified
    session ID, removes them, and returns a list of fact identifiers that were deleted.

    Args:
        session_id (str): The identifier of the session whose associated facts should be removed.

    Returns:
        A list of fact ids identifiers that were deleted from the store.
    """
    # Get IDs from session index
    key = f"{self.prefix}session:{session_id}"
    ids = list(self.r.smembers(key))
    if not ids:
        return []

    pipe = self.r.pipeline()
    for i in ids:
        pipe.delete(self._key(i))
        # Note: cleaning type index is expensive here without reading each fact,
        # acceptable tradeoff for Redis expiration logic later.
    pipe.delete(key)  # clear index
    pipe.execute()
    return ids

delete_txs

delete_txs(tx_uuids: list[str]) -> None

Removes a list of transactions from the transaction log whose session IDs match the provided transaction IDs. If the provided list is empty, no transactions are processed.

Parameters:

  • tx_uuids
    (list[str]) –

    A list of transaction UUIDs to be removed from the log.

Returns:

  • None

    None

Source code in memstate/backends/redis.py
def delete_txs(self, tx_uuids: list[str]) -> None:
    """
    Removes a list of transactions from the transaction log whose session IDs match the provided
    transaction IDs. If the provided list is empty, no transactions are processed.

    Args:
        tx_uuids (list[str]): A list of transaction UUIDs to be removed from the log.

    Returns:
        None
    """
    if not tx_uuids:
        return

    keys_to_load = [self._tx_key(uid) for uid in tx_uuids]
    raw_data = self.r.mget(keys_to_load)

    sessions_to_clean: dict[str, list[str]] = {}  # {session_id: [uuid, uuid]}

    for raw in raw_data:
        s = self._to_str(raw)
        if s:
            tx = json.loads(s)
            sid = tx.get("session_id")
            uuid = tx["uuid"]
            if sid:
                if sid not in sessions_to_clean:
                    sessions_to_clean[sid] = []
                sessions_to_clean[sid].append(uuid)

    with self.r.pipeline() as pipe:
        pipe.delete(*keys_to_load)

        pipe.zrem(f"{self.prefix}tx_log", *tx_uuids)

        for sid, uuids in sessions_to_clean.items():
            pipe.zrem(f"{self.prefix}tx_log:session:{sid}", *uuids)

        pipe.execute()

get_session_facts

get_session_facts(session_id: str) -> list[dict[str, Any]]

Retrieves all facts associated with a specific session.

This method filters and returns a list of all facts from the internal store that match the provided session ID. Each fact is represented as a dictionary, and the list may be empty if no facts match the provided session ID.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose facts are to be retrieved.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries, where each dictionary represents a fact related to the specified session.

Source code in memstate/backends/redis.py
def get_session_facts(self, session_id: str) -> list[dict[str, Any]]:
    """
    Retrieves all facts associated with a specific session.

    This method filters and returns a list of all facts from the internal store
    that match the provided session ID. Each fact is represented as a dictionary,
    and the list may be empty if no facts match the provided session ID.

    Args:
        session_id (str): The identifier of the session whose facts are to be retrieved.

    Returns:
        A list of dictionaries, where each dictionary represents a fact related to the specified session.
    """
    key = f"{self.prefix}session:{session_id}"
    ids = self.r.smembers(key)

    if not ids:
        return []

    pipe = self.r.pipeline()
    for i in ids:
        pipe.get(self._key(i))
    raw_docs = pipe.execute()

    results = []
    for raw_doc in raw_docs:
        doc_str = self._to_str(raw_doc)
        if doc_str:
            results.append(json.loads(doc_str))
    return results

get_tx_log

get_tx_log(
    session_id: str, limit: int = 100, offset: int = 0
) -> list[dict[str, Any]]

Retrieves and returns a portion of the transaction log. The transaction log is accessed in reverse order of insertion, i.e., the most recently added item is the first in the result.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose transactions should be retrieved.

  • limit
    (int, default: 100 ) –

    The maximum number of transaction log entries to be retrieved. Default is 100.

  • offset
    (int, default: 0 ) –

    The starting position relative to the most recent entry that determines where to begin retrieving the log entries. Default is 0.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries representing the requested subset of the transaction log. The dictionaries contain details of individual transaction log entries.

Source code in memstate/backends/redis.py
def get_tx_log(self, session_id: str, limit: int = 100, offset: int = 0) -> list[dict[str, Any]]:
    """
    Retrieves and returns a portion of the transaction log. The transaction log is accessed in
    reverse order of insertion, i.e., the most recently added item is the first in the result.

    Args:
        session_id (str): The identifier of the session whose transactions should be retrieved.
        limit (int): The maximum number of transaction log entries to be retrieved. Default is 100.
        offset (int): The starting position relative to the most recent entry that determines where to begin
            retrieving the log entries. Default is 0.

    Returns:
        A list of dictionaries representing the requested subset of the transaction log. The dictionaries
            contain details of individual transaction log entries.
    """
    index_key = f"{self.prefix}tx_log:session:{session_id}"
    uuids = self.r.zrevrange(index_key, offset, offset + limit - 1)

    if not uuids:
        return []

    tx_keys = [self._tx_key(uuid) for uuid in uuids]
    raw_data = self.r.mget(tx_keys)

    results = []
    for item in raw_data:
        s = self._to_str(item)
        if s is not None:
            results.append(json.loads(s))

    return results

load

load(id: str) -> dict[str, Any] | None

Loads an item from the store based on the provided identifier.

This method retrieves the item associated with the given id from the internal store. If no item is found for the provided identifier, it returns None.

Parameters:

  • id
    (str) –

    The unique identifier of the item to load.

Returns:

  • dict[str, Any] | None

    The item retrieved from the store or None if the identifier does not exist in the store.

Source code in memstate/backends/redis.py
def load(self, id: str) -> dict[str, Any] | None:
    """
    Loads an item from the store based on the provided identifier.

    This method retrieves the item associated with the given `id`
    from the internal store. If no item is found for the provided
    identifier, it returns ``None``.

    Args:
        id (str): The unique identifier of the item to load.

    Returns:
        The item retrieved from the store or ``None`` if the identifier does not exist in the store.
    """
    raw_data = self.r.get(self._key(id))
    json_str = self._to_str(raw_data)
    return json.loads(json_str) if json_str else None

query

query(
    type_filter: str | None = None,
    json_filters: dict[str, Any] | None = None,
) -> list[dict[str, Any]]

Query data from the internal store based on specified filters.

This method iterates through the internal store and filters the data based on the provided type_filter and json_filters. The results will include only the entries that match all specified filtering criteria.

Parameters:

  • type_filter
    (str | None, default: None ) –

    Optional filter to include only items with a matching "type" field. If None, this filter is ignored.

  • json_filters
    (dict[str, Any] | None, default: None ) –

    A dictionary where keys represent the path within the JSON data structure, and values represent the required values for inclusion. If None, this filter is ignored.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries containing the data entries from the internal store that match the specified filters.

Source code in memstate/backends/redis.py
def query(self, type_filter: str | None = None, json_filters: dict[str, Any] | None = None) -> list[dict[str, Any]]:
    """
    Query data from the internal store based on specified filters.

    This method iterates through the internal store and filters the data based on
    the provided `type_filter` and `json_filters`. The results will include
    only the entries that match all specified filtering criteria.

    Args:
        type_filter (str | None): Optional filter to include only items with a matching "type" field.
            If None, this filter is ignored.
        json_filters (dict[str, Any] | None): A dictionary where keys represent the path within the JSON
            data structure, and values represent the required values for inclusion.
            If None, this filter is ignored.

    Returns:
        A list of dictionaries containing the data entries from the internal store that match the specified filters.
    """
    # Optimization: Use Set Intersections if filters allow, otherwise scan
    # Redis without RediSearch is poor at complex filtering.
    # Strategy: Get IDs from Type Index -> Load -> Filter in Python

    if type_filter:
        ids = self.r.smembers(f"{self.prefix}type:{type_filter}")
    else:
        # Dangerous scan for all keys, acceptable for MVP/Small scale
        keys = self.r.keys(f"{self.prefix}fact:*")
        ids = [k.split(":")[-1] for k in keys]

    results = []
    # Pipeline loading for speed
    if not ids:
        return []

    pipe = self.r.pipeline()
    id_list = list(ids)
    for i in id_list:
        pipe.get(self._key(i))
    raw_docs = pipe.execute()

    for raw_doc in raw_docs:
        if not raw_doc:
            continue
        doc_str = self._to_str(raw_doc)
        if doc_str is None:
            continue
        fact = json.loads(doc_str)

        # JSON Filter in Python (Backfill for NoSQL)
        if json_filters:
            match = True
            for k, v in json_filters.items():
                actual_val = self._get_value_by_path(fact, k)
                if actual_val != v:
                    match = False
                    break
            if not match:
                continue
        results.append(fact)

    return results

save

save(fact_data: dict[str, Any]) -> None

Saves the given fact data into the internal store. The save operation and ensures data consistency by utilizing a lock mechanism.

Parameters:

  • fact_data
    (dict[str, Any]) –

    A dictionary containing fact data to be stored. The dictionary must include an "id" key with a corresponding value as a unique identifier.

Returns:

  • None

    None

Source code in memstate/backends/redis.py
def save(self, fact_data: dict[str, Any]) -> None:
    """
    Saves the given fact data into the internal store. The save operation
    and ensures data consistency by utilizing a lock mechanism.

    Args:
        fact_data (dict[str, Any]): A dictionary containing fact data to be stored. The dictionary
            must include an "id" key with a corresponding value as a unique identifier.

    Returns:
        None
    """
    self.r.set(self._key(fact_data["id"]), json.dumps(fact_data))
    self.r.sadd(f"{self.prefix}type:{fact_data['type']}", fact_data["id"])
    if fact_data.get("session_id"):
        self.r.sadd(f"{self.prefix}session:{fact_data['session_id']}", fact_data["id"])

sqlite

SQLite storage backend implementation.

Classes:

  • AsyncSQLiteStorage

    Async SQLite-based storage backend for managing structured data and transactional logs.

  • SQLiteStorage

    SQLite-based storage backend for managing structured data and transactional logs.

AsyncSQLiteStorage

AsyncSQLiteStorage(
    connection_or_path: str | Connection = "memory.db",
)

Bases: AsyncStorageBackend

Async SQLite-based storage backend for managing structured data and transactional logs.

This class provides functionality to persistently store, retrieve, and manipulate data and transaction logs using an SQLite database. It supports thread-safe operations, ensures data integrity, and utilizes SQLite-specific features such as WAL mode and JSON querying.

Example
storage = AsyncSQLiteStorage("agent_async.db")
await storage.connect()

Attributes: _conn (str | aiosqlite.Connection): SQLite database connection object. _owns_connection (bool): Specifies whether the SQLiteStorage instance owns the connection and is responsible for closing it. _lock (asyncio.Lock): Threading lock that ensures thread-safe access to the database. _db (aiosqlite.Connection): Async SQLite connection object. _path (str | None): Path to the SQLite database file.

Methods:

  • append_tx

    Asynchronously appends a transaction record to the transaction log.

  • close

    Asynchronously closes the current open resource or connection.

  • connect

    Async initialization. Must be called before use.

  • delete

    Asynchronously removes an entry from the store based on the provided identifier. If the identifier

  • delete_session

    Asynchronously deletes all facts associated with a given session ID from the store.

  • delete_txs

    Removes a list of transactions from the transaction log whose session IDs match the provided

  • get_session_facts

    Asynchronously retrieves all facts associated with a specific session.

  • get_tx_log

    Asynchronously retrieves and returns a portion of the transaction log. The transaction log is accessed in

  • load

    Asynchronously loads an item from the store based on the provided identifier.

  • query

    Asynchronously query data from the internal store based on specified filters.

  • save

    Asynchronously saves the given fact data into the internal store. The save operation

Source code in memstate/backends/sqlite.py
def __init__(self, connection_or_path: str | aiosqlite.Connection = "memory.db") -> None:
    if aiosqlite is None:
        raise ImportError("Run `pip install aiosqlite` to use AsyncSQLiteStorage.")

    self._lock = asyncio.Lock()
    self._owns_connection = False
    self._db: Any = None
    self._path: str | None = None

    if isinstance(connection_or_path, str):
        self._path = connection_or_path
        self._owns_connection = True
    else:
        self._db = connection_or_path
        self._owns_connection = False

append_tx async

append_tx(tx_data: dict[str, Any]) -> None

Asynchronously appends a transaction record to the transaction log.

Parameters:

  • tx_data
    (dict[str, Any]) –

    A dictionary containing transaction data to be appended.

Returns:

  • None

    None

Source code in memstate/backends/sqlite.py
async def append_tx(self, tx_data: dict[str, Any]) -> None:
    """
    Asynchronously appends a transaction record to the transaction log.

    Args:
        tx_data (dict[str, Any]): A dictionary containing transaction data to be appended.

    Returns:
        None
    """
    async with self._lock:
        await self._db.execute(
            """
            INSERT INTO tx_log(uuid, timestamp, data)
            VALUES (?, ?, ?)
            """,
            (tx_data["uuid"], tx_data["ts"], json.dumps(tx_data, default=str)),
        )
        await self._db.commit()

close async

close() -> None

Asynchronously closes the current open resource or connection.

This method is responsible for cleanup or finalization tasks. It ensures that resources, such as file handles or network connections, are properly released or closed. Once called, the resource cannot be used again unless it is reopened.

Returns:

  • None

    None

Source code in memstate/backends/sqlite.py
async def close(self) -> None:
    """
    Asynchronously closes the current open resource or connection.

    This method is responsible for cleanup or finalization tasks.
    It ensures that resources, such as file handles or network connections,
    are properly released or closed. Once called, the resource cannot
    be used again unless it is reopened.

    Returns:
        None
    """
    if self._db:
        await self._db.close()

connect async

connect() -> None

Async initialization. Must be called before use.

Returns:

  • None

    None

Source code in memstate/backends/sqlite.py
async def connect(self) -> None:
    """
    Async initialization. Must be called before use.

    Returns:
        None
    """
    if self._owns_connection and self._path:
        self._db = await aiosqlite.connect(self._path)

    if self._db is None:
        raise ValueError("Connection not initialized properly.")

    self._db.row_factory = aiosqlite.Row
    await self._init_db()

delete async

delete(id: str) -> None

Asynchronously removes an entry from the store based on the provided identifier. If the identifier does not exist, the method performs no action and completes silently.

Parameters:

  • id
    (str) –

    The identifier of the entry to be removed from the store. Must be a string.

Returns:

  • None

    None

Source code in memstate/backends/sqlite.py
async def delete(self, id: str) -> None:
    """
    Asynchronously removes an entry from the store based on the provided identifier. If the identifier
    does not exist, the method performs no action and completes silently.

    Args:
        id (str): The identifier of the entry to be removed from the store. Must be a string.

    Returns:
        None
    """
    async with self._lock:
        await self._db.execute("DELETE FROM facts WHERE id = ?", (id,))
        await self._db.commit()

delete_session async

delete_session(session_id: str) -> list[str]

Asynchronously deletes all facts associated with a given session ID from the store.

This method identifies all fact records in the store that are linked to the specified session ID, removes them, and returns a list of fact identifiers that were deleted.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose associated facts should be removed.

Returns:

  • list[str]

    A list of fact ids identifiers that were deleted from the store.

Source code in memstate/backends/sqlite.py
async def delete_session(self, session_id: str) -> list[str]:
    """
    Asynchronously deletes all facts associated with a given session ID from the store.

    This method identifies all fact records in the store that are linked to the specified
    session ID, removes them, and returns a list of fact identifiers that were deleted.

    Args:
        session_id (str): The identifier of the session whose associated facts should be removed.

    Returns:
        A list of fact ids identifiers that were deleted from the store.
    """
    async with self._lock:
        cursor = await self._db.execute(
            "DELETE FROM facts WHERE json_extract(data, '$.session_id') = ? RETURNING id", (session_id,)
        )
        rows = await cursor.fetchall()
        ids = [row["id"] for row in rows]
        await self._db.commit()
        return ids

delete_txs async

delete_txs(tx_uuids: list[str]) -> None

Removes a list of transactions from the transaction log whose session IDs match the provided transaction IDs. If the provided list is empty, no transactions are processed.

Parameters:

  • tx_uuids
    (list[str]) –

    A list of transaction UUIDs to be removed from the log.

Returns:

  • None

    None

Source code in memstate/backends/sqlite.py
async def delete_txs(self, tx_uuids: list[str]) -> None:
    """
    Removes a list of transactions from the transaction log whose session IDs match the provided
    transaction IDs. If the provided list is empty, no transactions are processed.

    Args:
        tx_uuids (list[str]): A list of transaction UUIDs to be removed from the log.

    Returns:
        None
    """
    if not tx_uuids:
        return
    async with self._lock:
        placeholders = ",".join("?" for _ in tx_uuids)
        await self._db.execute(f"DELETE FROM tx_log WHERE uuid IN ({placeholders})", tuple(tx_uuids))  # nosec B608
        await self._db.commit()

get_session_facts async

get_session_facts(session_id: str) -> list[dict[str, Any]]

Asynchronously retrieves all facts associated with a specific session.

This method filters and returns a list of all facts from the internal store that match the provided session ID. Each fact is represented as a dictionary, and the list may be empty if no facts match the provided session ID.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose facts are to be retrieved.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries, where each dictionary represents a fact related to the specified session.

Source code in memstate/backends/sqlite.py
async def get_session_facts(self, session_id: str) -> list[dict[str, Any]]:
    """
    Asynchronously retrieves all facts associated with a specific session.

    This method filters and returns a list of all facts from the internal store
    that match the provided session ID. Each fact is represented as a dictionary,
    and the list may be empty if no facts match the provided session ID.

    Args:
        session_id (str): The identifier of the session whose facts are to be retrieved.

    Returns:
        A list of dictionaries, where each dictionary represents a fact related to the specified session.
    """
    async with self._lock:
        async with self._db.execute(
            "SELECT data FROM facts WHERE json_extract(data, '$.session_id') = ?", (session_id,)
        ) as cursor:
            rows = await cursor.fetchall()
            return [json.loads(row["data"]) for row in rows]

get_tx_log async

get_tx_log(
    session_id: str, limit: int = 100, offset: int = 0
) -> list[dict[str, Any]]

Asynchronously retrieves and returns a portion of the transaction log. The transaction log is accessed in reverse order of insertion, i.e., the most recently added item is the first in the result.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose transactions should be retrieved.

  • limit
    (int, default: 100 ) –

    The maximum number of transaction log entries to be retrieved. Default is 100.

  • offset
    (int, default: 0 ) –

    The starting position relative to the most recent entry that determines where to begin retrieving the log entries. Default is 0.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries representing the requested subset of the transaction log. The dictionaries contain details of individual transaction log entries.

Source code in memstate/backends/sqlite.py
async def get_tx_log(self, session_id: str, limit: int = 100, offset: int = 0) -> list[dict[str, Any]]:
    """
    Asynchronously retrieves and returns a portion of the transaction log. The transaction log is accessed in
    reverse order of insertion, i.e., the most recently added item is the first in the result.

    Args:
        session_id (str): The identifier of the session whose transactions should be retrieved.
        limit (int): The maximum number of transaction log entries to be retrieved. Default is 100.
        offset (int): The starting position relative to the most recent entry that determines where to begin
            retrieving the log entries. Default is 0.

    Returns:
        A list of dictionaries representing the requested subset of the transaction log. The dictionaries
            contain details of individual transaction log entries.
    """
    async with self._lock:
        cursor = await self._db.execute(
            "SELECT data FROM tx_log WHERE json_extract(data, '$.session_id') = ? ORDER BY tx_id DESC LIMIT ? OFFSET ?",
            (session_id, limit, offset),
        )

        rows = await cursor.fetchall()
        return [json.loads(row["data"]) for row in rows]

load async

load(id: str) -> dict[str, Any] | None

Asynchronously loads an item from the store based on the provided identifier.

This method retrieves the item associated with the given id from the internal store. If no item is found for the provided identifier, it returns None.

Parameters:

  • id
    (str) –

    The unique identifier of the item to load.

Returns:

  • dict[str, Any] | None

    The item retrieved from the store or None if the identifier does not exist in the store.

Source code in memstate/backends/sqlite.py
async def load(self, id: str) -> dict[str, Any] | None:
    """
    Asynchronously loads an item from the store based on the provided identifier.

    This method retrieves the item associated with the given `id`
    from the internal store. If no item is found for the provided
    identifier, it returns ``None``.

    Args:
        id (str): The unique identifier of the item to load.

    Returns:
        The item retrieved from the store or ``None`` if the identifier does not exist in the store.
    """
    async with self._lock:
        async with self._db.execute("SELECT data FROM facts WHERE id = ?", (id,)) as cursor:
            row = await cursor.fetchone()
            return json.loads(row["data"]) if row else None

query async

query(
    type_filter: str | None = None,
    json_filters: dict[str, Any] | None = None,
) -> list[dict[str, Any]]

Asynchronously query data from the internal store based on specified filters.

This method iterates through the internal store and filters the data based on the provided type_filter and json_filters. The results will include only the entries that match all specified filtering criteria.

Parameters:

  • type_filter
    (str | None, default: None ) –

    Optional filter to include only items with a matching "type" field. If None, this filter is ignored.

  • json_filters
    (dict[str, Any] | None, default: None ) –

    A dictionary where keys represent the path within the JSON data structure, and values represent the required values for inclusion. If None, this filter is ignored.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries containing the data entries from the internal store that match the specified filters.

Source code in memstate/backends/sqlite.py
async def query(
    self, type_filter: str | None = None, json_filters: dict[str, Any] | None = None
) -> list[dict[str, Any]]:
    """
    Asynchronously query data from the internal store based on specified filters.

    This method iterates through the internal store and filters the data based on
    the provided `type_filter` and `json_filters`. The results will include
    only the entries that match all specified filtering criteria.

    Args:
        type_filter (str | None): Optional filter to include only items with a matching "type" field.
            If None, this filter is ignored.
        json_filters (dict[str, Any] | None): A dictionary where keys represent the path within the JSON
            data structure, and values represent the required values for inclusion.
            If None, this filter is ignored.

    Returns:
        A list of dictionaries containing the data entries from the internal store that match the specified filters.
    """
    query = "SELECT data FROM facts WHERE 1=1"
    params = []

    if type_filter:
        query += " AND type = ?"
        params.append(type_filter)

    if json_filters:
        for key, value in json_filters.items():
            if not re.match(r"^[a-zA-Z0-9_.]+$", key):
                raise ValueError(f"Invalid characters in filter key: {key}")
            query += f" AND json_extract(data, '$.{key}') = ?"
            params.append(value)

    async with self._lock:
        async with self._db.execute(query, tuple(params)) as cursor:
            rows = await cursor.fetchall()
            return [json.loads(row["data"]) for row in rows]

save async

save(fact_data: dict[str, Any]) -> None

Asynchronously saves the given fact data into the internal store. The save operation and ensures data consistency by utilizing a lock mechanism.

Parameters:

  • fact_data
    (dict[str, Any]) –

    A dictionary containing fact data to be stored. The dictionary must include an "id" key with a corresponding value as a unique identifier.

Returns:

  • None

    None

Source code in memstate/backends/sqlite.py
async def save(self, fact_data: dict[str, Any]) -> None:
    """
    Asynchronously saves the given fact data into the internal store. The save operation
    and ensures data consistency by utilizing a lock mechanism.

    Args:
        fact_data (dict[str, Any]): A dictionary containing fact data to be stored. The dictionary
            must include an "id" key with a corresponding value as a unique identifier.

    Returns:
        None
    """
    async with self._lock:
        await self._db.execute(
            """
            INSERT OR REPLACE INTO facts(id, type, data)
            VALUES (?, ?, ?)
            """,
            (
                fact_data["id"],
                fact_data.get("type", "unknown"),
                json.dumps(fact_data, default=str),
            ),
        )
        await self._db.commit()

SQLiteStorage

SQLiteStorage(
    connection_or_path: str | Connection = "memory.db",
)

Bases: StorageBackend

SQLite-based storage backend for managing structured data and transactional logs.

This class provides functionality to persistently store, retrieve, and manipulate data and transaction logs using an SQLite database. It supports thread-safe operations, ensures data integrity, and utilizes SQLite-specific features such as WAL mode and JSON querying.

Attributes: _conn (sqlite3.Connection): SQLite database connection object. _owns_connection (bool): Specifies whether the SQLiteStorage instance owns the connection and is responsible for closing it. _lock (threading.RLock): Threading lock that ensures thread-safe access to the database.

Methods:

  • append_tx

    Appends a transaction record to the transaction log.

  • close

    Closes the current open resource or connection.

  • delete

    Removes an entry from the store based on the provided identifier. If the identifier

  • delete_session

    Deletes all facts associated with a given session ID from the store.

  • delete_txs

    Removes a list of transactions from the transaction log whose session IDs match the provided

  • get_session_facts

    Retrieves all facts associated with a specific session.

  • get_tx_log

    Retrieves and returns a portion of the transaction log. The transaction log is accessed in

  • load

    Loads an item from the store based on the provided identifier.

  • query

    Query data from the internal store based on specified filters.

  • save

    Saves the given fact data into the internal store. The save operation

Source code in memstate/backends/sqlite.py
def __init__(self, connection_or_path: str | sqlite3.Connection = "memory.db") -> None:
    self._lock = threading.RLock()
    self._owns_connection = False

    if isinstance(connection_or_path, str):
        self._conn = sqlite3.connect(connection_or_path, check_same_thread=False)
        self._conn.row_factory = sqlite3.Row
        self._owns_connection = True
    elif isinstance(connection_or_path, sqlite3.Connection):
        self._conn = connection_or_path
        self._conn.row_factory = sqlite3.Row
        self._owns_connection = False
    else:
        raise ValueError(f"Invalid connection type: {type(connection_or_path)}")

    self._init_db()

append_tx

append_tx(tx_data: dict[str, Any]) -> None

Appends a transaction record to the transaction log.

Parameters:

  • tx_data
    (dict[str, Any]) –

    A dictionary containing transaction data to be appended.

Returns:

  • None

    None

Source code in memstate/backends/sqlite.py
def append_tx(self, tx_data: dict[str, Any]) -> None:
    """
    Appends a transaction record to the transaction log.

    Args:
        tx_data (dict[str, Any]): A dictionary containing transaction data to be appended.

    Returns:
        None
    """
    with self._lock:
        c = self._conn.cursor()
        c.execute(
            """
                  INSERT INTO tx_log(uuid, timestamp, data)
                  VALUES (?, ?, ?)
                  """,
            (tx_data["uuid"], tx_data["ts"], json.dumps(tx_data, default=str)),
        )
        self._conn.commit()

close

close() -> None

Closes the current open resource or connection.

This method is responsible for cleanup or finalization tasks. It ensures that resources, such as file handles or network connections, are properly released or closed. Once called, the resource cannot be used again unless it is reopened.

Returns:

  • None

    None

Source code in memstate/backends/sqlite.py
def close(self) -> None:
    """
    Closes the current open resource or connection.

    This method is responsible for cleanup or finalization tasks.
    It ensures that resources, such as file handles or network connections,
    are properly released or closed. Once called, the resource cannot
    be used again unless it is reopened.

    Returns:
        None
    """
    if self._owns_connection:
        self._conn.close()

delete

delete(id: str) -> None

Removes an entry from the store based on the provided identifier. If the identifier does not exist, the method performs no action and completes silently.

Parameters:

  • id
    (str) –

    The identifier of the entry to be removed from the store. Must be a string.

Returns:

  • None

    None

Source code in memstate/backends/sqlite.py
def delete(self, id: str) -> None:
    """
    Removes an entry from the store based on the provided identifier. If the identifier
    does not exist, the method performs no action and completes silently.

    Args:
        id (str): The identifier of the entry to be removed from the store. Must be a string.

    Returns:
        None
    """
    with self._lock:
        c = self._conn.cursor()
        c.execute("DELETE FROM facts WHERE id = ?", (id,))
        self._conn.commit()

delete_session

delete_session(session_id: str) -> list[str]

Deletes all facts associated with a given session ID from the store.

This method identifies all fact records in the store that are linked to the specified session ID, removes them, and returns a list of fact identifiers that were deleted.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose associated facts should be removed.

Returns:

  • list[str]

    A list of fact ids identifiers that were deleted from the store.

Source code in memstate/backends/sqlite.py
def delete_session(self, session_id: str) -> list[str]:
    """
    Deletes all facts associated with a given session ID from the store.

    This method identifies all fact records in the store that are linked to the specified
    session ID, removes them, and returns a list of fact identifiers that were deleted.

    Args:
        session_id (str): The identifier of the session whose associated facts should be removed.

    Returns:
        A list of fact ids identifiers that were deleted from the store.
    """
    with self._lock:
        c = self._conn.cursor()

        c.execute("DELETE FROM facts WHERE json_extract(data, '$.session_id') = ? RETURNING id", (session_id,))
        rows = c.fetchall()
        ids = [row["id"] for row in rows]
        self._conn.commit()
        return ids

delete_txs

delete_txs(tx_uuids: list[str]) -> None

Removes a list of transactions from the transaction log whose session IDs match the provided transaction IDs. If the provided list is empty, no transactions are processed.

Parameters:

  • tx_uuids
    (list[str]) –

    A list of transaction UUIDs to be removed from the log.

Returns:

  • None

    None

Source code in memstate/backends/sqlite.py
def delete_txs(self, tx_uuids: list[str]) -> None:
    """
    Removes a list of transactions from the transaction log whose session IDs match the provided
    transaction IDs. If the provided list is empty, no transactions are processed.

    Args:
        tx_uuids (list[str]): A list of transaction UUIDs to be removed from the log.

    Returns:
        None
    """
    if not tx_uuids:
        return
    with self._lock:
        c = self._conn.cursor()
        placeholders = ",".join("?" for _ in tx_uuids)
        c.execute(f"DELETE FROM tx_log WHERE uuid IN ({placeholders})", tuple(tx_uuids))  # nosec B608
        self._conn.commit()

get_session_facts

get_session_facts(session_id: str) -> list[dict[str, Any]]

Retrieves all facts associated with a specific session.

This method filters and returns a list of all facts from the internal store that match the provided session ID. Each fact is represented as a dictionary, and the list may be empty if no facts match the provided session ID.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose facts are to be retrieved.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries, where each dictionary represents a fact related to the specified session.

Source code in memstate/backends/sqlite.py
def get_session_facts(self, session_id: str) -> list[dict[str, Any]]:
    """
    Retrieves all facts associated with a specific session.

    This method filters and returns a list of all facts from the internal store
    that match the provided session ID. Each fact is represented as a dictionary,
    and the list may be empty if no facts match the provided session ID.

    Args:
        session_id (str): The identifier of the session whose facts are to be retrieved.

    Returns:
        A list of dictionaries, where each dictionary represents a fact related to the specified session.
    """
    with self._lock:
        c = self._conn.cursor()
        c.execute("SELECT data FROM facts WHERE json_extract(data, '$.session_id') = ?", (session_id,))
        return [json.loads(row["data"]) for row in c.fetchall()]

get_tx_log

get_tx_log(
    session_id: str, limit: int = 100, offset: int = 0
) -> list[dict[str, Any]]

Retrieves and returns a portion of the transaction log. The transaction log is accessed in reverse order of insertion, i.e., the most recently added item is the first in the result.

Parameters:

  • session_id
    (str) –

    The identifier of the session whose transactions should be retrieved.

  • limit
    (int, default: 100 ) –

    The maximum number of transaction log entries to be retrieved. Default is 100.

  • offset
    (int, default: 0 ) –

    The starting position relative to the most recent entry that determines where to begin retrieving the log entries. Default is 0.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries representing the requested subset of the transaction log. The dictionaries contain details of individual transaction log entries.

Source code in memstate/backends/sqlite.py
def get_tx_log(self, session_id: str, limit: int = 100, offset: int = 0) -> list[dict[str, Any]]:
    """
    Retrieves and returns a portion of the transaction log. The transaction log is accessed in
    reverse order of insertion, i.e., the most recently added item is the first in the result.

    Args:
        session_id (str): The identifier of the session whose transactions should be retrieved.
        limit (int): The maximum number of transaction log entries to be retrieved. Default is 100.
        offset (int): The starting position relative to the most recent entry that determines where to begin
            retrieving the log entries. Default is 0.

    Returns:
        A list of dictionaries representing the requested subset of the transaction log. The dictionaries
            contain details of individual transaction log entries.
    """
    with self._lock:
        c = self._conn.cursor()

        c.execute(
            "SELECT data FROM tx_log WHERE json_extract(data, '$.session_id') = ? ORDER BY tx_id DESC LIMIT ? OFFSET ?",
            (session_id, limit, offset),
        )
        rows = c.fetchall()
        return [json.loads(row["data"]) for row in rows]

load

load(id: str) -> dict[str, Any] | None

Loads an item from the store based on the provided identifier.

This method retrieves the item associated with the given id from the internal store. If no item is found for the provided identifier, it returns None.

Parameters:

  • id
    (str) –

    The unique identifier of the item to load.

Returns:

  • dict[str, Any] | None

    The item retrieved from the store or None if the identifier does not exist in the store.

Source code in memstate/backends/sqlite.py
def load(self, id: str) -> dict[str, Any] | None:
    """
    Loads an item from the store based on the provided identifier.

    This method retrieves the item associated with the given `id`
    from the internal store. If no item is found for the provided
    identifier, it returns ``None``.

    Args:
        id (str): The unique identifier of the item to load.

    Returns:
        The item retrieved from the store or ``None`` if the identifier does not exist in the store.
    """
    with self._lock:
        c = self._conn.cursor()
        c.execute("SELECT data FROM facts WHERE id = ?", (id,))
        row = c.fetchone()
        return json.loads(row["data"]) if row else None

query

query(
    type_filter: str | None = None,
    json_filters: dict[str, Any] | None = None,
) -> list[dict[str, Any]]

Query data from the internal store based on specified filters.

This method iterates through the internal store and filters the data based on the provided type_filter and json_filters. The results will include only the entries that match all specified filtering criteria.

Parameters:

  • type_filter
    (str | None, default: None ) –

    Optional filter to include only items with a matching "type" field. If None, this filter is ignored.

  • json_filters
    (dict[str, Any] | None, default: None ) –

    A dictionary where keys represent the path within the JSON data structure, and values represent the required values for inclusion. If None, this filter is ignored.

Returns:

  • list[dict[str, Any]]

    A list of dictionaries containing the data entries from the internal store that match the specified filters.

Source code in memstate/backends/sqlite.py
def query(self, type_filter: str | None = None, json_filters: dict[str, Any] | None = None) -> list[dict[str, Any]]:
    """
    Query data from the internal store based on specified filters.

    This method iterates through the internal store and filters the data based on
    the provided `type_filter` and `json_filters`. The results will include
    only the entries that match all specified filtering criteria.

    Args:
        type_filter (str | None): Optional filter to include only items with a matching "type" field.
            If None, this filter is ignored.
        json_filters (dict[str, Any] | None): A dictionary where keys represent the path within the JSON
            data structure, and values represent the required values for inclusion.
            If None, this filter is ignored.

    Returns:
        A list of dictionaries containing the data entries from the internal store that match the specified filters.
    """
    query = "SELECT data FROM facts WHERE 1=1"
    params = []

    if type_filter:
        query += " AND type = ?"
        params.append(type_filter)

    if json_filters:
        for key, value in json_filters.items():
            if not re.match(r"^[a-zA-Z0-9_.]+$", key):
                raise ValueError(f"Invalid characters in filter key: {key}")
            query += f" AND json_extract(data, '$.{key}') = ?"
            params.append(value)

    with self._lock:
        c = self._conn.cursor()
        c.execute(query, params)
        return [json.loads(row["data"]) for row in c.fetchall()]

save

save(fact_data: dict[str, Any]) -> None

Saves the given fact data into the internal store. The save operation and ensures data consistency by utilizing a lock mechanism.

Parameters:

  • fact_data
    (dict[str, Any]) –

    A dictionary containing fact data to be stored. The dictionary must include an "id" key with a corresponding value as a unique identifier.

Returns:

  • None

    None

Source code in memstate/backends/sqlite.py
def save(self, fact_data: dict[str, Any]) -> None:
    """
    Saves the given fact data into the internal store. The save operation
    and ensures data consistency by utilizing a lock mechanism.

    Args:
        fact_data (dict[str, Any]): A dictionary containing fact data to be stored. The dictionary
            must include an "id" key with a corresponding value as a unique identifier.

    Returns:
        None
    """
    with self._lock:
        c = self._conn.cursor()
        c.execute(
            """
            INSERT OR REPLACE INTO facts(id, type, data)
            VALUES (?, ?, ?)
            """,
            (
                fact_data["id"],
                fact_data.get("type", "unknown"),
                json.dumps(fact_data, default=str),
            ),
        )
        self._conn.commit()