-
Notifications
You must be signed in to change notification settings - Fork 227
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
提供实时任务限流&执行状态可视化功能 #397
Comments
flink 内实现限流功能参考为了在 Flink 作业运行时动态设置 updateRate 方法,你需要一个机制来接收外部的配置更新,并将这些更新应用到正在运行的任务实例中。这可以通过多种方式实现,下面我将详细说明一种常见的方法:使用广播变量和 Akka 的 Actor 系统。 使用广播变量和 Akka Actor 系统
首先,在你的主程序(即提交 Flink Job 的地方)中,你需要创建一个广播流,用于发送最新的限流配置给所有并行任务实例。你可以使用一个特殊的 SourceFunction 来生成包含新速率值的消息,并将其与你的主要数据流合并。 // 创建一个广播流,用于发送新的限流配置
DataStream<Double> rateConfigStream = env.addSource(new RateConfigSource())
.broadcast();
// 主要的数据流
DataStream<String> mainDataStream = ...;
// 将两者连接起来
ConnectedStreams<String, Double> connectedStreams = mainDataStream.connect(rateConfigStream);
// 应用自定义的 CoFlatMapFunction 或 RichCoFlatMapFunction 处理逻辑
connectedStreams.flatMap(new DynamicRateControlledFunction());
public class DynamicRateControlledFunction extends RichCoFlatMapFunction<String, Double, String> {
private transient RateLimiter rateLimiter;
private transient BroadcastState<String, Double> broadcastState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 获取广播状态
ValueStateDescriptor<Double> descriptor = new ValueStateDescriptor<>("rate", Double.class);
broadcastState = getRuntimeContext().getBroadcastState(descriptor);
// 初始化默认限流器
double initialRate = broadcastState.get("rate") != null ? broadcastState.get("rate") : 10.0;
rateLimiter = RateLimiter.create(initialRate); // 假设每秒允许10个事件
}
@Override
public void flatMap1(String value, Collector<String> out) throws Exception {
if (rateLimiter.tryAcquire()) {
// 如果获取到令牌,则继续处理
out.collect(value);
} else {
// 否则丢弃或者延迟处理
}
}
@Override
public void flatMap2(Double newRate, Collector<String> out) throws Exception {
// 更新广播状态中的限流速率
broadcastState.put("rate", newRate);
// 更新本地限流器的速率
rateLimiter.setRate(newRate);
}
} 在这个版本的 DynamicRateControlledFunction 中,我们重写了 flatMap2 方法来处理来自广播流的新速率值。每当收到新的速率配置时,它会更新广播状态和本地的 RateLimiter 实例。
为了让外部系统能够触发速率更新,你可以设置一个 Akka Actor 来监听 HTTP 请求或其他形式的远程调用。这个 Actor 可以直接向广播流发送消息,或者通过其他方式通知广播流有新的速率配置需要传播。 例如,你可以创建一个简单的 REST API 服务器,当接收到 POST 请求时,它会调用 Akka Actor 发送速率更新消息到广播流。 import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import scala.concurrent.ExecutionContextExecutor
implicit val system: ActorSystem = ActorSystem("my-system")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executionContext: ExecutionContextExecutor = system.dispatcher
val route =
path("update-rate") {
post {
entity(as[Double]) { newRate =>
// 调用广播流的源函数来发送新的速率配置
RateConfigSource.updateRate(newRate)
complete(s"Rate updated to $newRate")
}
}
}
Http().bindAndHandle(route, "localhost", 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine() // 让应用程序保持运行 这里的 RateConfigSource.updateRate(newRate) 是一个假设的方法,它应该负责将新的速率值推送到广播流中。具体的实现取决于你如何设计广播流的源函数。 通过这种方式,你可以在 Flink 作业启动后动态地调整每个任务实例的限流速率,而不需要重启整个作业。这种方法不仅提供了灵活性,还保证了高可用性和实时性。 |
是的,可以通过扩展 Flink 的 REST API 或者通过其他方式将外部请求作为输入流发送到正在运行的 Flink 作业中。以下是几种实现这一目标的方法: 1. 使用 Flink 提供的 REST API 和自定义 SourceFunction虽然 Flink 自身提供的 REST API 主要用于作业管理(如提交、取消作业等),但你可以创建一个自定义的 实现步骤:
2. 使用 Flink 提供的 Table & SQL API 和 Flink Restful Table APIFlink 提供了 Table & SQL API,以及从 Flink 1.13 开始引入的 Restful Table API,允许用户通过 REST 接口执行 SQL 查询并操作表。如果你的应用场景允许使用 SQL 来处理数据流,那么你可以考虑通过这种方式将外部请求转化为 SQL 操作的一部分。 3. 使用 Flink Connector 或者自定义 ConnectorFlink 支持多种内置和社区贡献的连接器,它们可以帮助你将外部系统(如数据库、消息队列等)与 Flink 作业集成起来。如果现有的连接器不能满足需求,你可以开发自己的连接器,使它能够监听特定端点的 HTTP 请求,并将这些请求作为数据源提供给 Flink 作业。 4. 使用 Flink Stateful Functions (StateFun)Flink Stateful Functions 是一种构建有状态应用程序的方式,它支持通过 gRPC 协议接收外部调用。如果你的应用需要处理有状态的交互式请求,那么 StateFun 可能是一个不错的选择。它可以很容易地接受来自外部系统的请求,并根据这些请求更新内部状态或触发计算逻辑。 5. 直接集成 Jetty 或其他 Web Server 到 Flink Job理论上,可以在 Flink 作业中直接嵌入一个轻量级的 Web 服务器(如 Jetty),以接收 HTTP 请求并将这些请求的数据作为输入流的一部分。这种方法比较少见,因为它可能会增加代码复杂度并且不容易维护,但它确实是一种可行的技术方案。 推荐做法在大多数情况下,推荐的做法是通过消息队列(如 Apache Kafka)或者其他中间件来解耦前端 Web 服务器和后端 Flink 作业。这样做不仅简化了架构设计,还提高了系统的可扩展性和可靠性。Web 服务器接收到请求后,将请求内容发布到消息队列中;Flink 作业订阅该消息队列,从而将外部请求作为输入流进行处理。这种模式下,即使 Web 服务器或 Flink 作业出现故障,也不会影响整个系统的正常运作。 选择哪种方法取决于你的具体应用场景、技术栈偏好以及对性能和可靠性的要求。 当然可以。下面我将给出一个简单的例子,展示如何使用 Flink Stateful Functions (StateFun) 来处理来自外部 REST API 的请求,并将其作为输入流发送到 Stateful Function 中进行处理。我们将创建一个简单的应用程序,它接收 HTTP 请求,然后根据请求内容调用相应的 Stateful Function。 环境准备首先,请确保你已经安装了必要的工具和库:
示例代码1. 创建 Stateful Function我们将定义一个非常简单的 Stateful Function,该函数接收一个字符串消息并返回一条问候信息。为了简化示例,我们假设这个函数是无状态的,但你可以很容易地扩展它来包含持久化状态。 // GreetingFunction.java
package com.example.statefun;
import io.statefun.flink.core.fn.Context;
import io.statefun.flink.types.Type;
import io.statefun.functions.*;
public class GreetingFunction implements SimpleFunction<String, String> {
@Override
public void invoke(Context context, String input, Emitter emitter) {
// Process the incoming message and emit a greeting response.
String response = "Hello, " + input + "!";
emitter.emit(context.self(), response);
}
@Override
public Type<String> inputType() {
return Types.string();
}
@Override
public Type<String> outputType() {
return Types.string();
}
} 2. 配置 StateFun 应用程序接下来,我们需要配置 StateFun 应用来加载我们的 # statefun.yaml
functions:
- typename: "com.example/greeter"
type: "builtin/python" # or "builtin/java" if you're using Java
spec:
factory-class: "com.example.statefun.GreetingFunction" 请注意,如果你使用的是 Python 函数,则需要调整配置文件以适应 Python 环境。 3. 设置 REST 接收器现在,让我们设置一个简单的 REST 接收器,它将把接收到的 HTTP POST 请求转换为 StateFun 消息并发送给 // RestReceiver.java
package com.example.receiver;
import org.springframework.web.bind.annotation.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.statefun.clients.StateFunClient;
import io.statefun.generated.Address;
import io.statefun.generated.TypedMessage;
@RestController
@RequestMapping("/api")
public class RestReceiver {
private final StateFunClient stateFunClient;
public RestReceiver() {
// Initialize gRPC channel to connect to StateFun.
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext().build();
this.stateFunClient = new StateFunClient(channel);
}
@PostMapping("/greet")
public String greet(@RequestParam String name) {
// Prepare the message to send to StateFun.
Address targetAddress = Address.from("com.example/greeter", "greeter");
TypedMessage message = TypedMessage.newBuilder()
.setTypeName("com.example/string")
.setValue(name)
.build();
// Send the message and receive the response.
TypedMessage response = stateFunClient.invoke(targetAddress, message);
// Return the response from the Stateful Function.
return response.getValue();
}
} 在这个例子中,我们使用 Spring Boot 来搭建 REST API,并且利用了 StateFun 客户端库来与 StateFun 进行交互。你需要确保你的应用能够访问 StateFun 服务(在这里是通过 localhost:8080)。 4. 启动 StateFun 和 REST 接收器最后,启动 StateFun 和 REST 接收器:
一旦两者都已启动,你可以通过向 注意事项
这段代码只是一个起点,旨在帮助你理解如何将 REST 请求作为输入流发送到 StateFun 中。随着项目的深入,你可以逐步添加更多功能和改进现有逻辑。 |
为了对正在执行的实时任务进行更细粒度的控制,需要提供任务限流&执行状态可视化功能,以应对实时任务执行期间执行批量任务构建,或者业务系统中有突发事件需要紧急限流。
需要在控制面中提供两个子功能予以配合:
任务限流
功能后,可以立即从可视化看板上观察到执行吞吐的变化The text was updated successfully, but these errors were encountered: