Skip to content

Commit

Permalink
Merge pull request #320 from phenobarbital/dev
Browse files Browse the repository at this point in the history
more features on handle_upload
  • Loading branch information
phenobarbital authored Nov 30, 2024
2 parents f7f42c7 + 365b70c commit e0b7bfa
Show file tree
Hide file tree
Showing 18 changed files with 634 additions and 59 deletions.
39 changes: 39 additions & 0 deletions examples/test_sqs_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import asyncio
from navconfig import config
from navigator.brokers.sqs import SQSConnection
import pandas as pd

AWS_ACCESS_KEY = config.get('AWS_KEY')
AWS_SECRET_KEY = config.get('AWS_SECRET')
AWS_REGION = config.get('AWS_REGION')


async def example_callback(message, processed_message):
# print(f"Processed Message: {processed_message}")
# print(f"Type: {type(processed_message)}")
# print(f"Raw Message: {message}")
metadata = processed_message.get('metadata')
print(metadata)
payload = processed_message.get('payload')
print('PAYLOAD > ', payload)
df = pd.DataFrame([payload])
print(df)


async def main():
connection = SQSConnection(
credentials={
"aws_access_key_id": AWS_ACCESS_KEY,
"aws_secret_access_key": AWS_SECRET_KEY,
"region_name": AWS_REGION
}
)
async with connection as sqs:
await sqs.consume_messages("MainEvent", example_callback)


if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
79 changes: 79 additions & 0 deletions examples/test_sqs_producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from dataclasses import dataclass
import asyncio
from datamodel import BaseModel
from navconfig import config
from navigator.brokers.sqs import SQSConnection

AWS_ACCESS_KEY = config.get('AWS_KEY')
AWS_SECRET_KEY = config.get('AWS_SECRET')
AWS_REGION = config.get('AWS_REGION')


@dataclass
class Example:
name: str
age: int

class ExampleModel(BaseModel):
name: str
age: int

async def main():
connection = SQSConnection(
credentials={
"aws_access_key_id": AWS_ACCESS_KEY,
"aws_secret_access_key": AWS_SECRET_KEY,
"region_name": AWS_REGION
}
)
async with connection as sqs:
# Create an SQS Queue
queue_name = "MainEvent"
print(f"Creating queue: {queue_name}")
queue = await sqs.create_queue(queue_name)
queue_url = queue.url
print(f"Queue URL: {queue_url}")
# # Publish a JSON Message
# await sqs.publish_message("MyTestQueue", {"key": "value"})
# # Publish JSONPickle
# model = ExampleModel(name="John Doe", age=30)
# await sqs.publish_message("MyTestQueue", model)
# # Dataclasses:
# mdl = Example(name="John Doe", age=30)
# await sqs.publish_message("MyTestQueue", mdl)

# # Publish CloudPickle
# class CustomWrapper:
# def __init__(self, data):
# self.data = data

# wrapper = CustomWrapper(data={"nested_key": "nested_value"})
# await sqs.publish_message("MyTestQueue", wrapper)

form = {
"metadata": {
"type": "recapDefinition",
"transactionType": "UPSERT",
"source": "MainEvent",
"client": "global"
},
"payload": {
"formid": 7,
"form_name": "Assembly Tech Form",
"active": True,
"created_on": "2024-09-24T07:13:20-05:00",
"updated_on": "2024-09-25T12:51:53-05:00",
"is_store_stamp": False,
"client_id": 61,
"client_name": "ASSEMBLY",
"orgid": 71
}
}
# Publish plain text
await sqs.publish_message("MainEvent", form)

if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
4 changes: 0 additions & 4 deletions navigator/broker/__init__.py

This file was deleted.

4 changes: 4 additions & 0 deletions navigator/brokers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Navigator Broker.
Producer/Consumer broker Functionality, using RabbitMQ or AWS SQS as the message broker.
"""
107 changes: 107 additions & 0 deletions navigator/brokers/connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""
Base Abstract for all Broker Service connections.
"""
from typing import Optional, Union, Any
from collections.abc import Awaitable, Callable
from abc import ABC, abstractmethod
import asyncio
from navconfig.logging import logging
from .pickle import DataSerializer


class BaseConnection(ABC):
"""
Manages connection and operations over Broker Services.
"""

def __init__(
self,
credentials: Union[str, dict],
timeout: Optional[int] = 5,
**kwargs
):
self._credentials = credentials
self._timeout: int = timeout
self._connection = None
self._monitor_task: Optional[Awaitable] = None
self.logger = logging.getLogger(self.__class__.__name__)
self._queues: dict = {}
self.reconnect_attempts = 0
self.max_reconnect_attempts = kwargs.get(
'max_reconnect_attempts', 3
)
self.reconnect_delay = 1 # Initial delay in seconds
self._lock = asyncio.Lock()
self._serializer = DataSerializer()

