diff --git a/examples/java/build.gradle b/examples/java/build.gradle index 068c0d1b56fd..cf168293968f 100644 --- a/examples/java/build.gradle +++ b/examples/java/build.gradle @@ -158,3 +158,29 @@ task wordCount(type:JavaExec) { systemProperties = System.getProperties() args = ["--output=/tmp/output.txt"] } + +// Run any example by using class name +// this task defines class path based on the runner argument +task exec (type:JavaExec) { + mainClass = System.getProperty("mainClass") + def execArgs = System.getProperty("exec.args") + String runner + if (execArgs) { + def runnerPattern = /runner[ =]([A-Za-z]+)/ + def matcher = execArgs =~ runnerPattern + if (matcher) { + runner = matcher[0][1] + runner = runner.substring(0, 1).toLowerCase() + runner.substring(1); + if (!(runner in (preCommitRunners + nonPreCommitRunners))) { + throw new GradleException("Unsupported runner: " + runner) + } + } + } + if (runner) { + classpath = sourceSets.main.runtimeClasspath + configurations."${runner}PreCommit" + } else { + classpath = sourceSets.main.runtimeClasspath + } + systemProperties System.getProperties() + args execArgs ? execArgs.split() : [] +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java index a33e99e4b239..6e1151369c3a 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java +++ b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java @@ -59,6 +59,7 @@ public interface Options extends PipelineOptions { void setRateLimiterDomain(String value); } + // [START RateLimiterSimpleJava] static class CallExternalServiceFn extends DoFn { private final String rlsAddress; private final String rlsDomain; @@ -111,6 +112,7 @@ public void processElement(ProcessContext c) throws Exception { c.output("Processed: " + element); } } + // [END RateLimiterSimpleJava] public static void main(String[] args) { Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); diff --git a/sdks/python/apache_beam/examples/inference/rate_limiter_vertex_ai.py b/sdks/python/apache_beam/examples/inference/rate_limiter_vertex_ai.py index 11ec02fbd54f..c16a8674f17e 100644 --- a/sdks/python/apache_beam/examples/inference/rate_limiter_vertex_ai.py +++ b/sdks/python/apache_beam/examples/inference/rate_limiter_vertex_ai.py @@ -52,6 +52,7 @@ def run(argv=None): pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True + # [START RateLimiterVertexPy] # Initialize the EnvoyRateLimiter rate_limiter = EnvoyRateLimiter( service_address=known_args.rls_address, @@ -67,6 +68,7 @@ def run(argv=None): project=known_args.project, location=known_args.location, rate_limiter=rate_limiter) + # [END RateLimiterVertexPy] # Input features for the model features = [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0], diff --git a/sdks/python/apache_beam/examples/rate_limiter_simple.py b/sdks/python/apache_beam/examples/rate_limiter_simple.py index 8cdf1166aadc..00c0a34c3775 100644 --- a/sdks/python/apache_beam/examples/rate_limiter_simple.py +++ b/sdks/python/apache_beam/examples/rate_limiter_simple.py @@ -29,6 +29,7 @@ from apache_beam.utils import shared +# [START RateLimiterSimplePython] class SampleApiDoFn(beam.DoFn): """A DoFn that simulates calling an external API with rate limiting.""" def __init__(self, rls_address, domain, descriptors): @@ -61,6 +62,9 @@ def process(self, element): yield element +# [END RateLimiterSimplePython] + + def parse_known_args(argv): """Parses args for the workflow.""" parser = argparse.ArgumentParser() diff --git a/website/www/site/content/en/documentation/patterns/overview.md b/website/www/site/content/en/documentation/patterns/overview.md index a313bae9ddca..527f15f6dccf 100644 --- a/website/www/site/content/en/documentation/patterns/overview.md +++ b/website/www/site/content/en/documentation/patterns/overview.md @@ -59,6 +59,9 @@ Pipeline patterns demonstrate common Beam use cases. Pipeline patterns are based * [Create a cache on a batch pipeline](/documentation/patterns/shared-class/#create-a-cache-on-a-batch-pipeline) * [Create a cache and update it regularly on a streaming pipeline](/documentation/patterns/shared-class/#create-a-cache-and-update-it-regularly-on-a-streaming-pipeline) +**Rate limiting patterns** - Patterns for rate limiting DoFns and Transforms in Beam pipelines +* [Rate limiting DoFns and Transforms](/documentation/patterns/rate-limiting) + ## Contributing a pattern To contribute a new pipeline pattern, create [a feature request](https://github.com/apache/beam/issues/new?labels=new+feature%2Cawaiting+triage&template=feature.yml&title=%5BFeature+Request%5D%3A+) and add details to the issue description. See [Get started contributing](/contribute/) for more information. diff --git a/website/www/site/content/en/documentation/patterns/rate-limiting.md b/website/www/site/content/en/documentation/patterns/rate-limiting.md new file mode 100644 index 000000000000..7c13c12ffa21 --- /dev/null +++ b/website/www/site/content/en/documentation/patterns/rate-limiting.md @@ -0,0 +1,97 @@ +--- +title: "Rate limiting patterns" +--- + + +# Rate limiting patterns + +Apache Beam is built to maximize throughput by scaling workloads across thousands of workers. However, this massive parallelism requires coordination when pipelines interact with external systems that enforce strict quotas, such as 3rd-party REST APIs, databases, or internal microservices. Without a centralized rate limiting mechanism, independent workers might exceed the capacity of these systems, resulting in service degradation or broad IP blocking. + +## Centralized Rate Limit Service + +The recommended approach for global rate limiting in Beam is using a centralized Rate Limit Service (RLS). + +A production-ready Terraform module to deploy this service on GKE is available in the beam repository: +[`envoy-ratelimiter`](https://github.com/apache/beam/tree/master/examples/terraform/envoy-ratelimiter) + +To deploy the rate-limiting infrastructure on GKE: + +1. Update `terraform.tfvars` with your project variables to adjust rules and domains. +2. Run the helper deploy script: `./deploy.sh` + +This script automates deployment and, upon completion, returns the Internal Load Balancer IP address for your deployment that you will use in your pipeline. + +--- + +{{< language-switcher java py >}} + +## Using RateLimiter + +To rate limit requests in your pipeline, you can create a RateLimiter client in your `DoFn`'s setup phase and acquire permits before making calls in the process phase. + +{{< paragraph class="language-java" >}} +In Java, use the `RateLimiter` interface and `EnvoyRateLimiterFactory` implementation to coordinate with the Envoy service. Create `RateLimiterOptions` with your service address, initialize the client in @Setup using `EnvoyRateLimiterFactory`, and call `rateLimiter.allow(batchSize)` in @ProcessElement to acquire a batch of permits. +{{< /paragraph >}} + +{{< highlight java >}} +{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java" RateLimiterSimpleJava >}} +{{< /highlight >}} + +{{< paragraph class="language-py" >}} +In Python, use the `EnvoyRateLimiter` and Shared to coordinate a single client instance shared across threads. Initialize client in `setup()` using `shared`, and call `self.rate_limiter.allow()` in `process()` to acquire rate-limiting permits before executing API calls. +{{< /paragraph >}} + +{{< highlight py >}} +{{< code_sample "sdks/python/apache_beam/examples/rate_limiter_simple.py" RateLimiterSimplePython >}} +{{< /highlight >}} + +{{< paragraph class="language-py" >}} +If you are using **RunInference** for remote model inference (e.g., Vertex AI), you can pass the `EnvoyRateLimiter` directly to the `ModelHandler`. The model handler coordinates the rate limit internally across your distributed workers. +{{< /paragraph >}} + +{{< highlight py >}} +{{< code_sample "sdks/python/apache_beam/examples/inference/rate_limiter_vertex_ai.py" RateLimiterVertexPy >}} +{{< /highlight >}} + +--- + +## Running Example Pipelines with RateLimiter + +Once your Rate Limiter Service is deployed and has an Internal IP, you can run your pipeline pointing to that address. + +{{< highlight java >}} +# Get the IP from your RLS deployment +export RLS_ADDRESS=":8081" + +./gradlew :examples:java:exec -DmainClass=org.apache.beam.examples.RateLimiterSimple \ + -Dexec.args="--runner= \ + --rateLimiterAddress=${RLS_ADDRESS} \ + --rateLimiterDomain=mongo_cps" +{{< /highlight >}} + +{{< highlight py >}} +# Get the IP from your RLS deployment +export RLS_ADDRESS=":8081" + +python -m apache_beam.examples.rate_limiter_simple \ + --runner= \ + --rls_address=${RLS_ADDRESS} +{{< /highlight >}} + +## AutoScaler Integration + +The throttling time and signals from the RateLimiter has to be picked up by the autoscaler. This allows the autoscaler to scale down the workers when the pipeline is being throttled by the external service, preventing unnecessary resource usage. + +**Dataflow** currently supports this AutoScaler integration for **Batch RunnerV2**. Note that AutoScaler integration for Streaming mode is a known limitation. diff --git a/website/www/site/layouts/partials/section-menu/en/documentation.html b/website/www/site/layouts/partials/section-menu/en/documentation.html index 0cc197d95fdc..0e87273853bf 100755 --- a/website/www/site/layouts/partials/section-menu/en/documentation.html +++ b/website/www/site/layouts/partials/section-menu/en/documentation.html @@ -213,6 +213,7 @@
  • Schema
  • BigQuery ML
  • Grouping elements for efficient external service calls
  • +
  • Rate limiting DoFns and Transforms
  • Cache using a shared object