Skip to content

CASSJAVA-97: Let users inject an ID for each request and write to the custom payload #2037

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: 4.x
Choose a base branch
from

Conversation

SiyaoIsHiding
Copy link
Contributor

@SiyaoIsHiding SiyaoIsHiding commented Apr 16, 2025

No description provided.

@SiyaoIsHiding
Copy link
Contributor Author

I did integration testing with C* OSS 5.0.2. @lukasz-antoniak helped me add a LoggingQueryHandler and set it as the cassandra.custom_query_handler_class.
I developed a client app using this Java driver with the following config

datastax-java-driver.advanced = {
  distributed-tracing.id-generator.class = W3CContextDistributedTraceIdGenerator
  distributed-tracing.custom-payload-with-key = "traceparent"
}

Running this client app, I got

17:03:20.860 [s0-io-5] TRACE InFlightHandler - [s0|id: 0xeefaab65, L:/127.0.0.2:52389 - R:/127.0.0.2:9042] Writing 00-d51ed2012c1f31b4434f409e50294da9-25da248d0a23981c-00 on stream id 0
17:03:20.863 [s0-io-5] TRACE CqlRequestHandler$NodeResponseCallback - [00-d51ed2012c1f31b4434f409e50294da9-25da248d0a23981c-00] Request sent on [id: 0xeefaab65, L:/127.0.0.2:52389 - R:/127.0.0.2:9042]
17:03:20.864 [s0-io-5] TRACE CqlRequestHandler$NodeResponseCallback - [00-d51ed2012c1f31b4434f409e50294da9-25da248d0a23981c-00] Speculative execution policy returned -1, no next execution
17:03:20.877 [s0-io-5] DEBUG InFlightHandler - [s0|id: 0xeefaab65, L:/127.0.0.2:52389 - R:/127.0.0.2:9042] Got last response on in-flight stream id 0, completing and releasing
17:03:20.877 [s0-io-5] TRACE InFlightHandler - [s0|id: 0xeefaab65, L:/127.0.0.2:52389 - R:/127.0.0.2:9042] Releasing stream id 0
17:03:20.877 [s0-io-5] TRACE CqlRequestHandler$NodeResponseCallback - [00-d51ed2012c1f31b4434f409e50294da9-25da248d0a23981c-00] Got result, completing

And the debug.log at server side got

