From 4c9af11f99294f7083baa86203aef6b2509d8db5 Mon Sep 17 00:00:00 2001 From: dewma Date: Fri, 17 Jan 2025 23:26:13 +0530 Subject: [PATCH] Update readme files --- README.md | 12 +- bindings/ceylon/examples/auction/README.md | 74 +++-- .../ceylon/examples/task_manager/README.md | 133 ++++++++ .../ceylon/examples/time_scheduling/README.md | 99 ++++-- docs/examples/connect-through-network.md | 14 +- docs/examples/meeting-sechdular.md | 299 ++++++++++-------- docs/examples/single-item-auction.md | 227 ++++++------- docs/examples/task-manager.md | 199 ++++++++---- 8 files changed, 667 insertions(+), 390 deletions(-) create mode 100644 bindings/ceylon/examples/task_manager/README.md diff --git a/README.md b/README.md index ee78b5d8..32380eaf 100644 --- a/README.md +++ b/README.md @@ -52,12 +52,15 @@ Ceylon provides a range of tutorials and examples to help you get started and ma ### Example Projects -- **News Writing Panel**: [Colab Script](https://colab.research.google.com/drive/1ZMy0Iggni6fCQynBlyI1wL4WW4U_Fman?usp=sharing) - **Meeting Scheduler**: [Colab Script](https://colab.research.google.com/drive/1C-E9BN992k5sZYeJWnVrsWA5_ryaaT8m?usp=sharing) [Read more](https://github.com/ceylonai/ceylon/blob/master/docs/examples/meeting-sechdular.md) - **Single Item Auction**: [Colab Script](https://colab.research.google.com/drive/1C-E9BN992k5sZYeJWnVrsWA5_ryaaT8m?usp=sharing) [Read more](https://github.com/ceylonai/ceylon/blob/master/docs/examples/single-item-auction.md) -- **Task Manager**: [Read more](https://github.com/ceylonai/ceylon/blob/master/docs/examples/task-manager.md) + +### More Examples +- **Task Manager**: [Read more](bindings/ceylon/examples/task_manager) +- **Auction System**: [Read more](bindings/ceylon/examples/auction) +- **Time Scheduling**: [Read more](bindings/ceylon/examples/time_scheduling) - **Connect Through Network**: [Read more](.https://github.com/ceylonai/ceylon/blob/master/docs/examples/connect-through-network.md) ## 🚦 Getting Started @@ -66,9 +69,10 @@ To get started with Ceylon, refer to our detailed [Getting Started Guide](./docs ## 🚧 Roadmap -- [X] LLM Agent Stack -- [X] Job Handling (parallel & sequential) +- [X] Agent Stack +- [X] Python Client Release - [ ] Web Agent +- [ ] Task Manager - [ ] Agent Registry ## 🤝 Contributing diff --git a/bindings/ceylon/examples/auction/README.md b/bindings/ceylon/examples/auction/README.md index 801b40fe..07aff62e 100644 --- a/bindings/ceylon/examples/auction/README.md +++ b/bindings/ceylon/examples/auction/README.md @@ -18,32 +18,37 @@ The system consists of two main types of agents: - `Bid`: Represents a bid made by a bidder, including the bidder's name and bid amount. - `AuctionStart`: Signals the start of the auction with the item details. - `AuctionResult`: Represents the result of the auction, including the winner and winning bid amount. -- `AuctionEnd`: Signals the end of the auction. +- `AuctionEnd`: Signals the end of the auction, allowing bidders to acknowledge the auction completion. ### Agents 1. **Bidder (Worker)** - - Manages individual budgets and places bids. - - Uses a random bidding strategy to determine bid amounts. - - Methods: - - `on_message`: Handles incoming auction messages and places bids. + - Manages individual budgets and places bids. + - Uses a random bidding strategy with multiplier between 1.0 and 10.0 times the starting price. + - Only bids if budget is higher than the starting price. + - Methods: + - `on_message`: Handles incoming auction messages (AuctionStart, AuctionResult, AuctionEnd) and places bids. + - `run`: Maintains the bidder's event loop. 2. **Auctioneer (Admin)** - - Manages the overall auction process. - - Methods: - - `on_agent_connected`: Tracks connected bidders and starts the auction when all are connected. - - `start_auction`: Initiates the auction by broadcasting the item details. - - `on_message`: Processes incoming bids and ends the auction after each bid. - - `end_auction`: Determines the winner and broadcasts the result. + - Manages the overall auction process. + - Methods: + - `on_agent_connected`: Tracks connected bidders and starts the auction when all are connected. + - `start_auction`: Initiates the auction by broadcasting the item details. + - `on_message`: Processes incoming bids. + - `end_auction`: Determines the winner and broadcasts the result when all bids are received. + - `run`: Maintains the auctioneer's event loop. ## How It Works -1. The Auctioneer waits for all Bidders to connect. -2. Once all Bidders are connected, the Auctioneer starts the auction by broadcasting the item details. -3. Bidders receive the auction start message and place their bids using a random strategy. -4. The Auctioneer receives each bid and immediately ends the auction after processing it. -5. The Auctioneer determines the winner (highest bidder) and broadcasts the result. -6. Bidders receive the result and update their status (won/lost). +1. The Auctioneer is initialized with an item and expected number of bidders. +2. Bidders are created with individual budgets and connected to the Auctioneer. +3. The Auctioneer waits for all Bidders to connect. +4. Once all Bidders are connected, the Auctioneer starts the auction by broadcasting the item details. +5. Bidders receive the auction start message and place their bids using a random multiplier strategy. +6. The Auctioneer collects all bids and ends the auction after receiving bids from all bidders. +7. The Auctioneer determines the winner (highest bidder) and broadcasts the result. +8. Bidders receive the result, update their budgets if they won, and acknowledge the auction end. ## Running the Code @@ -51,7 +56,7 @@ To run the single-item auction simulation: 1. Ensure you have the required dependencies installed: ``` - pip install asyncio pydantic ceylon + pip install asyncio loguru ceylon ``` 2. Save the code in a file (e.g., `single_item_auction.py`). @@ -61,28 +66,37 @@ To run the single-item auction simulation: python single_item_auction.py ``` -4. The script will simulate the auction process and output the results, including connections, bids, and the final - auction result. +## Default Configuration + +The default setup includes: + +- A "Rare Painting" item with starting price of $1,000 +- Three bidders: + - Alice (Budget: $1,500) + - Bob (Budget: $1,200) + - Charlie (Budget: $2,000) +- Random bidding strategy with multipliers between 1.0x and 10.0x the starting price ## Customization You can customize the simulation by modifying the `main` function: -- Adjust the item's name and starting price. -- Change the number of Bidders and their budgets. -- Modify the bidding strategy in the `Bidder` class for more complex behavior. +- Adjust the item's name and starting price in the Item initialization +- Change the number of bidders and their budgets +- Modify the bidding strategy in the `Bidder.on_message` method +- Adjust the random multiplier range for more conservative or aggressive bidding ## Note -This example uses the Ceylon framework for agent communication. Ensure you have the Ceylon library properly installed -and configured in your environment. +This implementation uses the Ceylon framework for agent communication and the Loguru library for logging. Make sure you have these libraries properly installed in your environment. ## Limitations and Potential Improvements -- The current implementation ends the auction after the first bid, which may not be realistic for most auction - scenarios. -- There's no mechanism for multiple bidding rounds or time-based auction closure. -- The random bidding strategy might result in unrealistic bid amounts. -- Error handling and edge cases (e.g., no bids received) could be improved. +- The auction ends after receiving bids from all bidders, with no support for multiple bidding rounds +- The random bidding strategy is relatively simple and could be enhanced with more sophisticated algorithms +- There's no timeout mechanism for bidders who fail to submit a bid +- Error handling could be improved for edge cases like network failures or disconnected bidders +- The system could be extended to support multiple items or concurrent auctions +- Bidder authentication and bid verification could be added for security These limitations provide opportunities for extending and improving the system for more realistic auction simulations. \ No newline at end of file diff --git a/bindings/ceylon/examples/task_manager/README.md b/bindings/ceylon/examples/task_manager/README.md new file mode 100644 index 00000000..ba4156f0 --- /dev/null +++ b/bindings/ceylon/examples/task_manager/README.md @@ -0,0 +1,133 @@ +# Distributed Task Management System + +This project implements a distributed task management system using the Ceylon framework. It simulates a workforce of +agents with different skill levels performing tasks of varying difficulty. + +## Overview + +The system consists of two main types of agents: + +1. **Workers**: Agents with different skill levels who can perform tasks +2. **Task Manager**: Central coordinator that distributes tasks and monitors completion + +The system demonstrates skill-based task execution where success depends on the worker's skill level matching or +exceeding the task's difficulty. + +## Components + +### Data Classes + +- `Task`: Represents a task to be performed + - id: Unique identifier + - description: Task description + - difficulty: Integer value from 1-10 indicating task complexity + +- `TaskAssignment`: Message containing a task to be assigned + - task: The Task to be performed + +- `TaskResult`: Message containing the outcome of a task + - task_id: ID of the completed task + - worker: Name of the worker who performed the task + - success: Boolean indicating task completion success + +### Agents + +1. **WorkerAgent (Worker)** + - Represents an individual worker with specific skills + - Properties: + - name: Worker's identifier + - skill_level: Integer (1-10) representing worker's capabilities + - has_task: Boolean tracking if worker is currently assigned a task + - Methods: + - `on_message`: Handles task assignments and simulates task execution + - `run`: Maintains the worker's event loop + +2. **TaskManager (Admin)** + - Coordinates task distribution and monitors completion + - Properties: + - tasks: List of tasks to be assigned + - expected_workers: Number of workers expected to connect + - task_results: Collection of completed task results + - tasks_assigned: Boolean tracking if tasks have been distributed + - Methods: + - `on_agent_connected`: Triggers task distribution when all workers connect + - `assign_tasks`: Distributes tasks to connected workers + - `on_message`: Processes task completion results + - `end_task_management`: Summarizes task completion statistics + +## How It Works + +1. The TaskManager initializes with a list of tasks and expected number of workers +2. Workers connect to the system, each with their own skill level +3. Once all workers are connected, the TaskManager distributes tasks +4. Workers receive tasks and attempt to complete them based on their skill level + - Success occurs if worker's skill_level >= task difficulty + - Task execution time is simulated based on task difficulty +5. Workers report task completion results back to the TaskManager +6. The TaskManager collects all results and generates a completion report + +## Running the Code + +1. Install required dependencies: + ``` + pip install asyncio loguru ceylon + ``` + +2. Run the script: + ``` + python task_manager.py + ``` + +## Default Configuration + +The example includes: + +- Three tasks of increasing difficulty: + 1. Simple calculation (difficulty: 2) + 2. Data analysis (difficulty: 5) + 3. Machine learning model training (difficulty: 8) +- Three workers with different skill levels: + 1. Junior (skill level: 3) + 2. Intermediate (skill level: 6) + 3. Senior (skill level: 9) + +## Output + +The system provides detailed logging of: + +- Worker initialization and connections +- Task assignments +- Task completions with success/failure status +- Final success rate and detailed results using checkmarks (✓) and crosses (✗) + +## Customization + +You can customize the simulation by modifying the `main` function: + +- Add or remove tasks with different difficulties +- Change the number of workers and their skill levels +- Modify task descriptions and complexities +- Adjust the task execution simulation time + +## Note + +This implementation uses: + +- Ceylon framework for agent communication +- Loguru for enhanced logging +- Pickle for message serialization +- Asyncio for asynchronous execution + +## Limitations and Potential Improvements + +- Fixed one-to-one task assignment (each worker gets exactly one task) +- No task prioritization or queuing system +- No support for task dependencies or workflows +- Limited to synchronous task completion (no parallel task execution) +- No task reassignment on failure +- No worker load balancing +- No persistent storage of task results +- No error recovery mechanism for failed tasks +- No consideration of worker specializations beyond skill level + +These limitations provide opportunities for extending the system for more complex task management scenarios. \ No newline at end of file diff --git a/bindings/ceylon/examples/time_scheduling/README.md b/bindings/ceylon/examples/time_scheduling/README.md index fbad1d17..3cebbc9f 100644 --- a/bindings/ceylon/examples/time_scheduling/README.md +++ b/bindings/ceylon/examples/time_scheduling/README.md @@ -17,40 +17,60 @@ based on the availabilities of all participants. ### Data Classes -- `Meeting`: Represents the meeting to be scheduled, including name, date, duration, and minimum required participants. -- `TimeSlot`: Represents a specific time slot with a date, start time, and end time. -- `AvailabilityRequest`: Used by the Coordinator to request availability from Participants. -- `AvailabilityResponse`: Used by Participants to respond to availability requests. +- `Meeting`: Represents the meeting to be scheduled, including: + - name: Name of the meeting + - date: Date for the meeting + - duration: Length of the meeting in hours + - minimum_participants: Minimum number of participants required +- `TimeSlot`: Represents a specific time slot with: + - date: Date of the slot + - start_time: Start hour (0-23) + - end_time: End hour + - duration: Property that calculates slot length +- `AvailabilityRequest`: Used by the Coordinator to request availability for a specific time slot +- `AvailabilityResponse`: Used by Participants to respond with their availability, including: + - owner: Participant name + - time_slot: The proposed time slot + - accepted: Whether the participant is available +- `RunnerInput`: Wrapper class for the meeting request input ### Agents 1. **Participant (Worker)** - - Manages individual schedules and responds to availability requests. - - Methods: - - `on_message`: Handles incoming availability requests. - - `is_overlap`: Checks if a given time slot overlaps with available times. + - Manages individual schedules with predefined available time slots + - Methods: + - `on_message`: Handles availability requests and responds with acceptance/rejection + - `is_overlap`: Checks if a proposed time slot overlaps with available times + - Maintains a list of available TimeSlots for the participant 2. **Coordinator (Admin)** - - Manages the overall scheduling process. - - Methods: - - `run`: Initializes the scheduling process with a meeting request. - - `on_agent_connected`: Triggers the initial availability request. - - `on_message`: Processes responses from Participants and determines if a suitable time slot is found. + - Manages the overall scheduling process + - State tracking: + - meeting: Current meeting being scheduled + - agreed_slots: Dictionary tracking participant agreements for each time slot + - next_time_slot: Next time slot to be proposed + - Methods: + - `run`: Initializes the scheduling process with a meeting request + - `on_agent_connected`: Triggers initial availability request when agents connect + - `on_message`: Processes availability responses and proposes new time slots + - `is_overlap`: Utility method to check time slot overlaps ## How It Works -1. The Coordinator sends out an initial availability request to all Participants. -2. Participants check their schedules and respond with their availability. -3. The Coordinator collects responses and keeps track of agreed time slots. -4. If a time slot with enough participants is found, the process ends successfully. -5. If not, the Coordinator continues to propose new time slots until a suitable one is found or all possibilities are - exhausted. +1. The Coordinator is initialized with a Meeting request (name, duration, date, minimum participants) +2. Participants are created with their individual available time slots +3. When all participants connect, the Coordinator starts proposing time slots beginning at hour 0 +4. For each proposed time slot: + - Coordinator sends an AvailabilityRequest to all participants + - Participants check their schedules and respond with AvailabilityResponse + - Coordinator tracks agreements and proposes the next time slot if needed +5. The process continues until either: + - A time slot with enough agreeing participants is found + - All possible time slots for the day are exhausted ## Running the Code -To run the time scheduling simulation: - -1. Ensure you have the required dependencies installed: +1. Install required dependencies: ``` pip install asyncio pydantic ceylon ``` @@ -60,17 +80,40 @@ To run the time scheduling simulation: python time_scheduling.py ``` -3. The script will simulate the scheduling process and output the results, including which participants agreed on which - time slots. +## Default Configuration + +The example includes: +- Meeting "Meeting 1" with 2-hour duration and minimum 3 participants +- Five participants (Alice, Bob, Charlie, David, Kevin) with different availability windows +- Each participant has two available time slots during the day +- Date set to "2024-07-21" ## Customization You can customize the simulation by modifying the `main` function: -- Adjust the number of Participants and their availabilities. -- Modify the Meeting parameters (duration, date, minimum participants). +- Adjust the Meeting parameters (duration, minimum participants) +- Add or remove Participants +- Modify Participant availability windows +- Change the date of the meeting ## Note -This example uses the Ceylon framework for agent communication. Make sure you have the Ceylon library properly installed -and configured in your environment. \ No newline at end of file +This implementation uses: +- Ceylon framework for agent communication +- Pydantic for data validation and serialization +- Pickle for message serialization +- Asyncio for asynchronous execution + +## Limitations and Potential Improvements + +- Currently only supports single-day scheduling +- Time slots are integer-based (hour granularity) +- No support for recurring meetings +- No consideration of time zones +- Limited to searching sequential time slots +- No preference weighting for participants +- No support for required vs optional participants +- No persistence of schedules between runs + +These limitations provide opportunities for extending the system for more realistic scheduling scenarios. \ No newline at end of file diff --git a/docs/examples/connect-through-network.md b/docs/examples/connect-through-network.md index a994e070..1e5a7494 100644 --- a/docs/examples/connect-through-network.md +++ b/docs/examples/connect-through-network.md @@ -24,16 +24,16 @@ On the computer that will act as the server admin: ```python import pickle from loguru import logger -from ceylon import CoreAdmin, Agent +from ceylon.base.agents import Admin, Worker from ceylon.ceylon import AgentDetail -class ServerAdminAgent(CoreAdmin): +class ServerAdminAgent(Admin): async def on_agent_connected(self, topic: "str", agent: AgentDetail): logger.info(f"ServerAdminAgent on_agent_connected {self.details().name}", agent.id, agent.name, agent.role) -class WorkerAgent1(Agent): +class WorkerAgent1(Worker): async def run(self, inputs: "bytes"): logger.info(f"WorkerAgent1 on_run {self.details().name}", inputs) @@ -81,11 +81,11 @@ On a different computer that will act as the worker: import asyncio import pickle from loguru import logger -from ceylon import Agent -from ceylon.ceylon import uniffi_set_event_loop +from ceylon.base.agents import Admin, Worker +from ceylon.ceylon.ceylon import uniffi_set_event_loop -class WorkerAgent1(Agent): +class WorkerAgent1(Worker): async def run(self, inputs: "bytes"): logger.info(f"WorkerAgent1 on_run {self.details().name}", inputs) @@ -98,7 +98,7 @@ admin_port = 8000 worker_1 = WorkerAgent1("worker_2", "server_admin", role="Whatever", admin_port=admin_port, admin_peer=f"{admin_peer_id}@{admin_ip}:{admin_port}") -worker_1.run_worker(pickle.dumps({})) +worker_1.run(pickle.dumps({})) ``` ### 2.2 Understanding the Worker Script diff --git a/docs/examples/meeting-sechdular.md b/docs/examples/meeting-sechdular.md index 22bd1616..30d5659d 100644 --- a/docs/examples/meeting-sechdular.md +++ b/docs/examples/meeting-sechdular.md @@ -1,196 +1,239 @@ -# Simplified Meeting Scheduler Tutorial +# Advanced Meeting Scheduler Tutorial ## Introduction -This tutorial will show you how to create a simple meeting scheduler using Python. The scheduler will help find a time -when everyone can attend a meeting. +This tutorial will show you how to create a distributed meeting scheduler using Python with Ceylon framework. The scheduler finds optimal meeting times by coordinating between multiple participants using an agent-based approach. -## Step 1: Install the Required Tools +## Step 1: Set Up the Environment -First, we need to install some special tools (called libraries) that will help us build our scheduler. You can install -these using a program called pip. Type this into your computer's command line: +First, install the required dependencies: -``` -pip install ceylon +```bash +pip install ceylon pydantic ``` -## Step 2: Define the Building Blocks +## Step 2: Define the Data Models -Now, we'll create some "blueprints" for the different parts of our scheduler. These blueprints are called classes, and -they help us organize our code. +We'll use Pydantic for data validation and serialization. Here are our core data models: ```python +from pydantic import BaseModel from pydantic.dataclasses import dataclass +from typing import List, Any +# Input model for the scheduler +class RunnerInput(BaseModel): + request: Any + + class Config: + arbitrary_types_allowed = True -@dataclass +@dataclass(repr=True) class Meeting: - name: str # The name of the meeting - date: str # The date of the meeting - duration: int # How long the meeting will last - minimum_participants: int # The smallest number of people needed for the meeting - + name: str # Meeting name + date: str # Meeting date + duration: int # Duration in hours + minimum_participants: int # Minimum required participants -@dataclass +@dataclass(repr=True) class TimeSlot: - date: str # The date of the time slot - start_time: int # When the time slot starts - end_time: int # When the time slot ends - - -@dataclass + date: str # Date of the slot + start_time: int # Start hour (0-23) + end_time: int # End hour (0-23) + + @property + def duration(self): + return self.end_time - self.start_time + + def is_greater_than(self, other): + return self.end_time > other.end_time + +@dataclass(repr=True) class AvailabilityRequest: - time_slot: TimeSlot # The time slot we're asking about - + time_slot: TimeSlot -@dataclass +@dataclass(repr=True) class AvailabilityResponse: - owner: str # The name of the person responding - time_slot: TimeSlot # The time slot they're responding about - accepted: bool # Whether they can attend (True) or not (False) + owner: str # Participant name + time_slot: TimeSlot + accepted: bool # Availability status ``` -## Step 3: Create the Participant +## Step 3: Implement the Participant Agent -Next, we'll create a "Participant" who can tell us if they're available for a meeting. +The Participant class represents each meeting attendee: ```python -from ceylon import Agent, on_message -from typing import List - - -class Participant(Agent): - name: str # The participant's name - available_times: List[TimeSlot] # A list of times when they're free +class Participant(Worker): + name: str + available_times: List[TimeSlot] def __init__(self, name, available_times): self.name = name self.available_times = available_times - super().__init__(name=name, workspace_id="time_scheduling", admin_peer="Coordinator", admin_port=8000) - - @on_message(type=AvailabilityRequest) - async def on_availability_request(self, data: AvailabilityRequest): - # Check if the participant is available at the requested time - if self.is_available(data.time_slot): - # If available, send a positive response - await self.broadcast_data(AvailabilityResponse(owner=self.name, time_slot=data.time_slot, accepted=True)) - else: - # If not available, send a negative response - await self.broadcast_data(AvailabilityResponse(owner=self.name, time_slot=data.time_slot, accepted=False)) - - def is_available(self, requested_slot: TimeSlot) -> bool: - # Check if the requested time slot overlaps with any of the participant's available times - for available_slot in self.available_times: - if self.slots_overlap(available_slot, requested_slot): - return True - return False + super().__init__(name=name, workspace_id=workspace_id, + admin_peer=admin_peer, admin_port=admin_port) + + async def on_message(self, agent_id: str, data: bytes, time: int): + data = pickle.loads(data) + if type(data) == AvailabilityRequest: + data: AvailabilityRequest = data + # Check availability and respond + is_available = any(self.is_overlap(slot, data.time_slot, + data.time_slot.duration) + for slot in self.available_times) + + response = AvailabilityResponse( + owner=self.details().name, + time_slot=data.time_slot, + accepted=is_available + ) + await self.broadcast(pickle.dumps(response)) @staticmethod - def slots_overlap(slot1: TimeSlot, slot2: TimeSlot) -> bool: - # Check if two time slots overlap - return slot1.start_time < slot2.end_time and slot2.start_time < slot1.end_time + def is_overlap(slot1: TimeSlot, slot2: TimeSlot, duration: int) -> bool: + latest_start = max(slot1.start_time, slot2.start_time) + earliest_end = min(slot1.end_time, slot2.end_time) + return earliest_end - latest_start >= duration ``` -## Step 4: Create the Coordinator +## Step 4: Implement the Coordinator -The Coordinator is in charge of finding a time that works for everyone. +The Coordinator manages the scheduling process: ```python -from ceylon import CoreAdmin, on_message -import pickle - - -class Coordinator(CoreAdmin): +class Coordinator(Admin): meeting: Meeting = None agreed_slots = {} next_time_slot = None def __init__(self): - super().__init__(name="time_scheduling", port=8000) + super().__init__(name=workspace_id, port=admin_port) - async def run(self, inputs: "bytes"): - # Unpack the meeting details - self.meeting = pickle.loads(inputs) + async def run(self, inputs: bytes): + input: RunnerInput = pickle.loads(inputs) + self.meeting = input.request print("Meeting Schedule request: ", self.meeting) - async def on_agent_connected(self, topic: "str", agent_id: "str"): - # When a new participant connects, start asking about availability + async def on_agent_connected(self, topic: str, agent_id: str): if self.next_time_slot is None and self.meeting is not None: - self.next_time_slot = TimeSlot(self.meeting.date, 0, self.meeting.duration) - await self.broadcast_data(AvailabilityRequest(time_slot=self.next_time_slot)) - - @on_message(type=AvailabilityResponse) - async def on_availability_request(self, data: AvailabilityResponse): - # Process responses from participants + self.next_time_slot = TimeSlot( + self.meeting.date, 0, self.meeting.duration + ) + await self.broadcast(pickle.dumps( + AvailabilityRequest(time_slot=self.next_time_slot) + )) + + async def on_message(self, agent_id: str, data: bytes, time: int): + data = pickle.loads(data) + if type(data) == AvailabilityResponse: + await self.handle_availability_response(data) + + async def handle_availability_response(self, data: AvailabilityResponse): if data.accepted: - # If the participant is available, add them to the list for this time slot time_slot_key = f"{data.time_slot}" + print(f"{data.owner} accepts {data.time_slot}") + + # Track acceptances and check if we have enough participants if time_slot_key in self.agreed_slots: - self.agreed_slots[time_slot_key].append(data.owner) + slots = self.agreed_slots[time_slot_key] + if data.owner not in slots: + slots.append(data.owner) + self.agreed_slots[time_slot_key] = slots + if len(slots) >= self.meeting.minimum_participants: + print(f"Meeting {slots} participants agreed on {data.time_slot}") + await self.stop() + return else: self.agreed_slots[time_slot_key] = [data.owner] - # Check if we have enough participants for this time slot - if len(self.agreed_slots[time_slot_key]) >= self.meeting.minimum_participants: - print(f"Meeting scheduled! Participants: {self.agreed_slots[time_slot_key]}, Time: {data.time_slot}") - await self.stop() - return - - # If we haven't found a suitable time yet, try the next time slot - self.next_time_slot = TimeSlot(self.meeting.date, self.next_time_slot.start_time + 1, - self.next_time_slot.start_time + 1 + self.meeting.duration) - await self.broadcast_data(AvailabilityRequest(time_slot=self.next_time_slot)) + # Try next time slot + current_time_slot = data.time_slot + next_time_slot = TimeSlot( + self.meeting.date, + current_time_slot.start_time + 1, + current_time_slot.start_time + 1 + self.meeting.duration + ) + + if next_time_slot.is_greater_than(self.next_time_slot): + self.next_time_slot = next_time_slot + await self.broadcast(pickle.dumps( + AvailabilityRequest(time_slot=self.next_time_slot) + )) ``` ## Step 5: Run the Scheduler -Finally, we'll set up our participants and start the scheduler. +Here's how to set up and run the scheduler: ```python -import asyncio - - async def main(): - # Create our participants with their available times - alice = Participant("Alice", [TimeSlot("2024-07-21", 9, 12), TimeSlot("2024-07-21", 14, 18)]) - bob = Participant("Bob", [TimeSlot("2024-07-21", 10, 13), TimeSlot("2024-07-21", 15, 17)]) - charlie = Participant("Charlie", [TimeSlot("2024-07-21", 11, 14), TimeSlot("2024-07-21", 16, 18)]) - - # Create our coordinator + # Create participants with their available times + participants = [ + Participant("Alice", [ + TimeSlot("2024-07-21", 9, 12), + TimeSlot("2024-07-21", 14, 18) + ]), + Participant("Bob", [ + TimeSlot("2024-07-21", 10, 13), + TimeSlot("2024-07-21", 15, 17) + ]), + Participant("Charlie", [ + TimeSlot("2024-07-21", 11, 14), + TimeSlot("2024-07-21", 16, 18) + ]), + Participant("David", [ + TimeSlot("2024-07-21", 11, 14), + TimeSlot("2024-07-21", 16, 18) + ]), + Participant("Kevin", [ + TimeSlot("2024-07-21", 10, 13), + TimeSlot("2024-07-21", 15, 17) + ]) + ] + + # Create and run coordinator coordinator = Coordinator() - - # Start the scheduling process - meeting = Meeting(name="Team Meeting", duration=2, date="2024-07-21", minimum_participants=3) + meeting = Meeting( + name="Meeting 1", + duration=2, + date="2024-07-21", + minimum_participants=3 + ) + await coordinator.arun_admin( - inputs=pickle.dumps(meeting), - workers=[alice, bob, charlie] + inputs=pickle.dumps(RunnerInput(request=meeting)), + workers=participants ) - if __name__ == '__main__': asyncio.run(main()) ``` ## How It Works -1. We set up our "Participants" (Alice, Bob, and Charlie) and tell the computer when they're free. -2. We create a "Coordinator" who's in charge of finding a time that works for everyone. -3. The Coordinator asks each Participant if they're free at a certain time. -4. Each Participant checks their schedule and tells the Coordinator if they can make it. -5. If enough people can make it, the Coordinator schedules the meeting. -6. If not enough people can make it, the Coordinator tries a different time. -7. This keeps going until a good time is found or we run out of options. - -## Real-World Improvements - -To make this work better in the real world, we could: - -1. Connect it to people's actual calendars (like Google Calendar). -2. Send out email reminders about the meeting. -3. Allow for scheduling multiple meetings at once. -4. Let people join or leave the meeting after it's been scheduled. -5. Give some meetings or people higher priority. -6. Suggest different days if no time works for everyone. -7. Create a user-friendly website for people to use the scheduler. - -These improvements would make our simple scheduler much more useful for real-life situations! \ No newline at end of file +1. The scheduler uses a distributed agent-based architecture where each participant is an independent agent. +2. The Coordinator initiates the scheduling process by sending availability requests for time slots. +3. Each Participant agent checks their availability and responds to requests. +4. The Coordinator tracks responses and finds a time slot that works for the minimum required participants. +5. The system uses efficient overlap detection to check time slot compatibility. + +## Key Improvements from Basic Version + +1. Uses Pydantic for robust data validation +2. Implements proper serialization/deserialization +3. Adds duration-aware time slot overlap detection +4. Supports multiple participants beyond the minimum required +5. Includes better error handling and type safety +6. Uses asynchronous communication for better performance + +## Potential Enhancements + +1. Add persistence layer for storing scheduling history +2. Implement priority-based scheduling +3. Add support for recurring meetings +4. Implement calendar integration (Google Calendar, Outlook) +5. Add conflict resolution for competing meeting requests +6. Implement notification system for scheduled meetings +7. Add support for different time zones +8. Create a REST API interface for web/mobile clients \ No newline at end of file diff --git a/docs/examples/single-item-auction.md b/docs/examples/single-item-auction.md index 55d0508d..e6fcab45 100644 --- a/docs/examples/single-item-auction.md +++ b/docs/examples/single-item-auction.md @@ -2,190 +2,159 @@ ## Introduction -In this guide, we will walk you through the process of building a single-item auction system using the Ceylon framework. The system simulates an auction where multiple bidders compete to purchase an item, with an auctioneer managing the entire process. +This guide demonstrates how to build a real-time single-item auction system using the Ceylon framework. The system enables multiple bidders to compete for an item while an auctioneer manages the bidding process using Ceylon's agent-based architecture. ## System Overview -The auction system is composed of two primary components: +The auction system consists of two main components: -1. **Auctioneer**: Manages the auction, collects bids, and determines the winner. -2. **Bidders**: Participate in the auction by placing bids. +1. **Auctioneer (Admin Agent)**: Controls the auction flow by: + - Managing bidder connections + - Broadcasting auction start + - Collecting and processing bids + - Determining and announcing the winner -Ceylon's agent-based architecture allows seamless communication and coordination between these components, making it ideal for such a system. +2. **Bidders (Worker Agents)**: Participate in the auction by: + - Connecting to the auction system + - Placing bids within their budget + - Receiving auction results ## Prerequisites -Before you dive into the code, ensure you have the following: +- Python 3.7 or higher +- Ceylon framework (`pip install ceylon`) +- Basic understanding of: + - Asynchronous programming in Python + - Agent-based architectures + - The Ceylon framework -- Python 3.7 or higher installed on your system. -- The Ceylon framework installed (`pip install ceylon`). -- A basic understanding of asynchronous programming in Python. +## Implementation Details -## Step-by-Step Implementation +### Data Models -### 1. Defining Data Structures - -The first step in building our auction system is to define the data structures that will be used for communication between the auctioneer and bidders. We use Pydantic dataclasses for this purpose: +The system uses dataclasses for message passing between agents: ```python -from pydantic.dataclasses import dataclass - -@dataclass(repr=True) +@dataclass class Item: name: str starting_price: float -@dataclass(repr=True) +@dataclass class Bid: bidder: str amount: float -@dataclass(repr=True) +@dataclass class AuctionStart: item: Item -@dataclass(repr=True) +@dataclass class AuctionResult: winner: str winning_bid: float -@dataclass(repr=True) +@dataclass class AuctionEnd: pass ``` -- **Item**: Represents the item being auctioned, including its name and starting price. -- **Bid**: Represents a bid placed by a bidder, including the bidder's name and the bid amount. -- **AuctionStart**: Signals the start of the auction, carrying information about the item. -- **AuctionResult**: Contains the result of the auction, including the winner's name and the winning bid. -- **AuctionEnd**: Signals the end of the auction process. - -### 2. Implementing the Bidder Agent +### Auctioneer Implementation -Next, we'll implement the bidder agent. Each bidder will listen for the start of the auction and place a bid if their budget allows. +The Auctioneer extends Ceylon's `Admin` class and manages the auction lifecycle: ```python -import random -from ceylon import Agent, on_message - -class Bidder(Agent): - name: str - budget: float - - def __init__(self, name: str, budget: float): - self.name = name - self.budget = budget - super().__init__(name=name, workspace_id="single_item_auction", admin_peer="Auctioneer", admin_port=8000, role="bidder") - - @on_message(type=AuctionStart) - async def on_auction_start(self, data: AuctionStart): - if self.budget > data.item.starting_price: - random_i = random.randint(100, 1000) - bid_amount = min(self.budget, data.item.starting_price * random_i / 100) - await self.broadcast_data(Bid(bidder=self.name, amount=bid_amount)) - - @on_message(type=AuctionResult) - async def on_auction_result(self, data: AuctionResult): - if data.winner == self.name: - self.budget -= data.winning_bid - print(f"{self.name} won the auction for ${data.winning_bid:.2f}") +class Auctioneer(Admin): + def __init__(self, item: Item, expected_bidders: int, name="auctioneer", port=8888): + super().__init__(name=name, port=port) + self.item = item + self.expected_bidders = expected_bidders + self.bids = [] + self.auction_ended = False ``` -- **Initialization**: Each `Bidder` is initialized with a name and budget. -- **Handling Auction Start**: The bidder listens for the `AuctionStart` message and places a random bid within their budget. -- **Handling Auction Result**: The bidder listens for the `AuctionResult` message and deducts the winning bid amount from their budget if they win. +Key features: +- Tracks connected bidders +- Broadcasts auction start when all bidders connect +- Processes incoming bids +- Determines and announces the winner +- Manages auction completion -### 3. Implementing the Auctioneer +### Bidder Implementation -The auctioneer is responsible for managing the auction, including starting it, collecting bids, and determining the winner. +Bidders extend Ceylon's `Worker` class and handle auction participation: ```python -from ceylon import CoreAdmin -from typing import List - -class Auctioneer(CoreAdmin): - item: Item - bids: List[Bid] = [] - expected_bidders: int - connected_bidders: int = 0 - - def __init__(self, item: Item, expected_bidders: int): - self.item = item - self.expected_bidders = expected_bidders - super().__init__(name="single_item_auction", port=8000) - - async def on_agent_connected(self, topic: str, agent_id: str): - self.connected_bidders += 1 - print(f"Bidder {agent_id} connected. {self.connected_bidders}/{self.expected_bidders} bidders connected.") - if self.connected_bidders == self.expected_bidders: - print("All bidders connected. Starting the auction.") - await self.start_auction() - - async def start_auction(self): - print(f"Starting auction for {self.item.name} with starting price ${self.item.starting_price}") - await self.broadcast_data(AuctionStart(item=self.item)) - - @on_message(type=Bid) - async def on_bid(self, bid: Bid): - self.bids.append(bid) - print(f"Received bid from {bid.bidder} for ${bid.amount:.2f}") - await self.end_auction() - - async def end_auction(self): - if not self.bids: - print(f"No bids received for {self.item.name}") - else: - winning_bid = max(self.bids, key=lambda x: x.amount) - result = AuctionResult(winner=winning_bid.bidder, winning_bid=winning_bid.amount) - await self.broadcast_data(result) - print(f"Auction ended. Winner: {result.winner}, Winning Bid: ${result.winning_bid:.2f}") - await self.stop() - - await self.broadcast_data(AuctionEnd()) +class Bidder(Worker): + def __init__(self, name: str, budget: float, workspace_id=DEFAULT_WORKSPACE_ID, + admin_peer="", admin_port=8888): + super().__init__(name=name, workspace_id=workspace_id, + admin_peer=admin_peer, admin_port=admin_port) + self.budget = budget + self.has_bid = False ``` -- **Initialization**: The `Auctioneer` is initialized with the auction item and the expected number of bidders. -- **Handling Agent Connection**: The auctioneer waits for all bidders to connect before starting the auction. -- **Starting the Auction**: The auctioneer broadcasts the `AuctionStart` message to all bidders. -- **Handling Bids**: The auctioneer collects bids and ends the auction after receiving bids. -- **Ending the Auction**: The auctioneer determines the highest bid and announces the winner. +Key features: +- Maintains bidder budget +- Implements bidding strategy +- Processes auction messages +- Handles win/loss results -### 4. Running the Auction System +## Running the System -Finally, we set up the auction environment by creating the auctioneer and bidders, and then running the system. +To start the auction system: ```python -if __name__ == '__main__': - item = Item("Rare Painting", 1000) +async def main(): + # Create auction item + item = Item("Rare Painting", 1000.0) + # Initialize auctioneer + auctioneer = Auctioneer(item, expected_bidders=3) + admin_details = auctioneer.details() + + # Create bidders bidders = [ - Bidder("Alice", 1500), - Bidder("Bob", 1200), - Bidder("Charlie", 2000), + Bidder("Alice", 1500.0, admin_peer=admin_details.id), + Bidder("Bob", 1200.0, admin_peer=admin_details.id), + Bidder("Charlie", 2000.0, admin_peer=admin_details.id) ] - auctioneer = Auctioneer(item, expected_bidders=len(bidders)) - auctioneer.run_admin(inputs=b"", workers=bidders) + # Run the auction + await auctioneer.arun_admin(b"", bidders) + +if __name__ == "__main__": + asyncio.run(main()) ``` -- **Item Creation**: An item to be auctioned is created with a name and starting price. -- **Bidder Creation**: Three bidders are created with different budgets. -- **Auctioneer Setup**: An auctioneer is created with the item and the expected number of bidders. -- **System Execution**: The auction system is run, with the auctioneer managing the process and bidders placing bids. +## System Features + +- **Real-time Bidding**: Immediate bid processing and updates +- **Automatic Winner Selection**: Highest bid wins automatically +- **Budget Management**: Bidders cannot exceed their budget +- **Graceful Completion**: Clean shutdown after auction ends +- **Error Handling**: Robust message processing with error logging + +## Advanced Features -## Key Ceylon Framework Features Used +The current implementation includes: +- Random bidding strategy with multipliers +- Budget constraints +- Automatic auction completion +- Logging with loguru -1. **Agent-based Architecture**: The system uses `Agent` for bidders and `CoreAdmin` for the auctioneer. -2. **Message Handling**: The `@on_message` decorator handles specific message types, making it easy to react to different events in the auction process. -3. **Asynchronous Programming**: Ceylon leverages Python's async capabilities, ensuring efficient communication and coordination between agents. -4. **Broadcast Messaging**: The `broadcast_data()` method is used to send messages to all agents, ensuring everyone is informed of the auction's progress. +## Future Enhancements -## Potential Enhancements +Consider these potential improvements: +1. Multiple round support +2. Time-based auction endings +3. Different auction types (Dutch, Silent, etc.) +4. Reserve prices and minimum bid increments +5. Proxy bidding support +6. Real-time bid updates to all participants +7. Transaction history and audit logs +8. Automated testing suite -While the current implementation is functional, there are several ways to extend and enhance the system: +## Contributing -1. Implement a more sophisticated bidding strategy for the bidders. -2. Add support for multiple rounds of bidding. -3. Introduce a time limit for the auction, adding urgency to the bidding process. -4. Implement different auction types, such as Dutch auctions or silent auctions. -5. Improve error handling and account for edge cases, such as network failures or disconnected agents. \ No newline at end of file +Feel free to submit issues and enhancement requests. Contributions are welcome! \ No newline at end of file diff --git a/docs/examples/task-manager.md b/docs/examples/task-manager.md index 0d34dcca..01720d3e 100644 --- a/docs/examples/task-manager.md +++ b/docs/examples/task-manager.md @@ -2,29 +2,22 @@ ### Overview -In this tutorial, you'll learn how to create a distributed task management system using Python. This system will distribute tasks to worker agents based on their skill levels, execute the tasks asynchronously, and collect the results. We will use several Python libraries including `asyncio`, `pydantic`, and a custom distributed agent system called `ceylon`. +In this tutorial, you'll learn how to create a distributed task management system using Python. This system will distribute tasks to worker agents based on their skill levels, execute the tasks asynchronously, and collect the results. We will use several Python libraries including `asyncio`, `ceylon`, and `loguru` for logging. ### Prerequisites Before starting, ensure you have the following Python packages installed: -- `asyncio`: For asynchronous task handling. -- `pydantic`: For data validation and management. -- `loguru`: For logging. -- `ceylon`: For creating distributed agents. - -You can install the necessary packages using pip: - ```bash -pip install pydantic loguru ceylon +pip install loguru ceylon ``` ### Step 1: Define Data Structures -We'll start by defining the data structures that will represent tasks, task assignments, and task results. +We'll start by defining the data structures that will represent tasks, task assignments, and task results using Python's dataclasses. ```python -from pydantic.dataclasses import dataclass +from dataclasses import dataclass @dataclass class Task: @@ -32,12 +25,10 @@ class Task: description: str difficulty: int # 1-10 scale - @dataclass class TaskAssignment: task: Task - @dataclass class TaskResult: task_id: int @@ -47,66 +38,103 @@ class TaskResult: ### Step 2: Create the Worker Agent -The `WorkerAgent` class represents a worker that performs tasks. Each worker has a name and a skill level that determines its ability to complete tasks of varying difficulties. +The `WorkerAgent` class represents a worker that performs tasks. Each worker has a name and a skill level that determines its ability to complete tasks of varying difficulties. We inherit from Ceylon's `Worker` class. ```python import asyncio -from ceylon import Agent, on_message +import pickle +from ceylon.base.agents import Worker from loguru import logger -class WorkerAgent(Agent): - def __init__(self, name: str, skill_level: int): +class WorkerAgent(Worker): + def __init__(self, name: str, skill_level: int, + workspace_id=DEFAULT_WORKSPACE_ID, + admin_peer="", + admin_port=8000): self.name = name self.skill_level = skill_level - super().__init__(name=name, workspace_id="task_management", admin_port=8000) - - @on_message(type=TaskAssignment) - async def on_task_assignment(self, data: TaskAssignment): - logger.info(f"{self.name} received task: {data.task.description}") - # Simulate task execution - await asyncio.sleep(data.task.difficulty) - success = self.skill_level >= data.task.difficulty - await self.broadcast_data(TaskResult(task_id=data.task.id, worker=self.name, success=success)) + self.has_task = False + super().__init__( + name=name, + workspace_id=workspace_id, + admin_peer=admin_peer, + admin_port=admin_port + ) + logger.info(f"Worker {name} initialized with skill level {skill_level}") + + async def on_message(self, agent_id: str, data: bytes, time: int): + try: + message = pickle.loads(data) + + if isinstance(message, TaskAssignment) and not self.has_task: + logger.info(f"{self.name} received task: {message.task.description}") + self.has_task = True + + # Simulate task execution + await asyncio.sleep(message.task.difficulty) + success = self.skill_level >= message.task.difficulty + + result = TaskResult(task_id=message.task.id, worker=self.name, success=success) + await self.broadcast(pickle.dumps(result)) + logger.info(f"{self.name} completed task {message.task.id} with success={success}") + + except Exception as e: + logger.error(f"Error processing message in worker: {e}") ``` ### Step 3: Create the Task Manager -The `TaskManager` class is responsible for assigning tasks to workers and collecting the results. +The `TaskManager` class is responsible for assigning tasks to workers and collecting the results. We inherit from Ceylon's `Admin` class. ```python -from ceylon import CoreAdmin - -class TaskManager(CoreAdmin): - tasks = [] - workers = [] - task_results = [] +from ceylon.base.agents import Admin - def __init__(self, tasks, workers): - self.workers = workers +class TaskManager(Admin): + def __init__(self, tasks: List[Task], expected_workers: int, + name="task_manager", port=8000): + super().__init__(name=name, port=port) self.tasks = tasks - super().__init__(name="task_management", port=8000) - - async def on_agent_connected(self, topic: str, agent_id: AgentDetail): - logger.info(f"Worker {agent_id} connected") - if len(self.workers) == len(self.tasks): + self.expected_workers = expected_workers + self.task_results = [] + self.tasks_assigned = False + logger.info(f"Task Manager initialized with {len(tasks)} tasks") + + async def on_agent_connected(self, topic: str, agent_id: str): + await super().on_agent_connected(topic, agent_id) + connected_count = len(self.get_connected_agents()) + logger.info(f"Worker connected. {connected_count}/{self.expected_workers} workers connected.") + + if connected_count == self.expected_workers and not self.tasks_assigned: + logger.info("All workers connected. Starting task distribution.") await self.assign_tasks() async def assign_tasks(self): - for task, worker in zip(self.tasks, self.workers): - await self.broadcast_data(TaskAssignment(task=task)) - - @on_message(type=TaskResult) - async def on_task_result(self, result: TaskResult): - self.task_results.append(result) - logger.info( - f"Received result for task {result.task_id} from {result.worker}: {'Success' if result.success else 'Failure'}") - if len(self.task_results) == len(self.tasks): - await self.end_task_management() - - async def end_task_management(self): - success_rate = sum(1 for result in self.task_results if result.success) / len(self.task_results) - logger.info(f"All tasks completed. Success rate: {success_rate:.2%}") - await self.stop() + if self.tasks_assigned: + return + + self.tasks_assigned = True + connected_workers = self.get_connected_agents() + + for task, worker in zip(self.tasks, connected_workers): + assignment = TaskAssignment(task=task) + await self.broadcast(pickle.dumps(assignment)) + logger.info(f"Assigned task {task.id} to worker {worker.name}") + + async def on_message(self, agent_id: str, data: bytes, time: int): + try: + message = pickle.loads(data) + if isinstance(message, TaskResult): + self.task_results.append(message) + logger.info( + f"Received result for task {message.task_id} from {message.worker}: " + f"{'Success' if message.success else 'Failure'}" + ) + + if len(self.task_results) == len(self.tasks): + await self.end_task_management() + + except Exception as e: + logger.error(f"Error processing message in manager: {e}") ``` ### Step 4: Main Execution Script @@ -114,7 +142,7 @@ class TaskManager(CoreAdmin): Finally, we need a script to create tasks, instantiate workers, and run the task manager. ```python -if __name__ == '__main__': +async def main(): # Create tasks tasks = [ Task(id=1, description="Simple calculation", difficulty=2), @@ -122,18 +150,61 @@ if __name__ == '__main__': Task(id=3, description="Machine learning model training", difficulty=8), ] - # Create workers + # Create task manager + task_manager = TaskManager(tasks, expected_workers=3) + admin_details = task_manager.details() + + # Create workers with proper admin_peer workers = [ - WorkerAgent("Junior", skill_level=3), - WorkerAgent("Intermediate", skill_level=6), - WorkerAgent("Senior", skill_level=9), + WorkerAgent("Junior", skill_level=3, admin_peer=admin_details.id), + WorkerAgent("Intermediate", skill_level=6, admin_peer=admin_details.id), + WorkerAgent("Senior", skill_level=9, admin_peer=admin_details.id), ] - # Create and run task manager - task_manager = TaskManager(tasks, workers) - task_manager.run_admin(inputs=b"", workers=workers) + try: + logger.info("Starting task management system...") + await task_manager.arun_admin(b"", workers) + except KeyboardInterrupt: + logger.info("Shutting down task management system...") + +if __name__ == "__main__": + logger.info("Initializing task management system...") + asyncio.run(main()) ``` ### Running the System -To run the distributed task management system, simply execute the main script. The `TaskManager` will distribute tasks to workers based on their skill levels, and each worker will either succeed or fail in completing the task. The results will be collected and logged. \ No newline at end of file +To run the distributed task management system: + +1. Save the code as `task_management.py` +2. Ensure you have the required dependencies installed +3. Run the script: +```bash +python task_management.py +``` + +The system will: +- Create three workers with different skill levels +- Assign tasks of varying difficulty +- Execute tasks asynchronously +- Show detailed results including success/failure status for each task +- Calculate and display the overall success rate + +### Key Features + +- Asynchronous task execution +- Skill-based task assignment +- Proper message serialization using pickle +- Robust error handling +- Detailed logging of system events +- Connection state tracking +- Task completion monitoring +- Success rate calculation + +### Implementation Notes + +- Workers use broadcast messaging for task results +- Task Manager tracks worker connections +- Task assignments are only made once all workers are connected +- Each worker tracks its task state to prevent duplicate processing +- System uses proper serialization for message passing \ No newline at end of file