Skip to content

Commit

Permalink
add python
Browse files Browse the repository at this point in the history
  • Loading branch information
liqiankun1111 committed Jun 15, 2024
1 parent fc35860 commit f84af39
Show file tree
Hide file tree
Showing 13 changed files with 297 additions and 16 deletions.
61 changes: 60 additions & 1 deletion _posts/Kubernetes/Tools/2020-07-20-client_go_informer.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,66 @@ type Type struct {
delayingQueue 的代码逻辑还是很清晰的. 首先使用数据结构小顶堆 minheap 来排列定时任务(使用readyAt作为大小依据). 当添加定时任务时, 把该任务扔到一个 chan 里, 然后由一个独立的协程监听该 chan, 把任务扔到 heap 中, 该独立协程会从堆里找到最近到期的任务, 并对该任务的进行到期监听, 当定时后期后, 会把到期的定时任务添加到 queue 队列中.
待确认:一个workqueue内只有一个类型的crd。
一个workqueue内只有一个类型的crd?manager中可以设置多个controller,但是一个controller中只有一个Reconciler。一个Reconciler 一般只处理单个crd,一个controller会持有一个workequeue,进而可以认为一个workqueue内只有一个类型的crd。
### 限速为何不好使
一次在开发业务时,有碰到一个场景,crd 变更 ==> workqueue ==> reconcile,在reconcile中故意 `return ctrl.Result{RequeueAfter: 5s}, nil`,则链路变成了 crd 变更 ==> workqueue ==> reconcile ==> workqueue ==> reconcile...,每5s就可以触发一次reconcile运行(为了实现每5s调用某个外部的api接口),难点来了,外部api 有限速要求,为此为controller queue配了ratelimiter,结果发现不好使。从 controller-runtime reconcileHandler代码可以看到,限速逻辑只有在err!=nil 等非延迟场景有效,RequeueAfter非空时,执行了 c.Queue.AddAfter 而不是 c.Queue.AddRateLimited。 因此 使用 RequeueAfter 来让step reconcile方法每隔xx秒 执行时,配的ratelimit 没用上。
```go
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.Queue.Get()
...
defer c.Queue.Done(obj)
c.reconcileHandler(ctx, obj)
return true
}
func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
...
result, err := c.Reconcile(ctx, req)
switch {
case err != nil:
c.Queue.AddRateLimited(req)
case result.RequeueAfter > 0:
c.Queue.Forget(obj)
c.Queue.AddAfter(req, result.RequeueAfter)
case result.Requeue:
c.Queue.AddRateLimited(req)
default:
c.Queue.Forget(obj)
}
```
为了让 ratelimit 有用,crd create event 首次进入workqueue 时就应该限速,进而 `crd 变更 ==> workqueue ==> reconcile ==> workqueue ==> reconcile...` 整个循环 就成限速的了。也就是应该改controller的resource event handler,即自定义controller.Watches方法。
```go
func Add(mgr ctrl.Manager, ctx *manager.ShuttleContext, options controller.Options) error {
r := &Reconciler{
Client: mgr.GetClient(),
log: ctrl.Log.WithName("xx"),
}
ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
For(&v1.xx{}).
Watches(r.xxEventHandler()).
Complete(r)
}
func (r *Reconciler) xxEventHandler() handler.EventHandler {
return handler.Funcs{
CreateFunc: func(e event.CreateEvent, queue workqueue.RateLimitingInterface) {
req = xx(e.Object)
queue.AddRateLimited(req)
},
DeleteFunc: func(e event.DeleteEvent, queue workqueue.RateLimitingInterface) {
req = xx(e.Object)
queue.AddRateLimited(req)
},
UpdateFunc: func(e event.UpdateEvent, queue workqueue.RateLimitingInterface) {
req = xx(e.ObjectNew)
queue.AddRateLimited(req)
},
}
}
```
## controller.Run/ Watch event 消费
Expand Down
2 changes: 1 addition & 1 deletion _posts/MachineLearning/2021-08-18-gpu.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ GPU 的架构;内存管理;任务管理;数据类型。
[为什么深度学习需要使用GPU?](https://time.geekbang.org/column/article/105401)相比cpu,gpu
1. gpu核心很多,上千个。
2. gpu内存带宽更高,速度快就贵,所以显存容量一般不大。因为 CPU 首先得取得数据, 才能进行运算, 所以很多时候,限制我们程序运行速度的并非是 CPU 核的处理速度, 而是数据访问的速度。
3. 控制流,cpu 控制流很强,**alu 只占cpu的一小部分**。gpu 则要少用控制语句。现代 CPU 里的晶体管变得越来越多,越来越复杂,其实已经不是用来实现“计算”这个核心功能,而是拿来实现处理乱序执行、进行分支预测,以及高速缓存。GPU 专门用于高度并行计算,因此设计时更多的晶体管用于数据处理,而不是数据缓存和流量控制。GPU 只有 取指令、指令译码、ALU 以及执行这些计算需要的寄存器和缓存。PS: 将更多晶体管用于数据处理,例如浮点计算,有利于高度并行计算。
3. 控制流,cpu 控制流很强,**alu 只占cpu的一小部分**。gpu 则要少用控制语句。现代 CPU 里的晶体管变得越来越多,越来越复杂,其实已经不是用来实现“计算”这个核心功能,而是拿来实现处理乱序执行、进行分支预测,以及高速缓存。GPU 专门用于高度并行计算,因此设计时更多的晶体管用于数据处理,而不是数据缓存和流量控制。GPU 只有 取指令、指令译码、ALU 以及执行这些计算需要的寄存器和缓存。PS: 将更多晶体管用于数据处理,例如浮点计算,有利于高度并行计算。我们一般习惯将cpu的控制单元和计算单元视为一个整体,而gpu 一般会独立看待控制单元和计算单元,所以觉得它们差别很大。
4. 编程,cpu 是各种编程语言,编译器成熟。

![](/public/upload/basic/gpu_develop.png)
Expand Down
121 changes: 117 additions & 4 deletions _posts/MachineLearning/2024-05-16-langchain_graph.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ keywords: langchain langgraph lcel

编程语言大类上可以分为命令式编程和声明式编程,前者深入细节,各种 if else、各种 while/for,程序员掌控每个像素;后者把任务「描述」清楚,重点在业务流程翻译成所用的语言上,具体怎么实现甩给别人(大部分是系统自带)。由于这一波 LLMs 强大的理解、生成能力,**关注细节的命令式编程似乎不再需要**,而偏重流程或者说业务逻辑编排的 pipeline 能力的声明式编程,成了主流「编程」方式。

推理阶段的RAG Flow分成四种主要的基础模式:顺序、条件、分支与循环。PS: 一个llm 业务有各种基本概念,prompt/llm/memory,整个工作流产出一个流式输出,处理链路上包含多个step,且step有复杂的关系(顺序、条件、分支与循环)。一个llm 业务开发的核心就是个性化各种原子能力 以及组合各种原子能力。
RAG 流程是指在 RAG 系统中,从输入查询到输出生成文本的整个工作流程。这个流程通常涉及多个模块和操作符的协同工作,包括但不限于检索器、生成器以及可能的预处理和后处理模块。RAG 流程的设计旨在使得 LLM(大语言模型)能够在生成文本时利用外部知识库或文档集,从而提高回答的准确性和相关性。推理阶段的RAG Flow分成四种主要的基础模式:顺序、条件、分支与循环。PS: 一个llm 业务有各种基本概念,prompt/llm/memory,整个工作流产出一个流式输出,处理链路上包含多个step,且step有复杂的关系(顺序、条件、分支与循环)。一个llm 业务开发的核心就是个性化各种原子能力 以及组合各种原子能力。

以一个RAG Agent 的工作流程为例
1. 根据问题,路由器决定是从向量存储中检索上下文还是进行网页搜索。
Expand All @@ -25,6 +25,52 @@ keywords: langchain langgraph lcel
5. 如果上下文被评为不相关,则进行网页搜索以检索内容。
6. 检索后,文档评分器对从网页搜索生成的内容进行评分。如果发现相关,则使用 LLM 进行综合,然后呈现响应。

[高级 RAG 检索策略之流程与模块化](https://mp.weixin.qq.com/s/WeAcAevUPemPKhQLhId3Vg)业界一个共识是RAG的演进:Naive RAG ==> Advanced RAG ==> Modular RAG。要落地Modular RAG,便是定义模块以及将模块串起来的Pipeline。比如LlamaIndex 的探索。PS: pipeline/add_modules/add_link
```
retriever = index.as_retriever()
p = QueryPipeline(verbose=True)
p.add_modules(
{
"input": InputComponent(),
"retriever": retriever,
"output": SimpleSummarize(),
}
)
p.add_link("input", "retriever")
p.add_link("input", "output", dest_key="query_str")
p.add_link("retriever", "output", dest_key="nodes")
```
完整的流水线
```
evaluator = RagasComponent()
p = QueryPipeline(verbose=True)
p.add_modules(
{
"input": InputComponent(),
"query_rewriter": query_rewriter,
"retriever": retriever,
"meta_replacer": meta_replacer,
"reranker": reranker,
"output": TreeSummarize(),
"evaluator": evaluator,
}
)
p.add_link("input", "query_rewriter")
p.add_link("input", "query_rewriter", src_key="input")
p.add_link("query_rewriter", "retriever")
p.add_link("retriever", "meta_replacer")
p.add_link("input", "reranker", dest_key="query_str")
p.add_link("input", "reranker", src_key="input", dest_key="query_str")
p.add_link("meta_replacer", "reranker", dest_key="nodes")
p.add_link("input", "output", dest_key="query_str")
p.add_link("input", "output", src_key="input", dest_key="query_str")
p.add_link("reranker", "output", dest_key="nodes")
p.add_link("input", "evaluator", src_key="input", dest_key="question")
p.add_link("input", "evaluator", src_key="ground_truth", dest_key="ground_truth")
p.add_link("reranker", "evaluator", dest_key="nodes")
p.add_link("output", "evaluator", dest_key="answer")
```

## LCEL

在 LangChain 里只要实现了Runnable接口,并且有invoke方法,都可以成为链。实现了Runnable接口的类,可以拿上一个链的输出作为自己的输入。
Expand Down Expand Up @@ -61,7 +107,7 @@ chain.stream("dog")
|Retriever| Single string| List of Documents|
|Tool| Single string or dictionary, depending on the tool| Depends on the tool|

### 基石Runnable
### 模块化抽象Runnable

我们使用的所有LCEL相关的组件都继承自RunnableSerializable,RunnableSequence 顾名思义就按顺序执行的Runnable,分为两部分Runnable和Serializable。其中Serializable是继承自Pydantic的BaseModel。(py+pedantic=Pydantic,是非常流行的参数验证框架)Serializable提供了,将Runnable序列化的能力。而Runnable,则是LCEL组件最重要的一个抽象类,它有几个重要的抽象方法。

Expand Down Expand Up @@ -162,7 +208,7 @@ class Runnable(Generic[Input, Output], ABC):
return RunnableWithFallbacks(self,fallbacks,...)
```

### 一些实践
### Runnable串联

```python
def add_one(x: int) -> int:
Expand Down Expand Up @@ -299,8 +345,75 @@ LangGraph 三个核心要素
2. 在创建了StateGraph之后,我们需要向其中添加Nodes(节点)。添加节点是通过`graph.add_node(name, value)`语法来完成的。其中,`name`参数是一个字符串,用于在添加边时引用这个节点。`value`参数应该可以是函数或runnable 接口,它们将在节点被调用时执行。其输入应为状态图的全局状态变量,在执行完毕之后也会输出一组键值对,字典中的键是State对象中要更新的属性。说白了,Nodes(节点)的责任是“执行”,在执行完毕之后会更新StateGraph的状态。
3. 节点通过边相互连接,形成了一个有向无环图(DAG),边有几种类型:
1. Normal Edges:即确定的状态转移,这些边表示一个节点总是要在另一个节点之后被调用。
2. Conditional Edges:输入是一个节点,输出是一个mapping,连接到所有可能的输出节点,同时附带一个判断函数,根据全局状态变量的当前值判断流转到哪一个输出节点上,以充分发挥大语言模型的思考能力。
2. Conditional Edges:输入是一个节点,输出是一个mapping,连接到所有可能的输出节点,同时附带一个判断函数(输入是StateGraph,输出是Literal),根据全局状态变量的当前值判断流转到哪一个输出节点上,以充分发挥大语言模型的思考能力。

当我们使用这三个核心要素构建图之后,通过图对象的compile方法可以将图转换为一个 Runnable对象(Runnable也有Runnable.get_graph 转为Graph对象),之后就能使用与lcel完全相同的接口调用图对象。

```python
class Graph:
def __init__(self) -> None:
self.nodes: dict[str, Runnable] = {}
self.edges = set[tuple[str, str]]()
self.branches: defaultdict[str, dict[str, Branch]] = defaultdict(dict)
self.support_multiple_edges = False
self.compiled = False
```

langgraph 代码的主要流程 构建node、edge,然后将其组为graph,自然 langchain 会提供很多现成封装,将各种组件封装为 node/edge。比如两个 为tool 提供了 ToolNode(将tool转为 node,因为node 一般入参是stateGraph,出餐是dict), tools_condition(是一个入参包含stateGraph 的函数,返回Literal)

```python
web_search_tool = TavilySearchResults(k=3)
tools = [web_search_tool]
retrieve = ToolNode(tools)
...
workflow.add_conditional_edges(
"agent",
# Assess agent decision
tools_condition,
{
# Translate the condition outputs to nodes in our graph
"tools": "retrieve",
END: END,
},
)
workflow.add_node("retrieve", retrieve)
```

```python
from langgraph_core.tools import BaseTool
class BaseTool(RunnableSerializable[Union[str, Dict], Any]):
name: str
description: str
def invoke(self, input: Union[str, Dict],config: Optional[RunnableConfig] = None,**kwargs: Any,) -> Any:
...
return self.run(...)
class Tool(BaseTool):
description: str = ""
func: Optional[Callable[..., str]]
coroutine: Optional[Callable[..., Awaitable[str]]] = None

from langgraph.prebuilt import ToolNode
class ToolNode(RunnableCallable):
def __init__( self,tools: Sequence[BaseTool],*,name: str = "tools",tags: Optional[list[str]] = None,) -> None:
super().__init__(self._func, self._afunc, name=name, tags=tags, trace=False)
self.tools_by_name = {tool.name: tool for tool in tools}
def _func(self, input: Union[list[AnyMessage], dict[str, Any]], config: RunnableConfig) -> Any:
message = messages[-1]
def run_one(call: ToolCall):
output = self.tools_by_name[call["name"]].invoke(call["args"], config)
return ToolMessage(...output...)
with get_executor_for_config(config) as executor:
outputs = [*executor.map(run_one, message.tool_calls)]
return outputs 或者 {"messages": outputs}

def tools_condition(state: Union[list[AnyMessage], dict[str, Any]],) -> Literal["tools", "__end__"]:
if isinstance(state, list):
ai_message = state[-1]
elif messages := state.get("messages", []):
ai_message = messages[-1]
else:
raise ValueError(f"No messages found in input state to tool_edge: {state}")
if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:
return "tools"
return "__end__"
```
Loading

0 comments on commit f84af39

Please sign in to comment.