Skip to content

Commit

Permalink
refactor: 感谢最菜兄优化 bin/reboot.php,mine-core的amqp队列监听器移动到 App\System\…
Browse files Browse the repository at this point in the history
…Listener 下,升级mine-core
  • Loading branch information
kanyxmo committed Jul 27, 2023
1 parent 7eb2c31 commit b3362d9
Show file tree
Hide file tree
Showing 4 changed files with 364 additions and 23 deletions.
124 changes: 124 additions & 0 deletions app/System/Listener/QueueConsumeListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
<?php
/**
* 站内消息队列消费监听器
*/
declare(strict_types=1);
namespace App\System\Listener;

use Mine\Interfaces\ServiceInterface\QueueLogServiceInterface;
use Mine\Amqp\Event\AfterConsume;
use Mine\Amqp\Event\BeforeConsume;
use Mine\Amqp\Event\ConsumeEvent;
use Mine\Amqp\Event\FailToConsume;
use Mine\Amqp\Event\WaitTimeout;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Event\Annotation\Listener;

/**
* 消费队列监听
* Class QueueConsumeListener
* @package Mine\Amqp\Listener
*/
#[Listener]
class QueueConsumeListener implements ListenerInterface
{
/**
* @Message("未消费")
*/
const CONSUME_STATUS_NO = 1;
/**
* @Message("消费中")
*/
const CONSUME_STATUS_DOING = 2;
/**
* @Message("消费成功")
*/
const CONSUME_STATUS_SUCCESS = 3;
/**
* @Message("消费失败")
*/
const CONSUME_STATUS_FAIL = 4;
/**
* @Message("消费重复")
*/
const CONSUME_STATUS_REPEAT = 5;

private QueueLogServiceInterface $service;

public function listen(): array
{
// 返回一个该监听器要监听的事件数组,可以同时监听多个事件
return [
AfterConsume::class,
BeforeConsume::class,
ConsumeEvent::class,
FailToConsume::class,
WaitTimeout::class,
];
}

/**
* @param object $event
* @throws \Psr\Container\ContainerExceptionInterface
* @throws \Psr\Container\NotFoundExceptionInterface
*/
public function process(object $event): void
{
$this->service = container()->get(QueueLogServiceInterface::class);
if ($event->message) {
$class = get_class($event);
$func = lcfirst(trim(strrchr($class, '\\'),'\\'));
$this->$func($event);
}
}

/**
* Description:消费前
* User:mike
* @param object $event
*/
public function beforeConsume(object $event)
{
$this->service->update(
(int)$event->data['queue_id'],
['consume_status' => self::CONSUME_STATUS_DOING]
);
}

/**
* Description:消费中
* User:mike
* @param object $event
*/
public function consumeEvent(object $event)
{
// TODO...
}

/**
* Description:消费后
* User:mike
* @param object $event
*/
public function afterConsume(object $event)
{
$this->service->update(
(int)$event->data['queue_id'],
['consume_status' => self::CONSUME_STATUS_SUCCESS]
);
}

/**
* Description:消费失败
* User:mike
* @param object $event
*/
public function failToConsume(object $event)
{
$this->service->update(
(int)$event->data['queue_id'], [
'consume_status' => self::CONSUME_STATUS_REPEAT,
'log_content' => $event->throwable ?: $event->throwable->getMessage()
]);
}
}
153 changes: 153 additions & 0 deletions app/System/Listener/QueueProduceListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
<?php
/**
* 站内消息队列生产监听器
*/
declare(strict_types=1);
namespace App\System\Listener;

use App\System\Queue\Producer\MessageProducer;
use Mine\Interfaces\ServiceInterface\QueueMessageServiceInterface;
use Mine\Interfaces\ServiceInterface\QueueLogServiceInterface;
use Hyperf\Context\Context;
use Mine\Amqp\Event\AfterProduce;
use Mine\Amqp\Event\BeforeProduce;
use Mine\Amqp\Event\FailToProduce;
use Mine\Amqp\Event\ProduceEvent;
use Mine\Amqp\Event\WaitTimeout;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Event\Annotation\Listener;

