Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions .github/workflows/test-xds.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
name: Test xDS

on: [push, pull_request]

permissions:
contents: read

env:
CONSOLE_OUTPUT: XTerm

jobs:
test:
name: ${{matrix.ruby}} on ${{matrix.os}}
runs-on: ${{matrix.os}}-latest
continue-on-error: ${{matrix.experimental}}

strategy:
matrix:
os:
- ubuntu

ruby:
- "3.3"
- "3.4"
- "4.0"

experimental: [false]

steps:
- uses: actions/checkout@v6

- uses: ruby/setup-ruby@v1
with:
ruby-version: ${{matrix.ruby}}
bundler-cache: true

- name: Run tests
timeout-minutes: 15
env:
RUBY_VERSION: ${{matrix.ruby}}
run: docker compose -f xds/docker-compose.yaml up --exit-code-from tests
1 change: 1 addition & 0 deletions async-grpc.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Gem::Specification.new do |spec|

spec.required_ruby_version = ">= 3.3"

spec.add_dependency "async", ">= 2.38.0"
spec.add_dependency "async-http"
spec.add_dependency "protocol-http", "~> 0.60"
spec.add_dependency "protocol-grpc", "~> 0.11.0"
Expand Down
116 changes: 116 additions & 0 deletions bake/async/grpc/xds.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2025-2026, by Samuel Williams.

# Generate Ruby protobuf classes from Envoy .proto files
# @parameter proto_dir [String] Directory containing .proto files (default: "proto")
# @parameter output_dir [String] Output directory for generated Ruby files (default: "lib")
def generate_protos(proto_dir: "proto", output_dir: "lib")
require "fileutils"

proto_dir = File.expand_path(proto_dir)
output_dir = File.expand_path(output_dir)

# Core discovery service files (most important)
discovery_files = [
"envoy/service/discovery/v3/discovery.proto",
"envoy/service/discovery/v3/ads.proto"
]

# Core config files needed for discovery
config_files = [
"envoy/config/core/v3/base.proto",
"envoy/config/core/v3/address.proto",
"envoy/config/core/v3/config_source.proto",
"envoy/config/cluster/v3/cluster.proto",
"envoy/config/endpoint/v3/endpoint.proto"
]

# Google protobuf well-known types
google_files = [
"google/protobuf/any.proto",
"google/protobuf/duration.proto",
"google/protobuf/timestamp.proto",
"google/protobuf/struct.proto",
"google/protobuf/empty.proto",
"google/protobuf/wrappers.proto",
"google/rpc/status.proto"
]

all_files = discovery_files + config_files + google_files

# Create output directories
FileUtils.mkdir_p(output_dir)

# Generate Ruby code
all_files.each do |proto_file|
full_path = File.join(proto_dir, proto_file)
next unless File.exist?(full_path)

Console.info{"Generating #{proto_file}..."}

system(
"protoc",
"--ruby_out=#{output_dir}",
"--proto_path=#{proto_dir}",
"--proto_path=#{File.join(proto_dir, 'google')}",
full_path,
out: File::NULL,
err: File::NULL
) or begin
Console.warn{"Failed to generate #{proto_file} (may have missing dependencies)"}
end
end

# Count generated files
generated = Dir.glob(File.join(output_dir, "**/*_pb.rb")).count

Console.info{"Generated #{generated} protobuf Ruby files in #{output_dir}"}
end

# Generate all protobuf files (including optional dependencies)
# This will attempt to generate all .proto files, even if some fail
# @parameter proto_dir [String] Directory containing .proto files (default: "proto")
# @parameter output_dir [String] Output directory for generated Ruby files (default: "lib")
def generate_all_protos(proto_dir: "proto", output_dir: "lib")
require "fileutils"

proto_dir = File.expand_path(proto_dir)
output_dir = File.expand_path(output_dir)

# Find all .proto files
proto_files = Dir.glob(File.join(proto_dir, "**/*.proto"))