DEBUG [Native-Transport-Requests-1] 2025-04-15 17:03:20,870 LoggingQueryHandler.java:44 - Processing CQL statement SelectStatement[aggregationSpecFactory=,bindVariables=[],isReversed=false,limit=,orderingComparator=,parameters=org.apache.cassandra.cql3.statements.SelectStatement$Parameters@5d46ec82,perPartitionLimit=,restrictions=StatementRestrictions[clusteringColumnsRestrictions=ClusteringColumnRestrictions[allowFiltering=false,comparator=comparator(),restrictions=RestrictionSet[hasAnn=false,hasContains=false,hasIn=false,hasMultiColumnRestrictions=false,hasOnlyEqualityRestrictions=true,hasSlice=false,restrictions={}]],filterRestrictions=IndexRestrictions[customExpressions=[],regularRestrictions=[]],hasRegularColumnsRestrictions=false,isKeyRange=true,nonPrimaryKeyRestrictions=RestrictionSet[hasAnn=false,hasContains=false,hasIn=false,hasMultiColumnRestrictions=false,hasOnlyEqualityRestrictions=true,hasSlice=false,restrictions={}],notNullColumns=[],partitionKeyRestrictions=PartitionKeySingleRestrictionSet[comparator=comparator(org.apache.cassandra.db.marshal.UTF8Type),restrictions=RestrictionSet[hasAnn=false,hasContains=false,hasIn=false,hasMultiColumnRestrictions=false,hasOnlyEqualityRestrictions=true,hasSlice=false,restrictions={}]],table=system.local,type=SELECT,usesSecondaryIndexing=false],selection=SimpleSelection{columns=[key, bootstrapped, broadcast_address, broadcast_port, cluster_name, cql_version, data_center, gossip_generation, host_id, listen_address, listen_port, native_protocol_version, partitioner, rack, release_version, rpc_address, rpc_port, schema_version, tokens, truncated_at], columnMapping={ Columns:[key, bootstrapped, broadcast_address, broadcast_port, cluster_name, cql_version, data_center, gossip_generation, host_id, listen_address, listen_port, native_protocol_version, partitioner, rack, release_version, rpc_address, rpc_port, schema_version, tokens, truncated_at], Mappings:{rack:[rack], cql_version:[cql_version], listen_address:[listen_address], release_version:[release_version], data_center:[data_center], broadcast_port:[broadcast_port], broadcast_address:[broadcast_address], partitioner:[partitioner], host_id:[host_id], gossip_generation:[gossip_generation], listen_port:[listen_port], rpc_address:[rpc_address], schema_version:[schema_version], rpc_port:[rpc_port], truncated_at:[truncated_at], cluster_name:[cluster_name], native_protocol_version:[native_protocol_version], tokens:[tokens], key:[key], bootstrapped:[bootstrapped]} }, metadata=[key(system, local), org.apache.cassandra.db.marshal.UTF8Type][bootstrapped(system, local), org.apache.cassandra.db.marshal.UTF8Type][broadcast_address(system, local), org.apache.cassandra.db.marshal.InetAddressType][broadcast_port(system, local), org.apache.cassandra.db.marshal.Int32Type][cluster_name(system, local), org.apache.cassandra.db.marshal.UTF8Type][cql_version(system, local), org.apache.cassandra.db.marshal.UTF8Type][data_center(system, local), org.apache.cassandra.db.marshal.UTF8Type][gossip_generation(system, local), org.apache.cassandra.db.marshal.Int32Type][host_id(system, local), org.apache.cassandra.db.marshal.UUIDType][listen_address(system, local), org.apache.cassandra.db.marshal.InetAddressType][listen_port(system, local), org.apache.cassandra.db.marshal.Int32Type][native_protocol_version(system, local), org.apache.cassandra.db.marshal.UTF8Type][partitioner(system, local), org.apache.cassandra.db.marshal.UTF8Type][rack(system, local), org.apache.cassandra.db.marshal.UTF8Type][release_version(system, local), org.apache.cassandra.db.marshal.UTF8Type][rpc_address(system, local), org.apache.cassandra.db.marshal.InetAddressType][rpc_port(system, local), org.apache.cassandra.db.marshal.Int32Type][schema_version(system, local), org.apache.cassandra.db.marshal.UUIDType][tokens(system, local), org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.UTF8Type)][truncated_at(system, local), org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UUIDType,org.apache.cassandra.db.marshal.BytesType)]},table=system.local] with custom payload {traceparent=30302d64353165643230313263316633316234343334663430396535303239346461392d323564613234386430613233393831632d3030}

The value 30302d64353165643230313263316633316234343334663430396535303239346461392d323564613234386430613233393831632d3030 is the hex of the id 00-d51ed2012c1f31b4434f409e50294da9-25da248d0a23981c-00. This shows the capability of tracing a request across client and server.

// We cannot do statement.getCustomPayload().put() because the default empty map is abstract
// But this will create new Statement instance for every request. We might want to optimize
// this
Map<String, ByteBuffer> existingMap = new HashMap<>(statement.getCustomPayload());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Statement is by design immutable. Maybe a nicer way would be to create method StatementBuilder.from(Statement) where you could create builder again based on statement. The code would look like: StatementBuilder.from(statement).addCustomPayload(...).build().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can copy just the payload, not the whole statement:

    Map<String, ByteBuffer> customPayload = statement.getCustomPayload();
    if (!this.customPayloadKey.isEmpty()) {
      customPayload =
          NullAllowingImmutableMap.<String, ByteBuffer>builder()
              .putAll(customPayload)
              .put(
                  this.customPayloadKey,
                  ByteBuffer.wrap(nodeRequestId.getBytes(StandardCharsets.UTF_8)))
              .build();
    }

