Skip to content

postgres ¤

This module contains the PostgresMessageHistory class, which is used to store chat message history in a PostgreSQL database.

Classes:

Name Description
PostgresMessageHistory

Chat message history that stores history in a PostgreSQL database.

Attributes:

Name Type Description
DEFAULT_CONNECTION_STRING

A connection string for a PostgreSQL database.

DEFAULT_TABLE_NAME

A table name for the PostgreSQL database.

DEFAULT_CONNECTION_STRING module-attribute ¤

DEFAULT_CONNECTION_STRING = (
    "postgresql://postgres:postgres@localhost:5432/postgres"
)

A connection string for a PostgreSQL database.

DEFAULT_TABLE_NAME module-attribute ¤

DEFAULT_TABLE_NAME = 'message_store'

A table name for the PostgreSQL database.

PostgresMessageHistory ¤

PostgresMessageHistory(
    session_id: str,
    connection_string: Optional[
        str
    ] = DEFAULT_CONNECTION_STRING,
    table_name: str = DEFAULT_TABLE_NAME,
)

Bases: BaseChatMessageHistory

Chat message history that stores history in a PostgreSQL database.

Parameters:

Name Type Description Default
session_id str

Arbitrary key that is used to store the messages for a single chat session.

required
connection_string Optional[str]

Database connection string.

DEFAULT_CONNECTION_STRING
table_name str

Name of the table to use.

DEFAULT_TABLE_NAME

Methods:

Name Description
__del__

Destructor to close cursor and connection.

_initialize_tables

Initialize the tables if they don't exist.

add_message

Add a message to the database.

clear

Clear session memory from the database.

close

Close cursor and connection.

Attributes:

Name Type Description
history List[Message]

Retrieve the messages from the database.

Source code in src/declarai/memory/postgres.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(
    self,
    session_id: str,
    connection_string: Optional[str] = DEFAULT_CONNECTION_STRING,
    table_name: str = DEFAULT_TABLE_NAME,
):
    try:
        import psycopg2  # pylint: disable=import-outside-toplevel
    except ImportError:
        raise ImportError(
            "Cannot import psycopg2."
            "Please install psycopg2 to use PostgresMessageHistory."
        )
    self.conn = psycopg2.connect(connection_string)
    self.cursor = self.conn.cursor()
    self.table_name = table_name
    self.session_id = session_id
    self._initialize_tables()

history property ¤

history: List[Message]

Retrieve the messages from the database.

__del__ ¤

__del__()

Destructor to close cursor and connection.

Source code in src/declarai/memory/postgres.py
91
92
93
94
95
96
def __del__(self):
    """Destructor to close cursor and connection."""
    if hasattr(self, "cursor"):
        self.cursor.close()
    if hasattr(self, "conn"):
        self.conn.close()

_initialize_tables ¤

_initialize_tables()

Initialize the tables if they don't exist.

Source code in src/declarai/memory/postgres.py
49
50
51
52
53
54
55
56
57
def _initialize_tables(self):
    """Initialize the tables if they don't exist."""
    create_table_query = f"""CREATE TABLE IF NOT EXISTS {self.table_name} (
        id SERIAL PRIMARY KEY,
        session_id TEXT NOT NULL,
        message JSONB NOT NULL
    );"""
    self.cursor.execute(create_table_query)
    self.conn.commit()

add_message ¤

add_message(message: Message) -> None

Add a message to the database.

Source code in src/declarai/memory/postgres.py
70
71
72
73
74
75
76
77
78
def add_message(self, message: Message) -> None:
    """Add a message to the database."""
    from psycopg2 import sql

    query = sql.SQL("INSERT INTO {} (session_id, message) VALUES (%s, %s);").format(
        sql.Identifier(self.table_name)
    )
    self.cursor.execute(query, (self.session_id, json.dumps(message.dict())))
    self.conn.commit()

clear ¤

clear() -> None

Clear session memory from the database.

Source code in src/declarai/memory/postgres.py
80
81
82
83
84
def clear(self) -> None:
    """Clear session memory from the database."""
    query = f"DELETE FROM {self.table_name} WHERE session_id = %s;"
    self.cursor.execute(query, (self.session_id,))
    self.conn.commit()

close ¤

close()

Close cursor and connection.

Source code in src/declarai/memory/postgres.py
86
87
88
89
def close(self):
    """Close cursor and connection."""
    self.cursor.close()
    self.conn.close()