Skip to content

Commit

Permalink
101 features: msg properties, msg key, timer msg, trans msg.
Browse files Browse the repository at this point in the history
  • Loading branch information
aliyunmq committed Jun 10, 2019
1 parent 41d890c commit 71e438b
Show file tree
Hide file tree
Showing 14 changed files with 328 additions and 33 deletions.
9 changes: 9 additions & 0 deletions MQ/Constants.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class Constants
const ERRORS = "Errors";
const MESSAGE_BODY = "MessageBody";
const MESSAGE_TAG = "MessageTag";
const MESSAGE_PROPERTIES = "Properties";
const MESSAGE_ID = "MessageId";
const MESSAGE_BODY_MD5 = "MessageBodyMD5";
const PUBLISH_TIME = "PublishTime";
Expand All @@ -42,6 +43,14 @@ class Constants
const ACK_FAIL = "AckFail";

const TOPIC_NOT_EXIST = "TopicNotExist";

const MESSAGE_PROPERTIES_MSG_KEY = "KEYS";
const MESSAGE_PROPERTIES_TRANS_CHECK_KEY = "__TransCheckT";
const MESSAGE_PROPERTIES_TIMER_KEY = "__STARTDELIVERTIME";

const TRANSACTION_ROLLBACK = "rollback";
const TRANSACTION_COMMIT = "commit";
const TRANSACTION_POP = "pop";
}

?>
2 changes: 1 addition & 1 deletion MQ/Http/HttpClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public function __construct($endPoint, $accessId,
$this->connectTimeout = $config->getConnectTimeout();
$this->securityToken = $securityToken;
$this->endpoint = $endPoint;
$this->agent = "mq-php-sdk/1.0.0(GuzzleHttp/" . \GuzzleHttp\Client::VERSION . " PHP/" . PHP_VERSION . ")";
$this->agent = "mq-php-sdk/1.0.1(GuzzleHttp/" . \GuzzleHttp\Client::VERSION . " PHP/" . PHP_VERSION . ")";
}