Then modify line 307 like so:

       channel
-          .write(message, statement.isTracing(), statement.getCustomPayload(), nodeResponseCallback)
+          .write(message, statement.isTracing(), customPayload, nodeResponseCallback)
           .addListener(nodeResponseCallback);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This solves the concurrency problem, but it also means the subsequent setFinalError(statement...), NodeResponseCallback(statement,...), and RequestTracker invocations do not have the statement with the actual custom payload.

Map<String, ByteBuffer> existingMap = new HashMap<>(statement.getCustomPayload());
existingMap.put(
this.customPayloadKey, ByteBuffer.wrap(nodeRequestId.getBytes(StandardCharsets.UTF_8)));
statement = statement.setCustomPayload(existingMap);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overriding custom payload here is not thread-safe. If client application executes the same statement instance multiple times concurrently (not a good use-case, but still possible), we do not guarantee how this map will be changed. Maybe indeed, there is no other way than make a shallow copy of the statement. Will think about it.

  /**
   * Sets the custom payload to use for execution.
   *
   * <p>All the driver's built-in statement implementations are immutable, and return a new instance
   * from this method. However custom implementations may choose to be mutable and return the same
   * instance.
   *
   * <p>Note that it's your responsibility to provide a thread-safe map. This can be achieved with a
   * concurrent or immutable implementation, or by making it effectively immutable (meaning that
   * it's never modified after being set on the statement).
   */
  @NonNull
  @CheckReturnValue
  SelfT setCustomPayload(@NonNull Map<String, ByteBuffer> newCustomPayload);

