Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions examples/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,29 @@ task wordCount(type:JavaExec) {
systemProperties = System.getProperties()
args = ["--output=/tmp/output.txt"]
}

// Run any example by using class name
// this task defines class path based on the runner argument
task exec (type:JavaExec) {
mainClass = System.getProperty("mainClass")
def execArgs = System.getProperty("exec.args")
String runner
if (execArgs) {
def runnerPattern = /runner[ =]([A-Za-z]+)/
def matcher = execArgs =~ runnerPattern
if (matcher) {
runner = matcher[0][1]
runner = runner.substring(0, 1).toLowerCase() + runner.substring(1);
if (!(runner in (preCommitRunners + nonPreCommitRunners))) {
throw new GradleException("Unsupported runner: " + runner)
}
}
}
if (runner) {
classpath = sourceSets.main.runtimeClasspath + configurations."${runner}PreCommit"
} else {
classpath = sourceSets.main.runtimeClasspath
}
systemProperties System.getProperties()
args execArgs ? execArgs.split() : []
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public interface Options extends PipelineOptions {
void setRateLimiterDomain(String value);
}

// [START RateLimiterSimpleJava]
static class CallExternalServiceFn extends DoFn<String, String> {
private final String rlsAddress;
private final String rlsDomain;
Expand Down Expand Up @@ -111,6 +112,7 @@ public void processElement(ProcessContext c) throws Exception {
c.output("Processed: " + element);
}
}
// [END RateLimiterSimpleJava]

public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def run(argv=None):
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True

# [START RateLimiterVertexPy]
# Initialize the EnvoyRateLimiter
rate_limiter = EnvoyRateLimiter(
service_address=known_args.rls_address,
Expand All @@ -67,6 +68,7 @@ def run(argv=None):
project=known_args.project,
location=known_args.location,
rate_limiter=rate_limiter)
# [END RateLimiterVertexPy]

# Input features for the model
features = [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0],
Expand Down
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/examples/rate_limiter_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from apache_beam.utils import shared


# [START RateLimiterSimplePython]
class SampleApiDoFn(beam.DoFn):
"""A DoFn that simulates calling an external API with rate limiting."""
def __init__(self, rls_address, domain, descriptors):
Expand Down Expand Up @@ -61,6 +62,9 @@ def process(self, element):
yield element


# [END RateLimiterSimplePython]


def parse_known_args(argv):
"""Parses args for the workflow."""
parser = argparse.ArgumentParser()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ Pipeline patterns demonstrate common Beam use cases. Pipeline patterns are based
* [Create a cache on a batch pipeline](/documentation/patterns/shared-class/#create-a-cache-on-a-batch-pipeline)
* [Create a cache and update it regularly on a streaming pipeline](/documentation/patterns/shared-class/#create-a-cache-and-update-it-regularly-on-a-streaming-pipeline)

**Rate limiting patterns** - Patterns for rate limiting DoFns and Transforms in Beam pipelines
* [Rate limiting DoFns and Transforms](/documentation/patterns/rate-limiting)

## Contributing a pattern

To contribute a new pipeline pattern, create [a feature request](https://github.com/apache/beam/issues/new?labels=new+feature%2Cawaiting+triage&template=feature.yml&title=%5BFeature+Request%5D%3A+) and add details to the issue description. See [Get started contributing](/contribute/) for more information.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
---
title: "Rate limiting patterns"
---
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

# Rate limiting patterns

Apache Beam is built to maximize throughput by scaling workloads across thousands of workers. However, this massive parallelism requires coordination when pipelines interact with external systems that enforce strict quotas, such as 3rd-party REST APIs, databases, or internal microservices. Without a centralized rate limiting mechanism, independent workers might exceed the capacity of these systems, resulting in service degradation or broad IP blocking.

## Centralized Rate Limit Service

The recommended approach for global rate limiting in Beam is using a centralized Rate Limit Service (RLS).

A production-ready Terraform module to deploy this service on GKE is available in the beam repository:
[`envoy-ratelimiter`](https://github.com/apache/beam/tree/master/examples/terraform/envoy-ratelimiter)

To deploy the rate-limiting infrastructure on GKE:

1. Update `terraform.tfvars` with your project variables to adjust rules and domains.
2. Run the helper deploy script: `./deploy.sh`

This script automates deployment and, upon completion, returns the Internal Load Balancer IP address for your deployment that you will use in your pipeline.

---

{{< language-switcher java py >}}

## Using RateLimiter

To rate limit requests in your pipeline, you can create a RateLimiter client in your `DoFn`'s setup phase and acquire permits before making calls in the process phase.

{{< paragraph class="language-java" >}}
In Java, use the `RateLimiter` interface and `EnvoyRateLimiterFactory` implementation to coordinate with the Envoy service. Create `RateLimiterOptions` with your service address, initialize the client in @Setup using `EnvoyRateLimiterFactory`, and call `rateLimiter.allow(batchSize)` in @ProcessElement to acquire a batch of permits.
{{< /paragraph >}}

{{< highlight java >}}
{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java" RateLimiterSimpleJava >}}
{{< /highlight >}}

{{< paragraph class="language-py" >}}
In Python, use the `EnvoyRateLimiter` and <a href="/documentation/patterns/shared-class/" style="text-decoration: underline;">Shared</a> to coordinate a single client instance shared across threads. Initialize client in `setup()` using `shared`, and call `self.rate_limiter.allow()` in `process()` to acquire rate-limiting permits before executing API calls.
{{< /paragraph >}}

{{< highlight py >}}
{{< code_sample "sdks/python/apache_beam/examples/rate_limiter_simple.py" RateLimiterSimplePython >}}
{{< /highlight >}}

{{< paragraph class="language-py" >}}
If you are using **RunInference** for remote model inference (e.g., Vertex AI), you can pass the `EnvoyRateLimiter` directly to the `ModelHandler`. The model handler coordinates the rate limit internally across your distributed workers.
{{< /paragraph >}}

{{< highlight py >}}
{{< code_sample "sdks/python/apache_beam/examples/inference/rate_limiter_vertex_ai.py" RateLimiterVertexPy >}}
{{< /highlight >}}

---

## Running Example Pipelines with RateLimiter

Once your Rate Limiter Service is deployed and has an Internal IP, you can run your pipeline pointing to that address.

{{< highlight java >}}
# Get the IP from your RLS deployment
export RLS_ADDRESS="<INTERNAL_IP>:8081"

./gradlew :examples:java:exec -DmainClass=org.apache.beam.examples.RateLimiterSimple \
-Dexec.args="--runner=<RUNNER> \
--rateLimiterAddress=${RLS_ADDRESS} \
--rateLimiterDomain=mongo_cps"
{{< /highlight >}}

{{< highlight py >}}
# Get the IP from your RLS deployment
export RLS_ADDRESS="<INTERNAL_IP>:8081"

python -m apache_beam.examples.rate_limiter_simple \
--runner=<RUNNER> \
--rls_address=${RLS_ADDRESS}
{{< /highlight >}}

## AutoScaler Integration

The throttling time and signals from the RateLimiter has to be picked up by the autoscaler. This allows the autoscaler to scale down the workers when the pipeline is being throttled by the external service, preventing unnecessary resource usage.

**Dataflow** currently supports this AutoScaler integration for **Batch RunnerV2**. Note that AutoScaler integration for Streaming mode is a known limitation.
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@
<li><a href="/documentation/patterns/schema/">Schema</a></li>
<li><a href="/documentation/patterns/bqml/">BigQuery ML</a></li>
<li><a href="/documentation/patterns/grouping-elements-for-efficient-external-service-calls/">Grouping elements for efficient external service calls</a></li>
<li><a href="/documentation/patterns/rate-limiting/">Rate limiting DoFns and Transforms</a></li>
<li><a href="/documentation/patterns/shared-class/">Cache using a shared object</a></li>
</ul>
</li>
Expand Down
Loading