Skip to content

Latest commit

 

History

History

solace-spring-cloud-stream-starter

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 

Spring Cloud Stream Binder for Solace PubSub+

An implementation of Spring’s Cloud Stream Binder for integrating with Solace PubSub+ message brokers. The Spring Cloud Stream Binder project provides a higher-level abstraction towards messaging that standardizes the development of distributed message-based systems.

  • Spring Cloud Stream consumer bindings with Solace PubSub+ Binder v5.x and later requires a Solace PubSub+ Broker version 10.2.1 or newer. The Native Message NACK feature on which Solace consumer binding depends on, was introduced in Solace PubSub+ Broker version 10.2.1.

  • Spring Cloud Stream producer bindings with Solace PubSub+ Binder v5.x and later is compatible with PubSub+ Broker version prior to 10.2.1.

  • See Solace PubSub+ Binder 5.0 Migration Guide for details, if you are upgrading from older version to Solace Binder v5.x or later.

Overview

The Solace implementation of the Spring Cloud Stream Binder maps the following concepts from Spring to Solace:

  • Destinations to topics/subscriptions

    • Producer bindings always sends messages to topics

  • Consumer groups to durable queues

    • A consumer group’s queue is subscribed to its destination subscription (default)

    • Consumer bindings always receives messages from queues

  • Anonymous consumer groups to temporary queues (When no group is specified; used for SCS Publish-Subscribe Model)

In Solace, the above setup is called topic-to-queue mapping. So a typical message flow would then appear as follows:

  1. Producer bindings publish messages to their destination topics

  2. Each consumer groups' queue receives the messages published to their destination topic

  3. The PubSub+ broker distributes messages in a round-robin fashion to each consumer binding for a particular consumer group

    ℹ️
    Round-robin distribution only occurs if the consumer group’s queue is configured for non-exclusive access. If the queue has exclusive access, then only one consumer will receive messages.
Since consumer bindings always consumes from queues it is required that Assured Delivery is enabled on the Solace PubSub+ Message VPN being used (Assured Delivery is automatically enabled if using Solace Cloud). Additionally, the client username’s client profile must be allowed to send and receive guaranteed messages.

For the sake of brevity, it will be assumed that you have a basic understanding of the Spring Cloud Stream project. If not, then please refer to Spring’s documentation. This document will solely focus on discussing components unique to Solace.

Spring Cloud Stream Binder

This project extends the Spring Cloud Stream Binder project. If you are new to Spring Cloud Stream, check out their documentation.

The following is a brief excerpt from that document:

Spring Cloud Stream is a framework for building message-driven microservice applications. Spring Cloud Stream builds upon Spring Boot to create standalone, production-grade Spring applications and uses Spring Integration to provide connectivity to message brokers. It provides opinionated configuration of middleware from several vendors, introducing the concepts of persistent publish-subscribe semantics, consumer groups, and partitions.

Using it in your Application

Updating your build

The releases from this project are hosted in Maven Central.

The easiest way to get started is to include the spring-cloud-starter-stream-solace in your application.

Here is how to include the spring cloud stream starter in your project using Gradle and Maven.

Using it with Gradle

// Solace Spring Cloud Stream Binder
compile("com.solace.spring.cloud:spring-cloud-starter-stream-solace:5.4.0")

Using it with Maven

<!-- Solace Spring Cloud Stream Binder -->
<dependency>
  <groupId>com.solace.spring.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-solace</artifactId>
  <version>5.4.0</version>
</dependency>

Creating a Simple Solace Binding

Starting in Spring Cloud Stream version 3 the recommended way to define binding and binding names is to use the Functional approach, which uses Spring Cloud Functions. You can learn more in the Spring Cloud Function support and Functional Binding Names sections of the reference guide.

Given this example app:

@SpringBootApplication
public class SampleAppApplication {

	public static void main(String[] args) {
		SpringApplication.run(SampleAppApplication.class, args);
	}

	@Bean
	public Function<String, String> uppercase() {
	    return value -> value.toUpperCase();
	}
}

An applicable Solace configuration file may look like:

spring:
  cloud:
    function:
      definition: uppercase
    stream:
      bindings:
        uppercase-in-0:
          destination: queuename
          group: myconsumergroup
          binder: solace-broker
        uppercase-out-0:
          destination: uppercase/topic
          binder: solace-broker
      binders:
        solace-broker:
          type: solace
          environment:
            solace: # (1)
              java:
                host: tcp://localhost:55555
                msgVpn: default
                clientUsername: default
                clientPassword: default
                connectRetries: -1
                reconnectRetries: -1
#                apiProperties:
#                  ssl_trust_store: <path_to_trust_store>
#                  ssl_trust_store_password: <trust_store_password>
#                  ssl_validate_certificate: true
  1. The latter half of this configuration where the Solace session is configured actually originates from the JCSMP Spring Boot Auto-Configuration project. See Solace Session Properties for more info.

For more samples see Solace Spring Cloud Samples repository.

For step-by-step instructions refer Solace Spring Cloud Stream tutorial and check out the blogs.

Configuration Options

