Skip to content

Commit

Permalink
New test case: RPC handler when lazy response data throws an error
Browse files Browse the repository at this point in the history
This reproduces a bug where an exception raised in a lazy sequence
attached to a protobuf response causes the request handler to crash and
time out.

Signed-off-by: Ryan Sundberg <[email protected]>
  • Loading branch information
sundbry committed Mar 19, 2022
1 parent 89b9d29 commit 13ed799
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 15 deletions.
45 changes: 35 additions & 10 deletions test/test/protojure/grpc_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
[example.hello.Greeter :as greeter]
[protojure.test.grpc.TestService.server :as test.server]
[protojure.test.grpc.TestService.client :as test.client]
[protojure.test.grpc :refer [new-ShouldThrowResponse]]
[protojure.internal.grpc.client.providers.http2.jetty :as jetty]
[crypto.random :as crypto]
[criterium.core :as criterium])
Expand Down Expand Up @@ -183,8 +184,20 @@
{:body {:msg (str "Hello, " auth)}}))

(ShouldThrow
[_ request]
(throw (ex-info "This is supposed to fail" {})))
[_ {{test-case :case} :grpc-params}]
{:body
(case test-case
0 (throw (ex-info "This is supposed to fail" {}))
; Non-lazy seq
1 {:numbers
(mapv (fn [_]
(throw (ex-info "This is also supposed to fail (1)" {})))
(range 3))}
; Lazy seq
2 {:numbers
(map (fn [n]
(throw (ex-info "This is also supposed to fail (2)" {})))
(range 3))})})

