Skip to content

Commit

Permalink
[NO-ISSUE] harden queue federation test by verifying presence of endp…
Browse files Browse the repository at this point in the history
…oints before deploying consumers to ensure distribution
  • Loading branch information
gtully committed May 30, 2024
1 parent 0434b9c commit 51a73b4
Showing 1 changed file with 34 additions and 25 deletions.
59 changes: 34 additions & 25 deletions controllers/activemqartemis_work_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ var _ = Describe("work queue", func() {
Expect(k8sClient.Create(ctx, jaasSecret)).Should(Succeed())
brokerCrd.Spec.DeploymentPlan.ExtraMounts.Secrets = []string{jaasSecret.Name}

// org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationQueueConsumer
By("deploying custom logging")
loggingConfigMapName := brokerCrd.Name + "-logging-config"
loggingData := make(map[string]string)
Expand Down Expand Up @@ -172,16 +171,13 @@ var _ = Describe("work queue", func() {
"securityRoles.#.control-plane.consume=true",
"securityRoles.#.control-plane.send=true",

// with properties update - can have this static with dns
"# federate the address, publish on N goes to [0..N]",
"# federate the queue in both directions",
"broker-0.AMQPConnections.target.uri=tcp://${CR_NAME}-ss-1.${CR_NAME}-hdls-svc:61616",
"broker-1.AMQPConnections.target.uri=tcp://${CR_NAME}-ss-0.${CR_NAME}-hdls-svc:61616",
// how to use TLS and sni here?

"# speed up mesh formation",
"AMQPConnections.target.retryInterval=1000",

// feature - service account
"AMQPConnections.target.user=control-plane",
"AMQPConnections.target.password=passwd",
"AMQPConnections.target.autostart=true",
Expand Down Expand Up @@ -211,23 +207,6 @@ var _ = Describe("work queue", func() {
By("provisioning the broker")
Expect(k8sClient.Create(ctx, &brokerCrd)).Should(Succeed())

createdBrokerCrd := &brokerv1beta1.ActiveMQArtemis{}
createdBrokerCrdKey := types.NamespacedName{
Name: brokerCrd.Name,
Namespace: defaultNamespace,
}

By("verifying broker started")
Eventually(func(g Gomega) {

g.Expect(k8sClient.Get(ctx, createdBrokerCrdKey, createdBrokerCrd)).Should(Succeed())
if verbose {
fmt.Printf("\nStatus:%v", createdBrokerCrd.Status)
}
g.Expect(meta.IsStatusConditionTrue(createdBrokerCrd.Status.Conditions, brokerv1beta1.ReadyConditionType)).Should(BeTrue())

}, existingClusterTimeout*5, existingClusterInterval).Should(Succeed())

By("provisioning loadbalanced service for this CR, for use within the cluster via dns")
svc := &corev1.Service{
TypeMeta: metav1.TypeMeta{
Expand All @@ -240,7 +219,7 @@ var _ = Describe("work queue", func() {
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
selectors.LabelResourceKey: createdBrokerCrd.Name,
selectors.LabelResourceKey: brokerCrd.Name,
},
Ports: []corev1.ServicePort{
{
Expand All @@ -253,6 +232,36 @@ var _ = Describe("work queue", func() {

Expect(k8sClient.Create(ctx, svc)).Should(Succeed())

createdBrokerCrd := &brokerv1beta1.ActiveMQArtemis{}
createdBrokerCrdKey := types.NamespacedName{
Name: brokerCrd.Name,
Namespace: defaultNamespace,
}

By("verifying broker started")
Eventually(func(g Gomega) {

g.Expect(k8sClient.Get(ctx, createdBrokerCrdKey, createdBrokerCrd)).Should(Succeed())
if verbose {
fmt.Printf("\nStatus:%v", createdBrokerCrd.Status)
}
g.Expect(meta.IsStatusConditionTrue(createdBrokerCrd.Status.Conditions, brokerv1beta1.ReadyConditionType)).Should(BeTrue())

}, existingClusterTimeout*5, existingClusterInterval).Should(Succeed())

By("verifying out service has two endpoints so our consumers will get distributed")
Eventually(func(g Gomega) {

endpoints := &corev1.Endpoints{}
g.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}, endpoints)).Should(Succeed())
if verbose {
fmt.Printf("\nEndpoints:%v", endpoints.Subsets)
}
g.Expect(len(endpoints.Subsets)).Should(BeNumerically("==", 1))
g.Expect(len(endpoints.Subsets[0].Addresses)).Should(BeNumerically("==", 2))

}, existingClusterTimeout*5, existingClusterInterval).Should(Succeed())

By("provisioning an app, publisher and consumers, using the broker image to access the artemis client from within the cluster")
deploymentTemplate := func(name string, replicas int32, command []string) appsv1.Deployment {
appLables := map[string]string{"app": name}
Expand Down Expand Up @@ -322,11 +331,11 @@ var _ = Describe("work queue", func() {
lines := strings.Split(string(body), "\n")

var done = false
if !routedNonZeroCheck && verbose {
if verbose {
fmt.Printf("\nStart Metrics for JOBS on %v with Headers %v \n", ordinal, resp.Header)
}
for _, line := range lines {
if !routedNonZeroCheck && verbose {
if verbose {
fmt.Printf("%s\n", line)
}

Expand Down

0 comments on commit 51a73b4

Please sign in to comment.