Solace Binder Configuration Options

Configuration of the Solace Spring Cloud Stream Binder is done through Spring Boot’s externalized configuration. This is where users can control the binder’s configuration options as well as the Solace Java API properties.

For general binder configuration options and properties, refer to the Spring Cloud Stream Reference Documentation.

Solace Session Properties

The binder’s Solace session is configurable using properties prefixed by solace.java or spring.cloud.stream.binders.<binder-name>.environment.solace.java.

This binder leverages the JCSMP Spring Boot Auto-Configuration project to configure its session. See the JCSMP Spring Boot Auto-Configuration documentation for more info on how to configure these properties.

See Creating a Simple Solace Binding for a simple example of how to configure a session for this binder.

💡

Additional session properties not available under the usual solace.java prefix can be set using solace.java.apiProperties.<property>, where <property> is the name of a JCSMPProperties constant (e.g. ssl_trust_store).

See JCSMP Spring Boot Auto-Configuration documentation for more info about solace.java.apiProperties.

💡

The Solace session can be configured to use OAuth2 authentication. See JCSMP Spring Boot: Using OAuth2 Authentication Scheme for more info.

Solace Consumer Properties

The following properties are available for Solace consumers only and must be prefixed with spring.cloud.stream.solace.bindings.<bindingName>.consumer. where bindingName looks something like functionName-in-0 as defined in Functional Binding Names.

See SolaceCommonProperties and SolaceConsumerProperties for the most updated list.

endpointType

Specifies whether the configured type of endpoint messages are consumed from is a queue or a topic_endpoint.

When set to topic_endpoint, then instead of provisioning a queue for the consumer group’s endpoint, the binder will instead provision a topic endpoint.

Default: queue

provisionDurableQueue

Whether to provision durable queues for non-anonymous consumer groups. This should only be set to false if you have externally pre-provisioned the required queue on the message broker.

Default: true
See: Generated Queue Name Syntax

addDestinationAsSubscriptionToQueue

Whether to add the Destination as a subscription to queue during provisioning.

Default: true

selector

If specified, enables client applications to choose which messages they are interested in receiving, as determined by the messages’ header field and property values.

A selector has a conditional expression syntax that is a subset of SQL92 Selector can be used with Queue or a Topic Endpoint Subscription

queueNameExpression

A SpEL expression for creating the consumer group’s queue name.

Default: "'scst/' + (isAnonymous ? 'an/' : 'wk/') + (group?.trim() + '/') + 'plain/' + destination.trim().replaceAll('[*>]', '_')"
See: Generated Queue Name Syntax

⚠️
Modifying this can cause naming conflicts between the queue names of consumer groups.
⚠️
While the default SpEL expression will consistently return a value adhering to Generated Queue Name Syntax, directly using the SpEL expression string is not supported. The default value for this config option is subject to change without notice.
queueAccessType

Access type for the consumer group queue.

Default: 0 (ACCESSTYPE_NONEXCLUSIVE)
See: The ACCESSTYPE_ prefixed constants for other possible values

queuePermission

Permissions for the consumer group queue.

queueDiscardBehaviour

If specified, whether to notify sender if a message fails to be enqueued to the consumer group queue.

Default: null

queueMaxMsgRedelivery

Sets the maximum message redelivery count on consumer group queue. (Zero means retry forever).

Default: null

queueMaxMsgSize

Maximum message size for the consumer group queue.

Default: null

queueQuota

Message spool quota for the consumer group queue.

Default: null

queueRespectsMsgTtl

Whether the consumer group queue respects Message TTL.

Default: null

queueAdditionalSubscriptions

An array of additional topic subscriptions to be applied on the consumer group queue.
These subscriptions may also contain wildcards.

Default: String[0]
See: Overview for more info on how this binder uses topic-to-queue mapping to implement Spring Cloud Streams consumer groups.

polledConsumerWaitTimeInMillis

Maximum wait time for polled consumers to receive a message from their consumer group queue.
Only applicable when batchMode is false.

Default: 100

transacted

When set to true, messages will be received using local transactions.

Default: false

ℹ️
The maximum transaction size is 256 messages.
The size of the transaction is controlled by the batched message’s size. See Batch Consumers for more info.
batchMaxSize

The maximum number of messages per batch.
Only applicable when batchMode is true.

Default: 255

batchWaitStrategy

The waiting strategy for accumulating batches.
Only applicable when batchMode is true.

Default: respect_timeout

ℹ️
The waiting strategy works alongside the batchMaxSize option.
respect_timeout

Adheres to the batchTimeout consumer config option.

immediate

Immediately collects the batch once no more messages are available on the endpoint.

batchTimeout

The maximum wait time in milliseconds to receive a batch of messages. If this timeout is reached, then the messages that have already been received will be used to create the batch. A value of 0 means wait forever.
Only applicable when batchMode is true.

Default: 5000

autoBindErrorQueue

Whether to automatically create a durable error queue to which messages will be republished when message processing failures are encountered. Only applies once all internal retries have been exhausted.

Default: false

