Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create basic happy and sad path tests for broker connections, #326. #327

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 79 additions & 45 deletions wres-events/src/wres/events/broker/BrokerConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ public class BrokerConnectionFactory implements Supplier<Connection>
/** The maximum number of times a message can be resent on failure. */
private final Integer maximumMessageRetries;

/** Context that maps JMS objects to names. */
private final Context context;

/** A connection factory. */
private final ConnectionFactory connectionFactory;

/** The connection properties. */
private final Properties properties;

/**
* <p>Returns an instance of a factory, which is created with supplied properties and a default number of message
* retries, {@link #DEFAULT_MAXIMUM_MESSAGE_RETRIES}.
Expand Down Expand Up @@ -120,7 +120,18 @@ public Destination getDestination( String name ) throws NamingException
{
Objects.requireNonNull( name );

return ( Destination ) this.context.lookup( name );
String connectionPropertyName = BrokerUtilities.getConnectionPropertyName( this.properties );
BrokerUtilities.testConnectionProperty( connectionPropertyName, this.properties );
String connectionUrl = this.properties.getProperty( connectionPropertyName );
Context context = BrokerConnectionFactory.getContextFromProperties( connectionUrl, this.properties );
try
{
return ( Destination ) context.lookup( name );
}
finally
{
BrokerConnectionFactory.closeContext( context );
}
}

/**
Expand Down Expand Up @@ -212,26 +223,55 @@ static void testConnection( Properties properties, int retries )
}
finally
{
// Close any connection that succeeded
if ( Objects.nonNull( connection ) )
{
LOGGER.info( "Established a connection to an AMQP message broker at binding URL: {}.",
connectionUrl );

try
{
connection.close();
}
catch ( JMSException e )
{
LOGGER.error( "Failed to close an attempted connection during the instantiation of a "
+ "connection factory." );
}
}
BrokerConnectionFactory.closeConnection( connection, connectionUrl );
BrokerConnectionFactory.closeContext( localContext );
}
}
}

/**
* Closes a connection.
*
* @param connection the connection, possibly null
* @param connectionUrl the connection URL for logging
*/
private static void closeConnection( Connection connection, String connectionUrl )
{
// Close any connection that succeeded
if ( Objects.nonNull( connection ) )
{
LOGGER.info( "Established a connection to an AMQP message broker at binding URL: {}.",
connectionUrl );

try
{
connection.close();
}
catch ( JMSException e )
{
LOGGER.error( "Failed to close an attempted connection during the instantiation of a "
+ "connection factory. The correction URL is: {}.", connectionUrl );
}
}
}

/**
* Closes the supplied context.
* @param context the context
*/

private static void closeContext( Context context )
{
try
{
context.close();
}
catch ( NamingException e )
{
LOGGER.error( "Failed to close the context for a connection." );
}
}

