Skip to content

Commit c5d6fb6

Browse files
upload codes
1 parent a922b9b commit c5d6fb6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+2345
-1
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,3 +172,6 @@ cython_debug/
172172

173173
# PyPI configuration file
174174
.pypirc
175+
176+
# test files
177+
tests/files/

README.md

Lines changed: 211 additions & 1 deletion
Large diffs are not rendered by default.

docs/pipeline.png

20.3 KB
Loading

langpipe/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from .lpnode import LPNode, LPNodeType, LPNodeState
2+
from .lpbegin import LPBegin
3+
from .lpend import LPEnd
4+
from .lpclassifier import LPClassifier
5+
from .lpextractor import LPExtractor
6+
from .lpaggregator import LPAggregator
7+
from .lpgenerator import LPGenerator
8+
from .lpchatter import LPChatter
9+
from .lpbaseinvoker import LPBaseInvoker
10+
from .lpbaserouter import LPBaseRouter
11+
from .lpsqlcreator import LPSQLCreator
12+
from .lpboardrender import LPBoardRender

langpipe/lpaggregator.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import json
2+
from ollama import generate
3+
from .lpnode import LPNode, LPNodeType
4+
5+
class LPAggregator(LPNode):
6+
"""
7+
data aggregation using LLM base on Ollama.
8+
access `lpdata['global_vars']['aggregated_data']` for the aggregated data.
9+
"""
10+
def __init__(self, name, aggregate_desc=None, model='minicpm-v:8b') -> None:
11+
super().__init__(name, LPNodeType.LLM, model)
12+
self.__aggregate_desc = aggregate_desc
13+
self.__aggregated_data = None
14+
self.__aggregate_prompt_template = """
15+
你是一个智能信息聚合器(Aggregator),负责整合多个来源的信息,并基于所有可用数据生成高质量、清晰且有逻辑性的最终回答。
16+
以下是所有可用的信息:
17+
---
18+
{0}
19+
---
20+
21+
以下是待回答的问题:
22+
---
23+
{1}
24+
---
25+
26+
任务要求:
27+
1. **信息整合**:充分利用所有来源的数据,确保信息完整,不遗漏任何重要内容。
28+
2. **语义流畅**:避免直接罗列数据,而是用自然语言组织,使回答清晰易懂。
29+
30+
请给出最终整合后的完整回答:
31+
"""
32+
33+
def _handle(self, lpdata) -> None:
34+
query = lpdata['query'] if self.__aggregate_desc is None else self.__aggregate_desc
35+
prompt = self.__aggregate_prompt_template.format(json.dumps(lpdata['global_vars'], indent=4, ensure_ascii=False), query)
36+
37+
response = generate(model=self.model,
38+
prompt=prompt,
39+
options={
40+
'top_k': 1,
41+
'temperature': 0.5
42+
})
43+
self.__aggregated_data = response['response']
44+
45+
# update records
46+
messages = lpdata['records'][-1]['messages']
47+
message = {}
48+
message['role'] = 'user'
49+
message['content'] = prompt
50+
messages.append(message)
51+
52+
message = {}
53+
message['role'] = 'assistant'
54+
message['content'] = response['response']
55+
messages.append(message)
56+
57+
def _after_handle(self, lpdata) -> None:
58+
super()._after_handle(lpdata)
59+
60+
# update local vars
61+
record = lpdata['records'][-1]
62+
record['local_vars']['__aggregate_desc'] = self.__aggregate_desc
63+
64+
# update global variables
65+
lpdata['final_out'] = self.__aggregated_data
66+
lpdata['global_vars']['aggregated_data'] = self.__aggregated_data

langpipe/lpbaseinvoker.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
2+
from abc import ABC, abstractmethod
3+
from typing import final
4+
from .lpnode import LPNode, LPNodeType
5+
6+
class LPBaseInvoker(ABC, LPNode):
7+
"""
8+
base class for all invoker nodes, which invokes external services, access database, or call 3rd tools.
9+
"""
10+
def __init__(self, name) -> None:
11+
super().__init__(name, LPNodeType.Invoke, None)
12+
13+
@final
14+
def _handle(self, lpdata) -> None:
15+
# hidden _handle(...) in derived classes, using _invoke(...) instead
16+
self._invoke(lpdata)
17+
18+
@abstractmethod
19+
def _invoke(self, lpdata) -> None:
20+
pass