/**
* 生产队列监听
* Class QueueProduceListener
* @package Mine\Amqp\Listener
*/
#[Listener]
class QueueProduceListener implements ListenerInterface
{
/**
* @Message("未生产")
*/
const PRODUCE_STATUS_WAITING = 1;
/**
* @Message("生产中")
*/
const PRODUCE_STATUS_DOING = 2;
/**
* @Message("生产成功")
*/
const PRODUCE_STATUS_SUCCESS = 3;
/**
* @Message("生产失败")
*/
const PRODUCE_STATUS_FAIL = 4;
/**
* @Message("生产重复")
*/
const PRODUCE_STATUS_REPEAT = 5;
private QueueLogServiceInterface $service;

public function listen(): array
{
// 返回一个该监听器要监听的事件数组,可以同时监听多个事件
return [
AfterProduce::class,
BeforeProduce::class,
ProduceEvent::class,
FailToProduce::class,
WaitTimeout::class,
];
}

/**
* @param object $event
* @throws \Psr\Container\ContainerExceptionInterface
* @throws \Psr\Container\NotFoundExceptionInterface
* @throws \Exception
*/
public function process(object $event): void
{
$this->service = container()->get(QueueLogServiceInterface::class);
$class = get_class($event);
$func = lcfirst(trim(strrchr($class, '\\'),'\\'));
$this->$func($event);
}

/**
* Description:生产前
* User:mike, x.mo
* @param object $event
*/
public function beforeProduce(object $event)
{
$queueName = strchr($event->producer->getRoutingKey(), '.', true) . '.queue';

$id = $this->service->save([
'exchange_name' => $event->producer->getExchange(),
'routing_key_name' => $event->producer->getRoutingKey(),
'queue_name' => $queueName,
'queue_content' => $event->producer->payload(),
'delay_time' => $event->delayTime ?? 0,
'produce_status' => self::PRODUCE_STATUS_SUCCESS
]);

$this->setId($id);

$payload = json_decode($event->producer->payload(), true);

if (!isset($payload['queue_id'])) {
$event->producer->setPayload([
'queue_id' => $id, 'data' => $payload
]);
}

$this->service->update($id, [ 'queue_content' => $event->producer->payload() ]);
}

/**
* Description:生产中
* User:mike, x.mo
* @param object $event
*/
public function produceEvent(object $event): void
{
// TODO...
}

/**
* Description:生产后
* User:mike, x.mo
* @param object $event
*/
public function afterProduce(object $event): void
{
// 只针对站内消息
if (isset($event->producer) && $event->producer instanceof MessageProducer) {
container()->get(QueueMessageServiceInterface::class)->save(
json_decode($event->producer->payload(), true)['data']
);
}
}

/**
* Description:生产失败
* User:mike, x.mo
*/
public function failToProduce(object $event): void
{
$this->service->update((int) $this->getId(), [
'produce_status' => self::PRODUCE_STATUS_FAIL,
'log_content' => $event->throwable ?: $event->throwable->getMessage()
]);
}

public function setId(int $id): void
{
Context::set('id', $id);
}

public function getId(): int
{
return Context::get('id', 0);
}
}
98 changes: 81 additions & 17 deletions bin/reboot.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,94 @@
* @Link https://gitee.com/xmo/MineAdmin
*/

/**
declare(strict_types=1);


/*
* 强制重启服务脚本,并清理缓存代理类
*/
$env = isset($argv[1]) ? $argv[1] : 'dev';

$pid = shell_exec(sprintf('cat %s/../runtime/hyperf.pid', __DIR__));
$rebootCmd = sprintf('rm -rf %s/../runtime/container/* && php %s/hyperf.php start > /dev/null 2>/dev/null &', __DIR__, __DIR__);
$httpPort = isset($argv[2]) ? $argv[2] : 9501;
$messagePort = isset($argv[3]) ? $argv[3] : 9502;

if (shell_exec(sprintf('ps -ef | grep -v grep | grep %s', $pid))) {
shell_exec("kill -9 {$pid}");
shell_exec($rebootCmd);
} else {
killHyperfPid();
killHttpPort($httpPort);
killWebsocketPort($messagePort);

startService($env);

function startService($env)
{
echo "启动{$env}服务\n";
if ($env == 'dev'){
$rebootCmd = sprintf('php %s/hyperf.php server:watch > /dev/tty', __DIR__);
shell_exec($rebootCmd);
}else{
$rebootCmd = sprintf('php %s/hyperf.php start > /dev/null', __DIR__);
shell_exec($rebootCmd);
}
}

function killHyperfPid()
{
echo "执行killHyperfPid中\n";
$pid = shell_exec(sprintf('cat %s/../runtime/hyperf.pid', __DIR__));
$rebootCmd = sprintf('rm -rf %s/../runtime/container/*', __DIR__);
// $rebootCmd = sprintf('rm -rf %s/../runtime/container/* && php %s/hyperf.php start > /dev/null 2>/dev/null &', __DIR__, __DIR__);

if (shell_exec(sprintf('ps -ef | grep -v grep | grep %s', $pid))) {
shell_exec("kill -9 {$pid}");
}
echo "执行killHyperfPid完成\n";

echo "执行清理缓存代理中\n";
shell_exec($rebootCmd);
echo "执行清理缓存代理成功\n";
}

// 执行 lsof 命令并查找 9502 端口
$output = shell_exec('lsof -i :9502 | grep LISTEN | awk \'{print $2}\'');
function killWebsocketPort($port = 9502)
{
echo "执行killWebsocketPort中\n";

// 将进程 ID 转换为数组
$pidList = explode("\n", trim($output));
$command = 'lsof -t -i:' . $port;
$output = shell_exec($command);

// 遍历进程 ID 列表并杀死相应进程
foreach ($pidList as $pid) {
if (is_numeric($pid)) {
shell_exec("kill -9 $pid");
echo "进程 $pid 已杀死\n";
if ($output) {
$pidArray = explode("\n", trim($output));

$pidList = array_filter($pidArray, 'strlen');

foreach ($pidList as $pid) {
if (is_numeric($pid)) {
shell_exec("kill -9 {$pid}");
echo __FUNCTION__ . ":{$port}端口进程 {$pid} 已杀死\n";
}
}
}

echo "执行killWebsocketPort完成\n";
}

function killHttpPort($port = 9501)
{
echo "执行killHttpPort中\n";

$command = 'lsof -t -i:' . $port;
$output = shell_exec($command);

if ($output) {
$pidArray = explode("\n", trim($output));

$pidList = array_filter($pidArray, 'strlen');

foreach ($pidList as $pid) {
if (is_numeric($pid)) {
shell_exec("kill -9 {$pid}");
echo __FUNCTION__ . ":{$port}端口进程 {$pid} 已杀死\n";
}
}
}
}

echo "执行killHttpPort完成\n";
}
Loading

0 comments on commit b3362d9

Please sign in to comment.