Skip to content

serve API schema through WAMP procedure #140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 14, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/app/main_async.py
Original file line number Diff line number Diff line change
@@ -54,6 +54,7 @@ async def on_startup():


app.add_event_handler("startup", on_startup)
app.set_schema_procedure("io.xconn.schema.get")


@app.register("io.xconn.echo")
14 changes: 9 additions & 5 deletions xconn/_client/async_.py
Original file line number Diff line number Diff line change
@@ -11,7 +11,6 @@
_handle_result,
_sanitize_incoming_data,
collect_docs,
serve_schema_async,
select_authenticator,
start_server_async,
handle_model_validation,
@@ -24,7 +23,7 @@
from xconn._client.types import ClientConfig
from xconn.client import AsyncClient
from xconn.async_session import AsyncSession
from xconn.types import Event, Invocation, Result
from xconn.types import Event, Invocation, Result, RegisterOptions, InvokeOptions


async def _setup(app: App, session: AsyncSession):
@@ -37,7 +36,7 @@ async def _setup(app: App, session: AsyncSession):
await subscribe_async(session, uri, func)


async def connect_async(app: App, config: ClientConfig, serve_schema=True, start_router: bool = False):
async def connect_async(app: App, config: ClientConfig, start_router: bool = False):
if start_router:
await start_server_async(config)

@@ -69,7 +68,7 @@ async def on_disconnect():
session = await client.connect(config.url, config.realm, on_connect, on_disconnect)
await _setup(app, session)

if serve_schema:
if app.schema_procedure is not None and app.schema_procedure != "":
docs = []

for uri, func in app.procedures.items():
@@ -78,7 +77,12 @@ async def on_disconnect():
for uri, func in app.topics.items():
docs.append(collect_docs(uri, func, "topic"))

await serve_schema_async(config.schema_host, config.schema_port, docs)
async def get_schema(_: Invocation) -> Result:
return Result(args=docs)

options = RegisterOptions(invoke=InvokeOptions.ROUNDROBIN)
await session.register(app.schema_procedure, get_schema, options=options)
print(f"serving schema at procedure {app.schema_procedure}")


@contextlib.asynccontextmanager
21 changes: 3 additions & 18 deletions xconn/_client/cli.py
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@
import importlib
import os
import sys
import ipaddress

import yaml

