Skip to content

Commit

Permalink
Init
Browse files Browse the repository at this point in the history
  • Loading branch information
gdarko committed Jul 5, 2022
0 parents commit 3416701
Show file tree
Hide file tree
Showing 10 changed files with 576 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
vendor/
composer.json
composer.lock
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Messages

Abstractions for queue brokering packages like RabbitMQ, etc used in Microservices.

### Installation

```
composer require ignitekit/messages
```


23 changes: 23 additions & 0 deletions config.sample.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

/*******************************/
/* exchanges Configuration */
/*******************************/

return [
'default' => 'RabbitMQ',
'adapters' => [
'RabbitMQ' => [
'hostname' => 'localhost',
'username' => 'guest',
'password' => 'guest',
'port' => 5672,
'settings' => [
'passive' => 0,
'durable' => 1,
'exclusive' => 0,
'auto_delete' => 0
]
]
]
];
68 changes: 68 additions & 0 deletions src/Abstracts/BaseAdapter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?php

namespace IgniteKit\Messages\Abstracts;

use IgniteKit\Messages\Message;
use IgniteKit\Messages\Exchange;

abstract class BaseAdapter {

/**
* List of declared exchanges
* @var Exchange[]
*/
protected $exchanges;

/**
* The queue connection
* @var BaseConnection
*/
protected $connection;


/**
* The constructor
*
* @param BaseConnection $connection
*/
public function __construct( BaseConnection $connection ) {
$this->connection = $connection;
}

/**
* Create a queue
*
* @return mixed
*/
abstract public function create( Exchange $exchange );

/**
* Dispatch a message
*
* @param Exchange $exchange
* @param Message $message
* @param array $config
*
* @return void
*/
abstract public function send( Exchange $exchange, Message $message, $config = array() );

/**
* Receive a message
*
* @param Exchange $exchange
* @param callable $callback
* @param array $config
*
* @return void
*/
abstract public function receive( Exchange $exchange, callable $callback, $config = array() );

/**
* The connection instance
* @return BaseConnection
*/
public function get_connection() {
return $this->connection;
}
}
62 changes: 62 additions & 0 deletions src/Abstracts/BaseConnection.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php

namespace IgniteKit\Messages\Abstracts;

abstract class BaseConnection {

protected $user;
protected $pass;
protected $host;
protected $port;

/**
* The constructor
*
* @param $user
* @param $pass
* @param $host
* @param $port
*/
public function __construct( $user, $pass, $host, $port, $connect = false ) {
$this->user = $user;
$this->pass = $pass;
$this->host = $host;
$this->port = $port;

if ( $connect ) {
$this->open();
}
}

/**
* The destructor
*/
public function __destruct() {
$this->close();
}

/**
* Create the connection
* @return mixed
*/
abstract public function open();

/**
* Close the active connection
* @return void
*/
abstract public function close();

/**
* Returns the connection
* @return mixed
*/
abstract public function get();

/**
* Check if connected
* @return bool
*/
abstract public function is_connected();

}
116 changes: 116 additions & 0 deletions src/Adapters/RabbitMQ/RabbitMQAdapter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
<?php

namespace IgniteKit\Messages\Adapters\RabbitMQ;

use IgniteKit\Messages\Abstracts\BaseAdapter;
use IgniteKit\Messages\Message;
use IgniteKit\Messages\Exchange;

use PhpAmqpLib\Message\AMQPMessage;

class RabbitMQAdapter extends BaseAdapter {

/**
* The connection
* @var RabbitMQConnection
*/
protected $connection;

/**
* Declare an exchange
*
* @return bool
*/
public function create( Exchange $exchange ) {

if ( isset( $this->exchanges[ $exchange->get_name() ] ) ) {
return false;
}

$this->exchanges[ $exchange->get_name() ] = $exchange;

$this->connection
->get()
->channel()
->queue_declare(
$exchange->get_name(),
$exchange->get_config( 'passive', false ),
$exchange->get_config( 'durable', true ),
$exchange->get_config( 'exclusive', false ),
$exchange->get_config( 'auto_delete', false )
);

return true;
}

/**
* Dispatch a message
*
* @param Exchange $exchange
* @param Message $message
* @param array $config
*
* @return void
*/
public function send( Exchange $exchange, Message $message, $config = array() ) {

if ( ! isset( $this->exchanges[ $exchange->get_name() ] ) ) {
$this->create( $exchange );
}

$amq_message = new AMQPMessage( $message->get_body() );

$channel = $this->connection
->get()
->channel();

$channel->basic_publish( $amq_message, '', $exchange->get_name() );
}

/**
* Dispatch a message and close connection
*
* @param Exchange $exchange
* @param Message $message
* @param array $config
*
* @return void
*/
public function send_close( Exchange $exchange, Message $message, $config = array() ) {
$this->send( $exchange, $message, $config );
$this->connection->close();
}

/**
* Receive a message
*
* @param Exchange $exchange
* @param callable $callback
* @param array $config
*
* @return void
*/
public function receive( Exchange $exchange, callable $callback, $config = array() ) {

if ( ! isset( $this->exchanges[ $exchange->get_name() ] ) ) {
$this->create( $exchange );
}

$channel = $this->connection->get()->channel();

$channel->basic_consume(
$exchange->get_name(),
$exchange->get_config( 'consumer_tag', '' ),
$exchange->get_config( 'no_local', false ),
$exchange->get_config( 'no_ack', true ),
$exchange->get_config( 'exclusive', false ),
$exchange->get_config( 'nowait', false ),
$callback
);

while ( $channel->is_open() ) {
$channel->wait();
}

}
}
63 changes: 63 additions & 0 deletions src/Adapters/RabbitMQ/RabbitMQConnection.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?php

namespace IgniteKit\Messages\Adapters\RabbitMQ;

use IgniteKit\Messages\Abstracts\BaseConnection;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class RabbitMQConnection extends BaseConnection {

/**
* The connection instance
* @var AMQPStreamConnection
*/
protected $instance;

/**
* Create the connection
* @return mixed
*/
public function open() {
$this->instance = new AMQPStreamConnection(
$this->host,
$this->port,
$this->user,
$this->pass
);

return $this->instance->isConnected();
}

/**
* Close the active connection
* @return void
*/
public function close() {
if ( ! $this->is_connected() ) {
return;
}

try {
$this->instance->channel()->close();
$this->instance->close();
} catch ( \Exception $e ) {
return;
}
}

/**
* Returns the connection
* @return AMQPStreamConnection
*/
public function get() {
return $this->instance;
}

/**
* Check if connected
* @return bool
*/
public function is_connected() {
return $this->instance && $this->instance->isConnected();
}
}
Loading

0 comments on commit 3416701

Please sign in to comment.