langpipe/lpbaserouter.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import threading
2+
from abc import ABC, abstractmethod
3+
from typing import final
4+
from .lpnode import LPNode, LPNodeType
5+
6+
class LPBaseRouter(ABC, LPNode):
7+
"""
8+
base class for router node, which is used to route the data to different branches in pipeline.
9+
"""
10+
def __init__(self, name) -> None:
11+
super().__init__(name, LPNodeType.Invoke, None)
12+
13+
@abstractmethod
14+
def _condition_check(self, lpdata) -> int:
15+
"""
16+
if/elif/.../else condition check, return int value to identity which branch to run.
17+
- 0 means first branch to run
18+
- 1 means second branch to run
19+
- ...
20+
"""
21+
pass
22+
23+
def _dispatch(self, lpdata) -> None:
24+
route_id = self._condition_check(lpdata)
25+
26+
if route_id >= 0 and len(self.next_nodes) > route_id:
27+
node = self.next_nodes[route_id]
28+
if lpdata['sync']:
29+
node.run(lpdata)
30+
else:
31+
threading.Thread(target=lambda d: node.run(d), args=(lpdata,)).start()

langpipe/lpbegin.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from datetime import datetime
2+
from .lpnode import LPNode, LPNodeType
3+
4+
5+
class LPBegin(LPNode):
6+
"""
7+
begin node in pipeline.
8+
"""
9+
10+
def __init__(self, name) -> None:
11+
super().__init__(name, LPNodeType.Begin, None)
12+
13+
def input(self, query, query_images=None, sync=True):
14+
"""
15+
construct lpdata and call super().run(), start to run the pipeline.
16+
17+
**parameters**
18+
- query(str): input text or prompt.
19+
- query_images([]): input images with cv2.mat format, None by default.
20+
- sync(bool): start pipeline with sync or aysnc mode.
21+
"""
22+
23+
# construct lpdata which will flowing the piepline
24+
lpdata = {}
25+
# running mode
26+
lpdata['sync'] = sync
27+
# begin time
28+
lpdata['begin_t'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
29+
# end time
30+
lpdata['end_t'] = None
31+
# input text or prompt
32+
lpdata['query'] = query
33+
# input images
34+
lpdata['query_imgs'] = query_images
35+
# final output generated by pipeline
36+
lpdata['final_out'] = None
37+
# global variables when lpdata flowing the pipeline
38+
lpdata['global_vars'] = {}
39+
# handle records when lpdata flowing the pipeline
40+
lpdata['records'] = []
41+
42+
super().run(lpdata)
43+
44+

langpipe/lpboardrender.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import cv2
2+
import threading
3+
import numpy as np
4+
from datetime import datetime
5+
from .lpnode import LPNodeState
6+
7+
class LPBoardRender:
8+
"""
9+
pipeline visualization tools based on OpenCV library.
10+
"""
11+
def __init__(self, node_size=50, h_spacing=100, v_spacing=80):
12+
self.__node_size = node_size
13+
self.__h_spacing = h_spacing
14+
self.__v_spacing = v_spacing
15+
self.__y_positions = {}
16+
self.__img = None
17+
self.__colors = {
18+
LPNodeState.Pending: (0, 0, 0), # bgr is black
19+
LPNodeState.Runing: (0, 0, 255), # bgr is red
20+
LPNodeState.Completed: (255, 0, 0) # bgr is blue
21+
}
22+
self.__runing = False
23+
24+
def __del__(self):
25+
self.__runing = False
26+
27+
def __get_tree_depth(self, root):
28+
if not root.next_nodes:
29+
return 1
30+
return 1 + max(self.__get_tree_depth(child) for child in root.next_nodes)
31+
32+
def __get_layer_nodes(self, root, depth=0, layers=None):
33+
if layers is None:
34+
layers = {}
35+
if depth not in layers:
36+
layers[depth] = []
37+
layers[depth].append(root)
38+
for child in root.next_nodes:
39+
self.__get_layer_nodes(child, depth + 1, layers)
40+
return layers
41+
42+
def __draw_node(self, node, x, y):
43+
cv2.rectangle(self.__img, (x, y),
44+
(x + self.__node_size, y + self.__node_size),
45+
self.__colors[node.state], 2)
46+
47+
# name
48+
text_size = cv2.getTextSize(node.name, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)[0]
49+
text_x = x + (self.__node_size - text_size[0]) // 2
50+
text_y = y + (self.__node_size + text_size[1]) // 2
51+
cv2.putText(self.__img, node.name, (text_x, text_y),
52+
cv2.FONT_HERSHEY_SIMPLEX, 0.5, self.__colors[node.state], 1)
53+
54+
# type
55+
text_size = cv2.getTextSize('[' + node.type.name + ']', cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)[0]
56+
text_x = x + (self.__node_size - text_size[0]) // 2
57+
text_y = y + text_size[1] + 5
58+
cv2.putText(self.__img, '[' + node.type.name + ']', (text_x, text_y),
59+
cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.__colors[node.state], 1)
60+
61+
# cost time
62+
if node.state == LPNodeState.Completed:
63+
text_size = cv2.getTextSize(str(node.cost_time) + 'sec', cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)[0]
64+
text_x = x + (self.__node_size - text_size[0]) // 2
65+
text_y = y + self.__node_size - 5
66+
cv2.putText(self.__img, str(node.cost_time) + 'sec', (text_x, text_y),
67+
cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.__colors[node.state], 1)
68+
69+
# child nodes
70+
child_x = x + self.__node_size + self.__h_spacing
71+
for child in node.next_nodes:
72+
child_y = self.__y_positions[child]
73+
cv2.line(self.__img, (x + self.__node_size, y + self.__node_size // 2),
74+
(child_x, child_y + self.__node_size // 2), self.__colors[child.state], 1, cv2.LINE_AA)
75+
cv2.circle(self.__img, (child_x, child_y + self.__node_size // 2),
76+
5, self.__colors[child.state], -1)
77+
self.__draw_node(child, child_x, child_y)
78+
79+
def __draw_info(self):
80+
cv2.putText(self.__img, datetime.now().strftime("%Y-%m-%d %H:%M:%S"), (20, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)
81+
82+
def __draw_board(self, root):
83+
layers = self.__get_layer_nodes(root)
84+
max_depth = max(layers.keys())
85+
max_width = max(len(layers[d]) for d in layers)
86+
87+
img_height = max(400, max_width * (self.__node_size + self.__v_spacing))
88+
img_width = (max_depth + 1) * (self.__node_size + self.__h_spacing)
89+
90+
for depth, nodes in layers.items():
91+
layer_height = len(nodes) * (self.__node_size + self.__v_spacing)
92+
start_y = (img_height - layer_height + self.__v_spacing) // 2
93+
self.__y_positions.update({node: start_y + i * (self.__node_size + self.__v_spacing) for i, node in enumerate(nodes)})
94+
95+
root_x = self.__h_spacing // 2
96+
root_y = self.__y_positions[root]
97+
98+
while self.__runing:
99+
self.__img = np.ones((img_height, img_width, 3), dtype=np.uint8) * 255
100+
self.__draw_node(root, root_x, root_y)
101+
self.__draw_info()
102+
cv2.imshow("Tree", self.__img)
103+
if cv2.waitKey(100) & 0xFF == 27:
104+
break
105+
106+
def render(self, root, block=True):
107+
self.__runing = True
108+
render_th = threading.Thread(target=self.__draw_board, args=(root,), daemon=True)
109+
render_th.start()
110+
111+
if block:
112+
render_th.join()

langpipe/lpchatter.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import json
2+
from ollama import chat
3+
from .lpnode import LPNode, LPNodeType
4+
5+
class LPChatter(LPNode):
6+
"""
7+
chat with LLM based on Ollama, supporting chat histories which is different from text generation.
8+
access `lpdata['global_vars']['chatted_text']` to get the chat text.
9+
"""
10+
def __init__(self, name, model='minicpm-v:8b') -> None:
11+
super().__init__(name, LPNodeType.LLM, model)
12+
self.__chatted_text = None
13+
14+
def _handle(self, lpdata) -> None:
15+
# convert query to json object something like: [{'role': 'user', 'content': 'Hello, how are you?'}, {...}, {...}]
16+
messages_from_query = json.loads(lpdata['query'])
17+
response = chat(model=self.model,
18+
messages=messages_from_query,
19+
options={
20+
'top_k': 100,
21+
'temperature': 0.8
22+
})
23+
# take care <think>...</think> in some reason models
24+
self.__chatted_text = response['message']['content']
25+
26+
# update records
27+
messages = lpdata['records'][-1]['messages']
28+
messages.extend(messages_from_query)
29+
30+
message = {}
31+
message['role'] = 'assistant'
32+
message['content'] = response['message']['content']
33+
messages.append(message)
34+
35+
def _after_handle(self, lpdata) -> None:
36+
super()._after_handle(lpdata)
37+
38+
# update global variables
39+
lpdata['final_out'] = self.__chatted_text
40+
lpdata['global_vars']['chatted_text'] = self.__chatted_text

0 commit comments

Comments
 (0)