💡
Your ACL Profile must allow for publishing to this queue if you decide to use autoBindErrorQueue.
provisionErrorQueue

Whether to provision durable queues for error queues when autoBindErrorQueue is true. This should only be set to false if you have externally pre-provisioned the required queue on the message broker.

errorQueueNameExpression

A SpEL expression for creating the error queue’s name.

Default: "'scst/error/' + (isAnonymous ? 'an/' : 'wk/') + (group?.trim() + '/') + 'plain/' + destination.trim().replaceAll('[*>]', '_')"
See: Generated Error Queue Name Syntax

⚠️
Modifying this can cause naming conflicts between the error queue names.
⚠️
While the default SpEL expression will consistently return a value adhering to Generated Queue Name Syntax, directly using the SpEL expression string is not supported. The default value for this config option is subject to change without notice.
errorQueueMaxDeliveryAttempts

Maximum number of attempts to send a failed message to the error queue. When all delivery attempts have been exhausted, the failed message will be requeued.

Default: 3

errorQueueAccessType

Access type for the error queue.

Default: 0 (ACCESSTYPE_NONEXCLUSIVE)
See: The ACCESSTYPE_ prefixed constants for other possible values

errorQueuePermission

Permissions for the error queue.

errorQueueDiscardBehaviour

If specified, whether to notify sender if a message fails to be enqueued to the error queue.

Default: null

errorQueueMaxMsgRedelivery

Sets the maximum message redelivery count on the error queue. (Zero means retry forever).

Default: null

errorQueueMaxMsgSize

Maximum message size for the error queue.

Default: null

errorQueueQuota

Message spool quota for the error queue.

Default: null

errorQueueRespectsMsgTtl

Whether the error queue respects Message TTL.

Default: null

errorMsgDmqEligible

The eligibility for republished messages to be moved to a Dead Message Queue.

Default: null

errorMsgTtl

The number of milliseconds before republished messages are discarded or moved to a Dead Message Queue.

Default: null

headerExclusions

The list of headers to exclude when converting consumed Solace message to Spring message.

Default: Empty List<String>

Solace Producer Properties

The following properties are available for Solace producers only and must be prefixed with spring.cloud.stream.solace.bindings.<bindingName>.producer. where bindingName looks something like functionName-out-0 as defined in Functional Binding Names.

See SolaceCommonProperties and SolaceProducerProperties for the most updated list.

destinationType

Specifies whether the configured destination is a topic or a queue.

When set to topic, the destination name is a topic subscription added on a queue.

When set to queue, the producer binds to a queue matching the destination name. The queue can be auto-provisioned with provisionDurableQueue=true however, all naming prefix and queue name generation options do not apply. A queue will be provisioned using the destination name explicitly.

Default: topic

headerExclusions

The list of headers to exclude from the published message. Excluding Solace message headers is not supported.

Default: Empty List<String>

nonserializableHeaderConvertToString

When set to true, irreversibly convert non-serializable headers to strings. An exception is thrown otherwise.

Default: false

Non-serializable headers should have a meaningful toString() implementation. Otherwise enabling this feature may result in potential data loss.
transacted

When set to true, messages will be delivered using local transactions.

Default: false

⚠️
A transacted producer cannot be used by multiple threads.
ℹ️
The maximum transaction size is 256 messages.
The size of the transaction is 1 when the binding receives a regular Spring message. Otherwise, if it receives a batched message, then the transaction size is equal to the batch size.
provisionDurableQueue

Whether to provision durable queues for non-anonymous consumer groups or queue destinations. This should only be set to false if you have externally pre-provisioned the required queue on the message broker.

Default: true
See: Generated Queue Name Syntax

addDestinationAsSubscriptionToQueue

Whether to add the Destination as a subscription to queue during provisioning.

Default: true

ℹ️
Does not apply when destinationType=queue.
queueNameExpression

A SpEL expression for creating the consumer group’s queue name.

Default: "'scst/' + (isAnonymous ? 'an/' : 'wk/') + (group?.trim() + '/') + 'plain/' + destination.trim().replaceAll('[*>]', '_')"
See: Generated Queue Name Syntax

⚠️
Modifying this can cause naming conflicts between the queue names of consumer groups.
⚠️
While the default SpEL expression will consistently return a value adhering to Generated Queue Name Syntax, directly using the SpEL expression string is not supported. The default value for this config option is subject to change without notice.
queueNameExpressionsForRequiredGroups

A mapping of required consumer groups to queue name SpEL expressions.

By default, queueNameExpression will be used to generate a required group’s queue name if it isn’t specified within this configuration option.

Default: Empty Map<String, String>
See: Generated Queue Name Syntax

⚠️
Modifying this can cause naming conflicts between the queue names of consumer groups.
⚠️
While the default SpEL expression will consistently return a value adhering to Generated Queue Name Syntax, directly using the SpEL expression string is not supported. The default value for this config option is subject to change without notice.
queueAccessType

Access type for binder provisioned queues.

Default: 0 (ACCESSTYPE_NONEXCLUSIVE)
See: The ACCESSTYPE_ prefixed constants for other possible values

queuePermission

Permissions for binder provisioned queues.

queueDiscardBehaviour

Queue discard behaviour for binder provisioned queues. Whether to notify sender if a message fails to be enqueued to the endpoint. A null value means use the appliance default.

Default: null

queueMaxMsgRedelivery

Sets the maximum message redelivery count for binder provisioned queues. (Zero means retry forever).

Default: null

queueMaxMsgSize

Maximum message size for binder provisioned queues.

Default: null

queueQuota

Message spool quota for binder provisioned queues.

Default: null

queueRespectsMsgTtl

Whether the binder provisioned queues respect Message TTL.

Default: null

queueAdditionalSubscriptions

A mapping of required consumer groups to arrays of additional topic subscriptions to be applied on each consumer group’s queue.
These subscriptions may also contain wildcards.

Default: Empty Map<String,String[]>
See: Overview for more info on how this binder uses topic-to-queue mapping to implement Spring Cloud Streams consumer groups.

ℹ️
Does not apply when destinationType=queue.

Solace Connection Health-Check Properties

These properties configure the Solace connection’s health indicator configurable under solace.health-check.connection.

reconnectAttemptsUntilDown

The number of session reconnect attempts until the health goes DOWN. This will happen regardless if the underlying session is actually still reconnecting. Setting this to 0 will disable this feature.

This feature operates independently of the PubSub+ session reconnect feature. Meaning that if PubSub+ session reconnect is configured to retry less than the value given to this property, then this feature effectively does nothing.

Default: 0

Solace Message Headers

Solace-defined Spring headers to get/set Solace metadata from/to Spring Message headers.

⚠️
solace_ is a header space reserved for Solace-defined headers. Creating new solace_-prefixed headers is not supported. Doing so may cause unexpected side-effects in future versions of this binder.
🔥
Refer to each header’s documentation for their expected usage scenario. Using headers outside of their intended type and access-control is not supported.
ℹ️

Header inheritance applies to Solace message headers in processor message handlers:

When the non-void handler method returns, if the return value is already a Message, that Message becomes the payload. However, when the return value is not a Message, the new Message is constructed with the return value as the payload while inheriting headers from the input Message minus the headers defined or filtered by SpringIntegrationProperties.messageHandlerNotPropagatedHeaders.

Solace Headers

These headers are to get/set Solace message properties.

💡
Use SolaceHeaders instead of hardcoding the header names. This class also contains the same documentation that you see here.
Header Name Type Access Description

solace_applicationMessageId

String

Read/Write

The message ID (a string for an application-specific message identifier).

This is the JMSMessageID header field if publishing/consuming to/from JMS.

solace_applicationMessageType

String

Read/Write

The application message type.

This is the JMSType header field if publishing/consuming to/from JMS.

solace_correlationId

String

Read/Write

The correlation ID.

solace_deliveryCount

Integer

Read

The number of times the message has been delivered.

Note that, while the Delivery Count feature is in controlled availability, Enable Client Delivery Count must be enabled on the queue and consumer bindings may need to be restarted after Enable Client Delivery Count is turned on.

solace_destination

Destination

Read

The destination this message was published to.

solace_discardIndication

Boolean

Read

Whether one or more messages have been discarded prior to the current message.

solace_dmqEligible

Boolean

Read/Write

Whether the message is eligible to be moved to a Dead Message Queue.

solace_expiration

Long

Read/Write

The UTC time (in milliseconds, from midnight, January 1, 1970 UTC) when the message is supposed to expire.

solace_httpContentEncoding

String

Read/Write

The HTTP content encoding header value from interaction with an HTTP client.

solace_isReply

Boolean

Read/Write

Indicates whether this message is a reply.

solace_priority

Integer

Read/Write

Priority value in the range of 0–255, or -1 if it is not set.

solace_receiveTimestamp

Long

Read

The receive timestamp (in milliseconds, from midnight, January 1, 1970 UTC).

solace_redelivered

Boolean

Read

Indicates if the message has been delivered by the broker to the API before.

solace_replicationGroupMessageId

ReplicationGroupMessageId

Read

Specifies a Replication Group Message ID as a replay start location.

solace_replyTo

Destination

Read/Write

The replyTo destination for the message.

solace_senderId

String

Read/Write

The Sender ID for the message.

solace_senderTimestamp

Long

Read/Write

The send timestamp (in milliseconds, from midnight, January 1, 1970 UTC).

solace_sequenceNumber

Long

Read/Write

The sequence number.

solace_timeToLive

Long

Read/Write

The number of milliseconds before the message is discarded or moved to a Dead Message Queue.

solace_userData

byte[]

Read/Write

When an application sends a message, it can optionally attach application-specific data along with the message, such as user data.

Solace Binder Headers

These headers are to get/set Solace Spring Cloud Stream Binder properties.

These can be used for:

  • Getting/Setting Solace Binder metadata

  • Directive actions for the binder when producing/consuming messages

💡
Use SolaceBinderHeaders instead of hardcoding the header names. This class also contains the same documentation that you see here.
Header Name Type Access Default Value Description

solace_scst_batchedHeaders

List<Map<String, Object>>

Read

Only applicable when batchMode is true.

The consolidated list of message headers for a batch of messages where the headers for each payload element is in this list’s corresponding index.

solace_scst_confirmCorrelation

CorrelationData

Write

A CorrelationData instance for messaging confirmations

solace_scst_messageVersion

Integer

Read

1

A static number set by the publisher to indicate the Spring Cloud Stream Solace message version.

solace_scst_nullPayload

Boolean

Read

Present and true to indicate when the PubSub+ message payload was null.

solace_scst_partitionKey

String

Write

The partition key for PubSub+ partitioned queues.

solace_scst_serializedPayload

Boolean

Internal Binder Use Only

Is true if a Solace Spring Cloud Stream binder has serialized the payload before publishing it to a broker. Is undefined otherwise.

solace_scst_serializedHeaders

String

Internal Binder Use Only

A JSON String array of header names where each entry indicates that that header’s value was serialized by a Solace Spring Cloud Stream binder before publishing it to a broker.

solace_scst_serializedHeadersEncoding

String

Internal Binder Use Only

"base64"

The encoding algorithm used to encode the headers indicated by solace_scst_serializedHeaders.

solace_scst_targetDestinationType

String

Write

Only applicable when scst_targetDestination is set.

topic

Specifies that the dynamic destination is a topic

queue

Specifies that the dynamic destination is a queue

When absent, the binding’s configured destination-type is used.

Native Payload Types

Below are the payload types natively supported by this binder (before/after Content Type Negotiation):

Payload Type PubSub+ Message Type Notes

byte[]

Binary Message

Basic PubSub+ payload type.

String

Text Message

Basic PubSub+ payload type.

SDTStream

Stream Message

Basic PubSub+ payload type.

SDTMap

Map Message

Basic PubSub+ payload type.

String

XML-Content Message

Basic PubSub+ payload type.

Only available for consumption.

Serializable

Bytes Message

This is not a basic payload type supported by the PubSub+ broker, but is one defined and coordinated by this binder.

Publishing:

When a Serializable payload which doesn’t satisfy any of the basic PubSub+ payload types is given to the binder to publish, the binder will serialize this payload to a byte[] and set the user property, solace_scst_serializedPayload, to true.

Consuming:

When the binder consumes a binary message which has the solace_scst_serializedPayload user property set to true, the binder will deserialize the binary attachment.

💡

Typically, the Spring Cloud Stream framework will convert a published payload into a byte[] before giving it to the binder. In which case, this binder will publish a binary message.

If this occurs, but you wish to publish other message types, then one option is to set useNativeEncoding=true on your producer (but read the caveats carefully before enabling this feature), and have your message handler return a payload of one of this binder’s supported native payload types; e.g. return Message<SDTStream> to publish a stream message.

See Content Type Negotiation for more info on how Spring Cloud Streams converts payloads and other options to control message conversion.

Empty Payload VS Null Payload

Spring messages can’t contain null payloads, however, message handlers can differentiate between null payloads and empty payloads by looking at the solace_scst_nullPayload header. The binder adds the solace_scst_nullPayload header when a Solace message with null payload is consumed from the wire. When that is the case, the binder sets the Spring message’s payload to a null equivalent payload. Null equivalent payloads are one of the following: empty byte[], empty String, empty SDTMap, or empty SDTStream.

ℹ️
Applications can’t differentiate between null payloads and empty payloads when consuming binary messages or XML-content messages from the wire. This is because Solace always converts empty payloads to null payloads when those message types are published.

Generated Queue Name Syntax

By default, generated consumer group queue names have the following form:

<prefix>/<familiarity-modifier>/<group>/<destination-encoding>/<encoded-destination>
prefix

A static prefix scst.

familiarity-modifier

Indicates the durability of the consumer group (wk for well-known or an for anonymous).

group

The consumer group name.

destination-encoding

Indicates the encoding scheme used to encode the destination in the queue name (currently only plain is supported).

encoded-destination

The encoded destination as per <destination-encoding>.

The queueNameExpression property’s default SpEL expression conforms to the above format, however, users can provide any valid SpEL expression in order to generate custom queue names. Valid expressions evaluate against the following context:

Context Variable Description

destination

The binding’s destination name.

group

The binding’s consumer group name.

isAnonymous

Indicates whether the consumer is an anonymous consumer group

properties.solace

The configured Solace binding properties.

properties.spring

The configured Spring binding properties.

Generated Error Queue Name Syntax

By default, generated error queue names have the following form:

<prefix>/error/<familiarity-modifier>/<group>/<destination-encoding>/<encoded-destination>

The definitions of each segment of the error queue matches that from Generated Queue Name Syntax, with the following exceptions:

group

The consumer group name.

The errorQueueNameExpression property’s default SpEL expression conforms to the above format. Users can provide any valid SpEL expression in order to generate custom error queue names using the same evaluation context as described in Generated Queue Name Syntax.

Consumer Concurrency

Configure Spring Cloud Stream’s concurrency consumer property to enable concurrent message consumption for a particular consumer binding.

