diff --git a/controllers/activemqartemis_work_queue_test.go b/controllers/activemqartemis_work_queue_test.go index 2f49c7930..1d19d94ba 100644 --- a/controllers/activemqartemis_work_queue_test.go +++ b/controllers/activemqartemis_work_queue_test.go @@ -112,7 +112,6 @@ var _ = Describe("work queue", func() { Expect(k8sClient.Create(ctx, jaasSecret)).Should(Succeed()) brokerCrd.Spec.DeploymentPlan.ExtraMounts.Secrets = []string{jaasSecret.Name} - // org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationQueueConsumer By("deploying custom logging") loggingConfigMapName := brokerCrd.Name + "-logging-config" loggingData := make(map[string]string) @@ -172,16 +171,13 @@ var _ = Describe("work queue", func() { "securityRoles.#.control-plane.consume=true", "securityRoles.#.control-plane.send=true", - // with properties update - can have this static with dns - "# federate the address, publish on N goes to [0..N]", + "# federate the queue in both directions", "broker-0.AMQPConnections.target.uri=tcp://${CR_NAME}-ss-1.${CR_NAME}-hdls-svc:61616", "broker-1.AMQPConnections.target.uri=tcp://${CR_NAME}-ss-0.${CR_NAME}-hdls-svc:61616", - // how to use TLS and sni here? "# speed up mesh formation", "AMQPConnections.target.retryInterval=1000", - // feature - service account "AMQPConnections.target.user=control-plane", "AMQPConnections.target.password=passwd", "AMQPConnections.target.autostart=true", @@ -211,23 +207,6 @@ var _ = Describe("work queue", func() { By("provisioning the broker") Expect(k8sClient.Create(ctx, &brokerCrd)).Should(Succeed()) - createdBrokerCrd := &brokerv1beta1.ActiveMQArtemis{} - createdBrokerCrdKey := types.NamespacedName{ - Name: brokerCrd.Name, - Namespace: defaultNamespace, - } - - By("verifying broker started") - Eventually(func(g Gomega) { - - g.Expect(k8sClient.Get(ctx, createdBrokerCrdKey, createdBrokerCrd)).Should(Succeed()) - if verbose { - fmt.Printf("\nStatus:%v", createdBrokerCrd.Status) - } - g.Expect(meta.IsStatusConditionTrue(createdBrokerCrd.Status.Conditions, brokerv1beta1.ReadyConditionType)).Should(BeTrue()) - - }, existingClusterTimeout*5, existingClusterInterval).Should(Succeed()) - By("provisioning loadbalanced service for this CR, for use within the cluster via dns") svc := &corev1.Service{ TypeMeta: metav1.TypeMeta{ @@ -240,7 +219,7 @@ var _ = Describe("work queue", func() { }, Spec: corev1.ServiceSpec{ Selector: map[string]string{ - selectors.LabelResourceKey: createdBrokerCrd.Name, + selectors.LabelResourceKey: brokerCrd.Name, }, Ports: []corev1.ServicePort{ { @@ -253,6 +232,36 @@ var _ = Describe("work queue", func() { Expect(k8sClient.Create(ctx, svc)).Should(Succeed()) + createdBrokerCrd := &brokerv1beta1.ActiveMQArtemis{} + createdBrokerCrdKey := types.NamespacedName{ + Name: brokerCrd.Name, + Namespace: defaultNamespace, + } + + By("verifying broker started") + Eventually(func(g Gomega) { + + g.Expect(k8sClient.Get(ctx, createdBrokerCrdKey, createdBrokerCrd)).Should(Succeed()) + if verbose { + fmt.Printf("\nStatus:%v", createdBrokerCrd.Status) + } + g.Expect(meta.IsStatusConditionTrue(createdBrokerCrd.Status.Conditions, brokerv1beta1.ReadyConditionType)).Should(BeTrue()) + + }, existingClusterTimeout*5, existingClusterInterval).Should(Succeed()) + + By("verifying out service has two endpoints so our consumers will get distributed") + Eventually(func(g Gomega) { + + endpoints := &corev1.Endpoints{} + g.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}, endpoints)).Should(Succeed()) + if verbose { + fmt.Printf("\nEndpoints:%v", endpoints.Subsets) + } + g.Expect(len(endpoints.Subsets)).Should(BeNumerically("==", 1)) + g.Expect(len(endpoints.Subsets[0].Addresses)).Should(BeNumerically("==", 2)) + + }, existingClusterTimeout*5, existingClusterInterval).Should(Succeed()) + By("provisioning an app, publisher and consumers, using the broker image to access the artemis client from within the cluster") deploymentTemplate := func(name string, replicas int32, command []string) appsv1.Deployment { appLables := map[string]string{"app": name} @@ -322,11 +331,11 @@ var _ = Describe("work queue", func() { lines := strings.Split(string(body), "\n") var done = false - if !routedNonZeroCheck && verbose { + if verbose { fmt.Printf("\nStart Metrics for JOBS on %v with Headers %v \n", ordinal, resp.Header) } for _, line := range lines { - if !routedNonZeroCheck && verbose { + if verbose { fmt.Printf("%s\n", line) }