Skip to content

Commit

Permalink
Propagate context to server RPC handlers (#2059)
Browse files Browse the repository at this point in the history
Motivation:

The interceptors API has a context which, at the moment, only includes
the name of the RPC. This information is generally useful so should be
propagated to the server handler too. Information on the context can
also be extended later to include things like the identity of the remote
peer, or info about the state of the RPC.

Modifications:

- Rename 'ServerInterceptorContext' to 'ServerContext'
- Make the transport the source of the context and have that provide it
  to the 'listen' method. Propagate this through the server stack to the
  generated stubs.
- Update code generator to include the context
- Update generated code
- Update services

Results:

RPC handlers have a context provided by a transport
  • Loading branch information
glbrntt authored Sep 19, 2024
1 parent 78da46d commit 7789f1e
Show file tree
Hide file tree
Showing 43 changed files with 1,133 additions and 432 deletions.
98 changes: 76 additions & 22 deletions Examples/v2/echo/Generated/echo.grpc.swift
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,28 @@ extension GRPCCore.ServiceDescriptor {
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
internal protocol Echo_EchoStreamingServiceProtocol: GRPCCore.RegistrableRPCService {
/// Immediately returns an echo of a request.
func get(request: GRPCCore.ServerRequest.Stream<Echo_EchoRequest>) async throws -> GRPCCore.ServerResponse.Stream<Echo_EchoResponse>
func get(
request: GRPCCore.ServerRequest.Stream<Echo_EchoRequest>,
context: GRPCCore.ServerContext
) async throws -> GRPCCore.ServerResponse.Stream<Echo_EchoResponse>

/// Splits a request into words and returns each word in a stream of messages.
func expand(request: GRPCCore.ServerRequest.Stream<Echo_EchoRequest>) async throws -> GRPCCore.ServerResponse.Stream<Echo_EchoResponse>
func expand(
request: GRPCCore.ServerRequest.Stream<Echo_EchoRequest>,
context: GRPCCore.ServerContext
) async throws -> GRPCCore.ServerResponse.Stream<Echo_EchoResponse>

/// Collects a stream of messages and returns them concatenated when the caller closes.
func collect(request: GRPCCore.ServerRequest.Stream<Echo_EchoRequest>) async throws -> GRPCCore.ServerResponse.Stream<Echo_EchoResponse>
func collect(
request: GRPCCore.ServerRequest.Stream<Echo_EchoRequest>,
context: GRPCCore.ServerContext
) async throws -> GRPCCore.ServerResponse.Stream<Echo_EchoResponse>

/// Streams back messages as they are received in an input stream.
func update(request: GRPCCore.ServerRequest.Stream<Echo_EchoRequest>) async throws -> GRPCCore.ServerResponse.Stream<Echo_EchoResponse>
func update(
request: GRPCCore.ServerRequest.Stream<Echo_EchoRequest>,
context: GRPCCore.ServerContext
) async throws -> GRPCCore.ServerResponse.Stream<Echo_EchoResponse>
}

/// Conformance to `GRPCCore.RegistrableRPCService`.
Expand All @@ -107,32 +119,44 @@ extension Echo_Echo.StreamingServiceProtocol {
forMethod: Echo_Echo.Method.Get.descriptor,
deserializer: GRPCProtobuf.ProtobufDeserializer<Echo_EchoRequest>(),
serializer: GRPCProtobuf.ProtobufSerializer<Echo_EchoResponse>(),
handler: { request in
try await self.get(request: request)
handler: { request, context in
try await self.get(
request: request,
context: context
)
}
)
router.registerHandler(
forMethod: Echo_Echo.Method.Expand.descriptor,
deserializer: GRPCProtobuf.ProtobufDeserializer<Echo_EchoRequest>(),
serializer: GRPCProtobuf.ProtobufSerializer<Echo_EchoResponse>(),
handler: { request in
try await self.expand(request: request)
handler: { request, context in
try await self.expand(
request: request,
context: context
)
}
)
router.registerHandler(
forMethod: Echo_Echo.Method.Collect.descriptor,
deserializer: GRPCProtobuf.ProtobufDeserializer<Echo_EchoRequest>(),
serializer: GRPCProtobuf.ProtobufSerializer<Echo_EchoResponse>(),
handler: { request in
try await self.collect(request: request)
handler: { request, context in
try await self.collect(
request: request,
context: context
)
}
)
router.registerHandler(
forMethod: Echo_Echo.Method.Update.descriptor,
deserializer: GRPCProtobuf.ProtobufDeserializer<Echo_EchoRequest>(),
serializer: GRPCProtobuf.ProtobufSerializer<Echo_EchoResponse>(),
handler: { request in
try await self.update(request: request)
handler: { request, context in
try await self.update(
request: request,
context: context
)
}
)
}
Expand All @@ -141,33 +165,63 @@ extension Echo_Echo.StreamingServiceProtocol {
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
internal protocol Echo_EchoServiceProtocol: Echo_Echo.StreamingServiceProtocol {
/// Immediately returns an echo of a request.
func get(request: GRPCCore.ServerRequest.Single<Echo_EchoRequest>) async throws -> GRPCCore.ServerResponse.Single<Echo_EchoResponse>
func get(
request: GRPCCore.ServerRequest.Single<Echo_EchoRequest>,
context: GRPCCore.ServerContext
) async throws -> GRPCCore.ServerResponse.Single<Echo_EchoResponse>

/// Splits a request into words and returns each word in a stream of messages.
func expand(request: GRPCCore.ServerRequest.Single<Echo_EchoRequest>) async throws -> GRPCCore.ServerResponse.Stream<Echo_EchoResponse>
func expand(
request: GRPCCore.ServerRequest.Single<Echo_EchoRequest>,
context: GRPCCore.ServerContext
) async throws -> GRPCCore.ServerResponse.Stream<Echo_EchoResponse>

/// Collects a stream of messages and returns them concatenated when the caller closes.
func collect(request: GRPCCore.ServerRequest.Stream<Echo_EchoRequest>) async throws -> GRPCCore.ServerResponse.Single<Echo_EchoResponse>
func collect(
request: GRPCCore.ServerRequest.Stream<Echo_EchoRequest>,
context: GRPCCore.ServerContext
) async throws -> GRPCCore.ServerResponse.Single<Echo_EchoResponse>

/// Streams back messages as they are received in an input stream.
func update(request: GRPCCore.ServerRequest.Stream<Echo_EchoRequest>) async throws -> GRPCCore.ServerResponse.Stream<Echo_EchoResponse>
func update(
request: GRPCCore.ServerRequest.Stream<Echo_EchoRequest>,
context: GRPCCore.ServerContext
) async throws -> GRPCCore.ServerResponse.Stream<Echo_EchoResponse>
}

/// Partial conformance to `Echo_EchoStreamingServiceProtocol`.
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
extension Echo_Echo.ServiceProtocol {
internal func get(request: GRPCCore.ServerRequest.Stream<Echo_EchoRequest>) async throws -> GRPCCore.ServerResponse.Stream<Echo_EchoResponse> {
let response = try await self.get(request: GRPCCore.ServerRequest.Single(stream: request))
internal func get(
request: GRPCCore.ServerRequest.Stream<Echo_EchoRequest>,
context: GRPCCore.ServerContext
) async throws -> GRPCCore.ServerResponse.Stream<Echo_EchoResponse> {
let response = try await self.get(
request: GRPCCore.ServerRequest.Single(stream: request),
context: context
)
return GRPCCore.ServerResponse.Stream(single: response)
}

internal func expand(request: GRPCCore.ServerRequest.Stream<Echo_EchoRequest>) async throws -> GRPCCore.ServerResponse.Stream<Echo_EchoResponse> {
let response = try await self.expand(request: GRPCCore.ServerRequest.Single(stream: request))
internal func expand(
request: GRPCCore.ServerRequest.Stream<Echo_EchoRequest>,
context: GRPCCore.ServerContext
) async throws -> GRPCCore.ServerResponse.Stream<Echo_EchoResponse> {
let response = try await self.expand(
request: GRPCCore.ServerRequest.Single(stream: request),
context: context
)
return response
}

internal func collect(request: GRPCCore.ServerRequest.Stream<Echo_EchoRequest>) async throws -> GRPCCore.ServerResponse.Stream<Echo_EchoResponse> {
let response = try await self.collect(request: request)
internal func collect(
request: GRPCCore.ServerRequest.Stream<Echo_EchoRequest>,
context: GRPCCore.ServerContext
) async throws -> GRPCCore.ServerResponse.Stream<Echo_EchoResponse> {
let response = try await self.collect(
request: request,
context: context
)
return GRPCCore.ServerResponse.Stream(single: response)
}
}
Expand Down
12 changes: 8 additions & 4 deletions Examples/v2/echo/Subcommands/Serve.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,24 @@ struct Serve: AsyncParsableCommand {
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
struct EchoService: Echo_EchoServiceProtocol {
func get(
request: ServerRequest.Single<Echo_EchoRequest>
request: ServerRequest.Single<Echo_EchoRequest>,
context: ServerContext
) async throws -> ServerResponse.Single<Echo_EchoResponse> {
return ServerResponse.Single(message: .with { $0.text = request.message.text })
}

func collect(
request: ServerRequest.Stream<Echo_EchoRequest>
request: ServerRequest.Stream<Echo_EchoRequest>,
context: ServerContext
) async throws -> ServerResponse.Single<Echo_EchoResponse> {
let messages = try await request.messages.reduce(into: []) { $0.append($1.text) }
let joined = messages.joined(separator: " ")
return ServerResponse.Single(message: .with { $0.text = joined })
}

func expand(
request: ServerRequest.Single<Echo_EchoRequest>
request: ServerRequest.Single<Echo_EchoRequest>,
context: ServerContext
) async throws -> ServerResponse.Stream<Echo_EchoResponse> {
return ServerResponse.Stream { writer in
let parts = request.message.text.split(separator: " ")
Expand All @@ -72,7 +75,8 @@ struct EchoService: Echo_EchoServiceProtocol {
}

func update(
request: ServerRequest.Stream<Echo_EchoRequest>
request: ServerRequest.Stream<Echo_EchoRequest>,
context: ServerContext
) async throws -> ServerResponse.Stream<Echo_EchoResponse> {
return ServerResponse.Stream { writer in
for try await message in request.messages {
Expand Down
27 changes: 21 additions & 6 deletions Examples/v2/hello-world/Generated/helloworld.grpc.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ extension GRPCCore.ServiceDescriptor {
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
internal protocol Helloworld_GreeterStreamingServiceProtocol: GRPCCore.RegistrableRPCService {
/// Sends a greeting
func sayHello(request: GRPCCore.ServerRequest.Stream<Helloworld_HelloRequest>) async throws -> GRPCCore.ServerResponse.Stream<Helloworld_HelloReply>
func sayHello(
request: GRPCCore.ServerRequest.Stream<Helloworld_HelloRequest>,
context: GRPCCore.ServerContext
) async throws -> GRPCCore.ServerResponse.Stream<Helloworld_HelloReply>
}

/// Conformance to `GRPCCore.RegistrableRPCService`.
Expand All @@ -72,8 +75,11 @@ extension Helloworld_Greeter.StreamingServiceProtocol {
forMethod: Helloworld_Greeter.Method.SayHello.descriptor,
deserializer: GRPCProtobuf.ProtobufDeserializer<Helloworld_HelloRequest>(),
serializer: GRPCProtobuf.ProtobufSerializer<Helloworld_HelloReply>(),
handler: { request in
try await self.sayHello(request: request)
handler: { request, context in
try await self.sayHello(
request: request,
context: context
)
}
)
}
Expand All @@ -83,14 +89,23 @@ extension Helloworld_Greeter.StreamingServiceProtocol {
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
internal protocol Helloworld_GreeterServiceProtocol: Helloworld_Greeter.StreamingServiceProtocol {
/// Sends a greeting
func sayHello(request: GRPCCore.ServerRequest.Single<Helloworld_HelloRequest>) async throws -> GRPCCore.ServerResponse.Single<Helloworld_HelloReply>
func sayHello(
request: GRPCCore.ServerRequest.Single<Helloworld_HelloRequest>,
context: GRPCCore.ServerContext
) async throws -> GRPCCore.ServerResponse.Single<Helloworld_HelloReply>
}

/// Partial conformance to `Helloworld_GreeterStreamingServiceProtocol`.
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
extension Helloworld_Greeter.ServiceProtocol {
internal func sayHello(request: GRPCCore.ServerRequest.Stream<Helloworld_HelloRequest>) async throws -> GRPCCore.ServerResponse.Stream<Helloworld_HelloReply> {
let response = try await self.sayHello(request: GRPCCore.ServerRequest.Single(stream: request))
internal func sayHello(
request: GRPCCore.ServerRequest.Stream<Helloworld_HelloRequest>,
context: GRPCCore.ServerContext
) async throws -> GRPCCore.ServerResponse.Stream<Helloworld_HelloReply> {
let response = try await self.sayHello(
request: GRPCCore.ServerRequest.Single(stream: request),
context: context
)
return GRPCCore.ServerResponse.Stream(single: response)
}
}
Expand Down
3 changes: 2 additions & 1 deletion Examples/v2/hello-world/Subcommands/Serve.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ struct Serve: AsyncParsableCommand {
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
struct Greeter: Helloworld_GreeterServiceProtocol {
func sayHello(
request: ServerRequest.Single<Helloworld_HelloRequest>
request: ServerRequest.Single<Helloworld_HelloRequest>,
context: ServerContext
) async throws -> ServerResponse.Single<Helloworld_HelloReply> {
var reply = Helloworld_HelloReply()
let recipient = request.message.name.isEmpty ? "stranger" : request.message.name
Expand Down
Loading

0 comments on commit 7789f1e

Please sign in to comment.