Though note that there are few limitations:

  1. concurrency > 1 is not supported for exclusive queues.

  2. concurrency > 1 is not supported for consumer bindings which are a part of anonymous consumer groups.

  3. concurrency > 1 is ignored for polled consumers.

  4. concurrency > 1 is not supported with auto-provisioned topic endpoints.

  5. Setting provisionDurableQueue to false disables endpoint configuration validation. Meaning that point 1 cannot be validated. In this scenario, it is the developer’s responsibility to ensure that point 1 is followed.

Batched Messaging

Batch Consumers

Batch consumers can be enabled by setting spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode to true. In which case, batched messages may be consumed as follows:

@Bean
Consumer<Message<List<Payload>>> input() {
	return batchMsg -> { // (1)
		List<Payload> batchedPayloads = batchMsg.getPayload();
		List<Map<String, Object>> batchedHeaders = (List<Map<String, Object>>) batchMsg.getHeaders().get(SolaceBinderHeaders.BATCHED_HEADERS); // (2)

		for (int i = 0; i < batchedPayloads.size(); i++) {
			Payload payload = batchedPayloads.get(i);
			Map<String, Object> headers = batchedHeaders.get(i);
			// Process inidividual message payload and its headers
		}
	};
}
  1. A batch of messages is really just a single Spring Message whose payload is a list of individual message payloads.

  2. The solace_scst_batchedHeaders message header contains the consolidated list of message headers for each of the individual messages in the batch.

💡
Transacted Batch Consumers

By default, batched messages are non-transacted (i.e. transacted is set to false). When in this mode, a batch created by this binder is fundamentally a collection of standalone messages. Where messages in the batch have no relationship between each other.

When transacted is set to true, a local transaction is used to process the batched message. The batch of messages is then automatically committed (or is rolled back on errors) when the message handler returns.

💡
Resolving Batch Message Conversion Issues

If the Spring Cloud Stream framework fails to convert the batch message, consider setting one of the following consumer config options:

See Content Type Negotiation for more info on how Spring Cloud Streams converts payloads and other options to control message conversion.

See Native Payload Types for more info regarding this binder’s natively supported payload types.

To create a batch of messages, the binder will consume messages from the PubSub+ broker until either a maximum batch size or timeout has been achieved. After which, the binder will compose the batch message and send it to the consumer handler for processing. Both these batching parameters can be configured using the batchMaxSize, batchWaitStrategy, and batchTimeout consumer config options.

Batch Producers

Similar to batch consumers, batched messages may also be published through the producer binding:

@Bean
Supplier<Message<List<Payload>>> output() {
	return () -> {
		List<Payload> batchedPayloads = new ArrayList<>();
		List<Map<String, Object>> batchedHeaders = new ArrayList<>();

		for (int i = 0; i < 100; i++) {
			// Create batched message contents
			batchedPayloads.add(new Payload(i));
			batchedHeaders.add(Map.of("my-header", "my-header-value"));
		}

		// construct batched message
		return MessageBuilder.withPayload(batchedPayloads)
				.setHeader(SolaceBinderHeaders.BATCHED_HEADERS, batchedHeaders)
				.build();
	};
}

The producer binding will look for the solace_scst_batchedHeaders message header to determine if the supplied Spring message is either a batched Spring message or a regular Spring message.

If the producer binding detects that it has received a batched Spring message, then it will individually publish each item in the batch.

ℹ️
Publishing Batched Messages using Transacted Producer Bindings

When transacted=true, the size of the transaction is equal to the size of the batched Spring message.

Partitioning

ℹ️

The Solace PubSub+ broker supports partitioning natively.

The partitioning abstraction as described in the Spring Cloud Stream documentation is not supported.

To publish messages that are intended for partitioned queues, you must provide a partition key by setting the solace_scst_partitionKey message header (accessible through the SolaceBinderHeaders.PARTITION_KEY constant).

For example:

public class MyMessageBuilder {
    public Message<String> buildMeAMessage() {
        return MessageBuilder.withPayload("payload")
            .setHeader(SolaceBinderHeaders.PARTITION_KEY, "partition-key")
            .build();
    }
}

As for consuming messages from partitioned queues, this is handled transparently by the PubSub+ broker. That is to say, consuming messages from a partitioned queue is no different from consuming messages from any other queue.

See Partitioned Queues for more.

Manual Message Acknowledgment

ℹ️
Manual message acknowledgment is not supported for consumers where transacted is set to true.

Message handlers can disable auto-acknowledgement and manually invoke the acknowledgement callback as follows:

public void consume(Message<?> message) {
    AcknowledgmentCallback acknowledgmentCallback = StaticMessageHeaderAccessor.getAcknowledgmentCallback(message); // (1)
    acknowledgmentCallback.noAutoAck(); // (2)
    try {
        AckUtils.accept(acknowledgmentCallback); // (3)
    } catch (SolaceAcknowledgmentException e) {} // (4)
}
  1. Get the message’s acknowledgement callback header

  2. Disable auto-acknowledgement

  3. Acknowledge the message with the ACCEPT status

  4. Handle any acknowledgment exceptions