Console.info{"Found #{proto_files.count} .proto files"}

# Generate each file
success_count = 0
fail_count = 0

proto_files.each do |proto_file|
relative_path = proto_file.sub(/^#{proto_dir}\//, "")

Console.debug{"Generating #{relative_path}..."}

if system(
"protoc",
"--ruby_out=#{output_dir}",
"--proto_path=#{proto_dir}",
"--proto_path=#{File.join(proto_dir, 'google')}",
proto_file,
out: File::NULL,
err: File::NULL
)
success_count += 1
else
fail_count += 1
Console.debug{"Failed: #{relative_path}"}
end
end

# Count generated files
generated = Dir.glob(File.join(output_dir, "**/*_pb.rb")).count

Console.info{"Generated #{generated} protobuf Ruby files (#{success_count} succeeded, #{fail_count} failed)"}
end
Empty file removed code.md
Empty file.
9 changes: 6 additions & 3 deletions lib/async/grpc/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,12 @@ def call(request)
# @parameter metadata [Hash] Custom metadata headers
# @parameter timeout [Numeric | Nil] Optional timeout in seconds
# @parameter encoding [String | Nil] Optional compression encoding
# @parameter initial [Object | Array] Optional initial message(s) to send with the request body for bidirectional streaming (avoids deadlock when server waits for first message)
# @yields {|input, output| ...} Block for streaming calls
# @returns [Object | Protocol::GRPC::Body::ReadableBody] Response message or readable body for streaming
# @raises [ArgumentError] If method is unknown or streaming type is invalid
# @raises [Protocol::GRPC::Error] If the gRPC call fails
def invoke(service, method, request = nil, metadata: {}, timeout: nil, encoding: nil, &block)
def invoke(service, method, request = nil, metadata: {}, timeout: nil, encoding: nil, initial: nil, &block)
rpc = service.class.lookup_rpc(method)
raise ArgumentError, "Unknown method: #{method}" unless rpc

Expand All @@ -141,7 +142,7 @@ def invoke(service, method, request = nil, metadata: {}, timeout: nil, encoding:
when :client_streaming
client_streaming_call(path, headers, request_class, response_class, encoding, &block)
when :bidirectional
bidirectional_call(path, headers, request_class, response_class, encoding, &block)
bidirectional_call(path, headers, request_class, response_class, encoding, initial: initial, &block)
else
raise ArgumentError, "Unknown streaming type: #{streaming}"
end
Expand Down Expand Up @@ -273,14 +274,16 @@ def client_streaming_call(path, headers, request_class, response_class, encoding
# @parameter request_class [Class] Request message class
# @parameter response_class [Class] Response message class
# @parameter encoding [String | Nil] Compression encoding
# @parameter initial [Object | Array | Nil] Optional initial message(s) to send with the request body (avoids deadlock when server waits for first message)
# @yields {|input, output| ...} Block to handle bidirectional streaming
# @returns [Protocol::GRPC::Body::ReadableBody] Readable body for streaming messages
# @raises [Protocol::GRPC::Error] If the gRPC call fails
def bidirectional_call(path, headers, request_class, response_class, encoding, &block)
def bidirectional_call(path, headers, request_class, response_class, encoding, initial: nil, &block)
body = Protocol::GRPC::Body::WritableBody.new(
encoding: encoding,
message_class: request_class
)
Array(initial).each{|message| body.write(message)}

http_request = Protocol::HTTP::Request["POST", path, headers, body]
response = call(http_request)
Expand Down
5 changes: 3 additions & 2 deletions lib/async/grpc/stub.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ def method_missing(method_name, *args, **options, &block)
# Extract request from args (first positional argument):
request = args.first

# Extract metadata, timeout, encoding from options:
# Extract metadata, timeout, encoding, initial from options:
metadata = options.delete(:metadata) || {}
timeout = options.delete(:timeout)
encoding = options.delete(:encoding)
initial = options.delete(:initial)

# Delegate to client.invoke with PascalCase method name (for interface lookup):
@client.invoke(@interface, interface_method_name, request, metadata: metadata, timeout: timeout, encoding: encoding, &block)
@client.invoke(@interface, interface_method_name, request, metadata: metadata, timeout: timeout, encoding: encoding, initial: initial, &block)
else
super
end
Expand Down
37 changes: 37 additions & 0 deletions lib/async/grpc/xds.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2025-2026, by Samuel Williams.

# Load order matters - Context must be loaded before Client
require_relative "xds/resource_cache"
require_relative "xds/resources"
require_relative "xds/ads_stream"
require_relative "xds/discovery_client"
require_relative "xds/health_checker"
require_relative "xds/load_balancer"
require_relative "xds/context"
require_relative "xds/client"

module Async
module GRPC
# xDS (Discovery Service) support for dynamic service discovery and configuration
#
# Provides dynamic service discovery and load balancing for gRPC clients
# using the xDS (Discovery Service) protocol.
#
# @example Basic usage
# require "async/grpc/xds"
#
# bootstrap = {
# "xds_servers" => [{"server_uri" => "xds-control-plane:18000"}],
# "node" => {"id" => "client-1", "cluster" => "test"}
# }
#
# xds_client = Async::GRPC::XDS::Client.new("myservice", bootstrap: bootstrap)
# stub = xds_client.stub(MyServiceInterface, "myservice")
# response = stub.say_hello(request)
module XDS
end
end
end
70 changes: 70 additions & 0 deletions lib/async/grpc/xds/ads_stream.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2025-2026, by Samuel Williams.

require "async"
require "async/grpc/client"
require "envoy/service/discovery/v3/aggregated_discovery_service"
require "envoy/service/discovery/v3/discovery_pb"
require "envoy/config/core/v3/base_pb"

module Async
module GRPC
module XDS
# Encapsulates a single ADS (Aggregated Discovery Service) bidirectional stream.
# Owns the stream lifecycle and delegates events to a delegate object.
class ADSStream
# Interface for ADSStream delegates. Implement these methods to receive stream events.
module Delegate
# Called when a DiscoveryResponse is received from the server.
# @parameter response [Envoy::Service::Discovery::V3::DiscoveryResponse] The discovery response
# @parameter stream [ADSStream] The stream instance; use stream.send(request) to send ACKs or new requests
def discovery_response(response, stream)
end
end

def initialize(client, node, delegate:)
@client = client
@node = node
@delegate = delegate
@body = nil
end

# Send a DiscoveryRequest on the stream. Call from within discovery_response to send ACKs.
# @parameter request [Envoy::Service::Discovery::V3::DiscoveryRequest] The request to send
def send(request)
@body&.write(request)
end

# Run the ADS stream. Blocks until the stream completes or errors.
# @parameter initial [Object | Array | Nil] Initial message(s) to send (defaults to node-only request if nil/empty)
def run(initial: nil)
service = Envoy::Service::Discovery::V3::AggregatedDiscoveryService.new(
"envoy.service.discovery.v3.AggregatedDiscoveryService"
)

initial = Array(initial).any? ? initial : [Envoy::Service::Discovery::V3::DiscoveryRequest.new(node: @node)]

@client.invoke(service, :StreamAggregatedResources, nil, initial: initial) do |body, readable_body|
@body = body
@delegate.stream_opened(self) if @delegate.respond_to?(:stream_opened)

begin
readable_body.each do |response|
@delegate.discovery_response(response, self)
end
ensure
@delegate.stream_closed(self) if @delegate.respond_to?(:stream_closed)
@body = nil
end
end
rescue => error
@delegate.stream_error(self, error) if @delegate.respond_to?(:stream_error)
Console.error(self, "Failed while streaming updates!", exception: error)
raise
end
end
end
end
end
Loading
Loading