-
Notifications
You must be signed in to change notification settings - Fork 7
/
orchestrator.py
160 lines (128 loc) · 6.8 KB
/
orchestrator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from langchain import OpenAI
from langchain.chains.base import Chain
from langchain.llms import BaseLLM
from modules.evaluation import EvaluationModule
from langchain.vectorstores import Pinecone
from modules.execution import ExecutionModule
from modules.learning import LearningModule
from modules.memory import MemoryModule
from modules.perception import PerceptionModule
from modules.reasoning import ReasoningModule
from typing import Any, Dict, List, Optional
from colorama import Fore, Back, Style
from langchain.callbacks import get_openai_callback
import rich
class AgentOrchestrator(Chain):
memory_module: MemoryModule
perception_module: PerceptionModule
learning_module: LearningModule
reasoning_module: ReasoningModule
execution_module: ExecutionModule
evaluation_module: EvaluationModule
max_iterations: Optional[int] = None
@property
def input_keys(self) -> List[str]:
return ["objective"]
@property
def output_keys(self) -> List[str]:
return []
def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
self.memory_module.objective = inputs["objective"]
self.reasoning_module.initialize_tasks()
num_iters = 0
while True:
with get_openai_callback() as cb:
if self.reasoning_module.task_list:
# Step 1: Pull the first task
task = self.reasoning_module.task_list.popleft()
self.print_task_list()
self.print_next_task(task)
# TODO: Enable it back when flow is complerely tested
# # Process the current task using PerceptionModule
# processed_task = self.perception_module.process_task(task)
# self.print_optimized_next_task(processed_task)
processed_task = task
# Step 2: Execute the task
execution_result = self.execution_module.execute(processed_task)
self.print_task_result(execution_result)
self.reasoning_module.completed_task_list.append(task)
self.memory_module.store_result(execution_result, processed_task)
print(f"\n{Fore.LIGHTMAGENTA_EX}Saved new result to memory{Fore.RESET}")
# TODO: Enable it back when flow is complerely tested
# # Process the execution result using PerceptionModule before storing it in the MemoryModule
# processed_execution_result = self.perception_module.process_result(execution_result)
# self.print_optimized_task_result(processed_execution_result)
processed_execution_result = execution_result
# TODO: Enable it back when flow is complerely tested
# new_memory = self.learning_module.learn_from(
# observation=processed_execution_result,
# completed_tasks=list(self.reasoning_module.completed_task_list),
# pending_tasks=list(self.reasoning_module.task_list),
# )
# self.print_new_memory(new_memory)
# # Step 3: Store the result in Memory
# self.memory_module.store(new_memory)
# print(f"\n{Fore.LIGHTMAGENTA_EX}Saved new learnings to memory{Fore.RESET}")
# Step 4: Create new tasks and reprioritize task list
self.reasoning_module.update_tasks(processed_task, processed_execution_result)
print(f"\n{Fore.LIGHTMAGENTA_EX}Updated tasks based on stored data{Fore.RESET}")
# # Evaluate the task result
# is_finished, final_answer = self.evaluation_module.evaluate_from(
# observation=processed_execution_result,
# )
# self.print_evaluated_task_result(is_finished, final_answer)
# if is_finished:
# break
rich.print(cb)
num_iters += 1
if self.max_iterations is not None and num_iters == self.max_iterations:
print(f"\n{Fore.RED}\n*****TASK ENDING*****\n{Fore.RESET}")
break
# self.print_end(final_answer)
return {}
@classmethod
def from_llm(cls, llm: OpenAI, exec_llm: BaseLLM, verbose: bool = False, **kwargs) -> "AgentOrchestrator":
memory_module = MemoryModule(llm, verbose=verbose)
perception_module = PerceptionModule(llm, memory_module=memory_module, verbose=verbose)
learning_module = LearningModule(llm, memory_module=memory_module, verbose=verbose)
reasoning_module = ReasoningModule(llm, memory_module=memory_module, verbose=verbose)
execution_module = ExecutionModule(exec_llm, memory_module=memory_module, verbose=verbose)
evaluation_module = EvaluationModule(llm, memory_module=memory_module, verbose=verbose)
return cls(
memory_module=memory_module,
perception_module=perception_module,
reasoning_module=reasoning_module,
learning_module=learning_module,
execution_module=execution_module,
evaluation_module=evaluation_module,
**kwargs,
)
def print_task_list(self):
print(f"\n{Fore.BLUE}*****Completed*****{Fore.RESET}")
for task in self.reasoning_module.completed_task_list:
print(f"- {task['task_name']}")
print(f"\n{Fore.GREEN}*****Pending*****{Fore.RESET}")
for task in self.reasoning_module.task_list:
print(f"- {task['task_name']}")
def print_next_task(self, task: Dict):
print(f"\n{Fore.LIGHTBLUE_EX}*****Next Task*****{Fore.RESET}")
rich.print(task)
def print_optimized_next_task(self, task: Dict):
print(f"\n{Fore.LIGHTBLUE_EX}*****Optimized Next Task*****{Fore.RESET}")
rich.print(task)
def print_task_result(self, result: str):
print(f"\n{Fore.LIGHTGREEN_EX}*****Task Result*****{Fore.RESET}")
rich.print(result)
def print_optimized_task_result(self, result: str):
print(f"\n{Fore.LIGHTGREEN_EX}*****Optimized Task Result*****{Fore.RESET}")
rich.print(result)
def print_evaluated_task_result(self, is_finished: bool, result: str):
print(f"\n{Fore.LIGHTCYAN_EX}*****Evaluated Task Result*****{Fore.RESET}")
print(f"\n{Fore.LIGHTYELLOW_EX}Is finished: {is_finished}{Fore.RESET}")
rich.print(result)
def print_new_memory(self, new_memory: str):
print(f"\n{Fore.LIGHTMAGENTA_EX}*****New Memory*****{Fore.RESET}")
rich.print(new_memory)
def print_end(self, final_result):
print(f"\n{Fore.RED}*****End Result*****{Fore.RESET}")
rich.print(final_result)