@SiyaoIsHiding SiyaoIsHiding marked this pull request as ready for review April 17, 2025 08:01
@absurdfarce absurdfarce changed the title CASSJAVA97: Let users inject an ID for each request and write to the custom payload CASSJAVA-97: Let users inject an ID for each request and write to the custom payload Apr 29, 2025
…ution info does not have actual custom payload
try (CqlSession session = SessionUtils.newSession(ccmRule, loader)) {
String query = "SELECT * FROM system.local";
ResultSet rs = session.execute(query);
ByteBuffer id = rs.getExecutionInfo().getRequest().getCustomPayload().get("trace_key");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do you inject for individual CQL request though?
Did i miss this kind of test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test "should_use_customized_request_id_generator". Do you think it answers your question?

* @param hashCode the hashcode of the CqlRequestHandler
* @return a unique identifier for the session request
*/
String getSessionRequestId(@NonNull Request statement, @NonNull String sessionName, int hashCode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface seems a bit too connected to the default impl of RequestIdGenerator. It makes sense to pass the hash code of the relevant CqlRequestHandler given that implementation but is that parameter going to be generally usable?

I'd almost prefer to see the complete CqlRequestHandler passed here rather than just a hash code. That way if other implementers want to pull other values out of the handler (or even provider their own custom handlers with additional info available) they have an easy way to do so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This getSessionRequestId is invoked in CqlRequestHandler's constructor. If we pass the CqlRequestHandler in, the object will not be initialized yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can rename the parameter to salt (similar to cryptography, an integer that just provides uniqueness of IDs)?

* @return a unique identifier for the node request
*/
String getNodeRequestId(
@NonNull Request statement, @NonNull String sessionRequestId, int executionCount);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here I guess; execution count feels very tied to how the default request ID generator works. Is there a way we can generalize this a bit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this parameter makes sense. Within one session, we can retry sending the same request due to retry policy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but that doesn't mean execution count is relevant to all implementations. It also begs the question of whether other things can/should be included for all implementations.

More generally, I'd argue it's inclusion here is primarily a function of the necessity of implementing the current log prefix as a request ID generator... which I'm not sure is a good idea (more on that elsewhere).


Usage:
* Inject ID generator: set the desired `RequestIdGenerator` in `advanced.request-id.generator.class`.
The default implementation generates the session request ID as `{session_name}|{hash_code}`, and node request ID as `{session_name}|{hash_code}|{execution_count}`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't really explain what {hash_code} or {execution_count} mean here

# add the request id to the custom payload with the given key
# if empty, the request id will not be added to the custom payload
custom-payload-with-key = ""
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit - elsewhere we have a space before the opening brace {

Comment on lines +28 to +29
- Session request ID: an identifier for an entire session.execute() call
- Node request ID: an identifier for the execution of a CQL statement against a particular node. There can be one or more node requests for a single session request, due to retries or speculative executions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retries and speculative executions are often against different nodes than the original request, might prefer another name here like "Request Attempt ID".

Currently the server has no way to know whether a given request is a retry, this feature could help us provide a metric for original requests vs. retries on the server, which would be pretty cool.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took the name "node request" v.s. "session request" from c# opentelemetry feature.

db.operation.name The type name of the operation being executed. Session_Request({RequestType}) for session level calls and Node_Request({RequestType}) for node level calls

And Lukasz's outstanding request tracker interface PR.
Do you think we should align with their naming?

*
* <p>Value-type: {@link String}
*/
REQUEST_ID_CUSTOM_PAYLOAD_KEY("advanced.request-id.custom-payload-with-key");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) naming: I find "custom-payload-key" clearer, you're already using that naming elsewhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the enum name, or the typesafe config path?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe @aratno is referring to the TypeSafe name @SiyaoIsHiding ... "custom-payload-key" rather than "custom-payload-with-key". Assuming that's correct I think he's on to something there.

}
# add the request id to the custom payload with the given key
# if empty, the request id will not be added to the custom payload
custom-payload-with-key = ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to disable Request IDs altogether? Seems like at least three possible states are needed:

  1. Disabled Request IDs, no behavior changes on upgrade
  2. Request IDs in driver logs only, not propagated to the server
  3. Request IDs in driver logs and propagated to the server

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing driver has a built-in logic to generate the log prefix, which is the same logic as the DefaultRequestIdGenerator. So your no.1 state is the same no.2 state, where the DefaultRequestIdGenerator is used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd actually collapse your second and third cases into one @aratno. I'd also specify the rule a bit differently:

If the client has configured a request ID generator we'll use that to generate a consistent request ID via the log prefix on the client side and the custom payload params delivered to the server. Otherwise we'll preserve the current log prefix on the client side and add nothing to the custom payload.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm on board with 2 + 3 being a single case, especially in the near-term, but 1 is different

}
# add the request id to the custom payload with the given key
# if empty, the request id will not be added to the custom payload
custom-payload-with-key = ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would a user know what to set this to? Can we come up with a reasonable default that's more likely to be interoperable between C* protocol implementations (C*, Scylla, DSE / Astra, etc)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we really need to choose one to recommend, I think we can recommend traceparent, as it's specified in W3C context propagation protocol.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably this will vary with the implementation, right @aratno? Individaul C* request handlers might want to map this value to some name that makes sense for them. So I guess this would be very implementation-dependent... ?

Side note: it does raise an interesting question for Astra actually. We'd want to automatically set a request ID generator if the user is using Astra... but that's only half the problem. In addition to generating IDs in the expected format we'd also want to make sure the custom payload is being added at the right key for Astra. Hmmm... that's an interesting problem.