private function addRequiredHeaders(BaseRequest &$request)
Expand Down
21 changes: 19 additions & 2 deletions MQ/MQClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public function __construct($endPoint, $accessId,
* @param string $instanceId: instance id
* @param string $topicName: the topic name
*
* @return MQProducer $topic: the Producer instance
* @return MQProducer: the Producer instance
*/
public function getProducer($instanceId, $topicName)
{
Expand All @@ -51,6 +51,23 @@ public function getProducer($instanceId, $topicName)
return new MQProducer($this->client, $instanceId, $topicName);
}

/**
* Returns a Transaction Producer reference for publish message to topic
*
* @param string $instanceId: instance id
* @param string $topicName: the topic name
* @param string $groupId: the group id
*
* @return MQTransProducer: the Transaction Producer instance
*/
public function getTransProducer($instanceId, $topicName, $groupId)
{
if ($topicName == NULL || $topicName == "") {
throw new InvalidArgumentException(400, "TopicName is null or empty");
}
return new MQTransProducer($this->client, $instanceId, $topicName, $groupId);
}

/**
* Returns a Consumer reference for consume and ack message to topic
*
Expand All @@ -59,7 +76,7 @@ public function getProducer($instanceId, $topicName)
* @param string $consumer: the consumer name / ons cid
* @param string $messageTag: filter tag for consumer. If not empty, only consume the message which's messageTag is equal to it.
*
* @return MQConsumer $topic: the Producer instance
* @return MQConsumer: the Consumer instance
*/
public function getConsumer($instanceId, $topicName, $consumer, $messageTag = NULL)
{
Expand Down
11 changes: 7 additions & 4 deletions MQ/MQProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@

class MQProducer
{
private $instanceId;
private $topicName;
private $client;
protected $instanceId;
protected $topicName;
protected $client;

function __construct(HttpClient $client, $instanceId = NULL, $topicName)
{
Expand All @@ -35,7 +35,10 @@ public function getTopicName()
public function publishMessage(TopicMessage $topicMessage)
{

$request = new PublishMessageRequest($this->instanceId, $this->topicName, $topicMessage->getMessageBody(), $topicMessage->getMessageTag());
$request = new PublishMessageRequest(
$this->instanceId, $this->topicName, $topicMessage->getMessageBody(),
$topicMessage->getProperties(), $topicMessage->getMessageTag()
);
$response = new PublishMessageResponse();
return $this->client->sendRequest($request, $response);
}
Expand Down
99 changes: 99 additions & 0 deletions MQ/MQTransProducer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
<?php
namespace MQ;

use MQ\Exception\InvalidArgumentException;
use MQ\Http\HttpClient;
use MQ\Requests\ConsumeMessageRequest;
use MQ\Requests\AckMessageRequest;
use MQ\Responses\AckMessageResponse;
use MQ\Responses\ConsumeMessageResponse;

class MQTransProducer extends MQProducer
{
private $groupId;

function __construct(HttpClient $client, $instanceId = NULL, $topicName, $groupId)
{
if (empty($groupId)) {
throw new InvalidArgumentException(400, "GroupId is null");
}
parent::__construct($client, $instanceId, $topicName);
$this->groupId = $groupId;
}

/**
* consume transaction half message
*
* @param $numOfMessages: consume how many messages once, 1~16
* @param $waitSeconds: if > 0, means the time(second) the request holden at server if there is no message to consume.
* If <= 0, means the server will response back if there is no message to consume.
* It's value should be 1~30
*
* @return Message
*
* @throws TopicNotExistException if queue does not exist
* @throws MessageNotExistException if no message exists
* @throws InvalidArgumentException if the argument is invalid
* @throws MQException if any other exception happends
*/
public function consumeHalfMessage($numOfMessages, $waitSeconds = -1)
{
if ($numOfMessages < 0 || $numOfMessages > 16) {
throw new InvalidArgumentException(400, "numOfMessages should be 1~16");
}
if ($waitSeconds > 30) {
throw new InvalidArgumentException(400, "numOfMessages should less then 30");
}
$request = new ConsumeMessageRequest($this->instanceId, $this->topicName, $this->groupId, $numOfMessages, $this->messageTag, $waitSeconds);
$request->setTrans(Constants::TRANSACTION_POP);
$response = new ConsumeMessageResponse();
return $this->client->sendRequest($request, $response);
}

/**
* commit transaction message
*
* @param $receiptHandle:
* $receiptHandle, which is got from consumeHalfMessage or publishMessage
*
* @return AckMessageResponse
*
* @throws TopicNotExistException if queue does not exist
* @throws ReceiptHandleErrorException if the receiptHandle is invalid
* @throws InvalidArgumentException if the argument is invalid
* @throws AckMessageException if any message not deleted
* @throws MQException if any other exception happends
*/
public function commit($receiptHandle)
{
$request = new AckMessageRequest($this->instanceId, $this->topicName, $this->groupId, array($receiptHandle));
$request->setTrans(Constants::TRANSACTION_COMMIT);
$response = new AckMessageResponse();
return $this->client->sendRequest($request, $response);
}


/**
* rollback transaction message
*
* @param $receiptHandle:
* $receiptHandle, which is got from consumeHalfMessage or publishMessage
*
* @return AckMessageResponse
*
* @throws TopicNotExistException if queue does not exist
* @throws ReceiptHandleErrorException if the receiptHandle is invalid
* @throws InvalidArgumentException if the argument is invalid
* @throws AckMessageException if any message not deleted
* @throws MQException if any other exception happends
*/
public function rollback($receiptHandle)
{
$request = new AckMessageRequest($this->instanceId, $this->topicName, $this->groupId, array($receiptHandle));
$request->setTrans(Constants::TRANSACTION_ROLLBACK);
$response = new AckMessageResponse();
return $this->client->sendRequest($request, $response);
}
}

?>
31 changes: 28 additions & 3 deletions MQ/Model/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ class Message
{
use MessagePropertiesForConsume;

public function __construct($messageId, $messageBodyMD5, $messageBody, $publishTime, $nextConsumeTime, $firstConsumeTime, $consumedTimes, $receiptHandle, $messageTag)
public function __construct($messageId, $messageBodyMD5, $messageBody, $publishTime, $nextConsumeTime,
$firstConsumeTime, $consumedTimes, $receiptHandle, $messageTag, $properties)
{
$this->messageId = $messageId;
$this->messageBodyMD5 = $messageBodyMD5;
Expand All @@ -19,6 +20,7 @@ public function __construct($messageId, $messageBodyMD5, $messageBody, $publishT
$this->consumedTimes = $consumedTimes;
$this->receiptHandle = $receiptHandle;
$this->messageTag = $messageTag;
$this->properties = $properties;
}

static public function fromXML(\XMLReader $xmlReader)
Expand All @@ -32,6 +34,7 @@ static public function fromXML(\XMLReader $xmlReader)
$consumedTimes = NULL;
$receiptHandle = NULL;
$messageTag = NULL;
$properties = NULL;

while ($xmlReader->read())
{
Expand Down Expand Up @@ -102,6 +105,24 @@ static public function fromXML(\XMLReader $xmlReader)
$messageTag = $xmlReader->value;
}
break;
case Constants::MESSAGE_PROPERTIES:
$xmlReader->read();
if ($xmlReader->nodeType == \XMLReader::TEXT)
{
$propertiesString = $xmlReader->value;
if ($propertiesString != NULL)
{
$kvArray = explode("|", $propertiesString);
foreach ($kvArray as $kv)
{
$kAndV = explode(":", $kv);
if (sizeof($kAndV) == 2)
{
$properties[$kAndV[0]] = $kAndV[1];
}
}
}
}
break;
}
break;
Expand All @@ -117,7 +138,9 @@ static public function fromXML(\XMLReader $xmlReader)
$firstConsumeTime,
$consumedTimes,
$receiptHandle,
$messageTag);
$messageTag,
$properties
);
return $message;
}
break;
Expand All @@ -133,7 +156,9 @@ static public function fromXML(\XMLReader $xmlReader)
$firstConsumeTime,
$consumedTimes,
$receiptHandle,
$messageTag);
$messageTag,
$properties
);

return $message;
}
Expand Down
41 changes: 41 additions & 0 deletions MQ/Model/TopicMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace MQ\Model;


use MQ\Constants;
use MQ\Traits\MessagePropertiesForPublish;

class TopicMessage
Expand All @@ -14,4 +15,44 @@ public function __construct($messageBody)
{
$this->messageBody = $messageBody;
}

public function putProperty($key, $value)
{
if ($key === NULL || $value === NULL || $key === "" || $value === "")
{
return;
}
$this->properties[$key . ""] = $value . "";
}

/**
* 设置消息KEY,如果没有设置,则消息的KEY为RequestId
*
* @param $key
*/
public function setMessageKey($key)
{
$this->putProperty(Constants::MESSAGE_PROPERTIES_MSG_KEY, $key);
}

/**
* 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递。
* 如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者
*
* @param $timeInMillis
*/
public function setStartDeliverTime($timeInMillis)
{
$this->putProperty(Constants::MESSAGE_PROPERTIES_TIMER_KEY, $timeInMillis);
}

/**
* 在消息属性中添加第一次消息回查的最快时间,单位秒,并且表征这是一条事务消息
* 范围: 10~300
* @param $timeInSeconds
*/
public function setTransCheckImmunityTime($timeInSeconds)
{
$this->putProperty(Constants::MESSAGE_PROPERTIES_TRANS_CHECK_KEY, $timeInSeconds);
}
}
10 changes: 10 additions & 0 deletions MQ/Requests/AckMessageRequest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class AckMessageRequest extends BaseRequest
private $topicName;
private $receiptHandles;
private $consumer;
private $trans;

public function __construct($instanceId, $topicName, $consumer, array $receiptHandles)
{
Expand All @@ -33,6 +34,11 @@ public function getConsumer()
return $this->consumer;
}

function setTrans($trans)
{
$this->trans = $trans;
}

public function generateBody()
{
$xmlWriter = new \XMLWriter;
Expand All @@ -54,6 +60,10 @@ public function generateQueryString()
if ($this->instanceId != null && $this->instanceId != "") {
$params["ns"] = $this->instanceId;
}
if ($this->trans != NULL)
{
$params["trans"] = $this->trans;
}
return http_build_query($params);
}
}
Expand Down
16 changes: 14 additions & 2 deletions MQ/Requests/ConsumeMessageRequest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class ConsumeMessageRequest extends BaseRequest
private $messageTag;
private $numOfMessages;
private $waitSeconds;
private $trans;

public function __construct($instanceId, $topicName, $consumer, $numOfMessages, $messageTag = NULL, $waitSeconds = NULL)
{
Expand Down Expand Up @@ -66,20 +67,31 @@ public function generateBody()
return NULL;
}

function setTrans($trans)
{
$this->trans = $trans;
}

public function generateQueryString()
{
$params = array("numOfMessages" => $this->numOfMessages);
$params["consumer"] = $this->consumer;
if ($this->instanceId != null && $this->instanceId != "") {
if ($this->instanceId != NULL && $this->instanceId != "")
{
$params["ns"] = $this->instanceId;
}
if ($this->waitSeconds != NULL)
{
$params["waitseconds"] = $this->waitSeconds;
}
if ($this->messageTag != NULL) {
if ($this->messageTag != NULL)
{
$params["tag"] = $this->messageTag;
}
if ($this->trans != NULL)
{
$params["trans"] = $this->trans;
}
return http_build_query($params);
}
}
Expand Down
Loading

0 comments on commit 71e438b

Please sign in to comment.