From bc4b99c5749a1078251e3bed46b10e14dfed24f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=98=E5=A4=AE?= Date: Thu, 22 Mar 2018 11:03:38 +0800 Subject: [PATCH] Remove topic concepts from oms --- .../samples/consumer/PushConsumerApp.java | 19 ++++-- .../consumer/StreamingConsumerApp.java | 2 +- .../samples/producer/AnotherProducerApp.java | 36 +++++------ .../samples/producer/ProducerApp.java | 33 +++++----- openmessaging-api/README.md | 3 +- .../main/java/io/openmessaging/Message.java | 12 +--- .../java/io/openmessaging/MessageFactory.java | 13 ---- .../openmessaging/MessagingAccessPoint.java | 35 +++++----- .../io/openmessaging/ResourceManager.java | 64 +++---------------- .../io/openmessaging/observer/OMSEvent.java | 40 ------------ .../observer/OMSExceptionEvent.java | 27 -------- .../io/openmessaging/observer/Observer.java | 36 ----------- .../io/openmessaging/producer/Producer.java | 2 +- .../io/openmessaging/routing/Routing.java | 10 +-- 14 files changed, 82 insertions(+), 250 deletions(-) delete mode 100644 openmessaging-api/src/main/java/io/openmessaging/observer/OMSEvent.java delete mode 100644 openmessaging-api/src/main/java/io/openmessaging/observer/OMSExceptionEvent.java delete mode 100644 openmessaging-api/src/main/java/io/openmessaging/observer/Observer.java diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java index 51b28c08..68ec77e8 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java @@ -38,7 +38,7 @@ public static void main(String[] args) throws OMSResourceNotExistException { String simpleQueue = "HELLO_QUEUE"; resourceManager.createQueue("NS1", simpleQueue, OMS.newKeyValue()); - //This queue doesn't has a source topic, so only the message delivered to the queue directly can + //This queue doesn't has a source queue, so only the message delivered to the queue directly can //be consumed by this consumer. consumer.attachQueue(simpleQueue, new MessageListener() { @Override @@ -56,14 +56,23 @@ public void onReceived(Message message, Context context) { //Consume messages from a complex queue. final PushConsumer anotherConsumer = messagingAccessPoint.createPushConsumer(); { - String complexQueue = "QUEUE_HAS_SOURCE_TOPIC"; - String sourceTopic = "SOURCE_TOPIC"; + String complexQueue = "QUEUE_WITH_SOURCE_QUEUE"; + String sourceQueue = "SOURCE_QUEUE"; //Create the complex queue. resourceManager.createQueue("NS_01", complexQueue, OMS.newKeyValue()); - //Create the source topic. - resourceManager.createTopic("NS_01", sourceTopic, OMS.newKeyValue()); + //Create the source queue. + resourceManager.createQueue("NS_01", sourceQueue, OMS.newKeyValue()); + anotherConsumer.attachQueue(complexQueue, new MessageListener() { + @Override + public void onReceived(Message message, Context context) { + //The message sent to the sourceQueue will be delivered to anotherConsumer + System.out.println("Received one message: " + message); + context.ack(); + } + + }); } Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/StreamingConsumerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/StreamingConsumerApp.java index 745a402b..5a28dbf6 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/StreamingConsumerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/StreamingConsumerApp.java @@ -39,7 +39,7 @@ public static void main(String[] args) { } //Persist the consume offset. - messageIterator.commit(true); + messageIterator.commit(); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/AnotherProducerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/AnotherProducerApp.java index dbedbc28..02c39b03 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/AnotherProducerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/AnotherProducerApp.java @@ -19,8 +19,8 @@ import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; +import io.openmessaging.producer.BatchMessageSender; import io.openmessaging.producer.Producer; -import io.openmessaging.producer.SendResult; import java.nio.charset.Charset; public class AnotherProducerApp { @@ -35,32 +35,32 @@ public static void main(String[] args) { producer.startup(); System.out.println("Producer startup OK"); - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - producer.shutdown(); - messagingAccessPoint.shutdown(); - } - })); + BatchMessageSender batchMessageSender = producer.createBatchMessageSender(); - SendResult result = producer.send(producer.createTopicBytesMessage( - "HELLO_TOPIC1", "HELLO_BODY1".getBytes(Charset.forName("UTF-8")))); + batchMessageSender.send(producer.createQueueBytesMessage( + "HELLO_QUEUE1", "HELLO_BODY1".getBytes(Charset.forName("UTF-8")))); - System.out.println("Send first message to topic OK, message id is: " + result.messageId()); - - producer.send(producer.createTopicBytesMessage( - "HELLO_TOPIC2", "HELLO_BODY2".getBytes(Charset.forName("UTF-8"))) + batchMessageSender.send(producer.createQueueBytesMessage( + "HELLO_QUEUE2", "HELLO_BODY2".getBytes(Charset.forName("UTF-8"))) .putUserHeaders("KEY1", 100) .putUserHeaders("KEY2", 200L) .putUserHeaders("KEY3", 3.14) .putUserHeaders("KEY4", "value4") ); - System.out.println("Send second message to topic OK"); - - producer.send(producer.createQueueBytesMessage( + batchMessageSender.send(producer.createQueueBytesMessage( "HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")))); - System.out.println("send third message to queue OK"); + batchMessageSender.commit(); + + System.out.println("Send a batch of messages OK"); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + producer.shutdown(); + messagingAccessPoint.shutdown(); + } + })); } } \ No newline at end of file diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java index 4317bdc7..ea0b97d2 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java @@ -37,27 +37,18 @@ public static void main(String[] args) { producer.startup(); System.out.println("Producer startup OK"); - //Add a shutdown hook - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - producer.shutdown(); - messagingAccessPoint.shutdown(); - } - })); - //Sync { - SendResult sendResult = producer.send(producer.createTopicBytesMessage( - "HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + SendResult sendResult = producer.send(producer.createQueueBytesMessage( + "HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")))); System.out.println("Send sync message OK, message id is: " + sendResult.messageId()); } //Async with Promise { - final Future result = producer.sendAsync(producer.createTopicBytesMessage( - "HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + final Future result = producer.sendAsync(producer.createQueueBytesMessage( + "HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")))); final SendResult sendResult = result.get(3000L); System.out.println("Send async message OK, message id is: " + sendResult.messageId()); @@ -65,8 +56,8 @@ public void run() { //Async with FutureListener { - final Future result = producer.sendAsync(producer.createTopicBytesMessage( - "HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + final Future result = producer.sendAsync(producer.createQueueBytesMessage( + "HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")))); result.addListener(new FutureListener() { @Override @@ -83,10 +74,18 @@ public void operationFailed(Future promise) { //Oneway { - producer.sendOneway(producer.createTopicBytesMessage( - "HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + producer.sendOneway(producer.createQueueBytesMessage( + "HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")))); System.out.println("Send oneway message OK"); } + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + producer.shutdown(); + messagingAccessPoint.shutdown(); + } + })); } } diff --git a/openmessaging-api/README.md b/openmessaging-api/README.md index 3e9027ad..999ed6ac 100644 --- a/openmessaging-api/README.md +++ b/openmessaging-api/README.md @@ -1,10 +1,9 @@ ## Server Side * Namespace -* Topic * Queue * Stream * Routing - * Operator + * Expression ## Client Side * Producer diff --git a/openmessaging-api/src/main/java/io/openmessaging/Message.java b/openmessaging-api/src/main/java/io/openmessaging/Message.java index ce7db012..dc4931d8 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/Message.java +++ b/openmessaging-api/src/main/java/io/openmessaging/Message.java @@ -131,16 +131,6 @@ interface BuiltinKeys { */ String MessageId = "MessageId"; - /** - * The {@code Topic} header field is the destination which the message is being sent. - *

- * When a message is sent this value is should be set properly. - *

- * When a message is received, its {@code Topic} value must be equivalent to the - * value assigned when it was sent. - */ - String Topic = "Topic"; - /** * The {@code Queue} header field is the destination which the message is being sent. *

@@ -284,6 +274,6 @@ interface BuiltinKeys { */ String RetryReason = "RetryReason"; - String Stream = "Stream"; + String StreamName = "StreamName"; } } \ No newline at end of file diff --git a/openmessaging-api/src/main/java/io/openmessaging/MessageFactory.java b/openmessaging-api/src/main/java/io/openmessaging/MessageFactory.java index 6745441f..be7b08df 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/MessageFactory.java +++ b/openmessaging-api/src/main/java/io/openmessaging/MessageFactory.java @@ -24,19 +24,6 @@ * */ public interface MessageFactory { - /** - * Creates a {@code BytesMessage} object. A {@code BytesMessage} object is used to send a message containing a - * stream of uninterpreted bytes. - *

- * The returned {@code BytesMessage} object only can be sent to the specified topic. - * - * @param topic the target topic to send - * @param body the body data for a message - * @return the created {@code BytesMessage} object - * @throws OMSRuntimeException if the OMS provider fails to create this message due to some internal error. - */ - BytesMessage createTopicBytesMessage(String topic, byte[] body); - /** * Creates a {@code BytesMessage} object. A {@code BytesMessage} object is used to send a message containing a * stream of uninterpreted bytes. diff --git a/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java b/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java index ca40667e..e184c448 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java +++ b/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java @@ -22,17 +22,16 @@ import io.openmessaging.consumer.PushConsumer; import io.openmessaging.consumer.StreamingConsumer; import io.openmessaging.exception.OMSRuntimeException; -import io.openmessaging.observer.Observer; import io.openmessaging.producer.Producer; import java.util.List; /** - * The {@code MessagingAccessPoint} obtained from {@link MessagingAccessPointFactory} is capable of creating {@code + * The {@code MessagingAccessPoint} obtained from {@link OMS} is capable of creating {@code * Producer}, {@code Consumer}, {@code ServiceEndPoint}, and so on.

For example: *

  * MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory.getMessagingAccessPoint("openmessaging:rocketmq://localhost:10911/namespace");
  * Producer producer = messagingAccessPoint.createProducer();
- * producer.send(producer.createTopicBytesMessage("HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+ * producer.send(producer.createQueueBytesMessage("HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
  * 
* * @version OMS 1.0 @@ -158,30 +157,30 @@ public interface MessagingAccessPoint extends ServiceLifecycle { ResourceManager getResourceManager(); /** - * Register an observer in an serviceEndPoint object. Whenever serviceEndPoint object publish or bind an service - * object, it will be notified to the list of observer object registered before + * Returns the {@code Producer} list created by the specified {@code MessagingAccessPoint} * - * @param observer observer event object to an serviceEndPoint object + * @return the producer list */ - void addObserver(Observer observer); + List producers(); /** - * Removes the given observer from the list of observer - *

- * If the given observer has not been previously registered (i.e. it was - * never added) then this method call is a no-op. If it had been previously - * added then it will be removed. If it had been added more than once, then - * only the first occurrence will be removed. + * Returns the {@code PushConsumer} list created by the specified {@code MessagingAccessPoint} * - * @param observer The observer to remove + * @return the push consumer list */ - void removeObserver(Observer observer); - - List producers(); - List pushConsumers(); + /** + * Returns the {@code StreamingConsumer} list created by the specified {@code MessagingAccessPoint} + * + * @return the streaming consumer list + */ List streamingConsumers(); + /** + * Returns the {@code PullConsumer} list created by the specified {@code MessagingAccessPoint} + * + * @return the pull consumer list + */ List pullConsumers(); } diff --git a/openmessaging-api/src/main/java/io/openmessaging/ResourceManager.java b/openmessaging-api/src/main/java/io/openmessaging/ResourceManager.java index a90ee143..caa02431 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/ResourceManager.java +++ b/openmessaging-api/src/main/java/io/openmessaging/ResourceManager.java @@ -23,14 +23,14 @@ /** * The {@code ResourceManager} is responsible for providing a unified interface of resource management, - * allows the user to manage the topic, queue, namespace resources. + * allows the user to manage the queue, namespace resources. *

* Create, fetch, update and destroy are the four basic functions of {@code ResourceManager}. *

* And the {@code ResourceManager} also supports fetch and update resource attributes dynamically. *

* The attributes of consumer and producer also are treated as {@code Resource}. {@code ResourceManager} - * allows the user to fetch producer and consumer list in a specified topic or queue, + * allows the user to fetch producer and consumer list in a specified queue, * and update their resource attributes dynamically. *

* {@link MessagingAccessPoint#getResourceManager()} is the unique method to obtain a {@code ResourceManager} @@ -85,54 +85,6 @@ public interface ResourceManager extends ServiceLifecycle { */ List listNamespaces(); - /** - * Creates a {@code Topic} resource in the specified namespace with some preset properties. - *

- * - * @param nsName the namespace name - * @param topicName the topic name - * @param attributes the preset properties - */ - void createTopic(String nsName, String topicName, KeyValue attributes); - - /** - * Updates the attributes of a specified topic, all the old attributes will be removed and apply the new - * attributes. - * - * @param nsName the namespace name - * @param topicName the topic name - * @param attributes the new attributes - * @throws OMSResourceNotExistException if the specified topic or namespace is not exists - */ - void setTopicAttributes(String nsName, String topicName, KeyValue attributes) throws OMSResourceNotExistException; - - /** - * Gets the attributes of a specified topic. - * - * @param nsName the namespace name - * @param topicName the topic name - * @return the attributes of namespace - * @throws OMSResourceNotExistException if the specified topic or namespace is not exists - */ - KeyValue getTopicAttributes(String nsName, String topicName) throws OMSResourceNotExistException; - - /** - * Deletes an existing topic resource. - * - * @param nsName the namespace of the existing topic - * @param topicName the topic needs to be deleted - * @throws OMSResourceNotExistException if the specified namespace is not exists - */ - void deleteTopic(String nsName, String topicName) throws OMSResourceNotExistException; - - /** - * Gets the topic list in specified namespace, return a empty list if the namespace is not exists - * - * @param nsName the namespace name - * @return the list of all topics - */ - List listTopics(String nsName); - /** * Creates a {@code Queue} resource in the specified namespace with some preset properties. *

@@ -148,9 +100,9 @@ public interface ResourceManager extends ServiceLifecycle { * attributes. * * @param nsName the namespace name - * @param queueName the topic name + * @param queueName the queue name * @param attributes the new attributes - * @throws OMSResourceNotExistException if the specified topic or namespace is not exists + * @throws OMSResourceNotExistException if the specified queue or namespace is not exists */ void setQueueAttributes(String nsName, String queueName, KeyValue attributes) throws OMSResourceNotExistException; @@ -158,9 +110,9 @@ public interface ResourceManager extends ServiceLifecycle { * Gets the attributes of a specified queue. * * @param nsName the namespace name - * @param queueName the topic name + * @param queueName the queue name * @return the attributes of namespace - * @throws OMSResourceNotExistException if the specified topic or namespace is not exists + * @throws OMSResourceNotExistException if the specified queue or namespace is not exists */ KeyValue getQueueAttributes(String nsName, String queueName) throws OMSResourceNotExistException; @@ -219,7 +171,7 @@ public interface ResourceManager extends ServiceLifecycle { void deleteRouting(String nsName, String routingName) throws OMSResourceNotExistException; /** - * Gets the routing list in specified namespace, return a empty list if the namespace is not exists + * Gets the routing list in specified namespace, return a empty list if the namespace is not exists. * * @param nsName the namespace name * @return the list of all routing @@ -227,7 +179,7 @@ public interface ResourceManager extends ServiceLifecycle { List listRoutings(String nsName); /** - * Message also is a resource, some headers of message can be updated by this method + * Updates some system headers of a message. * * @param messageId the id of message * @param headers the new headers diff --git a/openmessaging-api/src/main/java/io/openmessaging/observer/OMSEvent.java b/openmessaging-api/src/main/java/io/openmessaging/observer/OMSEvent.java deleted file mode 100644 index f619c88f..00000000 --- a/openmessaging-api/src/main/java/io/openmessaging/observer/OMSEvent.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.openmessaging.observer; - -/** - * Observable objects can trigger a {@code OMSEvent}, which will be handled in {@link Observer#onEvent(OMSEvent)} - * - * @version OMS 1.0 - * @since OMS 1.0 - */ -public interface OMSEvent { - /** - * Returns the specified {@code OMSEvent} - * - * @return the type of this event - */ - String type(); - - /** - * Returns the detail message to describe this event - * - * @return the detail message of this event - */ - String detailMessage(); -} diff --git a/openmessaging-api/src/main/java/io/openmessaging/observer/OMSExceptionEvent.java b/openmessaging-api/src/main/java/io/openmessaging/observer/OMSExceptionEvent.java deleted file mode 100644 index 48ca0e0e..00000000 --- a/openmessaging-api/src/main/java/io/openmessaging/observer/OMSExceptionEvent.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.openmessaging.observer; - -/** - * Special {@link OMSEvent} which is abnormal. - * - * @version OMS 1.0 - * @since OMS 1.0 - */ -public interface OMSExceptionEvent extends OMSEvent { -} diff --git a/openmessaging-api/src/main/java/io/openmessaging/observer/Observer.java b/openmessaging-api/src/main/java/io/openmessaging/observer/Observer.java deleted file mode 100644 index 7bd3e580..00000000 --- a/openmessaging-api/src/main/java/io/openmessaging/observer/Observer.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.openmessaging.observer; - -import io.openmessaging.MessagingAccessPoint; - -/** - * A {@code Observer} interface is used to observe the {@code OMSEvent} - * dispatches in observable objects, like {@link MessagingAccessPoint} - * - * @version OMS 1.0 - * @since OMS 1.0 - */ -public interface Observer { - /** - * Handler invoked upon arrival of a {@code OMSEvent} dispatch. - * - * @param event the specified event received - */ - void onEvent(OMSEvent event); -} diff --git a/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java b/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java index 9e54f748..ae0f9503 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java @@ -35,7 +35,7 @@ * of a {@code MessagingAccessPoint}. An instance of {@code Producer} is * created by calling the {@link MessagingAccessPoint#createProducer()} method. * It provides various {@code send} methods to send a message to a specified destination. - * A destination can be a {@link Message.BuiltinKeys#Topic} or a {@link Message.BuiltinKeys#Queue}. + * A destination should be a {@link Message.BuiltinKeys#Queue}. *

* {@link Producer#send(Message)} means send a message to destination synchronously, * the calling thread will block until the send request complete. diff --git a/openmessaging-api/src/main/java/io/openmessaging/routing/Routing.java b/openmessaging-api/src/main/java/io/openmessaging/routing/Routing.java index 4e247447..41647d17 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/routing/Routing.java +++ b/openmessaging-api/src/main/java/io/openmessaging/routing/Routing.java @@ -21,7 +21,7 @@ import io.openmessaging.ResourceManager; /** - * A {@code Routing} object is responsible for routing the messages from {@code Topic} to {@code Queue}, with + * A {@code Routing} object is responsible for routing the messages from a queue to another queue, with * an expression to filter or compute the source messages. * * @version OMS 1.0 @@ -38,16 +38,16 @@ public interface Routing { KeyValue properties(); /** - * The routing source, naming scheme is {@literal TOPIC::}. - * Messages sent to the source topic will be routing to the destination Queue. + * The routing source, naming scheme is {@literal QUEUE::}. + * Messages sent to the source queue will be routing to the destination Queue. * - * @return the source topic + * @return the source queue */ String source(); /** * The routing destination, naming scheme is {@literal QUEUE::} - * Messages sent to the source topic will be routing to this Queue. + * Messages sent to the source queue will be routing to this Queue. * * @return the destination queue */