diff --git a/src/main/java/software/amazon/awssdk/crt/iot/MqttRequestResponse.java b/src/main/java/software/amazon/awssdk/crt/iot/MqttRequestResponse.java new file mode 100644 index 000000000..e9f40a89f --- /dev/null +++ b/src/main/java/software/amazon/awssdk/crt/iot/MqttRequestResponse.java @@ -0,0 +1,39 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.amazon.awssdk.crt.iot; + +/** + * Encapsulates a response to an AWS IoT Core MQTT-based service request + */ +public class MqttRequestResponse { + + private String topic; + private byte[] payload; + + private MqttRequestResponse() { + } + + /** + * Gets the MQTT topic that the response was received on. + * + * Different topics map to different types within the + * service model, so we need this value in order to know what to deserialize the payload into. + * + * @return the MQTT topic that the response was received on + */ + public String getTopic() { + return topic; + } + + /** + * Gets the payload of the response that correlates to a submitted request. + * + * @return Payload of the response that correlates to a submitted request. + */ + public byte[] getPayload() { + return payload; + } +} diff --git a/src/main/java/software/amazon/awssdk/crt/iot/MqttRequestResponseClient.java b/src/main/java/software/amazon/awssdk/crt/iot/MqttRequestResponseClient.java index c337e5dd3..8886469c0 100644 --- a/src/main/java/software/amazon/awssdk/crt/iot/MqttRequestResponseClient.java +++ b/src/main/java/software/amazon/awssdk/crt/iot/MqttRequestResponseClient.java @@ -10,6 +10,8 @@ import software.amazon.awssdk.crt.mqtt.MqttClientConnection; import software.amazon.awssdk.crt.mqtt5.Mqtt5Client; +import java.util.concurrent.CompletableFuture; + /** * A helper class for AWS service clients that use MQTT as the transport protocol. * @@ -20,7 +22,13 @@ */ public class MqttRequestResponseClient extends CrtResource { - public MqttRequestResponseClient(Mqtt5Client client, MqttRequestResponseClientBuilder.MqttRequestResponseClientOptions options) throws CrtRuntimeException { + /** + * MQTT5-based constructor for request-response service clients + * + * @param client MQTT5 client that the request-response client should use as transport + * @param options request-response client configuration options + */ + public MqttRequestResponseClient(Mqtt5Client client, MqttRequestResponseClientBuilder.MqttRequestResponseClientOptions options) { acquireNativeHandle(mqttRequestResponseClientNewFrom5( this, client.getNativeHandle(), @@ -30,7 +38,13 @@ public MqttRequestResponseClient(Mqtt5Client client, MqttRequestResponseClientBu )); } - public MqttRequestResponseClient(MqttClientConnection client, MqttRequestResponseClientBuilder.MqttRequestResponseClientOptions options) throws CrtRuntimeException { + /** + * MQTT311-based constructor for request-response service clients + * + * @param client MQTT311 client that the request-response client should use as transport + * @param options request-response client configuration options + */ + public MqttRequestResponseClient(MqttClientConnection client, MqttRequestResponseClientBuilder.MqttRequestResponseClientOptions options) { acquireNativeHandle(mqttRequestResponseClientNewFrom311( this, client.getNativeHandle(), @@ -40,6 +54,21 @@ public MqttRequestResponseClient(MqttClientConnection client, MqttRequestRespons )); } + /** + * Submits a request to the request-response client. + * + * @param request description of the request to perform + * + * @return future that completes with the result of performing the request + */ + public CompletableFuture submitRequest(RequestResponseOperation request) { + CompletableFuture future = new CompletableFuture<>(); + + mqttRequestResponseClientSubmitRequest(getNativeHandle(), request, future); + + return future; + } + /** * Cleans up the native resources associated with this client. The client is unusable after this call */ @@ -78,4 +107,7 @@ private static native long mqttRequestResponseClientNewFrom311( ) throws CrtRuntimeException; private static native void mqttRequestResponseClientDestroy(long client); + + private static native void mqttRequestResponseClientSubmitRequest(long client, RequestResponseOperation request, CompletableFuture future); + } diff --git a/src/main/java/software/amazon/awssdk/crt/iot/RequestResponseOperation.java b/src/main/java/software/amazon/awssdk/crt/iot/RequestResponseOperation.java new file mode 100644 index 000000000..680352d14 --- /dev/null +++ b/src/main/java/software/amazon/awssdk/crt/iot/RequestResponseOperation.java @@ -0,0 +1,125 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.amazon.awssdk.crt.iot; + +import java.util.ArrayList; + +/** + * Configuration options for an MQTT-based request-response operation. + */ +public class RequestResponseOperation { + + /** + * Builder class for RequestResponseOperation instances + */ + public static class RequestResponseOperationBuilder { + + private RequestResponseOperation request = new RequestResponseOperation(); + + private RequestResponseOperationBuilder() {} + + /** + * Adds a response path to the set of all possible response paths associated with this request. + * + * @param path response paths to associate with this request + * + * @return the builder object + */ + public RequestResponseOperationBuilder withResponsePath(ResponsePath path) { + request.responsePaths.add(path); + return this; + } + + /** + * Adds a topic filter to the set of topic filters that should be subscribed to in order to cover all possible + * response paths. Sometimes using wildcards can cut down on the subscriptions needed; other times that isn't + * possible to do correctly. + * + * @param topicFilter topic filter to subscribe to in order to cover one or more response paths + * + * @return the builder object + */ + public RequestResponseOperationBuilder withSubscription(String topicFilter) { + request.subscriptions.add(topicFilter); + return this; + } + + /** + * Sets the topic to publish the request to once response subscriptions have been established. + * + * @param publishTopic topic to publish the request to once response subscriptions have been established + * + * @return the builder object + */ + public RequestResponseOperationBuilder withPublishTopic(String publishTopic) { + request.publishTopic = publishTopic; + return this; + } + + /** + * Sets the payload to publish to 'publishTopic' in order to initiate the request. + * + * @param payload payload to publish to 'publishTopic' in order to initiate the request + * + * @return the builder object + */ + public RequestResponseOperationBuilder withPayload(byte[] payload) { + request.payload = payload; + return this; + } + + /** + * Sets the correlation token embedded in the request that must be found in a response message. + * + * This can be null to support certain services which don't use correlation tokens. In that case, the client + * only allows one token-less request at a time. + * + * @param correlationToken the correlation token embedded in the request that must be found in a response message + * + * @return the builder object + */ + public RequestResponseOperationBuilder withCorrelationToken(String correlationToken) { + request.correlationToken = correlationToken; + return this; + } + + /** + * Creates a new RequestResponseOperation instance based on the builder's configuration. + * + * @return a new RequestResponseOperation instance based on the builder's configuration + */ + public RequestResponseOperation build() { + return new RequestResponseOperation(request); + } + } + + private ArrayList responsePaths = new ArrayList<>(); + private ArrayList subscriptions = new ArrayList<>(); + private String publishTopic; + private String correlationToken; + private byte[] payload; + + private RequestResponseOperation() { + } + + private RequestResponseOperation(RequestResponseOperation request) { + this.responsePaths.addAll(request.responsePaths); + this.subscriptions.addAll(request.subscriptions); + this.publishTopic = request.publishTopic; + this.correlationToken = request.correlationToken; + this.payload = request.payload; + } + + /** + * Creates a new builder for RequestResponseOperations objects + * + * @return a new builder instance for RequestResponseOperations objects + */ + public static RequestResponseOperationBuilder builder() { + return new RequestResponseOperationBuilder(); + } + +} diff --git a/src/main/java/software/amazon/awssdk/crt/iot/ResponsePath.java b/src/main/java/software/amazon/awssdk/crt/iot/ResponsePath.java new file mode 100644 index 000000000..c2f1330cc --- /dev/null +++ b/src/main/java/software/amazon/awssdk/crt/iot/ResponsePath.java @@ -0,0 +1,77 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.amazon.awssdk.crt.iot; + +/** + * A response path is a pair of values - MQTT topic and a JSON path - that describe how a response to + * an MQTT-based request may arrive. For a given request type, there may be multiple response paths and each + * one is associated with a separate JSON schema for the response body. + */ +public class ResponsePath { + + /** + * Builder type for ResponsePath instances + */ + public static class ResponsePathBuilder { + + private ResponsePath path = new ResponsePath(); + + private ResponsePathBuilder() {} + + /** + * Fluent setter for the MQTT topic associated with this response path. + * + * @param responseTopic MQTT topic associated with this response path + * + * @return the builder object + */ + public ResponsePathBuilder withResponseTopic(String responseTopic) { + path.responseTopic = responseTopic; + return this; + } + + /** + * Fluent setter for the JSON path for finding correlation tokens within payloads that arrive on this response path's topic. + * + * @param correlationTokenJsonPath JSON path for finding correlation tokens within payloads that arrive on this response path's topic + * + * @return the builder object + */ + public ResponsePathBuilder withCorrelationTokenJsonPath(String correlationTokenJsonPath) { + path.correlationTokenJsonPath = correlationTokenJsonPath; + return this; + } + + /** + * Creates a new ResponsePath instance based on the builder's configuration. + * + * @return a new ResponsePath instance based on the builder's configuration + */ + public ResponsePath build() { + return new ResponsePath(path); + } + } + + private String responseTopic; + private String correlationTokenJsonPath; + + private ResponsePath() { + } + + private ResponsePath(ResponsePath path) { + this.responseTopic = path.responseTopic; + this.correlationTokenJsonPath = path.correlationTokenJsonPath; + } + + /** + * Creates a new builder instance for ResponsePath instances. + * + * @return a new builder instance for ResponsePath instances + */ + public static ResponsePathBuilder builder() { + return new ResponsePathBuilder(); + } +} diff --git a/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json b/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json index 6e57a92c7..c5601a0d1 100644 --- a/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json +++ b/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json @@ -135,6 +135,16 @@ { "name": "", "parameterTypes": [] + }, + { + "name": "get", + "parameterTypes": [ + "int" + ] + }, + { + "name": "size", + "parameterTypes": [] } ] }, @@ -956,6 +966,54 @@ } ] }, + { + "name": "software.amazon.awssdk.crt.iot.MqttRequestResponse", + "methods": [ + { + "name": "", + "parameterTypes": [] + } + ], + "fields": [ + { + "name": "topic" + }, + { + "name": "payload" + } + ] + }, + { + "name": "software.amazon.awssdk.crt.iot.RequestResponseOperation", + "fields": [ + { + "name": "responsePaths" + }, + { + "name": "subscriptions" + }, + { + "name": "publishTopic" + }, + { + "name": "payload" + }, + { + "name": "correlationToken" + } + ] + }, + { + "name": "software.amazon.awssdk.crt.iot.ResponsePath", + "fields": [ + { + "name": "responseTopic" + }, + { + "name": "correlationTokenJsonPath" + } + ] + }, { "name": "software.amazon.awssdk.crt.mqtt.MqttClientConnection", "methods": [ diff --git a/src/native/java_class_ids.c b/src/native/java_class_ids.c index 414c35244..de8cb676b 100644 --- a/src/native/java_class_ids.c +++ b/src/native/java_class_ids.c @@ -2269,6 +2269,14 @@ static void s_cache_boxed_array_list(JNIEnv *env) { boxed_array_list_properties.list_constructor_id = (*env)->GetMethodID(env, boxed_array_list_properties.list_class, "", "()V"); AWS_FATAL_ASSERT(boxed_array_list_properties.list_constructor_id); + + boxed_array_list_properties.get_method_id = + (*env)->GetMethodID(env, boxed_array_list_properties.list_class, "get", "(I)Ljava/lang/Object;"); + AWS_FATAL_ASSERT(boxed_array_list_properties.get_method_id); + + boxed_array_list_properties.size_method_id = + (*env)->GetMethodID(env, boxed_array_list_properties.list_class, "size", "()I"); + AWS_FATAL_ASSERT(boxed_array_list_properties.size_method_id); } struct java_s3express_credentials_provider_factory_properties s3express_credentials_provider_factory_properties; @@ -2322,6 +2330,69 @@ static void s_cache_s3express_credentials_provider_properties(JNIEnv *env) { AWS_FATAL_ASSERT(s3express_credentials_provider_properties.destroyProvider); } +struct java_response_path_properties response_path_properties; + +static void s_cache_response_path_properties(JNIEnv *env) { + jclass cls = (*env)->FindClass(env, "software/amazon/awssdk/crt/iot/ResponsePath"); + AWS_FATAL_ASSERT(cls); + + response_path_properties.response_path_class = (*env)->NewGlobalRef(env, cls); + + response_path_properties.response_topic_field_id = + (*env)->GetFieldID(env, cls, "responseTopic", "Ljava/lang/String;"); + AWS_FATAL_ASSERT(response_path_properties.response_topic_field_id); + + response_path_properties.correlation_token_json_path_field_id = + (*env)->GetFieldID(env, cls, "correlationTokenJsonPath", "Ljava/lang/String;"); + AWS_FATAL_ASSERT(response_path_properties.correlation_token_json_path_field_id); +} + +struct java_request_response_operation_properties request_response_operation_properties; + +static void s_cache_request_response_operation_properties(JNIEnv *env) { + jclass cls = (*env)->FindClass(env, "software/amazon/awssdk/crt/iot/RequestResponseOperation"); + AWS_FATAL_ASSERT(cls); + + request_response_operation_properties.request_response_operation_class = (*env)->NewGlobalRef(env, cls); + + request_response_operation_properties.response_paths_field_id = + (*env)->GetFieldID(env, cls, "responsePaths", "Ljava/util/ArrayList;"); + AWS_FATAL_ASSERT(request_response_operation_properties.response_paths_field_id); + + request_response_operation_properties.subscriptions_field_id = + (*env)->GetFieldID(env, cls, "subscriptions", "Ljava/util/ArrayList;"); + AWS_FATAL_ASSERT(request_response_operation_properties.subscriptions_field_id); + + request_response_operation_properties.publish_topic_field_id = + (*env)->GetFieldID(env, cls, "publishTopic", "Ljava/lang/String;"); + AWS_FATAL_ASSERT(request_response_operation_properties.publish_topic_field_id); + + request_response_operation_properties.payload_field_id = (*env)->GetFieldID(env, cls, "payload", "[B"); + AWS_FATAL_ASSERT(request_response_operation_properties.payload_field_id); + + request_response_operation_properties.correlation_token_field_id = + (*env)->GetFieldID(env, cls, "correlationToken", "Ljava/lang/String;"); + AWS_FATAL_ASSERT(request_response_operation_properties.correlation_token_field_id); +} + +struct java_mqtt_request_response_properties mqtt_request_response_properties; + +static void s_cache_mqtt_request_response_properties(JNIEnv *env) { + jclass cls = (*env)->FindClass(env, "software/amazon/awssdk/crt/iot/MqttRequestResponse"); + AWS_FATAL_ASSERT(cls); + + mqtt_request_response_properties.mqtt_request_response_class = (*env)->NewGlobalRef(env, cls); + + mqtt_request_response_properties.constructor_method_id = + (*env)->GetMethodID(env, mqtt_request_response_properties.mqtt_request_response_class, "", "()V"); + + mqtt_request_response_properties.topic_field_id = (*env)->GetFieldID(env, cls, "topic", "Ljava/lang/String;"); + AWS_FATAL_ASSERT(mqtt_request_response_properties.topic_field_id); + + mqtt_request_response_properties.payload_field_id = (*env)->GetFieldID(env, cls, "payload", "[B"); + AWS_FATAL_ASSERT(mqtt_request_response_properties.payload_field_id); +} + static void s_cache_java_class_ids(void *user_data) { JNIEnv *env = user_data; s_cache_http_request_body_stream(env); @@ -2427,6 +2498,9 @@ static void s_cache_java_class_ids(void *user_data) { s_cache_mqtt5_outbound_topic_alias_behavior_type(env); s_cache_mqtt5_inbound_topic_alias_behavior_type(env); s_cache_topic_aliasing_options(env); + s_cache_response_path_properties(env); + s_cache_request_response_operation_properties(env); + s_cache_mqtt_request_response_properties(env); } static aws_thread_once s_cache_once_init = AWS_THREAD_ONCE_STATIC_INIT; diff --git a/src/native/java_class_ids.h b/src/native/java_class_ids.h index 1572b45bb..63b66de9f 100644 --- a/src/native/java_class_ids.h +++ b/src/native/java_class_ids.h @@ -943,6 +943,9 @@ extern struct java_boxed_list_properties boxed_list_properties; struct java_boxed_array_list_properties { jclass list_class; jmethodID list_constructor_id; + + jmethodID get_method_id; + jmethodID size_method_id; }; extern struct java_boxed_array_list_properties boxed_array_list_properties; @@ -968,6 +971,35 @@ struct java_s3express_credentials_provider_properties { }; extern struct java_s3express_credentials_provider_properties s3express_credentials_provider_properties; +/* ResponsePath */ +struct java_response_path_properties { + jclass response_path_class; + jfieldID response_topic_field_id; + jfieldID correlation_token_json_path_field_id; +}; +extern struct java_response_path_properties response_path_properties; + +/* RequestResponseOperation */ +struct java_request_response_operation_properties { + jclass request_response_operation_class; + + jfieldID response_paths_field_id; + jfieldID subscriptions_field_id; + jfieldID publish_topic_field_id; + jfieldID payload_field_id; + jfieldID correlation_token_field_id; +}; +extern struct java_request_response_operation_properties request_response_operation_properties; + +/* MqttRequestResponse */ +struct java_mqtt_request_response_properties { + jclass mqtt_request_response_class; + jmethodID constructor_method_id; + jfieldID topic_field_id; + jfieldID payload_field_id; +}; +extern struct java_mqtt_request_response_properties mqtt_request_response_properties; + /** * All functions bound to JNI MUST call this before doing anything else. * This caches all JNI IDs the first time it is called. Any further calls are no-op; it is thread-safe. diff --git a/src/native/mqtt_request_response.c b/src/native/mqtt_request_response.c index 69457a080..a82c9bad3 100644 --- a/src/native/mqtt_request_response.c +++ b/src/native/mqtt_request_response.c @@ -212,6 +212,352 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_iot_MqttRequestResponseCl } } +struct aws_request_response_operation_binding { + struct aws_allocator *allocator; + + JavaVM *jvm; + + jobject operation_future; +}; + +static void s_aws_request_response_operation_binding_destroy(struct aws_request_response_operation_binding *binding) { + if (!binding) { + return; + } + + JNIEnv *env = aws_jni_acquire_thread_env(binding->jvm); + + if (env && binding->operation_future) { + (*env)->DeleteGlobalRef(env, binding->operation_future); + } + + aws_jni_release_thread_env(binding->jvm, env); + + aws_mem_release(binding->allocator, binding); +} + +/* +All cursors in these structures are from calls to aws_jni_byte_cursor_from_jstring_acquire and the like which means +non-null ptrs must be released with the appropriate JNI call, hence the cleanup implementation. +*/ + +struct aws_jni_response_path { + jstring java_response_topic; + struct aws_byte_cursor response_topic; + + jstring java_correlation_token_json_path; + struct aws_byte_cursor correlation_token_json_path; +}; + +struct aws_jni_subscription { + jstring java_topic_filter; + struct aws_byte_cursor topic_filter; +}; + +struct aws_request_response_operation_jni_owned_parameters { + struct aws_array_list response_paths; + struct aws_array_list subscriptions; + + jstring java_publish_topic; + struct aws_byte_cursor publish_topic; + + jbyteArray java_payload; + struct aws_byte_cursor payload; + + jstring java_correlation_token; + struct aws_byte_cursor correlation_token; +}; + +static void s_aws_request_response_operation_parameters_init( + struct aws_request_response_operation_jni_owned_parameters *params, + struct aws_allocator *allocator) { + AWS_ZERO_STRUCT(*params); + + aws_array_list_init_dynamic(¶ms->response_paths, allocator, 2, sizeof(struct aws_jni_response_path)); + aws_array_list_init_dynamic(¶ms->subscriptions, allocator, 2, sizeof(struct aws_jni_subscription)); +} + +static void s_aws_request_response_operation_jni_owned_parameters_clean_up( + struct aws_request_response_operation_jni_owned_parameters *params, + JNIEnv *env) { + if (!params) { + return; + } + + for (size_t i = 0; i < aws_array_list_length(¶ms->response_paths); ++i) { + struct aws_jni_response_path response_path; + AWS_ZERO_STRUCT(response_path); + + aws_array_list_get_at(¶ms->response_paths, &response_path, i); + + aws_jni_byte_cursor_from_jstring_release(env, response_path.java_response_topic, response_path.response_topic); + aws_jni_byte_cursor_from_jstring_release( + env, response_path.java_correlation_token_json_path, response_path.correlation_token_json_path); + } + aws_array_list_clean_up(¶ms->response_paths); + + for (size_t i = 0; i < aws_array_list_length(¶ms->subscriptions); ++i) { + struct aws_jni_subscription subscription; + AWS_ZERO_STRUCT(subscription); + + aws_array_list_get_at(¶ms->subscriptions, &subscription, i); + aws_jni_byte_cursor_from_jstring_release(env, subscription.java_topic_filter, subscription.topic_filter); + } + aws_array_list_clean_up(¶ms->subscriptions); + + aws_jni_byte_cursor_from_jstring_release(env, params->java_publish_topic, params->publish_topic); + aws_jni_byte_cursor_from_jbyteArray_release(env, params->java_payload, params->payload); + aws_jni_byte_cursor_from_jstring_release(env, params->java_correlation_token, params->correlation_token); +} + +static int s_aws_request_response_operation_jni_owned_parameters_init_from_jobject( + struct aws_request_response_operation_jni_owned_parameters *params, + struct aws_allocator *allocator, + jobject java_request_response_operation, + JNIEnv *env) { + s_aws_request_response_operation_parameters_init(params, allocator); + + if (!java_request_response_operation) { + aws_jni_throw_runtime_exception( + env, "mqttRequestResponseClientSubmitRequest - request response options are null"); + return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + } + + // responsePaths + jobject java_response_paths = (jobject)(*env)->GetObjectField( + env, java_request_response_operation, request_response_operation_properties.response_paths_field_id); + jint response_path_count = + (*env)->CallIntMethod(env, java_response_paths, boxed_array_list_properties.size_method_id); + if (response_path_count <= 0) { + aws_jni_throw_runtime_exception(env, "mqttRequestResponseClientSubmitRequest - response paths is empty"); + return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + } + + for (size_t i = 0; i < (size_t)response_path_count; ++i) { + jobject java_response_path = + (*env)->CallObjectMethod(env, java_response_paths, boxed_array_list_properties.get_method_id, (jint)i); + if (!java_response_path) { + aws_jni_throw_runtime_exception(env, "mqttRequestResponseClientSubmitRequest - null response path"); + return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + } + + jstring java_response_topic = + (jstring)(*env)->GetObjectField(env, java_response_path, response_path_properties.response_topic_field_id); + if (!java_response_topic) { + aws_jni_throw_runtime_exception( + env, "mqttRequestResponseClientSubmitRequest - null response path response topic"); + return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + } + + struct aws_jni_response_path response_path = { + .java_response_topic = java_response_topic, + .response_topic = aws_jni_byte_cursor_from_jstring_acquire(env, java_response_topic), + }; + + jstring java_correlation_token_json_path = (jstring)(*env)->GetObjectField( + env, java_response_path, response_path_properties.correlation_token_json_path_field_id); + + if (java_correlation_token_json_path) { + response_path.java_correlation_token_json_path = java_correlation_token_json_path; + response_path.correlation_token_json_path = + aws_jni_byte_cursor_from_jstring_acquire(env, java_correlation_token_json_path); + } + + aws_array_list_push_back(¶ms->response_paths, &response_path); + } + + // subscriptions + jobject java_subscriptions = (jobject)(*env)->GetObjectField( + env, java_request_response_operation, request_response_operation_properties.subscriptions_field_id); + jint subscription_count = + (*env)->CallIntMethod(env, java_subscriptions, boxed_array_list_properties.size_method_id); + if (subscription_count <= 0) { + aws_jni_throw_runtime_exception(env, "mqttRequestResponseClientSubmitRequest - subscriptions is empty"); + return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + } + + for (size_t i = 0; i < (size_t)subscription_count; ++i) { + jstring java_subscription_topic_filter = + (*env)->CallObjectMethod(env, java_subscriptions, boxed_array_list_properties.get_method_id, (jint)i); + if (!java_subscription_topic_filter) { + aws_jni_throw_runtime_exception(env, "mqttRequestResponseClientSubmitRequest - null subscription"); + return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + } + + struct aws_jni_subscription subscription = { + .java_topic_filter = java_subscription_topic_filter, + .topic_filter = aws_jni_byte_cursor_from_jstring_acquire(env, java_subscription_topic_filter), + }; + + aws_array_list_push_back(¶ms->subscriptions, &subscription); + } + + // publishTopic + params->java_publish_topic = (jstring)(*env)->GetObjectField( + env, java_request_response_operation, request_response_operation_properties.publish_topic_field_id); + if (!params->java_publish_topic) { + aws_jni_throw_runtime_exception(env, "mqttRequestResponseClientSubmitRequest - publish topic is null"); + return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + } + params->publish_topic = aws_jni_byte_cursor_from_jstring_acquire(env, params->java_publish_topic); + + // payload + params->java_payload = (jbyteArray)(*env)->GetObjectField( + env, java_request_response_operation, request_response_operation_properties.payload_field_id); + if (!params->java_payload) { + aws_jni_throw_runtime_exception(env, "mqttRequestResponseClientSubmitRequest - payload is null"); + return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + } + params->payload = aws_jni_byte_cursor_from_jbyteArray_acquire(env, params->java_payload); + + // correlationToken (optional) + params->java_correlation_token = (jstring)(*env)->GetObjectField( + env, java_request_response_operation, request_response_operation_properties.correlation_token_field_id); + if (params->java_correlation_token) { + params->correlation_token = aws_jni_byte_cursor_from_jstring_acquire(env, params->java_correlation_token); + } + + return AWS_OP_SUCCESS; +} + +static void s_on_request_response_operation_completion( + const struct aws_byte_cursor *response_topic, + const struct aws_byte_cursor *payload, + int error_code, + void *user_data) { + + struct aws_request_response_operation_binding *binding = user_data; + JNIEnv *env = aws_jni_acquire_thread_env(binding->jvm); + if (!env) { + goto done; + } + + jobject java_result = NULL; + + if (error_code == AWS_ERROR_SUCCESS) { + java_result = (*env)->NewObject( + env, + mqtt_request_response_properties.mqtt_request_response_class, + mqtt_request_response_properties.constructor_method_id); + if (java_result != NULL) { + jstring java_topic = aws_jni_string_from_cursor(env, response_topic); + (*env)->SetObjectField(env, java_result, mqtt_request_response_properties.topic_field_id, java_topic); + + jbyteArray java_payload = aws_jni_byte_array_from_cursor(env, payload); + (*env)->SetObjectField(env, java_result, mqtt_request_response_properties.payload_field_id, java_payload); + } + } + + if (java_result != NULL) { + (*env)->CallBooleanMethod( + env, binding->operation_future, completable_future_properties.complete_method_id, java_result); + } else { + int final_error_code = (error_code == AWS_ERROR_SUCCESS) ? AWS_ERROR_UNKNOWN : error_code; + jobject crt_exception = aws_jni_new_crt_exception_from_error_code(env, final_error_code); + + (*env)->CallBooleanMethod( + env, + binding->operation_future, + completable_future_properties.complete_exceptionally_method_id, + crt_exception); + } + +done: + + aws_jni_release_thread_env(binding->jvm, env); + + s_aws_request_response_operation_binding_destroy(binding); +} + +JNIEXPORT void JNICALL + Java_software_amazon_awssdk_crt_iot_MqttRequestResponseClient_mqttRequestResponseClientSubmitRequest( + JNIEnv *env, + jclass jni_class, + jlong jni_mqtt_request_response_client_handle, + jobject java_request, + jobject java_result_future) { + + (void)jni_class; + + struct aws_crt_mqtt_request_response_client_binding *rr_client_binding = + (struct aws_crt_mqtt_request_response_client_binding *)jni_mqtt_request_response_client_handle; + struct aws_mqtt_request_response_client *rr_client = rr_client_binding->client; + if (!rr_client || !java_request || !java_result_future) { + aws_jni_throw_runtime_exception(env, "mqttRequestResponseClientSubmitRequest: null parameter"); + return; + } + + JavaVM *jvm = NULL; + jint jvmresult = (*env)->GetJavaVM(env, &jvm); + if (jvmresult != 0) { + aws_jni_throw_runtime_exception(env, "mqttRequestResponseClientSubmitRequest: failed to get JVM"); + return; + } + + struct aws_request_response_operation_binding *binding = NULL; + struct aws_allocator *allocator = aws_jni_get_allocator(); + struct aws_request_response_operation_jni_owned_parameters request_params; + if (s_aws_request_response_operation_jni_owned_parameters_init_from_jobject( + &request_params, allocator, java_request, env)) { + s_aws_request_response_operation_jni_owned_parameters_clean_up(&request_params, env); + return; + } + + binding = aws_mem_calloc(allocator, 1, sizeof(struct aws_request_response_operation_binding)); + binding->allocator = allocator; + binding->operation_future = (*env)->NewGlobalRef(env, java_result_future); + binding->jvm = jvm; + + struct aws_mqtt_request_operation_options request_options; + AWS_ZERO_STRUCT(request_options); + + size_t subscription_count = aws_array_list_length(&request_params.subscriptions); + AWS_VARIABLE_LENGTH_ARRAY(struct aws_byte_cursor, subscriptions, subscription_count); + for (size_t i = 0; i < subscription_count; ++i) { + struct aws_jni_subscription subscription; + AWS_ZERO_STRUCT(subscription); + aws_array_list_get_at(&request_params.subscriptions, &subscription, i); + + subscriptions[i] = subscription.topic_filter; + } + + size_t response_path_count = aws_array_list_length(&request_params.response_paths); + AWS_VARIABLE_LENGTH_ARRAY(struct aws_mqtt_request_operation_response_path, response_paths, response_path_count); + for (size_t i = 0; i < response_path_count; ++i) { + struct aws_jni_response_path response_path; + AWS_ZERO_STRUCT(response_path); + aws_array_list_get_at(&request_params.response_paths, &response_path, i); + + response_paths[i].topic = response_path.response_topic; + response_paths[i].correlation_token_json_path = response_path.correlation_token_json_path; + } + + request_options.subscription_topic_filters = subscriptions; + request_options.subscription_topic_filter_count = subscription_count; + request_options.response_paths = response_paths; + request_options.response_path_count = response_path_count; + request_options.publish_topic = request_params.publish_topic; + request_options.serialized_request = request_params.payload; + request_options.correlation_token = request_params.correlation_token; + request_options.completion_callback = s_on_request_response_operation_completion; + request_options.user_data = binding; + + if (aws_mqtt_request_response_client_submit_request(rr_client, &request_options)) { + aws_jni_throw_runtime_exception(env, "mqttRequestResponseClientSubmitRequest - failed to submit request"); + goto error; + } + + goto done; + +error: + + s_aws_request_response_operation_binding_destroy(binding); + +done: + + s_aws_request_response_operation_jni_owned_parameters_clean_up(&request_params, env); +} + #if UINTPTR_MAX == 0xffffffff # if defined(_MSC_VER) # pragma warning(pop) diff --git a/src/test/java/software/amazon/awssdk/crt/test/MqttRequestResponseClientTests.java b/src/test/java/software/amazon/awssdk/crt/test/MqttRequestResponseClientTests.java index 85757bdcb..8b1c45080 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/MqttRequestResponseClientTests.java +++ b/src/test/java/software/amazon/awssdk/crt/test/MqttRequestResponseClientTests.java @@ -5,14 +5,19 @@ package software.amazon.awssdk.crt.test; +import org.junit.After; +import org.junit.Assert; import org.junit.Assume; import org.junit.Test; import software.amazon.awssdk.crt.CRT; import software.amazon.awssdk.crt.CrtRuntimeException; import software.amazon.awssdk.crt.io.TlsContext; import software.amazon.awssdk.crt.io.TlsContextOptions; +import software.amazon.awssdk.crt.iot.MqttRequestResponse; import software.amazon.awssdk.crt.iot.MqttRequestResponseClient; import software.amazon.awssdk.crt.iot.MqttRequestResponseClientBuilder; +import software.amazon.awssdk.crt.iot.RequestResponseOperation; +import software.amazon.awssdk.crt.iot.ResponsePath; import software.amazon.awssdk.crt.mqtt.*; import software.amazon.awssdk.crt.mqtt5.Mqtt5Client; import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions; @@ -23,13 +28,68 @@ import software.amazon.awssdk.crt.mqtt5.OnStoppedReturn; import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket; - +import java.nio.charset.StandardCharsets; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; public class MqttRequestResponseClientTests extends CrtTestFixture { + public enum MqttVersion { + Mqtt5, + Mqtt311 + } + + public class TestContext { + public Mqtt5Client mqtt5Client; + public MqttClientConnection mqtt311Client; + public MqttRequestResponseClient rrClient; + + public TestContext(MqttVersion version, MqttRequestResponseClientBuilder builder) { + if (builder == null) { + builder = createRequestResponseClientBuilder(); + } + + if (version == MqttVersion.Mqtt5) { + try (Mqtt5Client protocolClient = createMqtt5Client(); + MqttRequestResponseClient rrClient = builder.build(protocolClient)) { + this.mqtt5Client = protocolClient; + this.rrClient = rrClient; + this.mqtt5Client.addRef(); + this.rrClient.addRef(); + } + } else { + try (MqttClientConnection protocolClient = createMqtt311Client(); + MqttRequestResponseClient rrClient = builder.build(protocolClient)) { + this.mqtt311Client = protocolClient; + this.rrClient = rrClient; + this.mqtt311Client.addRef(); + this.rrClient.addRef(); + } + } + } + + public void close() { + if (this.rrClient != null) { + rrClient.close(); + } + + if (this.mqtt5Client != null) { + this.mqtt5Client.stop(); + this.mqtt5Client.close(); + } + + if (this.mqtt311Client != null) { + this.mqtt311Client.disconnect(); + this.mqtt311Client.close(); + } + } + } + + public TestContext context; + static final boolean AWS_GRAALVM_CI = System.getProperty("AWS_GRAALVM_CI") != null; static final String AWS_TEST_MQTT5_IOT_CORE_HOST = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_HOST"); @@ -134,120 +194,459 @@ static private MqttClientConnection createMqtt311Client() { } } + static public MqttRequestResponseClientBuilder createRequestResponseClientBuilder() { + MqttRequestResponseClientBuilder rrBuilder = new MqttRequestResponseClientBuilder(); + rrBuilder.withMaxRequestResponseSubscriptions(4) + .withMaxStreamingSubscriptions(2) + .withOperationTimeoutSeconds(30); + + return rrBuilder; + } + + @After + public void cleanupContext() { + if (this.context != null) { + this.context.close(); + } + } + @Test public void CreateDestroyMqtt5() { skipIfNetworkUnavailable(); - try (Mqtt5Client protocolClient = createMqtt5Client()) { - MqttRequestResponseClientBuilder rrBuilder = new MqttRequestResponseClientBuilder(); - rrBuilder.withMaxRequestResponseSubscriptions(4) - .withMaxStreamingSubscriptions(2) - .withOperationTimeoutSeconds(30); - - MqttRequestResponseClient rrClient = rrBuilder.build(protocolClient); - rrClient.close(); - protocolClient.stop(); - } + this.context = new TestContext(MqttVersion.Mqtt5, null); } @Test(expected = CrtRuntimeException.class) public void Mqtt5CreateFailureBadMaxRequestResponseSubscriptions() { skipIfNetworkUnavailable(); - try (Mqtt5Client protocolClient = createMqtt5Client()) { - MqttRequestResponseClientBuilder rrBuilder = new MqttRequestResponseClientBuilder(); - rrBuilder.withMaxRequestResponseSubscriptions(0) - .withMaxStreamingSubscriptions(2) - .withOperationTimeoutSeconds(30); + MqttRequestResponseClientBuilder rrBuilder = new MqttRequestResponseClientBuilder() + .withMaxRequestResponseSubscriptions(0) + .withMaxStreamingSubscriptions(2) + .withOperationTimeoutSeconds(30); - rrBuilder.build(protocolClient); - } + this.context = new TestContext(MqttVersion.Mqtt5, rrBuilder); } @Test(expected = CrtRuntimeException.class) public void Mqtt5CreateFailureBadMaxStreamingSubscriptions() { skipIfNetworkUnavailable(); + MqttRequestResponseClientBuilder rrBuilder = new MqttRequestResponseClientBuilder() + .withMaxRequestResponseSubscriptions(4) + .withMaxStreamingSubscriptions(-1) + .withOperationTimeoutSeconds(30); - try (Mqtt5Client protocolClient = createMqtt5Client()) { - MqttRequestResponseClientBuilder rrBuilder = new MqttRequestResponseClientBuilder(); - rrBuilder.withMaxRequestResponseSubscriptions(4) - .withMaxStreamingSubscriptions(-1) - .withOperationTimeoutSeconds(30); - - rrBuilder.build(protocolClient); - } + this.context = new TestContext(MqttVersion.Mqtt5, rrBuilder); } @Test(expected = CrtRuntimeException.class) public void Mqtt5CreateFailureBadOperationTimeout() { skipIfNetworkUnavailable(); - try (Mqtt5Client protocolClient = createMqtt5Client()) { - MqttRequestResponseClientBuilder rrBuilder = new MqttRequestResponseClientBuilder(); - rrBuilder.withMaxRequestResponseSubscriptions(4) - .withMaxStreamingSubscriptions(2) - .withOperationTimeoutSeconds(-5); + MqttRequestResponseClientBuilder rrBuilder = new MqttRequestResponseClientBuilder() + .withMaxRequestResponseSubscriptions(4) + .withMaxStreamingSubscriptions(2) + .withOperationTimeoutSeconds(-5); - rrBuilder.build(protocolClient); - } + this.context = new TestContext(MqttVersion.Mqtt5, rrBuilder); } @Test public void CreateDestroyMqtt311() { skipIfNetworkUnavailable(); - try (MqttClientConnection protocolClient = createMqtt311Client()) { - MqttRequestResponseClientBuilder rrBuilder = new MqttRequestResponseClientBuilder(); - rrBuilder.withMaxRequestResponseSubscriptions(4) - .withMaxStreamingSubscriptions(2) - .withOperationTimeoutSeconds(30); - - MqttRequestResponseClient rrClient = rrBuilder.build(protocolClient); - rrClient.close(); - - protocolClient.disconnect(); - } + this.context = new TestContext(MqttVersion.Mqtt311, null); } @Test(expected = CrtRuntimeException.class) public void Mqtt311CreateFailureBadMaxRequestResponseSubscriptions() { skipIfNetworkUnavailable(); - try (MqttClientConnection protocolClient = createMqtt311Client()) { - MqttRequestResponseClientBuilder rrBuilder = new MqttRequestResponseClientBuilder(); - rrBuilder.withMaxRequestResponseSubscriptions(0) - .withMaxStreamingSubscriptions(2) - .withOperationTimeoutSeconds(30); + MqttRequestResponseClientBuilder rrBuilder = new MqttRequestResponseClientBuilder() + .withMaxRequestResponseSubscriptions(0) + .withMaxStreamingSubscriptions(2) + .withOperationTimeoutSeconds(30); - rrBuilder.build(protocolClient); - } + this.context = new TestContext(MqttVersion.Mqtt311, rrBuilder); } @Test(expected = CrtRuntimeException.class) public void Mqtt311CreateFailureBadMaxStreamingSubscriptions() { skipIfNetworkUnavailable(); - try (MqttClientConnection protocolClient = createMqtt311Client()) { - MqttRequestResponseClientBuilder rrBuilder = new MqttRequestResponseClientBuilder(); - rrBuilder.withMaxRequestResponseSubscriptions(4) - .withMaxStreamingSubscriptions(-1) - .withOperationTimeoutSeconds(30); + MqttRequestResponseClientBuilder rrBuilder = new MqttRequestResponseClientBuilder() + .withMaxRequestResponseSubscriptions(4) + .withMaxStreamingSubscriptions(-1) + .withOperationTimeoutSeconds(30); - rrBuilder.build(protocolClient); - } + this.context = new TestContext(MqttVersion.Mqtt311, rrBuilder); } @Test(expected = CrtRuntimeException.class) public void Mqtt311CreateFailureBadOperationTimeout() { skipIfNetworkUnavailable(); - try (MqttClientConnection protocolClient = createMqtt311Client()) { - MqttRequestResponseClientBuilder rrBuilder = new MqttRequestResponseClientBuilder(); - rrBuilder.withMaxRequestResponseSubscriptions(4) - .withMaxStreamingSubscriptions(2) - .withOperationTimeoutSeconds(-5); + MqttRequestResponseClientBuilder rrBuilder = new MqttRequestResponseClientBuilder() + .withMaxRequestResponseSubscriptions(4) + .withMaxStreamingSubscriptions(2) + .withOperationTimeoutSeconds(-5); + + this.context = new TestContext(MqttVersion.Mqtt311, rrBuilder); + } - rrBuilder.build(protocolClient); + RequestResponseOperation createGetNamedShadowRequest(String thing, String shadow, boolean withCorrelationToken) { + String acceptedTopic = String.format("$aws/things/%s/shadow/name/%s/get/accepted", thing, shadow); + String rejectedTopic = String.format("$aws/things/%s/shadow/name/%s/get/rejected", thing, shadow); + + RequestResponseOperation.RequestResponseOperationBuilder builder = RequestResponseOperation.builder() + .withSubscription(String.format("$aws/things/%s/shadow/name/%s/get/+", thing, shadow)) + .withPublishTopic(String.format("$aws/things/%s/shadow/name/%s/get", thing, shadow)); + + if (withCorrelationToken) { + String correlationToken = (UUID.randomUUID()).toString(); + String payloadAsString = String.format("{\"clientToken\":\"%s\"}", correlationToken); + + builder.withCorrelationToken(correlationToken); + builder.withPayload(payloadAsString.getBytes(StandardCharsets.UTF_8)); + builder.withResponsePath(ResponsePath.builder().withResponseTopic(acceptedTopic).withCorrelationTokenJsonPath("clientToken").build()); + builder.withResponsePath(ResponsePath.builder().withResponseTopic(rejectedTopic).withCorrelationTokenJsonPath("clientToken").build()); + } else { + builder.withPayload("{}".getBytes(StandardCharsets.UTF_8)); + builder.withResponsePath(ResponsePath.builder().withResponseTopic(acceptedTopic).build()); + builder.withResponsePath(ResponsePath.builder().withResponseTopic(rejectedTopic).build()); } + + return builder.build(); + } + + public static String REJECTED_TOPIC_SUBSTRING = "rejected"; + public static String ACCEPTED_TOPIC_SUBSTRING = "accepted"; + public static String NO_SUCH_SHADOW_SUBSTRING = "No shadow exists with name"; + + void doGetNamedShadow(String thing, String shadow, boolean withCorrelationToken, String expectedTopicSubstring, String expectedPayloadSubstring) { + RequestResponseOperation operation = createGetNamedShadowRequest(thing, shadow, withCorrelationToken); + CompletableFuture responseFuture = this.context.rrClient.submitRequest(operation); + MqttRequestResponse response = null; + + try { + response = responseFuture.get(); + } catch (Exception e) { + ; + } + + Assert.assertNotNull(response); + + String payloadAsString = new String(response.getPayload(), StandardCharsets.UTF_8); + Assert.assertTrue(payloadAsString.contains(expectedPayloadSubstring)); + Assert.assertTrue(response.getTopic().contains(expectedTopicSubstring)); + } + + @Test + public void GetNamedShadowFailureNoSuchShadowWithCorrelationMqtt5() { + skipIfNetworkUnavailable(); + this.context = new TestContext(MqttVersion.Mqtt5, null); + doGetNamedShadow("NoSuchThing", "Derp", true, REJECTED_TOPIC_SUBSTRING, NO_SUCH_SHADOW_SUBSTRING); + } + + @Test + public void GetNamedShadowFailureNoSuchShadowWithoutCorrelationMqtt5() { + skipIfNetworkUnavailable(); + this.context = new TestContext(MqttVersion.Mqtt5, null); + doGetNamedShadow("NoSuchThing", "Derp", false, REJECTED_TOPIC_SUBSTRING, NO_SUCH_SHADOW_SUBSTRING); + } + + @Test + public void GetNamedShadowFailureNoSuchShadowWithCorrelationMqtt311() { + skipIfNetworkUnavailable(); + this.context = new TestContext(MqttVersion.Mqtt311, null); + doGetNamedShadow("NoSuchThing", "Derp", true, REJECTED_TOPIC_SUBSTRING, NO_SUCH_SHADOW_SUBSTRING); + } + + @Test + public void GetNamedShadowFailureNoSuchShadowWithoutCorrelationMqtt311() { + skipIfNetworkUnavailable(); + this.context = new TestContext(MqttVersion.Mqtt311, null); + doGetNamedShadow("NoSuchThing", "Derp", false, REJECTED_TOPIC_SUBSTRING, NO_SUCH_SHADOW_SUBSTRING); + } + + RequestResponseOperation createUpdateNamedShadowRequest(String thing, String shadow, boolean withCorrelationToken) { + String acceptedTopic = String.format("$aws/things/%s/shadow/name/%s/update/accepted", thing, shadow); + String rejectedTopic = String.format("$aws/things/%s/shadow/name/%s/update/rejected", thing, shadow); + + RequestResponseOperation.RequestResponseOperationBuilder builder = RequestResponseOperation.builder() + .withSubscription(acceptedTopic) + .withSubscription(rejectedTopic) + .withPublishTopic(String.format("$aws/things/%s/shadow/name/%s/update", thing, shadow)); + + String desiredChange = "{\"Uff\":\"Dah\"}"; + + if (withCorrelationToken) { + String correlationToken = (UUID.randomUUID()).toString(); + String payloadAsString = String.format("{\"clientToken\":\"%s\",\"state\":{\"desired\":%s}}", correlationToken, desiredChange); + + builder.withCorrelationToken(correlationToken); + builder.withPayload(payloadAsString.getBytes(StandardCharsets.UTF_8)); + builder.withResponsePath(ResponsePath.builder().withResponseTopic(acceptedTopic).withCorrelationTokenJsonPath("clientToken").build()); + builder.withResponsePath(ResponsePath.builder().withResponseTopic(rejectedTopic).withCorrelationTokenJsonPath("clientToken").build()); + } else { + builder.withPayload(String.format("{\"state\":{\"desired\":%s}}", desiredChange).getBytes(StandardCharsets.UTF_8)); + builder.withResponsePath(ResponsePath.builder().withResponseTopic(acceptedTopic).build()); + builder.withResponsePath(ResponsePath.builder().withResponseTopic(rejectedTopic).build()); + } + + return builder.build(); + } + + void doUpdateNamedShadowSuccess(String thing, String shadow, boolean withCorrelationToken, String expectedTopicSubstring, String expectedPayloadSubstring) { + RequestResponseOperation operation = createUpdateNamedShadowRequest(thing, shadow, withCorrelationToken); + CompletableFuture responseFuture = this.context.rrClient.submitRequest(operation); + MqttRequestResponse response = null; + + try { + response = responseFuture.get(); + } catch (Exception e) { + ; + } + + Assert.assertNotNull(response); + + String payloadAsString = new String(response.getPayload(), StandardCharsets.UTF_8); + Assert.assertTrue(payloadAsString.contains(expectedPayloadSubstring)); + Assert.assertTrue(response.getTopic().contains(expectedTopicSubstring)); + } + + RequestResponseOperation createDeleteNamedShadowRequest(String thing, String shadow, boolean withCorrelationToken) { + String acceptedTopic = String.format("$aws/things/%s/shadow/name/%s/delete/accepted", thing, shadow); + String rejectedTopic = String.format("$aws/things/%s/shadow/name/%s/delete/rejected", thing, shadow); + + RequestResponseOperation.RequestResponseOperationBuilder builder = RequestResponseOperation.builder() + .withSubscription(acceptedTopic) + .withSubscription(rejectedTopic) + .withPublishTopic(String.format("$aws/things/%s/shadow/name/%s/delete", thing, shadow)); + + if (withCorrelationToken) { + String correlationToken = (UUID.randomUUID()).toString(); + String payloadAsString = String.format("{\"clientToken\":\"%s\"}", correlationToken); + + builder.withCorrelationToken(correlationToken); + builder.withPayload(payloadAsString.getBytes(StandardCharsets.UTF_8)); + builder.withResponsePath(ResponsePath.builder().withResponseTopic(acceptedTopic).withCorrelationTokenJsonPath("clientToken").build()); + builder.withResponsePath(ResponsePath.builder().withResponseTopic(rejectedTopic).withCorrelationTokenJsonPath("clientToken").build()); + } else { + builder.withPayload("{}".getBytes(StandardCharsets.UTF_8)); + builder.withResponsePath(ResponsePath.builder().withResponseTopic(acceptedTopic).build()); + builder.withResponsePath(ResponsePath.builder().withResponseTopic(rejectedTopic).build()); + } + + return builder.build(); + } + + void doDeleteNamedShadowSuccess(String thing, String shadow, boolean withCorrelationToken, String expectedTopicSubstring, String expectedPayloadSubstring) { + RequestResponseOperation operation = createDeleteNamedShadowRequest(thing, shadow, true); + CompletableFuture responseFuture = this.context.rrClient.submitRequest(operation); + MqttRequestResponse response = null; + + try { + response = responseFuture.get(); + } catch (Exception e) { + ; + } + + Assert.assertNotNull(response); + + String payloadAsString = new String(response.getPayload(), StandardCharsets.UTF_8); + Assert.assertTrue(payloadAsString.contains(expectedPayloadSubstring)); + Assert.assertTrue(response.getTopic().contains(expectedTopicSubstring)); + } + + public static String METADATA_SUBSTRING = "metadata"; + public static String UFF_SUBSTRING = "Uff"; + public static String VERSION_SUBSTRING = "version"; + + public void doUpdateNamedShadowSuccessTest(MqttVersion version, boolean useCorrelationToken) { + this.context = new TestContext(version, null); + + String thing = (UUID.randomUUID()).toString(); + String shadow = (UUID.randomUUID()).toString(); + + doGetNamedShadow(thing, shadow, useCorrelationToken, REJECTED_TOPIC_SUBSTRING, NO_SUCH_SHADOW_SUBSTRING); + doUpdateNamedShadowSuccess(thing, shadow, useCorrelationToken, ACCEPTED_TOPIC_SUBSTRING, METADATA_SUBSTRING); + doGetNamedShadow(thing, shadow, useCorrelationToken, ACCEPTED_TOPIC_SUBSTRING, UFF_SUBSTRING); + doDeleteNamedShadowSuccess(thing, shadow, useCorrelationToken, ACCEPTED_TOPIC_SUBSTRING, VERSION_SUBSTRING); + } + + @Test + public void UpdateNamedShadowSuccessWithCorrelationMqtt5() { + skipIfNetworkUnavailable(); + doUpdateNamedShadowSuccessTest(MqttVersion.Mqtt5, true); + } + + @Test + public void UpdateNamedShadowSuccessWithoutCorrelationMqtt5() { + skipIfNetworkUnavailable(); + doUpdateNamedShadowSuccessTest(MqttVersion.Mqtt5, false); + } + + @Test + public void UpdateNamedShadowSuccessWithCorrelationMqtt311() { + skipIfNetworkUnavailable(); + doUpdateNamedShadowSuccessTest(MqttVersion.Mqtt311, true); + } + + @Test + public void UpdateNamedShadowSuccessWithoutCorrelationMqtt311() { + skipIfNetworkUnavailable(); + doUpdateNamedShadowSuccessTest(MqttVersion.Mqtt311, false); + } + + public void doBadGetNamedShadowTest(Supplier builderConstructor) { + this.context = new TestContext(MqttVersion.Mqtt5, null); + + RequestResponseOperation operation = builderConstructor.get(); + CompletableFuture responseFuture = this.context.rrClient.submitRequest(operation); + + Assert.assertTrue(false); + } + + @Test(expected = CrtRuntimeException.class) + public void GetNamedShadowFailureNullOperation() { + skipIfNetworkUnavailable(); + doBadGetNamedShadowTest(() -> null); + } + + @Test(expected = CrtRuntimeException.class) + public void GetNamedShadowFailureNullPublishTopic() { + skipIfNetworkUnavailable(); + doBadGetNamedShadowTest(() -> { + return RequestResponseOperation.builder() + .withSubscription("hello/world/accepted") + .withResponsePath(ResponsePath.builder().withResponseTopic("hello/world/accepted").build()) + .withPayload("{}".getBytes(StandardCharsets.UTF_8)) + .build(); + }); + } + + @Test(expected = CrtRuntimeException.class) + public void GetNamedShadowFailureNoSubscriptions() { + skipIfNetworkUnavailable(); + doBadGetNamedShadowTest(() -> { + return RequestResponseOperation.builder() + .withPublishTopic("hello/world") + .withResponsePath(ResponsePath.builder().withResponseTopic("hello/world/accepted").build()) + .withPayload("{}".getBytes(StandardCharsets.UTF_8)) + .build(); + }); + } + + @Test(expected = CrtRuntimeException.class) + public void GetNamedShadowFailureNullSubscriptions() { + skipIfNetworkUnavailable(); + doBadGetNamedShadowTest(() -> { + return RequestResponseOperation.builder() + .withPublishTopic("hello/world") + .withSubscription(null) + .withResponsePath(ResponsePath.builder().withResponseTopic("hello/world/accepted").build()) + .withPayload("{}".getBytes(StandardCharsets.UTF_8)) + .build(); + }); + } + + @Test(expected = CrtRuntimeException.class) + public void GetNamedShadowFailureNoResponsePaths() { + skipIfNetworkUnavailable(); + doBadGetNamedShadowTest(() -> { + return RequestResponseOperation.builder() + .withPublishTopic("hello/world") + .withSubscription("hello/world/accepted") + .withPayload("{}".getBytes(StandardCharsets.UTF_8)) + .build(); + }); + } + + @Test(expected = CrtRuntimeException.class) + public void GetNamedShadowFailureNullResponsePath() { + skipIfNetworkUnavailable(); + doBadGetNamedShadowTest(() -> { + return RequestResponseOperation.builder() + .withPublishTopic("hello/world") + .withResponsePath(null) + .withSubscription("hello/world/accepted") + .withPayload("{}".getBytes(StandardCharsets.UTF_8)) + .build(); + }); + } + + @Test(expected = CrtRuntimeException.class) + public void GetNamedShadowFailureResponsePathNullTopic() { + skipIfNetworkUnavailable(); + doBadGetNamedShadowTest(() -> { + return RequestResponseOperation.builder() + .withPublishTopic("hello/world") + .withResponsePath(ResponsePath.builder().withResponseTopic(null).build()) + .withSubscription("hello/world/accepted") + .withPayload("{}".getBytes(StandardCharsets.UTF_8)) + .build(); + }); + } + + @Test(expected = CrtRuntimeException.class) + public void GetNamedShadowFailureNullPayload() { + skipIfNetworkUnavailable(); + doBadGetNamedShadowTest(() -> { + return RequestResponseOperation.builder() + .withPublishTopic("hello/world") + .withSubscription("hello/world/accepted") + .withResponsePath(ResponsePath.builder().withResponseTopic("hello/world/accepted").build()) + .withPayload(null) + .build(); + }); + } + + public void doGetNamedShadowFailureTimeoutTest(MqttVersion version) { + MqttRequestResponseClientBuilder clientBuilder = new MqttRequestResponseClientBuilder() + .withMaxRequestResponseSubscriptions(4) + .withMaxStreamingSubscriptions(2) + .withOperationTimeoutSeconds(2); + this.context = new TestContext(version, clientBuilder); + + String badAcceptedTopic = "hello/world/accepted"; + String badRejectedTopic = "hello/world/rejected"; + String correlationToken = (UUID.randomUUID()).toString(); + String payloadAsString = String.format("{\"clientToken\":\"%s\"}", correlationToken); + + RequestResponseOperation operation = RequestResponseOperation.builder() + .withSubscription(badAcceptedTopic) + .withPublishTopic("uff/dah") + .withCorrelationToken(correlationToken) + .withPayload(payloadAsString.getBytes(StandardCharsets.UTF_8)) + .withResponsePath(ResponsePath.builder().withResponseTopic(badAcceptedTopic).withCorrelationTokenJsonPath("clientToken").build()) + .withResponsePath(ResponsePath.builder().withResponseTopic(badRejectedTopic).withCorrelationTokenJsonPath("clientToken").build()) + .build(); + + CompletableFuture responseFuture = this.context.rrClient.submitRequest(operation); + + try { + responseFuture.get(); + } catch (InterruptedException ie) { + Assert.assertTrue(false); + } catch (ExecutionException ex) { + String message = ex.getMessage(); + Assert.assertTrue(message.contains("timeout")); + } + } + + @Test + public void GetNamedShadowFailureTimeoutMqtt5() { + skipIfNetworkUnavailable(); + + doGetNamedShadowFailureTimeoutTest(MqttVersion.Mqtt5); + } + + @Test + public void GetNamedShadowFailureTimeoutMqtt311() { + skipIfNetworkUnavailable(); + + doGetNamedShadowFailureTimeoutTest(MqttVersion.Mqtt311); } }