@@ -248,6 +259,19 @@ private void sendRequest(
if (result.isDone()) {
return;
}
String nodeRequestId =
this.requestIdGenerator.getNodeRequestId(statement, logPrefix, currentExecutionIndex);
if (!this.customPayloadKey.isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not missing else block here?

Copy link
Contributor

@absurdfarce absurdfarce left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies all, I had to retreat to a cave for a bit and ponder some of the questions under consideration here as well as what was nagging me about the original API. I think I landed on a reasonable compromise that can be extended to address most (all?) of the outstanding concerns... but I'm not completely convinced of that yet. Comments welcomed/encouraged.

@@ -139,6 +140,10 @@ default SpeculativeExecutionPolicy getSpeculativeExecutionPolicy(@NonNull String
@NonNull
RequestTracker getRequestTracker();

/** @return The driver's request ID generator; never {@code null}. */
@NonNull
RequestIdGenerator getRequestIdGenerator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to argue this should actually return Optional<RequestIdGenerator>. I think part of the confusion for various other aspects of this ticket come down to (a) an impl which requires the driver to always have a request ID generator and (b) a confusion between a log prefix in the driver and what we're sending as a request ID.

* @return a unique identifier for the node request
*/
String getNodeRequestId(
@NonNull Request statement, @NonNull String sessionRequestId, int executionCount);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but that doesn't mean execution count is relevant to all implementations. It also begs the question of whether other things can/should be included for all implementations.

More generally, I'd argue it's inclusion here is primarily a function of the necessity of implementing the current log prefix as a request ID generator... which I'm not sure is a good idea (more on that elsewhere).

* @return a unique identifier for the node request
*/
String getNodeRequestId(
@NonNull Request statement, @NonNull String sessionRequestId, int executionCount);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In related news: how do we not include the node in question when we're generating a node request ID? Requests/Statements can have a node set as state but that's an optional thing a user can set in order to target a specific node; that's not automatically set for every request.

this.logPrefix = sessionLogPrefix + "|" + this.hashCode();
this.requestIdGenerator = context.getRequestIdGenerator();
this.logPrefix =
this.requestIdGenerator.getSessionRequestId(statement, sessionLogPrefix, this.hashCode());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the root cause of my problem with the API. I think we need to clearly distinguish between a log prefix and a request ID. If a user doesn't configure a request ID generator that's totally fine... that means:

  • Nothing is added to custom payload AND
  • The old logic for generating a logPrefix is employed

That means our request ID generator API doesn't have to be retrofitted to support the existing log prefix syntax. It also resolve the issue @aratno has raised elsewhere, specifically "how do we shut this off if we don't want it?"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This current implementation of request ID relies on the old log prefix implementation to propagate to other classes, like RequestLogger and InFlightHandler. If we separate request ID with log prefix, how do we propagate request ID?

.build();
// TODO: we are creating a new statement object for every request. We should optimize this.
statement = statement.setCustomPayload(customPayload);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the wrong place to do this. In most cases we haven't even selected the node yet; note that this happens immediately below where we poll the query plan if no node is explicitly set in the request. Assuming we update the request ID generation logic to correctly account for the target node the setting of custom payload fields should happen after we determine which node we're actually sending to.

}
# add the request id to the custom payload with the given key
# if empty, the request id will not be added to the custom payload
custom-payload-with-key = ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd actually collapse your second and third cases into one @aratno. I'd also specify the rule a bit differently:

If the client has configured a request ID generator we'll use that to generate a consistent request ID via the log prefix on the client side and the custom payload params delivered to the server. Otherwise we'll preserve the current log prefix on the client side and add nothing to the custom payload.

}
# add the request id to the custom payload with the given key
# if empty, the request id will not be added to the custom payload
custom-payload-with-key = ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably this will vary with the implementation, right @aratno? Individaul C* request handlers might want to map this value to some name that makes sense for them. So I guess this would be very implementation-dependent... ?

Side note: it does raise an interesting question for Astra actually. We'd want to automatically set a request ID generator if the user is using Astra... but that's only half the problem. In addition to generating IDs in the expected format we'd also want to make sure the custom payload is being added at the right key for Astra. Hmmm... that's an interesting problem.

*
* <p>Value-type: {@link String}
*/
REQUEST_ID_CUSTOM_PAYLOAD_KEY("advanced.request-id.custom-payload-with-key");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe @aratno is referring to the TypeSafe name @SiyaoIsHiding ... "custom-payload-key" rather than "custom-payload-with-key". Assuming that's correct I think he's on to something there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants