Skip to content

Commit

Permalink
Update example
Browse files Browse the repository at this point in the history
  • Loading branch information
dewmal committed Aug 7, 2024
1 parent e3895e5 commit ba49583
Showing 1 changed file with 48 additions and 42 deletions.
90 changes: 48 additions & 42 deletions bindings/ceylon/tests/test_worker/meeting_schedular.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
import asyncio
import pickle
import sys
from typing import List

from loguru import logger
from pydantic.dataclasses import dataclass

from ceylon import Agent, CoreAdmin
from ceylon import Agent, CoreAdmin, on_message

admin_port = 8000
admin_peer = "Coordinator"
workspace_id = "time_scheduling"

# For libraries, should be your library's `__name__`

logger.disable("ceylon.agent.common")


@dataclass(repr=True)
class Meeting:
Expand Down Expand Up @@ -60,19 +66,17 @@ def __init__(self, name, available_times):
self.available_times = available_times
super().__init__(name=name, workspace_id=workspace_id, admin_peer=admin_peer, admin_port=admin_port)

async def on_message(self, agent_id: "str", data: "bytes", time: "int"):
data = pickle.loads(data)
if type(data) == AvailabilityRequest:
data: AvailabilityRequest = data
# Check if the time slot is available
if not any(self.is_overlap(slot, data.time_slot, data.time_slot.duration) for slot in self.available_times):
# print(f"Time slot {data.time_slot} is not available for {self.details().name}")
await self.broadcast(pickle.dumps(
AvailabilityResponse(owner=self.details().name, time_slot=data.time_slot, accepted=False)))
else:
# print(f"Time slot {data.time_slot} is available")
await self.broadcast(pickle.dumps(
AvailabilityResponse(owner=self.details().name, time_slot=data.time_slot, accepted=True)))
@on_message(type=AvailabilityRequest)
async def on_availability_request(self, data: AvailabilityRequest):
# Check if the time slot is available
if not any(self.is_overlap(slot, data.time_slot, data.time_slot.duration) for slot in self.available_times):
# print(f"Time slot {data.time_slot} is not available for {self.details().name}")
await self.broadcast_data(
AvailabilityResponse(owner=self.details().name, time_slot=data.time_slot, accepted=False))
else:
# print(f"Time slot {data.time_slot} is available")
await self.broadcast_data(
AvailabilityResponse(owner=self.details().name, time_slot=data.time_slot, accepted=True))

@staticmethod
def is_overlap(slot1: TimeSlot, slot2: TimeSlot, duration: int) -> bool:
Expand Down Expand Up @@ -102,34 +106,36 @@ async def run(self, inputs: "bytes"):
async def on_agent_connected(self, topic: "str", agent_id: "str"):
if self.next_time_slot is None and self.meeting is not None:
self.next_time_slot = TimeSlot(self.meeting.date, 0, self.meeting.duration)
await self.broadcast(pickle.dumps(AvailabilityRequest(time_slot=self.next_time_slot)))

async def on_message(self, agent_id: "str", data: "bytes", time: "int"):
data = pickle.loads(data)
if type(data) == AvailabilityResponse:
data: AvailabilityResponse = data
if data.accepted:
time_slot_key = f"{data.time_slot}"
print(f"{data.owner} accepts {data.time_slot}")
if time_slot_key in self.agreed_slots:
slots = self.agreed_slots[time_slot_key]
if data.owner not in slots:
slots.append(data.owner)
self.agreed_slots[time_slot_key] = slots
if len(slots) >= self.meeting.minimum_participants:
print(f"Meeting {slots} participants agreed on {data.time_slot}")
await self.stop()
else:
self.agreed_slots[time_slot_key] = [data.owner]

current_time_slot = data.time_slot
calculated_next_time_slot = TimeSlot(self.meeting.date, current_time_slot.start_time + 1,
current_time_slot.start_time + 1 + self.meeting.duration)

if calculated_next_time_slot.is_greater_than(self.next_time_slot):
self.next_time_slot = calculated_next_time_slot
# print(f"Next time slot: {self.next_time_slot}")
await self.broadcast(pickle.dumps(AvailabilityRequest(time_slot=self.next_time_slot)))
await self.broadcast_data(AvailabilityRequest(time_slot=self.next_time_slot))

@on_message(type=AvailabilityResponse)
async def on_availability_request(self, data: AvailabilityResponse):
await self.broadcast_data(
AvailabilityResponse(owner=self.details().name, time_slot=data.time_slot, accepted=True))

data: AvailabilityResponse = data
if data.accepted:
time_slot_key = f"{data.time_slot}"
print(f"{data.owner} accepts {data.time_slot}")
if time_slot_key in self.agreed_slots:
slots = self.agreed_slots[time_slot_key]
if data.owner not in slots:
slots.append(data.owner)
self.agreed_slots[time_slot_key] = slots
if len(slots) >= self.meeting.minimum_participants:
print(f"Meeting {slots} participants agreed on {data.time_slot}")
await self.stop()
else:
self.agreed_slots[time_slot_key] = [data.owner]

current_time_slot = data.time_slot
calculated_next_time_slot = TimeSlot(self.meeting.date, current_time_slot.start_time + 1,
current_time_slot.start_time + 1 + self.meeting.duration)

if calculated_next_time_slot.is_greater_than(self.next_time_slot):
self.next_time_slot = calculated_next_time_slot
# print(f"Next time slot: {self.next_time_slot}")
await self.broadcast_data(AvailabilityRequest(time_slot=self.next_time_slot))


async def main():
Expand Down

0 comments on commit ba49583

Please sign in to comment.