@@ -24,8 +23,6 @@ def handle_start(command_args: CommandArgs):
secret=command_args.secret,
ticket=command_args.ticket,
private_key=command_args.private_key,
schema_host=command_args.schema_host,
schema_port=command_args.schema_port,
)
config.websocket_config = WebsocketConfig(
command_args.open_timeout, command_args.ping_interval, command_args.ping_timeout
@@ -55,9 +52,6 @@ def handle_start(command_args: CommandArgs):

config.authmethod = helpers.select_authmethod(config)

# validate schema host
ipaddress.ip_address(config.schema_host)

split = command_args.app.split(":")
if len(split) != 2:
raise RuntimeError("invalid app argument, must be of format: module:instance")
@@ -69,9 +63,9 @@ def handle_start(command_args: CommandArgs):
raise RuntimeError(f"app instance is of unknown type {type(app)}")

if command_args.asyncio:
run(connect_async(app, config, serve_schema=True, start_router=command_args.start_router))
run(connect_async(app, config, start_router=command_args.start_router))
else:
connect_sync(app, config, serve_schema=True, start_router=command_args.start_router)
connect_sync(app, config, start_router=command_args.start_router)


def handle_init(
@@ -80,8 +74,6 @@ def handle_init(
authid: str,
authmethod: str,
secret: str,
schema_host: str,
schema_port: int,
open_timeout: int,
ping_interval: int,
ping_timeout: int,
@@ -99,8 +91,6 @@ def handle_init(
"authid": authid,
"authmethod": authmethod,
"secret": secret,
"schema_host": schema_host,
"schema_port": schema_port,
"websocket_config": {
"open_timeout": open_timeout,
"ping_interval": ping_interval,
@@ -127,8 +117,6 @@ def add_client_subparser(subparsers):
start.add_argument("--realm", type=str)
start.add_argument("--directory", type=str, default=".")
start.add_argument("--asyncio", action="store_true", default=False)
start.add_argument("--schema-host", type=str, default="127.0.0.1")
start.add_argument("--schema-port", type=int, default=9000)
start.add_argument("--router", action="store_true", default=False)
start.add_argument("--authid", type=str)
start.add_argument("--secret", type=str)
@@ -138,6 +126,7 @@ def add_client_subparser(subparsers):
start.add_argument("--open-timeout", type=int, default=10)
start.add_argument("--ping-interval", type=int, default=20)
start.add_argument("--ping-timeout", type=int, default=20)
start.add_argument("--schema-proc", type=str)
start.set_defaults(func=lambda args: handle_start(CommandArgs(**vars(args))))

stop = client_subparsers.add_parser("stop", help="Stop client")
@@ -150,8 +139,6 @@ def add_client_subparser(subparsers):
init.add_argument("--authid", type=str, default="anonymous")
init.add_argument("--authmethod", type=str, default="anonymous")
init.add_argument("--secret", type=str, default="")
init.add_argument("--schema-host", type=str, default="127.0.0.1")
init.add_argument("--schema-port", type=int, default=9000)
init.add_argument("--open-timeout", type=int, default=10)
init.add_argument("--ping-interval", type=int, default=20)
init.add_argument("--ping-timeout", type=int, default=20)
@@ -162,8 +149,6 @@ def add_client_subparser(subparsers):
args.authid,
args.authmethod,
args.secret,
args.schema_host,
args.schema_port,
args.open_timeout,
args.ping_interval,
args.ping_timeout,
16 changes: 0 additions & 16 deletions xconn/_client/helpers.py
Original file line number Diff line number Diff line change
@@ -369,22 +369,6 @@ async def serve_schema(_):
return app


async def serve_schema_async(host: str, port: int, docs: list, endpoint="/schema.json"):
app = create_app(docs, endpoint)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, host, port)
await site.start()
print(f"Schema available at http://{host}:{port}{endpoint}")


def serve_schema_sync(host: str, port: int, docs, endpoint="/schema.json"):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(serve_schema_async(host, port, docs, endpoint=endpoint))
loop.run_forever()


def select_authenticator(config: ClientConfig) -> IClientAuthenticator:
if config.authmethod == "cryptosign" or config.authmethod == "wampcra" or config.authmethod == "ticket":
if config.secret == "":
16 changes: 9 additions & 7 deletions xconn/_client/sync.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import contextlib
import random
import inspect
from multiprocessing import Process
import threading
import time
from typing import Any, Generator
@@ -14,7 +13,6 @@
_handle_result,
_sanitize_incoming_data,
collect_docs,
serve_schema_sync,
select_authenticator,
start_server_sync,
wait_for_server,
@@ -28,7 +26,7 @@
from xconn._client.types import ClientConfig
from xconn.client import Client
from xconn.session import Session
from xconn.types import Event, Invocation, Result
from xconn.types import Event, Invocation, Result, RegisterOptions, InvokeOptions


def _setup(app: App, session: Session):
@@ -41,7 +39,7 @@ def _setup(app: App, session: Session):
subscribe_sync(session, uri, func)


def connect_sync(app: App, config: ClientConfig, serve_schema: bool = False, start_router: bool = False):
def connect_sync(app: App, config: ClientConfig, start_router: bool = False):
if start_router:
threading.Thread(target=start_server_sync, args=(config,), daemon=True).start()

@@ -76,7 +74,7 @@ def on_disconnect():
session = client.connect(config.url, config.realm, on_connect, on_disconnect)
_setup(app, session)

if serve_schema:
if app.schema_procedure is not None and app.schema_procedure != "":
docs = []

for uri, func in app.procedures.items():
@@ -85,8 +83,12 @@ def on_disconnect():
for uri, func in app.topics.items():
docs.append(collect_docs(uri, func, "topic"))

docs_process = Process(target=serve_schema_sync, args=(config.schema_host, config.schema_port, docs))
docs_process.start()
def get_schema(_: Invocation) -> Result:
return Result(args=docs)

options = RegisterOptions(invoke=InvokeOptions.ROUNDROBIN)
session.register(app.schema_procedure, get_schema, options=options)
print(f"serving schema at procedure {app.schema_procedure}")


@contextlib.contextmanager
12 changes: 4 additions & 8 deletions xconn/_client/types.py
Original file line number Diff line number Diff line change
@@ -4,8 +4,6 @@


class ClientConfig(BaseModel):
schema_host: str
schema_port: int
url: str
realm: str
websocket_config: WebsocketConfig = WebsocketConfig()
@@ -23,16 +21,14 @@ class CommandArgs(BaseModel):
realm: str | None = None
directory: str | None = None
asyncio: bool
schema_host: str
schema_port: int
no_config: bool
start_router: bool = Field(alias="router")

open_timeout: int
ping_interval: int
ping_timeout: int

authid: str | None = None
ticket: str | None = None
secret: str | None = None
private_key: str | None = None

open_timeout: int
ping_interval: int
ping_timeout: int
12 changes: 10 additions & 2 deletions xconn/app.py
Original file line number Diff line number Diff line change
@@ -122,10 +122,11 @@ def __init__(self):
self._procedures = {}
self._topics = {}

self._session: Session | AsyncSession = None
self._session: Session | AsyncSession | None = None
self._components: list[Component] = []

self._startup_handler: Callable | Awaitable[None] = None
self._startup_handler: Callable | Awaitable[None] | None = None
self._schema_procedure: str | None = None

def set_session(self, session: Session | AsyncSession):
self._session = session
@@ -142,6 +143,10 @@ def set_session(self, session: Session | AsyncSession):
def components(self) -> list[Component]:
return self._components

@property
def schema_procedure(self) -> str:
return self._schema_procedure

def include_component(self, component: Component, prefix: str = "") -> None:
if prefix is None or len(prefix) == 0:
self._procedures.update(component.procedures)
@@ -160,3 +165,6 @@ def add_event_handler(self, event_type: str, handler: Callable | Awaitable):
raise ValueError(f"event_type {event_type} is not supported")

self._startup_handler = handler

def set_schema_procedure(self, uri: str):
self._schema_procedure = uri