diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh index 8329491111a9..1499155e96fd 100644 --- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh @@ -30,14 +30,17 @@ kubectl apply -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-cm.yml # init shenyu sync SYNC_ARRAY=("websocket" "http" "zookeeper" "etcd") #SYNC_ARRAY=("websocket" "nacos") -MIDDLEWARE_SYNC_ARRAY=("zookeeper" "etcd" "nacos") +MIDDLEWARE_SYNC_ARRAY=("etcd" "nacos") for sync in ${SYNC_ARRAY[@]}; do echo -e "------------------\n" kubectl apply -f "$SHENYU_TESTCASE_DIR"/k8s/shenyu-mysql.yml - + kubectl apply -f "$SHENYU_TESTCASE_DIR"/k8s/shenyu-zookeeper.yml kubectl apply -f "${PRGDIR}"/shenyu-rocketmq.yml - sleep 30s + # rely on zookeeper + kubectl apply -f "${PRGDIR}"/shenyu-kafka.yml + sleep 20s + echo "[Start ${sync} synchronous] create shenyu-admin-${sync}.yml shenyu-bootstrap-${sync}.yml shenyu-examples-springcloud.yml" # shellcheck disable=SC2199 # shellcheck disable=SC2076 @@ -69,10 +72,12 @@ for sync in ${SYNC_ARRAY[@]}; do exit 1 fi kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/shenyu-mysql.yml + kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/shenyu-zookeeper.yml kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-admin-"${sync}".yml kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml kubectl delete -f "${PRGDIR}"/shenyu-examples-http.yml kubectl delete -f "${PRGDIR}"/shenyu-rocketmq.yml + kubectl delete -f "${PRGDIR}"/shenyu-kafka.yml # shellcheck disable=SC2199 # shellcheck disable=SC2076 if [[ "${MIDDLEWARE_SYNC_ARRAY[@]}" =~ "${sync}" ]]; then diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/shenyu-kafka.yml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/shenyu-kafka.yml new file mode 100644 index 000000000000..a324c1b66797 --- /dev/null +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/shenyu-kafka.yml @@ -0,0 +1,66 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: shenyu-kafka + labels: + app: shenyu-kafka +spec: + replicas: 1 + selector: + matchLabels: + app: shenyu-kafka + strategy: {} + template: + metadata: + labels: + app: shenyu-kafka + spec: + containers: + - image: apache/kafka:latest + name: shenyu-kafka + ports: + - containerPort: 9092 + name: TCP + env: + - name: KAFKA_BROKER_ID + value: "0" + - name: KAFKA_ZOOKEEPER_CONNECT + value: "shenyu-zookeeper:2181/kafka" + - name: KAFKA_LISTENERS + value: "PLAINTEXT://:9092" + - name: KAFKA_ADVERTISED_LISTENERS + value: "PLAINTEXT://shenyu-kafka:9092" + restartPolicy: Always +status: {} + +--- +apiVersion: v1 +kind: Service +metadata: + name: shenyu-kafka + labels: + app: shenyu-kafka +spec: + type: NodePort + selector: + app: shenyu-kafka + ports: + - port: 9092 + targetPort: 9092 + name: TCP \ No newline at end of file diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml index 2a3141908e2b..6c7a5baba517 100644 --- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml @@ -32,5 +32,10 @@ rocketmq-client 4.9.3 + + org.apache.kafka + kafka-clients + 3.4.0 + diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java index 256aca6f1109..3ccc140b4514 100644 --- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java @@ -20,6 +20,10 @@ import com.google.common.collect.Lists; import io.restassured.http.Method; import org.apache.commons.collections.CollectionUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; @@ -37,7 +41,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; +import java.util.Collections; import java.util.List; +import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.shenyu.e2e.engine.scenario.function.HttpCheckers.exists; @@ -47,10 +54,14 @@ public class DividePluginCases implements ShenYuScenarioProvider { - private static final String NAMESERVER = "http://localhost:31876"; + private static final String ROCKETMQ_NAMESERVER = "http://localhost:31876"; + + private static final String KAFKA_BROKER = "localhost:9092"; private static final String CONSUMERGROUP = "shenyu-plugin-logging-rocketmq"; + private static final String KAFKA_CONSUMER = "shenyu-plugin-logging-kafka"; + private static final String TOPIC = "shenyu-access-logging"; private static final String TEST = "/http/order/findById?id=123"; @@ -61,7 +72,8 @@ public class DividePluginCases implements ShenYuScenarioProvider { public List get() { return Lists.newArrayList( testDivideHello(), - testRocketMQHello() + testRocketMQHello(), + testKafkaHello() ); } @@ -105,7 +117,7 @@ private ShenYuScenarioSpec testRocketMQHello() { Thread.sleep(1000 * 30); request.request(Method.GET, "/http/order/findById?id=23"); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMERGROUP); - consumer.setNamesrvAddr(NAMESERVER); + consumer.setNamesrvAddr(ROCKETMQ_NAMESERVER); consumer.subscribe(TOPIC, "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(final List msgs, final ConsumeConcurrentlyContext consumeConcurrentlyContext) { @@ -136,4 +148,65 @@ public ConsumeConcurrentlyStatus consumeMessage(final List msgs, fin // .deleteWaiting(notExists(TEST)).build()) .build(); } + + private ShenYuScenarioSpec testKafkaHello() { + return ShenYuScenarioSpec.builder() + .name("testKafkaHello") + .beforeEachSpec( + ShenYuBeforeEachSpec.builder() + .addSelectorAndRule( + newSelectorBuilder("selector", Plugin.LOGGING_ROCKETMQ) + .name("1") + .matchMode(MatchMode.OR) + .conditionList(newConditions(Condition.ParamType.URI, Condition.Operator.STARTS_WITH, "/http")) + .build(), + newRuleBuilder("rule") + .name("1") + .matchMode(MatchMode.OR) + .conditionList(newConditions(Condition.ParamType.URI, Condition.Operator.STARTS_WITH, "/http")) + .build() + ) + .checker(exists(TEST)) + .build() + ) + .caseSpec( + ShenYuCaseSpec.builder() + .add(request -> { + AtomicBoolean isLog = new AtomicBoolean(false); + try { + Thread.sleep(1000 * 30); + request.request(Method.GET, "/http/order/findById?id=23"); + KafkaConsumer consumer = defaultKafkaConsumer(); + consumer.subscribe(Collections.singletonList(TOPIC)); + LOG.info("kafka consumer start, isLog: isLog.get():{}", isLog.get()); + ConsumerRecords records = consumer.poll(Duration.ofSeconds(10)); + LOG.info("kafka consumer fetch count: {}", records.count()); + records.forEach(record -> { + String value = record.value(); + LOG.info("kafka msg:{}", value); + if (value.contains("/http/order/findById?id=23")) { + isLog.set(true); + } + }); + consumer.commitSync(); + LOG.info("isLog.get():{}", isLog.get()); + Assertions.assertTrue(isLog.get()); + } catch (Exception e) { + LOG.error("error", e); + Assertions.assertTrue(isLog.get()); + } + }) + .build() + ) + .build(); + } + + private KafkaConsumer defaultKafkaConsumer() { + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_CONSUMER); + return new KafkaConsumer<>(consumerProperties); + } } diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java index 309ff66bc58e..b6104e5d35eb 100644 --- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java +++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java @@ -101,6 +101,11 @@ void setup(final AdminClient adminClient, final GatewayClient gatewayClient) thr WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllMetaData, gatewayClient::getMetaDataCache, adminClient); WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllRules, gatewayClient::getRuleCache, adminClient); LOG.info("start loggingRocketMQ plugin"); + enableLoggingRocketMQPlugin(adminClient, gatewayClient); + enableLoggingKafkaPlugin(adminClient, gatewayClient); + } + + private void enableLoggingRocketMQPlugin(AdminClient adminClient, GatewayClient gatewayClient) throws Exception { MultiValueMap formData = new LinkedMultiValueMap<>(); formData.add("id", "29"); formData.add("name", "loggingRocketMQ"); @@ -112,6 +117,18 @@ void setup(final AdminClient adminClient, final GatewayClient gatewayClient) thr WaitDataSync.waitGatewayPluginUse(gatewayClient, "org.apache.shenyu.plugin.logging.rocketmq"); } + private void enableLoggingKafkaPlugin(AdminClient adminClient, GatewayClient gatewayClient) throws Exception { + MultiValueMap formData = new LinkedMultiValueMap<>(); + formData.add("id", "33"); + formData.add("name", "loggingKafka"); + formData.add("enabled", "true"); + formData.add("role", "Logging"); + formData.add("sort", "180"); + formData.add("config", "{\"topic\":\"shenyu-access-logging\", \"namesrvAddr\": \"localhost:9092\"}"); + adminClient.changePluginStatus("29", formData); + WaitDataSync.waitGatewayPluginUse(gatewayClient, "org.apache.shenyu.plugin.logging.kafka"); + } + @ShenYuScenario(provider = DividePluginCases.class) void testDivide(final GatewayClient gateway, final CaseSpec spec) { spec.getVerifiers().forEach(verifier -> verifier.verify(gateway.getHttpRequesterSupplier().get()));