/**
* Returns the context from the properties.
* @param connectionUrl the connection string to help with exception messaging
Expand Down Expand Up @@ -313,41 +353,35 @@ private BrokerConnectionFactory( Properties properties,
}

this.maximumMessageRetries = maximumMessageRetries;
this.properties = properties;

// The connection property name
String connectionPropertyName = BrokerUtilities.getConnectionPropertyName( properties );

// Set any variables that depend on the (possibly adjusted) properties
try
{
LOGGER.debug( "Creating a connection factory with these properties: {}.", properties );

// Test
String connectionString = properties.getProperty( connectionPropertyName );
this.context = new InitialContext( properties );
this.connectionFactory = BrokerConnectionFactory.getConnectionFactory( connectionString,
this.context,
connectionPropertyName );
LOGGER.debug( "Creating a connection factory with these properties: {}.", properties );

LOGGER.debug( "Testing the connection property {} with corresponding connection string {}.",
connectionPropertyName,
connectionString );
// Test
String connectionString = properties.getProperty( connectionPropertyName );
Context context = BrokerConnectionFactory.getContextFromProperties( connectionString, properties );
this.connectionFactory = BrokerConnectionFactory.getConnectionFactory( connectionString,
context,
connectionPropertyName );

BrokerConnectionFactory.testConnection( properties,
BrokerConnectionFactory.MAXIMUM_CONNECTION_RETRIES );
LOGGER.debug( "Testing the connection property {} with corresponding connection string {}.",
connectionPropertyName,
connectionString );

if ( LOGGER.isInfoEnabled() )
{
LOGGER.info( "Created a broker connection factory {} with name {} and binding URL {}.",
this,
connectionPropertyName,
properties.getProperty( connectionPropertyName ) );
}
}
catch ( NamingException e )
BrokerConnectionFactory.testConnection( properties,
BrokerConnectionFactory.MAXIMUM_CONNECTION_RETRIES );

if ( LOGGER.isInfoEnabled() )
{
throw new CouldNotLoadBrokerConfigurationException( "Unable to load the expected broker configuration.",
e );
LOGGER.info( "Created a broker connection factory {} with name {} and binding URL {}.",
this,
connectionPropertyName,
properties.getProperty( connectionPropertyName ) );
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package wres.events.broker;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
Expand Down Expand Up @@ -38,17 +39,17 @@ class BrokerConnectionFactoryTest
void testMessageRouting() throws IOException, NamingException, JMSException, InterruptedException
{
Properties properties = BrokerUtilities.getBrokerConnectionProperties( "eventbroker.properties" );

// Create and start the broker, clean up on completion
try ( EmbeddedBroker ignored = EmbeddedBroker.of( properties, true ) )
try ( EmbeddedBroker ignored = EmbeddedBroker.of( properties, true ) )
{
BrokerConnectionFactory factory = BrokerConnectionFactory.of( properties, 2 );
Topic evaluationTopic = (Topic) factory.getDestination( "evaluation" );
Topic evaluationStatusTopic = (Topic) factory.getDestination( "status" );
Topic statisticsTopic = (Topic) factory.getDestination( "statistics" );
Topic evaluationTopic = ( Topic ) factory.getDestination( "evaluation" );
Topic evaluationStatusTopic = ( Topic ) factory.getDestination( "status" );
Topic statisticsTopic = ( Topic ) factory.getDestination( "statistics" );

String evaluationId = "1234567";

// Application/JMS-level message selection based on correlation id
String messageSelector = "JMSCorrelationID = '" + evaluationId + "'";

Expand All @@ -69,8 +70,8 @@ void testMessageRouting() throws IOException, NamingException, JMSException, Int

// Listen for evaluation messages
MessageListener evaluationListener = message -> {
TextMessage textMessage = (TextMessage) message;
TextMessage textMessage = ( TextMessage ) message;

try
{
assertEquals( "I am an evaluation message!", textMessage.getText() );
Expand All @@ -79,15 +80,15 @@ void testMessageRouting() throws IOException, NamingException, JMSException, Int
{
throw new IllegalStateException( e );
}

LOGGER.info( "Received an evaluation message {}", textMessage );
evaluationConsumerCount.countDown();
};

// Listen for evaluation status messages
MessageListener evaluationStatusListener = message -> {
TextMessage textMessage = (TextMessage) message;
TextMessage textMessage = ( TextMessage ) message;

try
{
assertEquals( "I am an evaluation status message!", textMessage.getText() );
Expand All @@ -96,15 +97,15 @@ void testMessageRouting() throws IOException, NamingException, JMSException, Int
{
throw new IllegalStateException( e );
}

LOGGER.info( "Received an evaluation status message {}", textMessage );
evaluationStatusConsumerCount.countDown();
};

// Listen for statistics messages
MessageListener evaluationStatisticsListener = message -> {
TextMessage textMessage = (TextMessage) message;
TextMessage textMessage = ( TextMessage ) message;

try
{
assertEquals( "I am a statistics message!", textMessage.getText() );
Expand All @@ -113,7 +114,7 @@ void testMessageRouting() throws IOException, NamingException, JMSException, Int
{
throw new IllegalStateException( e );
}

LOGGER.info( "Received a statistics message {}", textMessage );
statisticsConsumerCount.countDown();
};
Expand Down Expand Up @@ -148,5 +149,22 @@ void testMessageRouting() throws IOException, NamingException, JMSException, Int
}
}

@Test
void testConnectionSucceedsWhenPropertiesAreCorrect()
{
Properties properties = BrokerUtilities.getBrokerConnectionProperties( "eventbroker.properties" );
assertThrows( BrokerConnectionException.class, () -> BrokerConnectionFactory.testConnection( properties, 0 ) );
}

@Test
void testConnectionFailsWhenPropertiesAreIncorrect()
{
Properties properties = BrokerUtilities.getBrokerConnectionProperties( "eventbroker.properties" );

String connectionPropertyName = BrokerUtilities.getConnectionPropertyName( properties );
// Replace resolved property with url/port that is guaranteed not to have a broker running
properties.put( connectionPropertyName, "amqp://localhost:-1" );
assertThrows( BrokerConnectionException.class, () -> BrokerConnectionFactory.testConnection( properties, 0 ) );
}

}
Loading