Skip to content

Commit

Permalink
Support browser runner and enhance vector store (amun-ai#723)
Browse files Browse the repository at this point in the history
* Use redis as vector search

* Fix publish_to

* Fix syntax

* add runner; bump hypha-rpc

* Enhance vector store, support persistence to s3

* bump version

* rename service

* small fix

* Fix score return

* Enhance vector store, support persistence to s3

* bump version

* rename service

* small fix

* Fix score return

* Fix unload

* Update workspace.py

* Update workspace.py

* refine clean up logic

* Print error

* Improve task scheduling

* add activity tracker

* Fix teardown

* Improve lifetime management

* Fix applications and daemon

* Fix time

* Fix activity monitor

* Fix event bus; add timeout

* dump vector store

* Fix vector dump

* Fix errors

* Fix tests

* wait long enough

* Fix artifact unload and load

* Fix workspaces unload

* Disable schechdule check

* Add update

* Add update

* Add delay

* Fix test

* Long delay

* Default to 60s
  • Loading branch information
oeway authored Dec 6, 2024
1 parent 5c96b87 commit 74b8c13
Show file tree
Hide file tree
Showing 48 changed files with 2,173 additions and 349 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

### 0.20.41

- Add `time_limit` for server apps to limit the running time
- Add `stop_after_inactive` option for server apps to stop the server app after a period of inactivity.
- Support launching server apps worker in a separate service

### 0.20.40
Expand Down
2 changes: 1 addition & 1 deletion docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ svc = await get_remote_service("http://localhost:9527/ws-user-scintillating-lawy
Include the following script in your HTML file to load the `hypha-rpc` client:

```html
<script src="https://cdn.jsdelivr.net/npm/[email protected].41/dist/hypha-rpc-websocket.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/[email protected].42/dist/hypha-rpc-websocket.min.js"></script>
```

Use the following code in JavaScript to connect to the server and access an existing service:
Expand Down
6 changes: 3 additions & 3 deletions docs/hypha-quick-tour.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"close_after_inactive: "markdown",
"metadata": {},
"source": [
"# Hypha Demos\n",
"\n",
"## Install client library"
]
},
{
{close_after_inactive
"cell_type": "code",
"execution_count": null,
"metadata": {},
Expand All @@ -24,7 +24,7 @@
" # For native python with pip\n",
" import subprocess\n",
" subprocess.call(['pip', 'install', 'hypha-rpc'])"
]
]close_after_inactive
},
{
"attachments": {},
Expand Down
10 changes: 5 additions & 5 deletions docs/migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ To connect to the server, instead of installing the `imjoy-rpc` module, you will
pip install -U hypha-rpc # new install
```

We also changed our versioning strategy, we use the same version number for the server and client, so it's easier to match the client and server versions. For example, `hypha-rpc` version `0.20.41` is compatible with Hypha server version `0.20.41`.
We also changed our versioning strategy, we use the same version number for the server and client, so it's easier to match the client and server versions. For example, `hypha-rpc` version `0.20.42` is compatible with Hypha server version `0.20.42`.

#### 2. Change the imports to use `hypha-rpc`

Expand Down Expand Up @@ -128,10 +128,10 @@ loop.run_forever()
To connect to the server, instead of using the `imjoy-rpc` module, you will need to use the `hypha-rpc` module. The `hypha-rpc` module is a standalone module that provides the RPC connection to the Hypha server. You can include it in your HTML using a script tag:

```html
<script src="https://cdn.jsdelivr.net/npm/[email protected].41/dist/hypha-rpc-websocket.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/[email protected].42/dist/hypha-rpc-websocket.min.js"></script>
```

We also changed our versioning strategy, we use the same version number for the server and client, so it's easier to match the client and server versions. For example, `hypha-rpc` version `0.20.41` is compatible with Hypha server version `0.20.41`.
We also changed our versioning strategy, we use the same version number for the server and client, so it's easier to match the client and server versions. For example, `hypha-rpc` version `0.20.42` is compatible with Hypha server version `0.20.42`.

#### 2. Change the connection method and use camelCase for service function names

Expand All @@ -149,7 +149,7 @@ Here is a suggested list of search and replace operations to update your code:
Here is an example of how the updated code might look:

```html
<script src="https://cdn.jsdelivr.net/npm/[email protected].41/dist/hypha-rpc-websocket.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/[email protected].42/dist/hypha-rpc-websocket.min.js"></script>
<script>
async function main(){
const server = await hyphaWebsocketClient.connectToServer({"server_url": "https://hypha.amun.ai"});
Expand Down Expand Up @@ -197,7 +197,7 @@ We created a tutorial to introduce this new feature: [service type annotation](.
Here is a quick example in JavaScript:

```html
<script src="https://cdn.jsdelivr.net/npm/[email protected].41/dist/hypha-rpc-websocket.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/[email protected].42/dist/hypha-rpc-websocket.min.js"></script>

<script>
async function main(){
Expand Down
2 changes: 1 addition & 1 deletion docs/service-type-annotation.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ if __name__ == "__main__":
**JavaScript Client: Service Usage**

```html
<script src="https://cdn.jsdelivr.net/npm/[email protected].41/dist/hypha-rpc-websocket.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/[email protected].42/dist/hypha-rpc-websocket.min.js"></script>
<script>
async function main() {
const server = await hyphaWebsocketClient.connectToServer({"server_url": "https://hypha.amun.ai"});
Expand Down
2 changes: 1 addition & 1 deletion helm-charts/aks-hypha.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ replicaCount: 1
image:
repository: ghcr.io/amun-ai/hypha
pullPolicy: IfNotPresent
tag: "0.20.41"
tag: "0.20.42"
serviceAccount:
create: true
Expand Down
2 changes: 1 addition & 1 deletion helm-charts/hypha-server/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.20.41
version: 0.20.42

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
Expand Down
2 changes: 1 addition & 1 deletion helm-charts/hypha-server/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ image:
repository: ghcr.io/amun-ai/hypha
pullPolicy: IfNotPresent
# Overrides the image tag whose default is the chart appVersion.
tag: "0.20.41"
tag: "0.20.42"

imagePullSecrets: []
nameOverride: ""
Expand Down
2 changes: 1 addition & 1 deletion hypha/VERSION
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"version": "0.20.41"
"version": "0.20.42"
}
146 changes: 117 additions & 29 deletions hypha/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
from hypha import main_version
from jinja2 import Environment, PackageLoader, select_autoescape
from typing import Any, Dict, List, Optional, Union
from hypha.core import UserInfo, UserPermission, ServiceInfo, ApplicationArtifact
from hypha.runner.browser import BrowserAppRunner
from hypha.core import UserInfo, UserPermission, ServiceInfo, ApplicationManifest
from hypha.utils import (
random_id,
PLUGIN_CONFIG_FIELDS,
Expand All @@ -20,6 +19,7 @@
import base58
import random
from hypha.plugin_parser import convert_config_to_artifact, parse_imjoy_plugin
from hypha.core import WorkspaceInfo

logging.basicConfig(stream=sys.stdout)
logger = logging.getLogger("apps")
Expand Down Expand Up @@ -54,27 +54,38 @@ def __init__(
loader=PackageLoader("hypha"), autoescape=select_autoescape()
)
self.templates_dir = Path(__file__).parent / "templates"
self._runners = None

def shutdown(_) -> None:
asyncio.ensure_future(self.shutdown())

self.event_bus.on_local("shutdown", shutdown)

async def client_disconnected(info: dict) -> None:
"""Handle client disconnected event."""
# {"id": client_id, "workspace": ws}
client_id = info["id"]
full_client_id = info["workspace"] + "/" + client_id
if full_client_id in self._sessions:
app_info = self._sessions.pop(full_client_id, None)
try:
await app_info["_runner"].stop(full_client_id)
except Exception as exp:
logger.warning(f"Failed to stop browser tab: {exp}")

self.event_bus.on_local("client_disconnected", client_disconnected)
store.set_server_app_controller(self)

async def get_runners(self):
# start the browser runner
server = await self.store.get_public_api()
svcs = await server.list_services("public/server-app-worker")
if not svcs:
return []
runners = [await server.get_service(svc["id"]) for svc in svcs]
if runners:
return runners
elif self._runners:
return self._runners
self._runners = [
BrowserAppRunner(in_docker=self.in_docker),
BrowserAppRunner(in_docker=self.in_docker),
]
return self._runners
else:
[]

async def setup_applications_collection(self, overwrite=True, context=None):
"""Set up the workspace."""
Expand Down Expand Up @@ -205,7 +216,7 @@ async def install(
"public_url": public_url,
}
)
ApplicationArtifact.model_validate(artifact_obj)
ApplicationManifest.model_validate(artifact_obj)

try:
artifact = await self.artifact_manager.read("applications", context=context)
Expand Down Expand Up @@ -352,7 +363,7 @@ async def start(
timeout: float = 60,
version: str = None,
wait_for_service: Union[str, bool] = None,
time_limit: Optional[int] = 600,
stop_after_inactive: Optional[int] = None,
context: Optional[dict] = None,
):
"""Start the app and keep it alive."""
Expand All @@ -379,10 +390,24 @@ async def start(
artifact_info = await self.artifact_manager.read(
f"applications:{app_id}", version=version, context=context
)
artifact = artifact_info.get("manifest", {})
artifact = ApplicationArtifact.model_validate(artifact)

entry_point = artifact.entry_point
manifest = artifact_info.get("manifest", {})
manifest = ApplicationManifest.model_validate(manifest)
if manifest.singleton:
# check if the app is already running
for session_info in self._sessions.values():
if session_info["app_id"] == app_id:
raise RuntimeError(
f"App {app_id} is a singleton app and already running (id: {session_info['id']})"
)
if manifest.daemon and stop_after_inactive and stop_after_inactive > 0:
raise ValueError("Daemon apps should not have stop_after_inactive set.")
if stop_after_inactive is None:
stop_after_inactive = (
600
if manifest.stop_after_inactive is None
else manifest.stop_after_inactive
)
entry_point = manifest.entry_point
assert entry_point, f"Entry point not found for app {app_id}."
server_url = self.local_base_url
local_url = (
Expand All @@ -404,21 +429,46 @@ async def start(
+ (f"&version={version}" if version else "")
+ (f"&use_proxy=true")
)

runner = random.choice(await self.get_runners())
runners = await self.get_runners()
if not runners:
raise Exception("No server app worker found")
runner = random.choice(runners)

full_client_id = workspace + "/" + client_id
await runner.start(
url=local_url, session_id=full_client_id, time_limit=time_limit
)
self._sessions[full_client_id] = {
metadata = {
"id": full_client_id,
"app_id": app_id,
"workspace": workspace,
"local_url": local_url,
"public_url": public_url,
"_runner": runner,
}

await runner.start(
url=local_url,
session_id=full_client_id,
metadata=metadata,
)
self._sessions[full_client_id] = metadata

# test activity tracker
tracker = self.store.get_activity_tracker()
if not manifest.daemon and stop_after_inactive and stop_after_inactive > 0:

async def _stop_after_inactive():
if full_client_id in self._sessions:
await runner.stop(full_client_id)
logger.info(
f"App {full_client_id} stopped because of inactive for {stop_after_inactive}s."
)

tracker.register(
full_client_id,
inactive_period=stop_after_inactive,
on_inactive=_stop_after_inactive,
entity_type="client",
)

# collecting services registered during the startup of the script
collected_services: List[ServiceInfo] = []
app_info = {
Expand Down Expand Up @@ -452,14 +502,14 @@ def service_added(info: dict):
)

# save the services
artifact.services = collected_services
artifact = ApplicationArtifact.model_validate(
artifact.model_dump(mode="json")
manifest.services = collected_services
manifest = ApplicationManifest.model_validate(
manifest.model_dump(mode="json")
)
await self.artifact_manager.edit(
f"applications:{app_id}",
version=version,
manifest=artifact.model_dump(mode="json"),
manifest=manifest.model_dump(mode="json"),
context=context,
)

Expand Down Expand Up @@ -495,7 +545,9 @@ async def stop(
f"User {user_info.id} does not have permission"
f" to stop app {session_id} in workspace {workspace}."
)
await self._stop(session_id, raise_exception=raise_exception)

async def _stop(self, session_id: str, raise_exception=True):
if session_id in self._sessions:
app_info = self._sessions.pop(session_id, None)
try:
Expand Down Expand Up @@ -543,20 +595,56 @@ async def list_running(self, context: Optional[dict] = None) -> List[str]:
async def list_apps(self, context: Optional[dict] = None):
"""List applications in the workspace."""
try:
ws = context["ws"]
apps = await self.artifact_manager.list_children(
"applications", context=context
f"{ws}/applications", context=context
)
return [app["manifest"] for app in apps]
except KeyError:
return []
raise KeyError(f"Applications collection not found: {ws}")
except Exception as exp:
raise Exception(f"Failed to list apps: {exp}") from exp

async def shutdown(self) -> None:
"""Shutdown the app controller."""
logger.info("Closing the server app controller...")
for app in self._sessions.values():
await self.stop(app["id"])
await self.stop(app["id"], raise_exception=False)

async def prepare_workspace(self, workspace_info: WorkspaceInfo):
"""Prepare the workspace."""
context = {
"ws": workspace_info.id,
"user": self.store.get_root_user().model_dump(),
}
apps = await self.list_apps(context=context)
# start daemon apps
for app in apps:
if app.get("daemon"):
try:
await self.start(app["id"], context=context)
except Exception as exp:
logger.error(
f"Failed to start daemon app: {app['id']}, error: {exp}"
)

async def close_workspace(self, workspace_info: WorkspaceInfo):
"""Archive the workspace."""
# Stop all running apps
for app in list(self._sessions.values()):
if app["workspace"] == workspace_info.id:
await self._stop(app["id"], raise_exception=False)
# Send to all runners
runners = await self.get_runners()
if not runners:
return
for runner in runners:
try:
await runner.close_workspace(workspace_info.id)
except Exception as exp:
logger.warning(
f"Worker failed to close workspace: {workspace_info.id}, error: {exp}"
)

def get_service_api(self) -> Dict[str, Any]:
"""Get a list of service API endpoints."""
Expand Down
Loading

0 comments on commit 74b8c13

Please sign in to comment.