(Async
[_ request]
Expand Down Expand Up @@ -262,6 +275,16 @@
[code & body]
`(-check-throw ~code #(do ~@body)))

(defn is-grpc-error? [grpc-status response-promise]
"Asserts that response promise returns a specific gRPC error code, and
returns the exception."
(let [ex (is (thrown? Exception @response-promise))
cause (.getCause ex)
code (grpc.status/get-code grpc-status)]
(is (some? cause))
(is (= code (:status (ex-data cause))))
cause))

;;-----------------------------------------------------------------------------
;; Scaletest Assemblies
;;-----------------------------------------------------------------------------
Expand Down Expand Up @@ -680,18 +703,20 @@
(deftest test-grpc-exception
(let [client @(grpc.http2/connect {:uri (str "http://localhost:" (:port @test-env))})]
(testing "Check that exceptions thrown on the server propagate back to the client"
(is (thrown? java.util.concurrent.ExecutionException
@(test.client/ShouldThrow client {})))
(try
@(test.client/ShouldThrow client {})
(assert false) ;; we should never get here
(catch java.util.concurrent.ExecutionException e
(let [{:keys [status]} (ex-data (.getCause e))]
(is (= status 13))))))
(is-grpc-error? :internal (test.client/ShouldThrow client {})))
(testing "Check that we can still connect even after exceptions have been received"
(is (-> @(test.client/Async client {}) :msg (= "Hello, Async"))))
(grpc/disconnect client)))

(deftest test-grpc-sequence-exception
(let [client @(grpc.http2/connect {:uri (str "http://localhost:" (:port @test-env))})]
(testing "Check that exceptions thrown in sequence mappers on the server propagate back to the client"
(testing "Non-lazy: throws in mapv"
(is-grpc-error? :internal (test.client/ShouldThrow client {:case 1})))
(testing "Lazy: throws in map"
(is-grpc-error? :internal (test.client/ShouldThrow client {:case 2}))))
(grpc/disconnect client)))

(deftest test-grpc-async
(testing "Check that async processing functions correctly"
(let [client @(grpc.http2/connect {:uri (str "http://localhost:" (:port @test-env))})]
Expand Down
102 changes: 102 additions & 0 deletions test/test/protojure/test/grpc.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
(declare cis->BigPayload)
(declare ecis->BigPayload)
(declare new-BigPayload)
(declare cis->ShouldThrowRequest)
(declare ecis->ShouldThrowRequest)
(declare new-ShouldThrowRequest)
(declare cis->ShouldThrowResponse)
(declare ecis->ShouldThrowResponse)
(declare new-ShouldThrowResponse)

;;----------------------------------------------------------------------------------
;;----------------------------------------------------------------------------------
Expand Down Expand Up @@ -378,3 +384,99 @@

(def ^:protojure.protobuf.any/record BigPayload-meta {:type "protojure.test.grpc.BigPayload" :decoder pb->BigPayload})

;-----------------------------------------------------------------------------
; ShouldThrowRequest
;-----------------------------------------------------------------------------
(defrecord ShouldThrowRequest-record [case]
pb/Writer
(serialize [this os]
(serdes.core/write-Int32 1 {:optimize true} (:case this) os))
pb/TypeReflection
(gettype [this]
"protojure.test.grpc.ShouldThrowRequest"))

(s/def :protojure.test.grpc.ShouldThrowRequest/case int?)
(s/def ::ShouldThrowRequest-spec (s/keys :opt-un [:protojure.test.grpc.ShouldThrowRequest/case ]))
(def ShouldThrowRequest-defaults {:case 0 })

(defn cis->ShouldThrowRequest
"CodedInputStream to ShouldThrowRequest"
[is]
(->> (tag-map ShouldThrowRequest-defaults
(fn [tag index]
(case index
1 [:case (serdes.core/cis->Int32 is)]

[index (serdes.core/cis->undefined tag is)]))
is)
(map->ShouldThrowRequest-record)))

(defn ecis->ShouldThrowRequest
"Embedded CodedInputStream to ShouldThrowRequest"
[is]
(serdes.core/cis->embedded cis->ShouldThrowRequest is))

(defn new-ShouldThrowRequest
"Creates a new instance from a map, similar to map->ShouldThrowRequest except that
it properly accounts for nested messages, when applicable.
"
[init]
{:pre [(if (s/valid? ::ShouldThrowRequest-spec init) true (throw (ex-info "Invalid input" (s/explain-data ::ShouldThrowRequest-spec init))))]}
(-> (merge ShouldThrowRequest-defaults init)
(map->ShouldThrowRequest-record)))

(defn pb->ShouldThrowRequest
"Protobuf to ShouldThrowRequest"
[input]
(cis->ShouldThrowRequest (serdes.stream/new-cis input)))

(def ^:protojure.protobuf.any/record ShouldThrowRequest-meta {:type "protojure.test.grpc.ShouldThrowRequest" :decoder pb->ShouldThrowRequest})

;-----------------------------------------------------------------------------
; ShouldThrowResponse
;-----------------------------------------------------------------------------
(defrecord ShouldThrowResponse-record [numbers]
pb/Writer
(serialize [this os]
(serdes.complex/write-repeated serdes.core/write-Int32 1 (:numbers this) os))
pb/TypeReflection
(gettype [this]
"protojure.test.grpc.ShouldThrowResponse"))

(s/def :protojure.test.grpc.ShouldThrowResponse/numbers (s/every int?))
(s/def ::ShouldThrowResponse-spec (s/keys :opt-un [:protojure.test.grpc.ShouldThrowResponse/numbers ]))
(def ShouldThrowResponse-defaults {:numbers [] })

(defn cis->ShouldThrowResponse
"CodedInputStream to ShouldThrowResponse"
[is]
(->> (tag-map ShouldThrowResponse-defaults
(fn [tag index]
(case index
1 [:numbers (serdes.complex/cis->packablerepeated tag serdes.core/cis->Int32 is)]

[index (serdes.core/cis->undefined tag is)]))
is)
(map->ShouldThrowResponse-record)))

(defn ecis->ShouldThrowResponse
"Embedded CodedInputStream to ShouldThrowResponse"
[is]
(serdes.core/cis->embedded cis->ShouldThrowResponse is))

(defn new-ShouldThrowResponse
"Creates a new instance from a map, similar to map->ShouldThrowResponse except that
it properly accounts for nested messages, when applicable.
"
[init]
{:pre [(if (s/valid? ::ShouldThrowResponse-spec init) true (throw (ex-info "Invalid input" (s/explain-data ::ShouldThrowResponse-spec init))))]}
(-> (merge ShouldThrowResponse-defaults init)
(map->ShouldThrowResponse-record)))

(defn pb->ShouldThrowResponse
"Protobuf to ShouldThrowResponse"
[input]
(cis->ShouldThrowResponse (serdes.stream/new-cis input)))

(def ^:protojure.protobuf.any/record ShouldThrowResponse-meta {:type "protojure.test.grpc.ShouldThrowResponse" :decoder pb->ShouldThrowResponse})

4 changes: 2 additions & 2 deletions test/test/protojure/test/grpc/TestService/client.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@
output (async/chan 1)
desc {:service "protojure.test.grpc.TestService"
:method "ShouldThrow"
:input {:f com.google.protobuf/new-Empty :ch input}
:output {:f com.google.protobuf/pb->Empty :ch output}
:input {:f protojure.test.grpc/new-ShouldThrowRequest :ch input}
:output {:f protojure.test.grpc/pb->ShouldThrowResponse :ch output}
:metadata metadata}]
(-> (send-unary-params input params)
(p/then (fn [_] (invoke-unary client desc output)))))))
Expand Down
2 changes: 1 addition & 1 deletion test/test/protojure/test/grpc/TestService/server.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,4 @@
{:pkg "protojure.test.grpc" :service "TestService" :method "AsyncEmpty" :method-fn AsyncEmpty-dispatch :server-streaming true :client-streaming false :input com.google.protobuf/pb->Empty :output com.google.protobuf/new-Empty}
{:pkg "protojure.test.grpc" :service "TestService" :method "Metadata" :method-fn Metadata-dispatch :server-streaming false :client-streaming false :input com.google.protobuf/pb->Empty :output new-SimpleResponse}
{:pkg "protojure.test.grpc" :service "TestService" :method "ReturnErrorStreaming" :method-fn ReturnErrorStreaming-dispatch :server-streaming true :client-streaming false :input pb->ErrorRequest :output com.google.protobuf/new-Empty}
{:pkg "protojure.test.grpc" :service "TestService" :method "ShouldThrow" :method-fn ShouldThrow-dispatch :server-streaming false :client-streaming false :input com.google.protobuf/pb->Empty :output com.google.protobuf/new-Empty}])
{:pkg "protojure.test.grpc" :service "TestService" :method "ShouldThrow" :method-fn ShouldThrow-dispatch :server-streaming false :client-streaming false :input pb->ShouldThrowRequest :output new-ShouldThrowResponse}])
12 changes: 10 additions & 2 deletions test/test/resources/grpctest.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,25 @@ message BigPayload {
bytes data = 2;
}

message ShouldThrowRequest {
int32 case = 1;
}

message ShouldThrowResponse {
repeated int32 numbers = 1;
}

service TestService {
rpc ClientCloseDetect (CloseDetectRequest) returns (stream google.protobuf.Any);
rpc ServerCloseDetect (google.protobuf.Empty) returns (stream google.protobuf.Any);
rpc FlowControl (FlowControlRequest) returns (stream FlowControlPayload);
rpc Metadata (google.protobuf.Empty) returns (SimpleResponse);
rpc ShouldThrow (google.protobuf.Empty) returns (google.protobuf.Empty);
rpc ShouldThrow (ShouldThrowRequest) returns (ShouldThrowResponse);
rpc Async (google.protobuf.Empty) returns (SimpleResponse);
rpc AllEmpty(google.protobuf.Empty) returns (google.protobuf.Empty);
rpc AsyncEmpty(google.protobuf.Empty) returns (stream google.protobuf.Empty);
rpc DeniedStreamer(google.protobuf.Empty) returns (stream google.protobuf.Empty);
rpc ReturnError(ErrorRequest) returns (google.protobuf.Empty);
rpc ReturnErrorStreaming(ErrorRequest) returns (stream google.protobuf.Empty);
rpc BandwidthTest(BigPayload) returns (BigPayload);
}
}

0 comments on commit 13ed799

Please sign in to comment.