Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,6 @@ permissions:
contents: write

jobs:
release-notes:
name: Generate Release Notes
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v6
with:
fetch-depth: 0

- name: Generate release notes
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
gh release edit ${{ github.event.release.tag_name }} --generate-notes

build:
name: Build distribution
runs-on: ubuntu-latest
Expand Down
6 changes: 3 additions & 3 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# DeepAgents Backends - Agent Instructions
# AGENTS.md: Deep Agents Remote Backends

## Project Overview

This library provides **S3Backend** and **PostgresBackend** implementations of LangChain DeepAgents' `BackendProtocol` for remote file storage. All operations are async-native with sync wrappers.
This library provides **S3Backend** and **PostgresBackend** implementations of [LangChain Deep Agents'](https://github.com/langchain-ai/deepagents) `BackendProtocol` for remote file storage and middleware operations.

**Installation:** `uv add deepagents-backends` or `uv sync` for development.

Expand All @@ -26,7 +26,7 @@ Each backend implements both sync and async versions:
- `ls_info`/`als_info`, `glob_info`/`aglob_info`, `grep_raw`/`agrep_raw`
- `upload_files`/`aupload_files`, `download_files`/`adownload_files`

Sync methods use `asyncio.get_event_loop().run_until_complete()`.
Sync methods use `run_async_safely()`.

## Build and Test Commands

Expand Down
6 changes: 3 additions & 3 deletions examples/basic_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
- postgres_deep_agent.py - Full PostgreSQL backend with DeepAgent
- composite_backend.py - Hybrid S3 + PostgreSQL storage

These backends implement DeepAgents' BackendProtocol for remote file storage.
These backends implement Deep Agents' BackendProtocol for remote file storage.

Usage:
uv run examples/basic_usage.py
Expand Down Expand Up @@ -126,7 +126,7 @@ async def postgres_backend_operations() -> None:
edit_result = await backend.aedit(
"/project/main.py",
"Hello from PostgreSQL!",
"Hello from DeepAgents!",
"Hello from Deep Agents!",
)
print(f"Edit result: {edit_result}")

Expand Down Expand Up @@ -157,7 +157,7 @@ async def postgres_backend_operations() -> None:

async def main() -> None:
"""Run all examples."""
print("DeepAgents Remote Backends - Low-level API Examples")
print("Deep Agents Remote Backends - Low-level API Examples")
print("=" * 60)
print()
print("For DeepAgent integration examples, see:")
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[project]
name = "deepagents-backends"
version = "0.1.0"
description = "S3 and PostgreSQL remote backends for LangChain DeepAgents"
version = "0.1.1"
description = "S3 and PostgreSQL remote backends for LangChain Deep Agents"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
Expand Down
107 changes: 83 additions & 24 deletions src/deepagents_backends/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
DeepAgents Remote Backends
Deep Agents Remote Backends

S3 and PostgreSQL backend implementations for LangChain's DeepAgents.
S3 and PostgreSQL backend implementations for LangChain's Deep Agents.
Supports any S3-compatible storage (AWS S3, MinIO, etc.) and PostgreSQL
with connection pooling for optimal performance.
"""
Expand All @@ -12,15 +12,14 @@
import fnmatch
import json
import re
import threading
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from dataclasses import dataclass
from datetime import datetime, timezone
from io import BytesIO
from pathlib import PurePosixPath
from typing import TYPE_CHECKING, Any, AsyncIterator
from typing import TYPE_CHECKING, Any, AsyncIterator, Coroutine

import aioboto3
import psycopg
import psycopg_pool
import wcmatch.glob as wcglob
from botocore.config import Config as BotoConfig
Expand All @@ -46,6 +45,66 @@
__all__ = ["S3Backend", "S3Config", "PostgresBackend", "PostgresConfig"]


class _AsyncThread(threading.Thread):
"""helper thread class for running async coroutines in a separate thread"""

def __init__(self, coroutine: Coroutine[Any, Any, Any]):
self.coroutine = coroutine
self.result = None
self.exception = None

super().__init__(daemon=True)

def run(self):
try:
self.result = asyncio.run(self.coroutine)
except Exception as e:
self.exception = e


def run_async_safely[T](coroutine: Coroutine[Any, Any, T], timeout: float | None = None) -> T:
"""safely runs a coroutine with handling of an existing event loop.

This function detects if there's already a running event loop and uses
a separate thread if needed to avoid the "asyncio.run() cannot be called
from a running event loop" error. This is particularly useful in environments
like Jupyter notebooks, FastAPI applications, or other async frameworks.

Args:
coroutine: The coroutine to run
timeout: max seconds to wait for. None means hanging forever

Returns:
The result of the coroutine

Raises:
Any exception raised by the coroutine
"""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None

if loop and loop.is_running():
# There's a running loop, use a separate thread
thread = _AsyncThread(coroutine)
thread.start()
thread.join(timeout=timeout)

if thread.is_alive():
raise TimeoutError("The operation timed out after %f seconds" % timeout)

if thread.exception:
raise thread.exception

return thread.result
else:
if timeout:
coroutine = asyncio.wait_for(coroutine, timeout)

return asyncio.run(coroutine)


# =============================================================================
# S3 Backend (S3-compatible: AWS S3, MinIO, etc.)
# =============================================================================
Expand All @@ -70,7 +129,7 @@ class S3Config:

class S3Backend(BackendProtocol):
"""
S3-compatible backend for DeepAgents file operations.
S3-compatible backend for Deep Agents file operations.

Supports AWS S3, MinIO, and any S3-compatible object storage.
All operations are async-native using aioboto3.
Expand Down Expand Up @@ -187,7 +246,7 @@ async def _list_keys(self, prefix: str = "") -> list[dict[str, Any]]:

def ls_info(self, path: str) -> list[FileInfo]:
"""Sync wrapper for als_info."""
return asyncio.get_event_loop().run_until_complete(self.als_info(path))
return run_async_safely(self.als_info(path))

async def als_info(self, path: str) -> list[FileInfo]:
"""List files in a directory."""
Expand Down Expand Up @@ -230,7 +289,7 @@ async def als_info(self, path: str) -> list[FileInfo]:

def read(self, file_path: str, offset: int = 0, limit: int = 2000) -> str:
"""Sync wrapper for aread."""
return asyncio.get_event_loop().run_until_complete(
return run_async_safely(
self.aread(file_path, offset, limit)
)

Expand All @@ -254,7 +313,7 @@ async def aread(self, file_path: str, offset: int = 0, limit: int = 2000) -> str

def write(self, file_path: str, content: str) -> WriteResult:
"""Sync wrapper for awrite."""
return asyncio.get_event_loop().run_until_complete(
return run_async_safely(
self.awrite(file_path, content)
)

Expand Down Expand Up @@ -286,7 +345,7 @@ def edit(
replace_all: bool = False,
) -> EditResult:
"""Sync wrapper for aedit."""
return asyncio.get_event_loop().run_until_complete(
return run_async_safely(
self.aedit(file_path, old_string, new_string, replace_all)
)

Expand Down Expand Up @@ -323,7 +382,7 @@ def grep_raw(
self, pattern: str, path: str | None = None, glob: str | None = None
) -> list[GrepMatch] | str:
"""Sync wrapper for agrep_raw."""
return asyncio.get_event_loop().run_until_complete(
return run_async_safely(
self.agrep_raw(pattern, path, glob)
)

Expand Down Expand Up @@ -359,7 +418,7 @@ async def agrep_raw(

def glob_info(self, pattern: str, path: str = "/") -> list[FileInfo]:
"""Sync wrapper for aglob_info."""
return asyncio.get_event_loop().run_until_complete(
return run_async_safely(
self.aglob_info(pattern, path)
)

Expand Down Expand Up @@ -390,7 +449,7 @@ async def aglob_info(self, pattern: str, path: str = "/") -> list[FileInfo]:

def upload_files(self, files: list[tuple[str, bytes]]) -> list[FileUploadResponse]:
"""Sync wrapper for aupload_files."""
return asyncio.get_event_loop().run_until_complete(self.aupload_files(files))
return run_async_safely(self.aupload_files(files))

async def aupload_files(
self, files: list[tuple[str, bytes]]
Expand Down Expand Up @@ -423,7 +482,7 @@ async def aupload_files(

def download_files(self, paths: list[str]) -> list[FileDownloadResponse]:
"""Sync wrapper for adownload_files."""
return asyncio.get_event_loop().run_until_complete(self.adownload_files(paths))
return run_async_safely(self.adownload_files(paths))

async def adownload_files(self, paths: list[str]) -> list[FileDownloadResponse]:
"""Download multiple files."""
Expand Down Expand Up @@ -494,7 +553,7 @@ def conninfo(self) -> str:

class PostgresBackend(BackendProtocol):
"""
PostgreSQL backend for DeepAgents file operations.
PostgreSQL backend for Deep Agents file operations.

Uses psycopg3 with connection pooling for high-performance async operations.
Files are stored in a table with path as primary key and content as JSONB.
Expand Down Expand Up @@ -640,7 +699,7 @@ async def _list_paths(self, prefix: str = "/") -> list[tuple[str, datetime, int]

def ls_info(self, path: str) -> list[FileInfo]:
"""Sync wrapper for als_info."""
return asyncio.get_event_loop().run_until_complete(self.als_info(path))
return run_async_safely(self.als_info(path))

async def als_info(self, path: str) -> list[FileInfo]:
"""List files in a directory."""
Expand Down Expand Up @@ -674,7 +733,7 @@ async def als_info(self, path: str) -> list[FileInfo]:

def read(self, file_path: str, offset: int = 0, limit: int = 2000) -> str:
"""Sync wrapper for aread."""
return asyncio.get_event_loop().run_until_complete(
return run_async_safely(
self.aread(file_path, offset, limit)
)

Expand All @@ -698,7 +757,7 @@ async def aread(self, file_path: str, offset: int = 0, limit: int = 2000) -> str

def write(self, file_path: str, content: str) -> WriteResult:
"""Sync wrapper for awrite."""
return asyncio.get_event_loop().run_until_complete(
return run_async_safely(
self.awrite(file_path, content)
)

Expand All @@ -725,7 +784,7 @@ def edit(
replace_all: bool = False,
) -> EditResult:
"""Sync wrapper for aedit."""
return asyncio.get_event_loop().run_until_complete(
return run_async_safely(
self.aedit(file_path, old_string, new_string, replace_all)
)

Expand Down Expand Up @@ -762,7 +821,7 @@ def grep_raw(
self, pattern: str, path: str | None = None, glob: str | None = None
) -> list[GrepMatch] | str:
"""Sync wrapper for agrep_raw."""
return asyncio.get_event_loop().run_until_complete(
return run_async_safely(
self.agrep_raw(pattern, path, glob)
)

Expand Down Expand Up @@ -796,7 +855,7 @@ async def agrep_raw(

def glob_info(self, pattern: str, path: str = "/") -> list[FileInfo]:
"""Sync wrapper for aglob_info."""
return asyncio.get_event_loop().run_until_complete(
return run_async_safely(
self.aglob_info(pattern, path)
)

Expand All @@ -823,7 +882,7 @@ async def aglob_info(self, pattern: str, path: str = "/") -> list[FileInfo]:

def upload_files(self, files: list[tuple[str, bytes]]) -> list[FileUploadResponse]:
"""Sync wrapper for aupload_files."""
return asyncio.get_event_loop().run_until_complete(self.aupload_files(files))
return run_async_safely(self.aupload_files(files))

async def aupload_files(
self, files: list[tuple[str, bytes]]
Expand Down Expand Up @@ -857,7 +916,7 @@ async def aupload_files(

def download_files(self, paths: list[str]) -> list[FileDownloadResponse]:
"""Sync wrapper for adownload_files."""
return asyncio.get_event_loop().run_until_complete(self.adownload_files(paths))
return run_async_safely(self.adownload_files(paths))

async def adownload_files(self, paths: list[str]) -> list[FileDownloadResponse]:
"""Download multiple files."""
Expand Down