def get_connection(self) -> Optional[Union[Callable, Awaitable]]:
if not self._connection:
raise RuntimeError('No connection established.')
return self._connection

def get_serializer(self) -> DataSerializer:
return self._serializer

async def __aenter__(self) -> 'BaseConnection':
await self.connect()
return self

async def __aexit__(self, exc_type, exc, tb) -> None:
await self.disconnect()

@abstractmethod
async def connect(self) -> None:
"""
Creates a Connection to Broker Service.
"""
raise NotImplementedError

@abstractmethod
async def disconnect(self) -> None:
"""
Disconnect from Broker Service.
"""
raise NotImplementedError

async def ensure_connection(self) -> None:
"""
Ensures that the connection is active.
"""
if self._connection is None:
await self.connect()

@abstractmethod
async def publish_message(
self,
exchange_name: str,
routing_key: str,
body: Union[str, list, dict, Any],
**kwargs
) -> None:
"""
Publish a message to the Broker Service.
"""
raise NotImplementedError

@abstractmethod
async def consume_messages(
self,
queue_name: str,
callback: Callable,
**kwargs
) -> None:
"""
Consume messages from the Broker Service.
"""
raise NotImplementedError

@abstractmethod
async def process_message(
self,
body: bytes,
properties: Any
) -> str:
"""
Process a message from the Broker Service.
"""
raise NotImplementedError
36 changes: 28 additions & 8 deletions navigator/broker/pickle.py → navigator/brokers/pickle.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from typing import Any
import base64
from dataclasses import dataclass
import jsonpickle
from jsonpickle.handlers import BaseHandler
from jsonpickle.unpickler import loadclass
import msgpack
import cloudpickle
from datamodel import BaseModel
from datamodel import Model, BaseModel


class ModelHandler(jsonpickle.handlers.BaseHandler):
class ModelHandler(BaseHandler):

"""ModelHandler.
This class can handle with serializable Data Models.
"""
Expand All @@ -17,15 +21,14 @@ def flatten(self, obj, data):
def restore(self, obj):
module_and_type = obj['py/object']
mdl = loadclass(module_and_type)
if hasattr(mdl, '__new__'):
cls = mdl.__new__(mdl)
else:
cls = object.__new__(mdl)

cls.__dict__ = self.context.restore(obj['__dict__'], reset=False)
cls = mdl.__new__(mdl) # Create a new instance without calling __init__
# cls.__dict__ = self.context.restore(obj['__dict__'], reset=False)
# Restore attributes from the saved __dict__
cls.__dict__.update(self.context.restore(obj['__dict__'], reset=False))
return cls

jsonpickle.handlers.registry.register(BaseModel, ModelHandler, base=True)
jsonpickle.handlers.registry.register(Model, ModelHandler, base=True)


class DataSerializer:
Expand All @@ -49,6 +52,7 @@ def decode(data: str) -> Any:
try:
return jsonpickle.decode(data)
except Exception as err:
print('JSON ', err)
raise RuntimeError(err) from err

@staticmethod
Expand All @@ -71,3 +75,19 @@ def unserialize(data: Any) -> dict:
return cloudpickle.loads(decoded_data)
except Exception as err:
raise RuntimeError(err) from err

def pack(self, data: Any) -> bytes:
"""Pack Data.
"""
try:
return msgpack.packb(data)
except Exception as err:
raise RuntimeError(err) from err

def unpack(self, data) -> Any:
"""Unpack Data.
"""
try:
return msgpack.unpackb(data)
except Exception as err:
raise RuntimeError(err) from err
3 changes: 3 additions & 0 deletions navigator/brokers/rabbitmq/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""
Using RabbitMQ as Message Broker.
"""
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from navconfig.logging import logging
from navigator.applications.base import BaseApplication
from .rabbit import RabbitMQConnection
from .pickle import DataSerializer
from ..pickle import DataSerializer


# Disable Debug Logging for AIORMQ
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
from navigator_auth.conf import (
AUTH_SESSION_OBJECT
)
from ..conf import BROKER_MANAGER_QUEUE_SIZE
from ...conf import BROKER_MANAGER_QUEUE_SIZE
from .rabbit import RabbitMQConnection
from .pickle import DataSerializer
from ..pickle import DataSerializer


# Disable Debug Logging for AIORMQ
Expand All @@ -31,6 +31,10 @@ class BrokerManager(RabbitMQConnection, metaclass=Singleton):
Args:
dsn: RabbitMQ DSN.
queue_size: Size of Asyncio Queue for enqueuing messages before send.
num_workers: Number of workers to process the queue.
timeout: Timeout for RabbitMQ Connection.
"""
_name_: str = "broker_manager"
Expand Down
Loading

0 comments on commit e0b7bfa

Please sign in to comment.