Skip to content

Commit

Permalink
Remove topic concepts from oms
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouxinyu committed Mar 22, 2018
1 parent f4d80ea commit bc4b99c
Show file tree
Hide file tree
Showing 14 changed files with 82 additions and 250 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static void main(String[] args) throws OMSResourceNotExistException {
String simpleQueue = "HELLO_QUEUE";
resourceManager.createQueue("NS1", simpleQueue, OMS.newKeyValue());

//This queue doesn't has a source topic, so only the message delivered to the queue directly can
//This queue doesn't has a source queue, so only the message delivered to the queue directly can
//be consumed by this consumer.
consumer.attachQueue(simpleQueue, new MessageListener() {
@Override
Expand All @@ -56,14 +56,23 @@ public void onReceived(Message message, Context context) {
//Consume messages from a complex queue.
final PushConsumer anotherConsumer = messagingAccessPoint.createPushConsumer();
{
String complexQueue = "QUEUE_HAS_SOURCE_TOPIC";
String sourceTopic = "SOURCE_TOPIC";
String complexQueue = "QUEUE_WITH_SOURCE_QUEUE";
String sourceQueue = "SOURCE_QUEUE";

//Create the complex queue.
resourceManager.createQueue("NS_01", complexQueue, OMS.newKeyValue());
//Create the source topic.
resourceManager.createTopic("NS_01", sourceTopic, OMS.newKeyValue());
//Create the source queue.
resourceManager.createQueue("NS_01", sourceQueue, OMS.newKeyValue());

anotherConsumer.attachQueue(complexQueue, new MessageListener() {
@Override
public void onReceived(Message message, Context context) {
//The message sent to the sourceQueue will be delivered to anotherConsumer
System.out.println("Received one message: " + message);
context.ack();
}

});
}

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static void main(String[] args) {
}

//Persist the consume offset.
messageIterator.commit(true);
messageIterator.commit();

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.producer.BatchMessageSender;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;
import java.nio.charset.Charset;

public class AnotherProducerApp {
Expand All @@ -35,32 +35,32 @@ public static void main(String[] args) {
producer.startup();
System.out.println("Producer startup OK");

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
producer.shutdown();
messagingAccessPoint.shutdown();
}
}));
BatchMessageSender batchMessageSender = producer.createBatchMessageSender();

SendResult result = producer.send(producer.createTopicBytesMessage(
"HELLO_TOPIC1", "HELLO_BODY1".getBytes(Charset.forName("UTF-8"))));
batchMessageSender.send(producer.createQueueBytesMessage(
"HELLO_QUEUE1", "HELLO_BODY1".getBytes(Charset.forName("UTF-8"))));

System.out.println("Send first message to topic OK, message id is: " + result.messageId());

producer.send(producer.createTopicBytesMessage(
"HELLO_TOPIC2", "HELLO_BODY2".getBytes(Charset.forName("UTF-8")))
batchMessageSender.send(producer.createQueueBytesMessage(
"HELLO_QUEUE2", "HELLO_BODY2".getBytes(Charset.forName("UTF-8")))
.putUserHeaders("KEY1", 100)
.putUserHeaders("KEY2", 200L)
.putUserHeaders("KEY3", 3.14)
.putUserHeaders("KEY4", "value4")
);

System.out.println("Send second message to topic OK");

producer.send(producer.createQueueBytesMessage(
batchMessageSender.send(producer.createQueueBytesMessage(
"HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));

System.out.println("send third message to queue OK");
batchMessageSender.commit();

System.out.println("Send a batch of messages OK");

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
producer.shutdown();
messagingAccessPoint.shutdown();
}
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,36 +37,27 @@ public static void main(String[] args) {
producer.startup();
System.out.println("Producer startup OK");

//Add a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
producer.shutdown();
messagingAccessPoint.shutdown();
}
}));

//Sync
{
SendResult sendResult = producer.send(producer.createTopicBytesMessage(
"HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
SendResult sendResult = producer.send(producer.createQueueBytesMessage(
"HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));

System.out.println("Send sync message OK, message id is: " + sendResult.messageId());
}

//Async with Promise
{
final Future<SendResult> result = producer.sendAsync(producer.createTopicBytesMessage(
"HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
final Future<SendResult> result = producer.sendAsync(producer.createQueueBytesMessage(
"HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));

final SendResult sendResult = result.get(3000L);
System.out.println("Send async message OK, message id is: " + sendResult.messageId());
}

//Async with FutureListener
{
final Future<SendResult> result = producer.sendAsync(producer.createTopicBytesMessage(
"HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
final Future<SendResult> result = producer.sendAsync(producer.createQueueBytesMessage(
"HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));

result.addListener(new FutureListener<SendResult>() {
@Override
Expand All @@ -83,10 +74,18 @@ public void operationFailed(Future<SendResult> promise) {

//Oneway
{
producer.sendOneway(producer.createTopicBytesMessage(
"HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
producer.sendOneway(producer.createQueueBytesMessage(
"HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));

System.out.println("Send oneway message OK");
}

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
producer.shutdown();
messagingAccessPoint.shutdown();
}
}));
}
}
3 changes: 1 addition & 2 deletions openmessaging-api/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
## Server Side
* Namespace
* Topic
* Queue
* Stream
* Routing
* Operator
* Expression

## Client Side
* Producer
Expand Down
12 changes: 1 addition & 11 deletions openmessaging-api/src/main/java/io/openmessaging/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,6 @@ interface BuiltinKeys {
*/
String MessageId = "MessageId";

/**
* The {@code Topic} header field is the destination which the message is being sent.
* <p>
* When a message is sent this value is should be set properly.
* <p>
* When a message is received, its {@code Topic} value must be equivalent to the
* value assigned when it was sent.
*/
String Topic = "Topic";

/**
* The {@code Queue} header field is the destination which the message is being sent.
* <p>
Expand Down Expand Up @@ -284,6 +274,6 @@ interface BuiltinKeys {
*/
String RetryReason = "RetryReason";

String Stream = "Stream";
String StreamName = "StreamName";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,6 @@
*
*/
public interface MessageFactory {
/**
* Creates a {@code BytesMessage} object. A {@code BytesMessage} object is used to send a message containing a
* stream of uninterpreted bytes.
* <p>
* The returned {@code BytesMessage} object only can be sent to the specified topic.
*
* @param topic the target topic to send
* @param body the body data for a message
* @return the created {@code BytesMessage} object
* @throws OMSRuntimeException if the OMS provider fails to create this message due to some internal error.
*/
BytesMessage createTopicBytesMessage(String topic, byte[] body);

/**
* Creates a {@code BytesMessage} object. A {@code BytesMessage} object is used to send a message containing a
* stream of uninterpreted bytes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,16 @@
import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.consumer.StreamingConsumer;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.observer.Observer;
import io.openmessaging.producer.Producer;
import java.util.List;

/**
* The {@code MessagingAccessPoint} obtained from {@link MessagingAccessPointFactory} is capable of creating {@code
* The {@code MessagingAccessPoint} obtained from {@link OMS} is capable of creating {@code
* Producer}, {@code Consumer}, {@code ServiceEndPoint}, and so on. <p> For example:
* <pre>
* MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory.getMessagingAccessPoint("openmessaging:rocketmq://localhost:10911/namespace");
* Producer producer = messagingAccessPoint.createProducer();
* producer.send(producer.createTopicBytesMessage("HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
* producer.send(producer.createQueueBytesMessage("HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
* </pre>
*
* @version OMS 1.0
Expand Down Expand Up @@ -158,30 +157,30 @@ public interface MessagingAccessPoint extends ServiceLifecycle {
ResourceManager getResourceManager();

/**
* Register an observer in an serviceEndPoint object. Whenever serviceEndPoint object publish or bind an service
* object, it will be notified to the list of observer object registered before
* Returns the {@code Producer} list created by the specified {@code MessagingAccessPoint}
*
* @param observer observer event object to an serviceEndPoint object
* @return the producer list
*/
void addObserver(Observer observer);
List<Producer> producers();

/**
* Removes the given observer from the list of observer
* <p>
* If the given observer has not been previously registered (i.e. it was
* never added) then this method call is a no-op. If it had been previously
* added then it will be removed. If it had been added more than once, then
* only the first occurrence will be removed.
* Returns the {@code PushConsumer} list created by the specified {@code MessagingAccessPoint}
*
* @param observer The observer to remove
* @return the push consumer list
*/
void removeObserver(Observer observer);

List<Producer> producers();

List<PushConsumer> pushConsumers();

/**
* Returns the {@code StreamingConsumer} list created by the specified {@code MessagingAccessPoint}
*
* @return the streaming consumer list
*/
List<StreamingConsumer> streamingConsumers();

/**
* Returns the {@code PullConsumer} list created by the specified {@code MessagingAccessPoint}
*
* @return the pull consumer list
*/
List<PullConsumer> pullConsumers();
}
Loading

0 comments on commit bc4b99c

Please sign in to comment.