Refer to the AckUtils documentation and AcknowledgmentCallback documentation for more info on these objects.

💡
If manual acknowledgement is to be done outside of the message handler’s thread, then make sure auto-acknowledgement is disabled within the message handler’s thread and not an external one. Otherwise, the binder will auto-acknowledge the message when the message handler returns.

For each acknowledgement status, the binder will perform the following actions:

Status Action

ACCEPT

Acknowledge the message.

REJECT

If autoBindErrorQueue is true, then republish the message onto the error queue and ACCEPT it. Otherwise, For both, the consumer in a defined consumer group or in an anonymous group, signal the Solace broker to discard/remove the message from queue.

Refer to Failed Consumer Message Error Handling for more info.

REQUEUE

For both, the consumer in a defined consumer group or in an anonymous group, signal the Solace broker to requeue/redeliver the message. The message will be redelivered until it is ACCEPTed or the message’s max redelivery count is exceeded.

Refer to Message Redelivery for more info.

Acknowledgements may throw SolaceAcknowledgmentException depending on the current state of the consumer. Particularly if doing asynchronous acknowledgements, your invocation to acknowledge a message should catch SolaceAcknowledgmentException and deal with it accordingly.

Example:
(refer to Message Redelivery for background info)

A SolaceAcknowledgmentException with cause IllegalStateException may be thrown when trying to asynchronously ACCEPT a message and consumer flow is closed. Though for this particular example, since the message that failed to ACCEPT will be redelivered, this exception can be caught and ignored if you have no business logic to revert.

ℹ️
Manual acknowledgements do not support any application-internal error handling strategies (i.e. retry template, error channel forwarding, etc). Also, throwing an exception in the message handler will always acknowledge the message in some way regardless if auto-acknowledgment is disabled.
💡

If asynchronously acknowledging messages, then if these messages aren’t acknowledged in a timely manner, it is likely for the message consumption rate to stall due to the consumer queue’s configured "Maximum Delivered Unacknowledged Messages per Flow".

This property can be configured for dynamically created queues by using queue templates. However note that as per our documentation, anonymous consumer group queues (i.e. temporary queues) will not match a queue template’s name filter. Only the queue template defined in the client profile’s "Copy Settings From Queue Template" setting will apply to those.

Dynamic Producer Destinations

Spring Cloud Stream has a reserved message header called scst_targetDestination (retrievable via BinderHeaders.TARGET_DESTINATION), which allows for messages to be redirected from their bindings' configured destination to the target destination specified by this header.

For this binder’s implementation of this header, the target destination defines the exact Solace topic or queue to which a message will be sent. i.e. No post-processing is done.

This binder also adds a reserved message header called solace_scst_targetDestinationType (retrievable via SolaceBinderHeaders.TARGET_DESTINATION_TYPE), which allows to override the configured producer destination-type.

public class MyMessageBuilder {
    public Message<String> buildMeAMessage() {
        return MessageBuilder.withPayload("payload")
            .setHeader(BinderHeaders.TARGET_DESTINATION, "some-dynamic-destination") // (1)
            .setHeader(SolaceBinderHeaders.TARGET_DESTINATION_TYPE, "topic")         // (2)
            .build();
    }
}
  1. This message will be sent to the some-dynamic-destination topic, ignoring the producer’s configured destination.

  2. Optionally, the configured producer destination-type can be overridden.

ℹ️
Those 2 headers are cleared from the message before it is sent off to the message broker. So you should attach that information to your message payload if you want to get that information on the consumer-side.
ℹ️
Dynamic Producer Destinations with StreamBridge

This binder does not support the usage of StreamBridge’s dynamic destination feature, which automatically creates and caches unknown output bindings on-the-fly.

Instead, set the scst_targetDestination message header and send the message to a pre-defined output binding:

public void sendMessage(StreamBridge streamBridge, String myDynamicDestination, Message<?> message) {
  Message<?> messageWithDestination = MessageBuilder.fromMessage(message)
      .setHeader(BinderHeaders.TARGET_DESTINATION, myDynamicDestination)
      .build();
  streamBridge.send("some-pre-defined-output-binding", messageWithDestination);
}

Then in your application’s configuration file, configure your predefined output binding:

spring.cloud.stream.output-bindings=some-pre-defined-output-binding

Failed Consumer Message Error Handling

The Spring cloud stream framework already provides a number of application-internal reprocessing strategies for failed messages during message consumption. You can read more about that here:

However, after all internal error handling strategies have been exhausted, the Solace implementation of the binder would either:

  • Redeliver the failed message (default)

  • Republish the message to another queue (an error queue) for an external application/binding to process

Message Redelivery

A simple error handling strategy in which failed messages are redelivered from the consumer group’s queue. This is very similar to simply enabling the retry template (setting maxAttempts to a value greater than 1), but allows for the failed messages to be re-processed by the message broker.

The internal implementation of redelivery has changed from Solace Binder v5.0.0. Previously, redelivery was initiated by rebinding consumer flows; however, as of v5.0.0 and later, the Solace API now leverages the Solace broker’s native NACK (Negative Acknowledgement) capabilities.

