Initial commit

This commit is contained in:
William FH
2024-10-01 17:32:55 -07:00
committed by William Fu-Hinthorn
commit 35d340ef26
26 changed files with 1757 additions and 0 deletions
View File
+9
View File
@@ -0,0 +1,9 @@
# To separate your traces from other application
LANGSMITH_PROJECT=memory-graph
# The following depend on your selected configuration
## LLM choice:
# ANTHROPIC_API_KEY=....
# FIREWORKS_API_KEY=...
# OPENAI_API_KEY=...
+43
View File
@@ -0,0 +1,43 @@
# This workflow will run integration tests for the current project once per day
name: Integration Tests
on:
schedule:
- cron: "37 14 * * *" # Run at 7:37 AM Pacific Time (14:37 UTC) every day
workflow_dispatch: # Allows triggering the workflow manually in GitHub UI
# If another scheduled run starts while this workflow is still running,
# cancel the earlier run in favor of the next run.
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
integration-tests:
name: Integration Tests
strategy:
matrix:
os: [ubuntu-latest]
python-version: ["3.11", "3.12"]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
curl -LsSf https://astral.sh/uv/install.sh | sh
uv venv
uv pip install -r pyproject.toml
uv pip install -U pytest-asyncio vcrpy
- name: Run integration tests
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
TAVILY_API_KEY: ${{ secrets.TAVILY_API_KEY }}
LANGSMITH_API_KEY: ${{ secrets.LANGSMITH_API_KEY }}
LANGSMITH_TRACING: true
run: |
uv run pytest tests/integration_tests
+57
View File
@@ -0,0 +1,57 @@
# This workflow will run unit tests for the current project
name: CI
on:
push:
branches: ["main"]
pull_request:
workflow_dispatch: # Allows triggering the workflow manually in GitHub UI
# If another push to the same PR or branch happens while this workflow is still running,
# cancel the earlier run in favor of the next run.
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
unit-tests:
name: Unit Tests
strategy:
matrix:
os: [ubuntu-latest]
python-version: ["3.11", "3.12"]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
curl -LsSf https://astral.sh/uv/install.sh | sh
uv venv
uv pip install -r pyproject.toml
- name: Lint with ruff
run: |
uv pip install ruff
uv run ruff check .
- name: Lint with mypy
run: |
uv pip install mypy
uv run mypy --strict src/
- name: Check README spelling
uses: codespell-project/actions-codespell@v2
with:
ignore_words_file: .codespellignore
path: README.md
- name: Check code spelling
uses: codespell-project/actions-codespell@v2
with:
ignore_words_file: .codespellignore
path: src/
- name: Run tests with pytest
run: |
uv pip install pytest
uv run pytest tests/unit_tests
+164
View File
@@ -0,0 +1,164 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
.DS_Store
uv.lock
+21
View File
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2024 LangChain
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
+65
View File
@@ -0,0 +1,65 @@
.PHONY: all format lint test tests test_watch integration_tests docker_tests help extended_tests
# Default target executed when no arguments are given to make.
all: help
# Define a variable for the test file path.
TEST_FILE ?= tests/unit_tests/
test:
python -m pytest $(TEST_FILE)
test_watch:
python -m ptw --snapshot-update --now . -- -vv tests/unit_tests
test_profile:
python -m pytest -vv tests/unit_tests/ --profile-svg
extended_tests:
python -m pytest --only-extended $(TEST_FILE)
######################
# LINTING AND FORMATTING
######################
# Define a variable for Python and notebook files.
PYTHON_FILES=src/
MYPY_CACHE=.mypy_cache
lint: PYTHON_FILES=src
format: PYTHON_FILES=.
lint_diff format_diff: PYTHON_FILES=$(shell git diff --name-only --diff-filter=d main | grep -E '\.py$$|\.ipynb$$')
lint_package: PYTHON_FILES=src
lint_tests: PYTHON_FILES=tests
lint_tests: MYPY_CACHE=.mypy_cache_test
lint lint_diff lint_package lint_tests:
python -m ruff check .
[ "$(PYTHON_FILES)" = "" ] || python -m ruff format $(PYTHON_FILES) --diff
[ "$(PYTHON_FILES)" = "" ] || python -m ruff check --select I $(PYTHON_FILES)
[ "$(PYTHON_FILES)" = "" ] || python -m mypy --strict $(PYTHON_FILES)
[ "$(PYTHON_FILES)" = "" ] || mkdir -p $(MYPY_CACHE) && python -m mypy --strict $(PYTHON_FILES) --cache-dir $(MYPY_CACHE)
format format_diff:
ruff format $(PYTHON_FILES)
ruff check --select I --fix $(PYTHON_FILES)
spell_check:
codespell --toml pyproject.toml
spell_fix:
codespell --toml pyproject.toml -w
######################
# HELP
######################
help:
@echo '----'
@echo 'format - run code formatters'
@echo 'lint - run linters'
@echo 'test - run unit tests'
@echo 'tests - run unit tests'
@echo 'test TEST_FILE=<test_file> - run all tests in file'
@echo 'test_watch - run unit tests in watch mode'
+355
View File
@@ -0,0 +1,355 @@
# LangGraph Memory Service
[![CI](https://github.com/langchain-ai/memory-template/actions/workflows/unit-tests.yml/badge.svg)](https://github.com/langchain-ai/memory-template/actions/workflows/unit-tests.yml)
[![Integration Tests](https://github.com/langchain-ai/memory-template/actions/workflows/integration-tests.yml/badge.svg)](https://github.com/langchain-ai/memory-template/actions/workflows/integration-tests.yml)
[![Open in - LangGraph Studio](https://img.shields.io/badge/Open_in-LangGraph_Studio-00324d.svg?logo=data:image/svg%2bxml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHdpZHRoPSI4NS4zMzMiIGhlaWdodD0iODUuMzMzIiB2ZXJzaW9uPSIxLjAiIHZpZXdCb3g9IjAgMCA2NCA2NCI+PHBhdGggZD0iTTEzIDcuOGMtNi4zIDMuMS03LjEgNi4zLTYuOCAyNS43LjQgMjQuNi4zIDI0LjUgMjUuOSAyNC41QzU3LjUgNTggNTggNTcuNSA1OCAzMi4zIDU4IDcuMyA1Ni43IDYgMzIgNmMtMTIuOCAwLTE2LjEuMy0xOSAxLjhtMzcuNiAxNi42YzIuOCAyLjggMy40IDQuMiAzLjQgNy42cy0uNiA0LjgtMy40IDcuNkw0Ny4yIDQzSDE2LjhsLTMuNC0zLjRjLTQuOC00LjgtNC44LTEwLjQgMC0xNS4ybDMuNC0zLjRoMzAuNHoiLz48cGF0aCBkPSJNMTguOSAyNS42Yy0xLjEgMS4zLTEgMS43LjQgMi41LjkuNiAxLjcgMS44IDEuNyAyLjcgMCAxIC43IDIuOCAxLjYgNC4xIDEuNCAxLjkgMS40IDIuNS4zIDMuMi0xIC42LS42LjkgMS40LjkgMS41IDAgMi43LS41IDIuNy0xIDAtLjYgMS4xLS44IDIuNi0uNGwyLjYuNy0xLjgtMi45Yy01LjktOS4zLTkuNC0xMi4zLTExLjUtOS44TTM5IDI2YzAgMS4xLS45IDIuNS0yIDMuMi0yLjQgMS41LTIuNiAzLjQtLjUgNC4yLjguMyAyIDEuNyAyLjUgMy4xLjYgMS41IDEuNCAyLjMgMiAyIDEuNS0uOSAxLjItMy41LS40LTMuNS0yLjEgMC0yLjgtMi44LS44LTMuMyAxLjYtLjQgMS42LS41IDAtLjYtMS4xLS4xLTEuNS0uNi0xLjItMS42LjctMS43IDMuMy0yLjEgMy41LS41LjEuNS4yIDEuNi4zIDIuMiAwIC43LjkgMS40IDEuOSAxLjYgMi4xLjQgMi4zLTIuMy4yLTMuMi0uOC0uMy0yLTEuNy0yLjUtMy4xLTEuMS0zLTMtMy4zLTMtLjUiLz48L3N2Zz4=)](https://langgraph-studio.vercel.app/templates/open?githubUrl=https://github.com/langchain-ai/memory-template)
This repo provides a simple example of a long-term memory service you can build and deploy using LangGraph.
This graph extracts memories from chat interactions and persists them to its store. This information can later be read via the API to provide personalized context when your bot is responding to a particular user.
The memory graph handles debouncing when processing individual conversations (to help deduplicate work) and supports continuous updates to a single "memory schema" as well as "event-based" memories that can be fetched by recency and filtered.
This repo also provides an example chat bot (in this case, also a simple graph) that connects to the memory graph via the SDK.
Any time you send a message to the chat bot, it will query the memory service to fetch the most up-to-date memories (if any) for the configured user. These memories are put in the system prompt. After responding, it will post the conversation to the memory service to schedule long-term memory formation.
This separation of concerns provides minimal overhead, allows deduplication of memory processing, and ensures you can optimize for better recall.
![Memory Diagram](./static/memory_graph.png)
## Getting Started
This quickstart will get your memory service deployed on [LangGraph Cloud](https://langchain-ai.github.io/langgraph/cloud/). Once created, you can interact with it from any API.
Assuming you have already [installed LangGraph Studio](https://github.com/langchain-ai/langgraph-studio?tab=readme-ov-file#download), to set up:
1. Create a `.env` file.
```bash
cp .env.example .env
```
2. Define required API keys in your `.env` file.
<!--
Setup instruction auto-generated by `langgraph template lock`. DO NOT EDIT MANUALLY.
-->
### Setup Model
The defaults values for `model` are shown below:
```yaml
model: anthropic/claude-3-5-sonnet-20240620
```
Follow the instructions below to get set up, or pick one of the additional options.
#### Anthropic
To use Anthropic's chat models:
1. Sign up for an [Anthropic API key](https://console.anthropic.com/) if you haven't already.
2. Once you have your API key, add it to your `.env` file:
```
ANTHROPIC_API_KEY=your-api-key
```
#### OpenAI
To use OpenAI's chat models:
1. Sign up for an [OpenAI API key](https://platform.openai.com/signup).
2. Once you have your API key, add it to your `.env` file:
```
OPENAI_API_KEY=your-api-key
```
<!--
End setup instructions
-->
3. Open in LangGraph studio. Navigate to the "memory_graph" and click on the "Assistants" drop-down to make a new assistant.
a. Call this assissitant "MyMemoryAssistant".
b. Configure what types of memories you'd like this assistant to be processing by pasting the following as the 'memory_types' configuration:
```json
[
{
"name": "User",
"description": "Update this document to maintain up-to-date information about the user in the conversation.",
"update_mode": "patch",
"parameters": {
"type": "object",
"properties": {
"user_name": {
"type": "string",
"description": "The user's preferred name"
},
"age": {
"type": "integer",
"description": "The user's age"
},
"interests": {
"type": "array",
"items": {
"type": "string"
},
"description": "A list of the user's interests"
}
}
}
},
{
"name": "Note",
"description": "Save notable memories the user has shared with you for later recall.",
"update_mode": "insert",
"parameters": {
"type": "object",
"properties": {
"context": {
"type": "string",
"description": "The situation or circumstance in which the memory occurred that inform when it would be useful to recall this."
},
"content": {
"type": "string",
"description": "The specific information, preference, or event being remembered."
}
},
"required": ["context", "content"]
}
}
]
```
This tells the service to manage two types of memories:
1. A "User" profile - this "`insert`" update_mode generates and updates a single document for a given user.
2. "Note"s - this "`insert`" update_mode generates (or updates) any number of memories of this type for a given user.
You can ignore the user_id for now - that will be provided when calling the service.
Once you've created the assistant, copy the "AssistantID" for use in the next section.
4. Staying in LangGraph Studio, switch to the "chat bot" graph and create an assistant there. Most of the fields can be omitted, since they have reasonable defaults in code. Set the
b. Mem Assistant Id = the AssistantID you copied from the previous step
c. User ID: "my-example-user" (you can set it however you'd like)
The rest of the values can remain blank. The "Memory Service Url" will default to "None" which means it will connect to the graph running in the same deployment as your chat bot. If you are running these in separate deployments, you can provide the deployment URL there.
5. Connect to the chat bot! Click "Open in Studio"
a. Send some messages saying your name and other things the bot should remember.
b. Try creating a new thread and chatting with the bot again - if it's been at least 60 seconds, the bot should now have access to the memories you've saved!
## How it works
This chat bot reads from your memory graph's `Store` to easily list extracted memories.
Connecting to this type of memory service typically follows an interaction pattern similar to the one outlined below:
![Interaction Pattern](./static/memory_interactions.png)
The service waits for a pre-determined interval before it considers the thread "complete". If the user queries a second time within that interval, the memory run is cancelled to avoid duplicate processing of a thread.
## How to evaluate
Memory management can be challenging to get right. To make sure your memory_types suit your applications' needs, we recommend starting from an evaluation set, adding to it over time as you find and address common errors in your service.
We have provided a few example evaluation cases in [the test file here](./tests/integration_tests/test_graph.py). As you can see, the metrics themselves don't have to be terribly complicated, especially not at the outset.
We use [LangSmith's @unit decorator](https://docs.smith.langchain.com/how_to_guides/evaluation/unit_testing#write-a-test) to sync all the evaluations to LangSmith so you can better optimize your system and identify the root cause of any issues that may arise.
## How to customize
Customize memory memory_types: The memory service supports two types of memory memory_types:
1. Patch Schema: This allows updating a single, continuous memory schema with new information from the conversation. You can customize the schema for this type by defining the JSON schema when initializing the memory schema. For example:
```json
{
"type": "object",
"properties": {
"user_name": {
"type": "string",
"description": "The user's preferred name"
},
"age": {
"type": "integer",
"description": "The user's age"
},
"interests": {
"type": "array",
"items": {
"type": "string"
},
"description": "A list of the user's interests"
}
}
}
```
2. Insertion Schema: This allows inserting individual "event" memories, such as key pieces of information or summaries from the conversation. You can define custom memory_types for these event memories by providing a JSON schema when initializing the InsertionMemorySchema. For example:
```json
{
"type": "object",
"properties": {
"context": {
"type": "string",
"description": "The situation or circumstance in which the memory occurred that inform when it would be useful to recall this."
},
"content": {
"type": "string",
"description": "The specific information, preference, or event being remembered."
}
},
"required": ["context", "content"]
}
```
3. Select a different model: We default to anthropic/claude-3-5-sonnet-20240620. You can select a compatible chat model using provider/model-name via configuration. Example: openai/gpt-4.
4. Customize the prompts: We provide default prompts in the graph definition. You can easily update these via configuration.
For quick prototyping, these configurations can be set in the LangGraph Studio UI.
You can also quickly extend this template by:
- Adding additional nodes and edges in [graph.py](./src/memory_graph/graph.py) to modify the memory processing flow.
<!--
Configuration auto-generated by `langgraph template lock`. DO NOT EDIT MANUALLY.
{
"config_schemas": {
"memory": {
"type": "object",
"properties": {
"model": {
"type": "string",
"default": "anthropic/claude-3-5-sonnet-20240620",
"description": "The name of the language model to use for the agent. Should be in the form: provider/model-name.",
"environment": [
{
"value": "anthropic/claude-1.2",
"variables": "ANTHROPIC_API_KEY"
},
{
"value": "anthropic/claude-2.0",
"variables": "ANTHROPIC_API_KEY"
},
{
"value": "anthropic/claude-2.1",
"variables": "ANTHROPIC_API_KEY"
},
{
"value": "anthropic/claude-3-5-sonnet-20240620",
"variables": "ANTHROPIC_API_KEY"
},
{
"value": "anthropic/claude-3-haiku-20240307",
"variables": "ANTHROPIC_API_KEY"
},
{
"value": "anthropic/claude-3-opus-20240229",
"variables": "ANTHROPIC_API_KEY"
},
{
"value": "anthropic/claude-3-sonnet-20240229",
"variables": "ANTHROPIC_API_KEY"
},
{
"value": "anthropic/claude-instant-1.2",
"variables": "ANTHROPIC_API_KEY"
},
{
"value": "openai/gpt-3.5-turbo",
"variables": "OPENAI_API_KEY"
},
{
"value": "openai/gpt-3.5-turbo-0125",
"variables": "OPENAI_API_KEY"
},
{
"value": "openai/gpt-3.5-turbo-0301",
"variables": "OPENAI_API_KEY"
},
{
"value": "openai/gpt-3.5-turbo-0613",
"variables": "OPENAI_API_KEY"
},
{
"value": "openai/gpt-3.5-turbo-1106",
"variables": "OPENAI_API_KEY"
},
{
"value": "openai/gpt-3.5-turbo-16k",
"variables": "OPENAI_API_KEY"
},
{
"value": "openai/gpt-3.5-turbo-16k-0613",
"variables": "OPENAI_API_KEY"
},
{
"value": "openai/gpt-4",
"variables": "OPENAI_API_KEY"
},
{
"value": "openai/gpt-4-0125-preview",
"variables": "OPENAI_API_KEY"
},
{
"value": "openai/gpt-4-0314",
"variables": "OPENAI_API_KEY"
},
{
"value": "openai/gpt-4-0613",
"variables": "OPENAI_API_KEY"
},
{
"value": "openai/gpt-4-1106-preview",
"variables": "OPENAI_API_KEY"
},
{
"value": "openai/gpt-4-32k",
"variables": "OPENAI_API_KEY"
},
{
"value": "openai/gpt-4-32k-0314",
"variables": "OPENAI_API_KEY"
},
{
"value": "openai/gpt-4-32k-0613",
"variables": "OPENAI_API_KEY"
},
{
"value": "openai/gpt-4-turbo",
"variables": "OPENAI_API_KEY"
},
{
"value": "openai/gpt-4-turbo-preview",
"variables": "OPENAI_API_KEY"
},
{
"value": "openai/gpt-4-vision-preview",
"variables": "OPENAI_API_KEY"
},
{
"value": "openai/gpt-4o",
"variables": "OPENAI_API_KEY"
},
{
"value": "openai/gpt-4o-mini",
"variables": "OPENAI_API_KEY"
}
]
}
}
},
"chatbot": {
"type": "object",
"properties": {}
}
}
}
-->
+12
View File
@@ -0,0 +1,12 @@
{
"dockerfile_lines": [],
"graphs": {
"memory_graph": "./src/memory_graph/graph.py:graph",
"chatbot": "./src/chatbot/graph.py:graph"
},
"env": ".env",
"python_version": "3.11",
"dependencies": [
"."
]
}
+297
View File
File diff suppressed because one or more lines are too long
+59
View File
@@ -0,0 +1,59 @@
[project]
name = "memory-graph"
version = "0.0.1"
description = "A long-term memory processor and example chatbot it works with."
authors = [
{ name = "William Fu-Hinthorn", email = "13333726+hinthornw@users.noreply.github.com" },
]
readme = "README.md"
license = { text = "MIT" }
requires-python = ">=3.9"
dependencies = [
"langgraph>=0.2.32,<0.3.0",
# Optional (for selecting different models)
"langchain-openai>=0.1.22",
"langchain-anthropic>=0.1.23",
"langchain>=0.2.14",
"langchain-fireworks>=0.1.7",
"python-dotenv>=1.0.1",
"langgraph-sdk>=0.1.32",
"trustcall>=0.0.20",
]
[project.optional-dependencies]
dev = ["mypy>=1.11.1", "ruff>=0.6.1", "pytest-asyncio"]
[build-system]
requires = ["setuptools>=73.0.0", "wheel"]
build-backend = "setuptools.build_meta"
[tool.setuptools]
packages = ["memory_graph", "chatbot"]
[tool.setuptools.package-dir]
"memory_graph" = "src/memory_graph"
"langgraph.templates.memory_graph" = "src/memory_graph"
"chatbot" = "src/chatbot"
"langgraph.templates.chatbot" = "src/chatbot"
[tool.setuptools.package-data]
"*" = ["py.typed"]
[tool.ruff]
lint.select = [
"E", # pycodestyle
"F", # pyflakes
"I", # isort
"D", # pydocstyle
"D401", # First line should be in imperative mood
"T201",
"UP",
]
lint.ignore = ["UP006", "UP007", "UP035", "D417", "E501"]
include = ["*.py", "*.pyi", "*.ipynb"]
[tool.ruff.lint.per-file-ignores]
"tests/*" = ["D", "UP"]
"ntbk/*" = ["D", "UP", "T201"]
[tool.ruff.lint.pydocstyle]
convention = "google"
+1
View File
@@ -0,0 +1 @@
"""An example chatbot that connects to the memory processor graph."""
+126
View File
@@ -0,0 +1,126 @@
"""Example chatbot that incorporates user memories."""
import os
import uuid
from dataclasses import dataclass, fields
from datetime import datetime, timezone
from typing import List, Optional
from langchain.chat_models import init_chat_model
from langchain_core.messages import AnyMessage
from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph, add_messages
from langgraph_sdk import get_client
from typing_extensions import Annotated
from chatbot.prompts import SYSTEM_PROMPT
@dataclass
class ChatState:
"""The state of the chatbot."""
messages: Annotated[List[AnyMessage], add_messages]
@dataclass(kw_only=True)
class ChatConfigurable:
"""The configurable fields for the chatbot."""
user_id: str
mem_assistant_id: str
model: str = "claude-3-5-sonnet-20240620"
delay_seconds: int = 60 # For debouncing memory creation
system_prompt: str = SYSTEM_PROMPT
# None will default to connecting to the local deployment
memory_service_url: str | None = None
@classmethod
def from_runnable_config(cls, config: Optional[RunnableConfig] = None):
"""Load configuration."""
configurable = (
config["configurable"] if config and "configurable" in config else {}
)
values = {
f.name: os.environ.get(f.name.upper(), configurable.get(f.name))
for f in fields(cls)
if f.init
}
return cls(**{k: v for k, v in values.items() if v})
def format_memories(memories: Optional[list[dict]]) -> str:
"""Format the user's memories."""
if not memories:
return ""
# Note Bene: You can format better than this....
memories = "\n".join(str(m) for m in memories)
return f"""
## Memories
You have noted the following memorable events from previous interactions with the user.
<memories>
{memories}
</memories>
"""
async def bot(state: ChatState, config: RunnableConfig) -> ChatState:
"""Prompt the bot to resopnd to the user, incorporating memories (if provided)."""
configurable = ChatConfigurable.from_runnable_config(config)
memory_client = get_client(url=configurable.memory_service_url)
namespace = (configurable.user_id,)
# This lists ALL user memories in the provided namespace (up to the `limit`)
# you can also filter by content.
user_memory = await memory_client.store.search_items(namespace)
model = init_chat_model(configurable.model)
prompt = configurable.system_prompt.format(
user_info=format_memories([item["value"] for item in user_memory["items"]]),
time=datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"),
)
m = await model.ainvoke(
[{"role": "system", "content": prompt}] + state.messages,
)
return {"messages": [m]}
async def schedule_memories(state: ChatState, config: RunnableConfig) -> ChatState:
"""Prompt the bot to resopnd to the user, incorporating memories (if provided)."""
configurable = ChatConfigurable.from_runnable_config(config)
memory_client = get_client(url=configurable.memory_service_url)
memory_thread = uuid.uuid5(
uuid.NAMESPACE_DNS,
configurable.user_id + config["configurable"]["thread_id"],
)
await memory_client.threads.create(thread_id=memory_thread, if_exists="do_nothing")
await memory_client.runs.create(
memory_thread,
assistant_id=configurable.mem_assistant_id,
input={
# the service dedupes messages by ID, so we can send the full convo each time
# if we want
"messages": state.messages,
},
config={
"configurable": {
"user_id": configurable.user_id,
},
},
multitask_strategy="rollback",
# This let's us "debounce" repeated requests to the memory graph
# if the user is actively engaging in a conversation
after_seconds=configurable.delay_seconds,
)
builder = StateGraph(ChatState, config_schema=ChatConfigurable)
builder.add_node(bot)
builder.add_node(schedule_memories)
builder.add_edge("__start__", "bot")
builder.add_edge("bot", "schedule_memories")
graph = builder.compile()
+7
View File
@@ -0,0 +1,7 @@
"""Define default prompts."""
SYSTEM_PROMPT = (
"You are a helpful and friendly chatbot. Get to know the user!"
" Ask questions! Be spontaneous!"
"{user_info}\n\nSystem Time: {time}"
)
+5
View File
@@ -0,0 +1,5 @@
"""Enrichment for a pre-defined schema."""
from memory_graph.graph import graph
__all__ = ["graph"]
+70
View File
@@ -0,0 +1,70 @@
"""Define the configurable parameters for the agent."""
import os
from dataclasses import dataclass, field, fields
from typing import Literal, Optional
from langchain_core.runnables import RunnableConfig
from typing_extensions import Annotated
from memory_graph.prompts import SYSTEM_PROMPT
@dataclass(kw_only=True)
class MemoryConfig:
"""Configuration for memory-related operations."""
name: str
description: str
parameters: dict
"""The JSON Schema of the memory document to manage."""
system_prompt: Optional[str] = SYSTEM_PROMPT
"""The system prompt to use for the memory assistant."""
update_mode: Literal["patch", "insert"] = field(default="patch")
"""Whether to continuously patch the memory, or treat each new
generation as a new memory.
Patching is useful for maintaining a structured profile or core list
of memories. Inserting is useful for maintaining all interactions and
not losing any information.
For patched memories, you can GET the current state at any given time.
For inserted memories, you can query the full history of interactions.
"""
@dataclass(kw_only=True)
class Configuration:
"""Main configuration class for the memory graph system."""
user_id: str = "default"
"""The ID of the user to remember in the conversation."""
model: Annotated[str, {"__template_metadata__": {"kind": "llm"}}] = field(
default="anthropic/claude-3-5-sonnet-20240620",
metadata={
"description": "The name of the language model to use for the agent. "
"Should be in the form: provider/model-name."
},
)
"""The model to use for generating memories. """
memory_types: list[MemoryConfig] = field(default_factory=list)
"""The memory_types for the memory assistant."""
@classmethod
def from_runnable_config(cls, config: Optional[RunnableConfig] = None):
"""Create a Configuration instance from a RunnableConfig."""
configurable = (
config["configurable"] if config and "configurable" in config else {}
)
values = {
f.name: os.environ.get(f.name.upper(), configurable.get(f.name))
for f in fields(cls)
if f.init
}
values["memory_types"] = [
MemoryConfig(**v) for v in (values["memory_types"] or [])
]
return cls(**{k: v for k, v in values.items() if v})
+147
View File
@@ -0,0 +1,147 @@
"""Graphs that extract memories on a schedule."""
from __future__ import annotations
import asyncio
import logging
import uuid
from dataclasses import asdict
from langchain_core.runnables import RunnableConfig
from langgraph.constants import Send
from langgraph.graph import StateGraph
from langgraph.store.base import BaseStore
from trustcall import create_extractor
from memory_graph import configuration, utils
from memory_graph.state import ProcessorState, State
logger = logging.getLogger("memory")
async def handle_patch_memory(
state: ProcessorState, config: RunnableConfig, *, store: BaseStore
) -> dict:
"""Extract the user's state from the conversation and update the memory."""
configurable = configuration.Configuration.from_runnable_config(config)
namespace = (configurable.user_id, "user_states", state.function_name)
existing_item = await store.aget(namespace, "memory")
existing = {existing_item.key: existing_item.value} if existing_item else None
memory_config = next(
conf for conf in configurable.memory_types if conf.name == state.function_name
)
extractor = create_extractor(
utils.init_model(configurable.model),
tools=[
{
"name": memory_config.name,
"description": memory_config.description,
"parameters": memory_config.parameters,
}
],
tool_choice=memory_config.name,
)
prepared_messages = utils.prepare_messages(
state.messages, memory_config.system_prompt
)
inputs = {"messages": prepared_messages, "existing": existing}
result = await extractor.ainvoke(inputs, config)
extracted = result["responses"][0].model_dump(mode="json")
# Upsert the memory to storage
await store.aput(namespace, "memory", extracted)
return {"messages": []}
async def handle_insertion_memory(
state: ProcessorState, config: RunnableConfig, *, store: BaseStore
) -> dict[str, list]:
"""Upsert memory events."""
configurable = configuration.Configuration.from_runnable_config(config)
namespace = (configurable.user_id, "events", state.function_name)
existing_items = await store.asearch(namespace, limit=5)
memory_config = next(
conf for conf in configurable.memory_types if conf.name == state.function_name
)
extractor = create_extractor(
utils.init_model(configurable.model),
tools=[
{
"name": memory_config.name,
"description": memory_config.description,
"parameters": memory_config.parameters,
}
],
tool_choice="any",
enable_inserts=True,
)
extracted = await extractor.ainvoke(
{
"messages": utils.prepare_messages(
state.messages, memory_config.system_prompt
),
"existing": (
[
(existing_item.key, state.function_name, existing_item.value)
for existing_item in existing_items
]
if existing_items
else None
),
},
config,
)
await asyncio.gather(
*(
store.aput(
namespace,
rmeta.get("json_doc_id", str(uuid.uuid4())),
r.model_dump(mode="json"),
)
for r, rmeta in zip(extracted["responses"], extracted["response_metadata"])
)
)
return {"messages": []}
# Create the graph + all nodes
builder = StateGraph(State, config_schema=configuration.Configuration)
builder.add_node(handle_patch_memory, input=ProcessorState)
builder.add_node(handle_insertion_memory, input=ProcessorState)
def scatter_schemas(state: State, config: RunnableConfig) -> list[Send]:
"""Route the memory_types for the memory assistant.
These will be executed in parallel.
"""
configurable = configuration.Configuration.from_runnable_config(config)
sends = []
current_state = asdict(state)
for v in configurable.memory_types:
update_mode = v.update_mode
match update_mode:
case "patch":
target = "handle_patch_memory"
case "insert":
target = "handle_insertion_memory"
case _:
raise ValueError(f"Unknown update mode: {update_mode}")
sends.append(
Send(
target,
ProcessorState(**{**current_state, "function_name": v.name}),
)
)
return sends
builder.add_conditional_edges(
"__start__", scatter_schemas, ["handle_patch_memory", "handle_insertion_memory"]
)
graph = builder.compile()
__all__ = ["graph"]
+17
View File
@@ -0,0 +1,17 @@
"""Default prompts used in this project."""
SYSTEM_PROMPT = """You are doing web research on behalf of a user. You are trying to figure out this information:
<info>
{info}
</info>
You have access to the following tools:
- `Search`: call a search tool and get back some results
- `ScrapeWebsite`: scrape a website and get relevant notes about the given request. This will update the notes above.
- `Info`: call this when you are done and have gathered all the relevant info
Here is the information you have about the topic you are researching:
Topic: {topic}"""
+30
View File
@@ -0,0 +1,30 @@
"""Define the shared values."""
from __future__ import annotations
from dataclasses import dataclass
from langchain_core.messages import AnyMessage
from langgraph.graph import add_messages
from typing_extensions import Annotated
@dataclass(kw_only=True)
class State:
"""Main graph state."""
messages: Annotated[list[AnyMessage], add_messages]
"""The messages in the conversation."""
@dataclass(kw_only=True)
class ProcessorState(State):
"""Extractor state."""
function_name: str
__all__ = [
"State",
"ProcessorState",
]
+38
View File
@@ -0,0 +1,38 @@
"""Utility functions used in our graph."""
from typing import Sequence
from langchain.chat_models import init_chat_model
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import AnyMessage, merge_message_runs
def prepare_messages(
messages: Sequence[AnyMessage], system_prompt: str
) -> list[AnyMessage]:
"""Merge message runs and add instructions before and after to stay on task."""
sys = {
"role": "system",
"content": f"""{system_prompt}
<memory-system>Reflect on following interaction. Use the provided tools to \
retain any necessary memories about the user.</memory-system>
""",
}
m = {
"role": "user",
"content": "## End of conversation\n\n"
"<memory-system>Reflect on the interaction above."
" What memories ought to be retained or updated?</memory-system>",
}
return merge_message_runs([sys] + list(messages) + [m])
def init_model(fully_specified_name: str) -> BaseChatModel:
"""Initialize the configured chat model."""
if "/" in fully_specified_name:
provider, model = fully_specified_name.split("/", maxsplit=1)
else:
provider = None
model = fully_specified_name
return init_chat_model(model, model_provider=provider)
Binary file not shown.

After

Width:  |  Height:  |  Size: 245 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 757 KiB

+1
View File
@@ -0,0 +1 @@
"""Define any integration tests you want in this directory."""
+227
View File
@@ -0,0 +1,227 @@
import uuid
from datetime import datetime
from typing import Literal, Optional
import langsmith as ls
from langgraph.store.memory import InMemoryStore
from pydantic import BaseModel, Field
from memory_graph.graph import builder
class User(BaseModel):
"""Store all important information about a user here."""
preferred_name: Optional[str] = None
current_age: Optional[str] = None
skills: list[str] = Field(description="Various skills the user has.")
favorite_foods: Optional[list[str]]
last_updated: datetime
core_memories: list[str] = Field(
description="Important events and memories that shape the user's identity."
)
topics_discussed: list[str] = Field(
description="topics the user has discussed previously"
)
other_preferences: list[str] = Field(
description="Other preferences the user has expressed that informs how you should interact with them."
)
relationships: list[str] = Field(
description="Store information about friends, family members, coworkers, and other important relationships the user has here. Include relevant information about htem."
)
def create_function(model, description: str = ""):
return {}
def create_memory_function(
model,
description: str = "",
custom_instructions: str = "",
kind: Literal["patch", "insert"] = "patch",
):
return {
"name": model.__name__,
"description": description or model.__doc__ or "",
"parameters": model.model_json_schema(),
"system_prompt": custom_instructions,
"update_mode": kind,
}
@ls.unit
async def test_patch_memory_stored():
mem_store = InMemoryStore()
mem_func = create_memory_function(User)
graph = builder.compile(store=mem_store)
thread_id = str(uuid.uuid4())
user_id = "my-test-user"
config = {
"configurable": {"memory_types": {"User": mem_func}},
"thread_id": thread_id,
"user_id": user_id,
}
await graph.ainvoke(
{"messages": [("user", "My name is Bob. I like fun things")]}, config
)
namespace = ("user_states", user_id, "User")
memories = mem_store.search(namespace)
ls.expect(len(memories)).to_equal(1)
mem = memories[0]
ls.expect(mem.value.get("preferred_name")).to_equal("Bob")
await graph.ainvoke(
{
"messages": [
("user", "Even though my name is Bob, I prefer to go by Robert.")
]
},
config,
)
memories = mem_store.search(namespace)
ls.expect(len(memories)).to_equal(1)
mem = memories[0]
ls.expect(mem.value.get("preferred_name")).to_equal("Robert")
# Check that searching by a different namespace returns no memories
bad_namespace = ("user_states", "my-bad-test-user", "User")
memories = mem_store.search(bad_namespace)
ls.expect(memories).against(lambda x: not x)
class Relationship(BaseModel):
"""A relationship memory type for insertion.
Call for each distinct indivual the user interacts with.
"""
name: str = Field(description="The legal name of the person.")
preferred_name: str = Field(
description="The name of the person in the relationship"
)
relation_to_user: str = Field(
description="The type of relationship "
"(e.g., friend, sister, brother, grandmother, colleague)"
)
recent_interactions: list[str] = Field(
description="List of recent interactions with this person"
)
notes: str = Field(
description="Other important information about this individual and how they relate to the user."
)
@ls.unit
async def test_insertion_memory_stored():
mem_store = InMemoryStore()
mem_func = create_memory_function(
Relationship,
custom_instructions="Extract all relationships mentioned. Call Relationship once per-relationship.",
kind="insert",
)
graph = builder.compile(store=mem_store)
thread_id = str(uuid.uuid4())
user_id = "my-test-user"
config = {
"configurable": {"memory_types": {"Relationship": mem_func}},
"user_id": user_id,
}
await graph.ainvoke(
{
"messages": [
(
"user",
"I've been thinking about my old friend Joanne Steine lately. We met in 3rd grade and were inseparable for years.",
),
(
"assistant",
"It's nice that you're reminiscing about your childhood friend Joanne. Friendships from that age can be very special. How are things between you two now?",
),
(
"user",
"Well, that's the thing. We've been drifting apart over the years. It's sad, but I guess it happens sometimes.",
),
(
"assistant",
"I'm sorry to hear that you and Joanne have been drifting apart. It's true that relationships can change over time. Is there anything specific that's contributed to the distance between you?",
),
(
"user",
"Not really, just life getting in the way I suppose. But you know, it makes me even more grateful for my friend Anthony. Despite living far apart, we still manage to stay connected.",
),
(
"assistant",
"It's wonderful that you have a friend like Anthony who you can maintain a strong connection with despite the distance. Long-lasting friendships like that are truly valuable. How do you and Anthony manage to stay close?",
),
(
"user",
"We make an effort to call each other regularly and share important moments in our lives. It's not always easy, but it's worth it.",
),
]
},
{**config, "thread_id": thread_id},
)
namespace = ("events", user_id, "Relationship")
memories = mem_store.search(namespace)
ls.expect(len(memories)).to_be_greater_than(1)
# Check for Joanne's relationship
joanne_relationship = next(
(mem for mem in memories if "Joanne" in mem.value.get("name", "")), None
)
ls.expect(joanne_relationship).against(lambda x: x is not None)
ls.expect(joanne_relationship.value.get("relation_to_user")).to_contain("friend")
# Check for Anthony's relationship
anthony_relationship = next(
(mem for mem in memories if "Anthony" in mem.value.get("name")), None
)
ls.expect(anthony_relationship).against(lambda x: x is not None)
ls.expect(anthony_relationship.value.get("relation_to_user")).to_contain("friend")
thread_id_2 = str(uuid.uuid4())
# New conversation about Joanne's preferred name
await graph.ainvoke(
{
"messages": [
(
"user",
"I just talked with Joanne. She told me she's going by 'Jo' now.",
),
(
"assistant",
"Oh, that's interesting! It's nice that Joanne - or should I say Jo - shared that with you. How do you feel about her name change?",
),
(
"user",
"I think it suits her. It's a bit strange to get used to, but I'm happy for her."
" She also introduced me to a great person named Nick."
" Nick and i became fast friends. I'm going surfing with him tomorrow.",
),
]
},
{**config, "thread_id": thread_id_2},
)
# Check the memories again
updated_memories = mem_store.search(
namespace,
)
ls.expect(len(updated_memories)).to_equal(
3
) # Now there should be 3 objects: Nick, Joanne, and Anthony
# Check for updated Joanne/Jo relationship
jo_relationship = next(
(mem for mem in updated_memories if "Joanne" in mem.value.get("name")), None
)
ls.expect(jo_relationship).against(lambda x: x is not None)
ls.expect(jo_relationship.value.get("preferred_name")).to_equal("Jo")
# Check for Nick's relationship
nick_relationship = next(
(mem for mem in updated_memories if "Nick" in mem.value.get("name", "")), None
)
ls.expect(nick_relationship).against(lambda x: x is not None)
ls.expect(nick_relationship.value.get("relation_to_user")).to_contain("friend")
+1
View File
@@ -0,0 +1 @@
"""Define any unit tests you may want in this directory."""
+5
View File
@@ -0,0 +1,5 @@
from memory_graph.configuration import Configuration
def test_configuration_from_none() -> None:
Configuration.from_runnable_config()