Skip to content

Skelmis/Zonis

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

27 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Zonis

A coro based callback system for many to one IPC setups.

pip install zonis


Simplistic example

Client:

import asyncio

from zonis import Client


async def main():
    client = Client(url="localhost:12344/ws")

    @client.route()
    async def ping():
        print("Pong'd!")
        return "pong"

    await client.start()
    await client.block_until_closed()


asyncio.run(main())

Server:

import asyncio
import json

from fastapi import FastAPI
from starlette.websockets import WebSocket, WebSocketDisconnect

from zonis import UnknownClient
from zonis.server import Server

app = FastAPI()
server = Server(using_fastapi_websockets=True)


@app.get("/")
async def index():
    try:
        response = await server.request("ping")
        return {"data": response}  # Returns pong
    except UnknownClient as e:
        return e


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    d: str = await websocket.receive_text()
    identifier = await server.parse_identify(json.loads(d), websocket)

    try:
        await asyncio.Future()
    except WebSocketDisconnect:
        await server.disconnect(identifier)
        print(f"Closed connection for {identifier}")

See the examples directory for more use cases.

For documentation, please see here.


Support

Want realtime help? Join the discord here.


Funding

Want a feature added quickly? Want me to help build your software using this?

Sponsor me here

About

Agnostic IPC for Python programs

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages