From 00707b7df02f062ceba9328a688682d3acf25b52 Mon Sep 17 00:00:00 2001 From: Simon Li Date: Thu, 5 Mar 2020 15:04:05 +0000 Subject: [PATCH 1/3] =?UTF-8?q?copy=20clients.createSession=20=E2=86=92=20?= =?UTF-8?q?asyncCreateSession?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/omero/clients.py | 114 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/src/omero/clients.py b/src/omero/clients.py index 3bb5652f6..62d54b44d 100755 --- a/src/omero/clients.py +++ b/src/omero/clients.py @@ -706,6 +706,120 @@ def createSession(self, username=None, password=None): finally: self.__lock.release() + async def createAsyncSession(self, username=None, password=None): + """ + This is a copy of createSesson + except that: + - the session is created asynchronously + - keep alive is not initialised + """ + import omero + + self.__lock.acquire() + try: + + # Checking state + + if self.__sf: + raise omero.ClientError( + "Session already active. " + "Create a new omero.client or closeSession()") + + if not self.__ic: + if not self.__previous: + raise omero.ClientError( + "No previous data to recreate communicator.") + self._initData(self.__previous) + self.__previous = None + + # Check the required properties + + if not username: + username = self.getProperty("omero.user") + elif isinstance(username, omero.RString): + username = username.val + + if not username or len(username) == 0: + raise omero.ClientError("No username specified") + + if not password: + password = self.getProperty("omero.pass") + elif isinstance(password, omero.RString): + password = password.val + + if not password: + raise omero.ClientError("No password specified") + + # Acquire router and get the proxy + prx = None + retries = 0 + while retries < 3: + reason = None + if retries > 0: + self.__logger.warning( + "%s - createSession retry: %s" % (reason, retries)) + try: + ctx = self.getContext() + ctx[omero.constants.AGENT] = self.__agent + if self.__ip is not None: + ctx[omero.constants.IP] = self.__ip + rtr = self.getRouter(self.__ic) + prx = rtr.createSession(username, password, ctx) + + # Create the adapter + self.__oa = self.__ic.createObjectAdapterWithRouter( + "omero.ClientCallback", rtr) + self.__oa.activate() + + id = Ice.Identity() + id.name = self.__uuid + id.category = rtr.getCategoryForClient() + + self.__cb = BaseClient.CallbackI(self.__ic, self.__oa, id) + self.__oa.add(self.__cb, id) + + break + except omero.WrappedCreateSessionException as wrapped: + if not wrapped.concurrency: + raise wrapped # We only retry concurrency issues. + reason = "%s:%s" % (wrapped.type, wrapped.reason) + retries = retries + 1 + except Ice.ConnectTimeoutException as cte: + reason = "Ice.ConnectTimeoutException:%s" % str(cte) + retries = retries + 1 + + if not prx: + raise omero.ClientError("Obtained null object prox") + + # Check type + self.__sf = omero.api.ServiceFactoryPrx.uncheckedCast(prx) + if not self.__sf: + raise omero.ClientError( + "Obtained object proxy is not a ServiceFactory") + + # Configure keep alive + self.startKeepAlive() + + # Set the client callback on the session + # and pass it to icestorm + try: + + raw = self.__oa.createProxy(self.__cb.id) + self.__sf.setCallback( + omero.api.ClientCallbackPrx.uncheckedCast(raw)) + # self.__sf.subscribe("/public/HeartBeat", raw) + except: + self.__del__() + raise + + # Set the session uuid in the implicit context + self.getImplicitContext().put( + omero.constants.SESSIONUUID, self.getSessionId()) + + return self.__sf + finally: + self.__lock.release() + def enableKeepAlive(self, seconds): """ Resets the "omero.keep_alive" property on the current From bce0e87662b3357879a3ee7ae55a7cb40887fd4e Mon Sep 17 00:00:00 2001 From: Simon Li Date: Thu, 5 Mar 2020 15:17:04 +0000 Subject: [PATCH 2/3] Add async service wrappers --- src/omero/iceasync.py | 133 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 src/omero/iceasync.py diff --git a/src/omero/iceasync.py b/src/omero/iceasync.py new file mode 100644 index 000000000..c29423e0f --- /dev/null +++ b/src/omero/iceasync.py @@ -0,0 +1,133 @@ +import asyncio +from functools import partial, update_wrapper +import logging + + +logger = logging.getLogger(__name__) + + +def _firstline_truncate(s): + lines = "{}\n".format(s).splitlines() + if len(lines[0]) > 80 or len(lines) > 1: + s = lines[0][:79] + "…" + return s + + +async def ice_async(loop, func, *args, **kwargs): + """ + Wrap an asynchronous Ice service method so it can be used with asyncio + + loop: The event loop + func: The Ice service method + *args: Positional arguments for the Ice service method + *kwargs: Keyword arguments for the Ice service method + """ + # https://docs.python.org/3.6/library/asyncio-task.html#example-future-with-run-until-complete + + # Ice runs in a different thread from asyncio so must use + # call_soon_threadsafe + # https://docs.python.org/3.6/library/asyncio-dev.html#concurrency-and-multithreading + + future = loop.create_future() + + def exception_cb(ex): + logger.warning("exception_cb: %s", _firstline_truncate(ex)) + loop.call_soon_threadsafe(future.set_exception, ex) + + def response_cb(result=None, *outparams): + logger.debug("response_cb: %s", _firstline_truncate(result)) + loop.call_soon_threadsafe(future.set_result, result) + + a = func(*args, **kwargs, _response=response_cb, _ex=exception_cb) + logger.debug( + "_exec_ice_async(%s) sent:%s completed:%s", + func.__name__, + a.isSent(), + a.isCompleted(), + ) + + result = await future + return result + + +class AsyncService: + def __init__(self, svc, loop=None): + """ + Convert an OMERO Ice service to an async service + + svc: The OMERO Ice service + loop: The async event loop (optional) + """ + + # This would be easier in Python 3.7 since Future.get_loop() returns + # the loop the Future is bound to so there's no need to pass it + # https://docs.python.org/3/library/asyncio-future.html#asyncio.Future.get_loop + if not loop: + loop = asyncio.get_event_loop() + methods = { + m for m in dir(svc) + if callable(getattr(svc, m)) and not m.startswith("_") + } + + # Ice methods come in sync (`f`) and async (`begin_f`…`end_f`) versions + # https://doc.zeroc.com/ice/3.6/language-mappings/python-mapping/client-side-slice-to-python-mapping/asynchronous-method-invocation-ami-in-python + # Replace each set of functions with a single async function `f`. + # Uses `update_wrapper` to copy the original signature for `f` to the + # wrapped function. + async_methods = {m for m in methods if m.startswith("begin_")} + for async_m in async_methods: + sync_m = async_m[6:] + methods.remove(sync_m) + methods.remove("begin_" + sync_m) + methods.remove("end_" + sync_m) + setattr( + self, + sync_m, + update_wrapper( + partial(ice_async, loop, getattr(svc, async_m)), + getattr(svc, sync_m), + ), + ) + for sync_m in methods: + setattr( + self, + sync_m, + update_wrapper(partial( + getattr(svc, sync_m)), getattr(svc, sync_m)), + ) + + +async def _getServiceWrapper(getsvc_m, loop): + svc = await getsvc_m() + return AsyncService(svc, loop) + + +class AsyncSession(AsyncService): + def __init__(self, session, loop=None): + """ + Wrap a session from client.getSession() so all services are async + + session: The OMERO session + loop: The async event loop (optional) + """ + + # This will wrap methods including getXxxService(), but we need to also + # wrap the results of those services + super().__init__(session, loop) + getsvc_methods = { + m + for m in dir(self) + if callable(getattr(self, m)) + and m.startswith("get") + and m.endswith("Service") + } + + for getsvc_m in getsvc_methods: + setattr( + self, + getsvc_m, + update_wrapper( + partial(_getServiceWrapper, getattr(self, getsvc_m), loop), + getattr(session, getsvc_m), + ), + ) From a6583d0b80aeb38148234f628d5a741af63a8577 Mon Sep 17 00:00:00 2001 From: Simon Li Date: Thu, 5 Mar 2020 15:17:26 +0000 Subject: [PATCH 3/3] Fill in createAsyncSession --- src/omero/clients.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/omero/clients.py b/src/omero/clients.py index 62d54b44d..44b27d80a 100755 --- a/src/omero/clients.py +++ b/src/omero/clients.py @@ -33,6 +33,7 @@ import re import ssl import uuid +from .iceasync import AsyncService, AsyncSession from past.builtins import basestring @@ -764,7 +765,8 @@ async def createAsyncSession(self, username=None, password=None): if self.__ip is not None: ctx[omero.constants.IP] = self.__ip rtr = self.getRouter(self.__ic) - prx = rtr.createSession(username, password, ctx) + prx = await AsyncService(rtr).createSession( + username, password, _ctx=ctx) # Create the adapter self.__oa = self.__ic.createObjectAdapterWithRouter( @@ -792,23 +794,22 @@ async def createAsyncSession(self, username=None, password=None): raise omero.ClientError("Obtained null object prox") # Check type - self.__sf = omero.api.ServiceFactoryPrx.uncheckedCast(prx) - if not self.__sf: + sf = omero.api.ServiceFactoryPrx.uncheckedCast(prx) + if not sf: raise omero.ClientError( "Obtained object proxy is not a ServiceFactory") + self.__sf = AsyncSession(sf) - # Configure keep alive - self.startKeepAlive() + # Don't automatically configure keep alive # Set the client callback on the session # and pass it to icestorm try: - raw = self.__oa.createProxy(self.__cb.id) - self.__sf.setCallback( + await self.__sf.setCallback( omero.api.ClientCallbackPrx.uncheckedCast(raw)) # self.__sf.subscribe("/public/HeartBeat", raw) - except: + except BaseException: self.__del__() raise