Here is what happens under the hood when this is triggered:

  1. Say the current message is marked for 'REQUEUE'. Any subsequent messages that are currently spooled on the client side, despite having been acknowledged ACCEPTed by binder, the Solace broker will discard their ACK.

  2. The Solace Broker will redeliver all messages starting with the one tagged as 'REQUEUE', if the message’s max redelivery count is not exceeded.

The redelivery may result in message duplication, and the application should be designed to handle this.

Error Queue Republishing

ℹ️
Error queue republishing is not supported for consumers where transacted is set to true.

First, it must be noted that an Error Queue is different from a Dead Message Queue (DMQ). In particular, a DMQ is used to capture re-routed failed messages as a consequence of Solace PubSub+ messaging features such as TTL expiration or exceeding a message’s max redelivery count. Whereas the purpose of an Error Queue is to capture re-routed messages which have been successfully consumed from the message broker, yet cannot be processed by the application.

An Error Queue can be provisioned for a particular consumer group by setting the autoBindErrorQueue consumer config option to true. This Error Queue is simply another durable queue which is named as per the Generated Error Queue Name Syntax section. And like the queues used for consumer groups, its endpoint properties can be configured by means of any consumer properties whose names begin with "errorQueue".

ℹ️

Error Queues should not be used with anonymous consumer groups.

Since the names of anonymous consumer groups, and in turn the name of their would-be Error Queues, are randomly generated at runtime, it would provide little value to create bindings to these Error Queues because of their unpredictable naming and temporary existence. Also, your environment will be polluted with orphaned Error Queues whenever these consumers rebind.

Consumer Bindings Pause/Resume

The Solace binder supports pausing and resuming consumer bindings. See Spring Cloud Stream documentation to learn how to pause and resume consumer bindings.

ℹ️
There is no guarantee that the effect of pausing a binding will be instantaneous: messages already in-flight or being processed by the binder may still be delivered after the call to pause returns.

Failed Producer Message Error Handling

By default, asynchronous producer errors aren’t handled by the framework. Producer error channels can be enabled using the errorChannelEnabled producer config option.

Beyond that, this binder also supports using a Future to wait for publish confirmations. See [Publisher Confirms] for more info.

Publisher Confirmations

For each message you can create a new CorrelationData instance and set it as the value of your message’s SolaceBinderHeaders.CONFIRM_CORRELATION header.

ℹ️
CorrelationData can be extended to add more correlation info. The SolaceBinderHeaders.CONFIRM_CORRELATION header is not reflected in the actual message published to the broker.

Now using CorrelationData.getFuture().get(), you can wait for a publish acknowledgment from the broker. If the publish failed, then this future will throw an exception.

For example:

@Autowired
private StreamBridge streamBridge;

public void send(String payload, long timeout, TimeUnit unit) {
    CorrelationData correlationData = new CorrelationData();
    Message<SensorReading> message = MessageBuilder.withPayload(payload)
            .setHeader(SolaceBinderHeaders.CONFIRM_CORRELATION, correlationData)
            .build();

    streamBridge.send("output-destination", message);

    try {
        correlationData.getFuture().get(timeout, unit);
        // Do success logic
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        // Do failure logic
    }
}
ℹ️
CorrelationData with Batched Messages

When using Batch Producers, the SolaceBinderHeaders.CONFIRM_CORRELATION header must be set at the root of the batched message, and not in the SolaceBinderHeaders.BATCHED_HEADERS header.

The CorrelationData.getFuture() will be resolved:

  • Successfully once all messages in the batch have been successfully delivered to the destination.

  • Failed upon the first delivery error encountered while publishing the batch of messages.

Solace Binder Health Indicator

Solace binders can report health statuses via the Spring Boot Actuator health endpoint. To enable this feature, add Spring Boot Actuator to the classpath. To manually disable this feature, set management.health.binders.enabled=false.

Health Status Description

UP

Status indicating that the binder is functioning as expected.

RECONNECTING

Status indicating that the binder is actively trying to reconnect to the message broker.

This is a custom health status. It isn’t included in the health severity order list (management.endpoint.health.status.order) and returns the default HTTP status code of 200. To customize these, see Writing Custom HealthIndicators.

DOWN

Status indicating that the binder has suffered an unexpected failure. For instance, the binder may have exhausted all reconnection attempts. User intervention is likely required.

Solace Binder Metrics

Leveraging Spring Metrics, the Solace PubSub+ binder exposes the following metrics:

Name Type Tags Description

solace.message.size.payload

DistributionSummary

Base Units: bytes

  • name: <bindingName>

Message payload size.

This is the payload size of the messages received (if name is a consumer binding) or published (if name is a producer binding) from/to a PubSub+ broker.

solace.message.size.total

DistributionSummary

Base Units: bytes

  • name: <bindingName>

Total message size.

This is the total size of the messages received (if name is a consumer binding) or published (if name is a producer binding) from/to a PubSub+ broker.

Resources

For more information about Spring Cloud Streams try these resources:

For more information about Solace technology in general please visit these resources: