The redis_ipc
gem provides a simple way to implement Inter-Process Communication (IPC) between n-number of Ruby processes via Redis Streams.
- Simple and easy to implement
- Quick and efficient communication between n-number of programs/processes
- Thread-safe, process-safe, and load-balanced
- Inherently insecure
- Any process that has access to the Redis database can add/change/remove data in transit.
- Do not use RedisIPC in an untrusted environment!
- Extensible
- Within Ruby,
RedisIPC::Stream
can be adapted to different use cases. SeeRedisIPC::Channel
- Outside of Ruby, any programming language with access to Redis can implement the Redis Stream and Consumer Group design and hook into RedisIPC allowing for communication between completely different programming languages.
- With a centralized Redis instance, programs on separate computers can communicate with each other
- Within Ruby,
Add this line to your application's Gemfile:
gem "redis_ipc"
And then execute:
$ bundle
Or install it yourself as:
$ gem install redis_ipc
require "redis_ipc"
# The "stream_name" is the shared stream for all endpoints
# The "group_name" is the unique identifier for this endpoint.
# Processes cannot share the same group at this current point
stream = RedisIPC::Stream.new("stream_name", "group_name")
# Any entries sent to our group will come through here
stream.on_request do |entry|
# Do something with entry (RedisIPC::Stream::Entry)
end
# Any exceptions that occurred will come through here
stream.on_error do |exception|
# Do something with exception (StandardError, or similar)
end
# Until this point, this endpoint was not connected to the stream.
# Calling #connect will allow us to send and receive entries on the stream
#
# All of the following options are pre-configured so they do not have to be provided
stream.connect(
# Redis connection options. Host, port, url, etc.
redis_options: {},
# An optional logger instance that can be provided for verbose logs
# Default: nil
logger: nil,
# The number of Redis connections to be allotted for sending entries. This number is added
# To the total number of Redis connections based on consumer/dispatcher pool sizes.
# Redis connections are not held onto for very long so this number does not need to be huge
# Default: 10
pool_size: 10,
# The max pool size for Redis connections for this stream/group.
# When provided, this option will take precedence over `pool_size` and the related
# calculations mentioned above.
# Default: nil
max_pool_size: nil,
# Options for the ledger
ledger: {
# Controls how long, in seconds, an entry can be in the ledger before it is considered expired
# This also controls how long the thread is blocked when waiting for an entry to be responded to
entry_timeout: 5,
# Controls how often, in seconds, the ledger should clean up and remove expired entries
cleanup_interval: 1
},
# Options for the consumers
consumer: {
# How many consumers should be created
pool_size: 3,
# How often, in seconds, should consumers check for entries
execution_interval: 0.001
},
# Options for the dispatchers
dispatcher: {
# How many dispatchers should be created
pool_size: 2,
# How often, in seconds, should dispatchers check for entries
execution_interval: 0.001
}
)
# Now we can send data
# This will block until "another_group" picks up the entry, or until the timeout is reached, in which this will raise an exception
stream.send_to_group(content: "Hello!", to: "another_group")
Run the following code in a Ruby IRB process. If running this code from a script, make sure the script does not exit unless told to.
require "redis_ipc"
# Setup the basic stream
child_stream = RedisIPC::Stream.new("electric_household", "child")
child_stream.on_error do |exception|
puts exception
end
# When an entry comes in, have a 75% chance of fulfilling the request, and a 25% chance of being a baby
child_stream.on_request do |entry|
if rand > 0.25
fulfill_request(entry, content: "Yes, creators")
else
reject_request(entry, content: "No! <Tantrum initiated>")
end
end
# Connect to the stream
child_stream.connect
And then run this code in another Ruby IRB process, separate from the one above. Although, it can work in the same process.
require "redis_ipc"
parent_stream = RedisIPC::Stream.new("electric_household", "parent")
parent_stream.on_error do |exception|
puts exception
end
# This parent isn't very nice
parent_stream.on_request do |entry|
reject_request(entry, content: "No, and because I said so")
end
# Connect to the stream
parent_stream.connect
# Ask the child for something
response = parent_stream.send_to_group(content: "Please do this task for me, child", to: "child")
# Handle the child's reaction
if response.fulfilled?
puts "Child said: #{response.value}"
exit 0
end
# The child is throwing a tantrum
response = parent_stream.send_to_group(content: "Child, I will not ask again", to: "child")
if response.rejected? # Why yes, this is totally reasonable parenting (sarcasm)
puts "The child's reasoning is #{response.reason}"
exit 1
end
RedisIPC::Channel
is an event based implementation of Stream. It still uses RedisIPC::Stream
underneath but instead of accepting any data that is sent to the group it only allows certain events with predefined data structures. This is incredibly useful for programs that have specific functionality they want to expose to other programs. This can also be thought of like an API.
Let's say you have a website and one worker that will need to occasionally trigger each others logic. Since they both have access to Redis, this can be solved using Channel
.
In the website codebase, you would add a new class that inherits from RedisIPC::Channel
and configure it:
class WebChannel < RedisIPC::Channel
# Sets the name of the stream.
# Channels must be on the same stream in order to trigger each others events
stream "ipc:website"
# The unique identifier for this class.
# This name is used by other channels to trigger events defined below
channel "web"
# An event are similar to an API endpoint. Other channels can trigger these events and receive
# The params argument is an allowlist that can contain Strings/Symbols (either work)
# Only the keys defined in params argument array will be available in the params hash below
event "notifications::create", params: [:title, :message] do
# The params hash is HashWithIndifferentAccess
# The result of this block is sent back to the channel that triggered this event
Notification.create!(**params)
end
# This event will return `true` back to the channel that triggered this event
event "notifications::delete", params: [:id] do
Notification.delete(params[:id])
true
end
end
# Called somewhere during program lifecycle
WebChannel.connect
The worker will also need a new class created in its own codebase:
class WorkerChannel < RedisIPC::Channel
# Notice this is the same as WebChannel above
stream "ipc:website"
# Since Channel is just an implementation of Stream, .group can also be used instead of .channel
channel "worker"
event("start_job", params: [:id]) { JobSite.start_job(params["id"]) }
# Events do not have to have params
event("active_jobs_report") { Foreman.queue_report("active_jobs").to_h }
end
# Called somewhere during program lifecycle
# .connect is a wrapper for RedisIPC::Stream#connect allowing you to configure the underlying functionality
WorkerChannel.connect(redis_options: {url: ENV["REDIS_URL"]})
Now that both program channels are connected to the same stream, they can now communicate with each other via .trigger_event
.
# On the website, let's start a job
WebChannel.trigger_event("start_job", target: "worker", params: {id: 1})
# And run a report
response = WebChannel.trigger_event("active_jobs_report", target: "worker")
if response.fulfilled?
puts response.value #=> {worker_1: [{id: 1, status: :running}]}
end
Now that the website and worker are communicating, you find yourself needing a tracker program to trigger web notifications when it starts to track an event. Since Streams can support n-number of endpoints, so can Channels. Let's add a new class to the tracker program:
class TrackerChannel < RedisIPC::Channel
stream "ipc:website"
channel "tracker"
# Since .connect is a class method, it can be called within the class itself
# This is also showing off adjusting an underlying configuration option in the ledger
connect(ledger: {entry_timeout: 10})
end
# We can now trigger web notifications
response = TrackerChannel.trigger_event(
"notifications::create",
params: {title: "Tracker event", message: "A new event has been tracked"},
target: "web"
)
response.status #=> :fulfilled
Whenever an exception is raised, or an entry is manually rejected like in a Stream (see Timeouts below), the response will be in a rejected state and the reason for the rejection can be accessed using #reason
.
class Buggy < RedisIPC::Channel
stream "example"
channel "buggy"
event(:raise_exception) { raise "I told you!" }
connect
end
class Endpoint < RedisIPC::Channel
stream "example"
channel "endpoint"
connect
end
response = Endpoint.trigger_event(:raise_exception, target: "buggy")
response.rejected? #=> true
response.reason #=> "I told you!"
When a Stream, or Channel, hits the timeout on an outbound request the resulting response will be rejected with a TimeoutError.
stream = RedisIPC::Stream.new("example", "group")
stream.on_request {}
stream.on_error {}
stream.connect
# This will block until `entry_timeout` is hit. By default, this will take 5 seconds
response = stream.send_to_group(content: "This message will never get there", to: "a_group_that_does_not_exist")
response.state #=> :rejected
response.reason #=> #<RedisIPC::TimeoutError: RedisIPC::TimeoutError>
Contributions are welcome! RedisIPC has a great foundation and has everything I need it to have, but it is far from feature complete. If you find any issues or have suggestions for improvement, please open an issue or create a pull request.
-
Clone the repository:
git clone https://github.com/itsthedevman/redis_ipc.git
-
Install dependencies:
bundle install
-
Run the tests:
bundle exec rspec spec
At the core of RedisIPC is the Stream (RedisIPC::Stream
). The Stream class is the abstracted implementation of the underlying Consumer Group functionality. It provides ways to configure, connect, send, and receive data. The way Stream does this is by hosting n-number of Consumers and n-number of Dispatchers, along with a Ledger.
A Consumer (RedisIPC::Stream::Consumer
) is an Ruby implementation of a Redis Consumer and its default job is to watch the stream and read in entries and process them. The Stream, however, uses a special Consumer called Ledger Consumer to handle processing entries instead for reasons which will be explained soon.
A Dispatcher (RedisIPC::Stream::Dispatcher
) is another special Consumer that is essentially a bouncer for the Stream. Every entry posted to the Redis Stream is received by every Dispatcher, regardless of the group it belongs to and where the entry is going. Once a Dispatcher receives an entry from the stream for its group and dispatch to group consumer, depending on the state of the entry and which instance it originated from.
The Ledger (RedisIPC::Stream::Ledger
) is the way the Stream tracks and handles timeouts for outbound entries. This is where Ledger Consumer (RedisIPC::Stream::Ledger::Consumer
) come in. When entries are dispatched to them, they will check to see if the entry is in the Ledger. If the entry exists, the Ledger Consumer will then pass the entry to the waiting code. Entries that do not exist in the ledger are treated as inbound requests which bubble up to the Stream itself. This functionality, plus statuses, ensure entries are processed correctly without confusion.
Yes! As of 1.0.2, RedisIPC can be safely used in programs that have multiple processes, or instances, of them running at once. This is managed via an instance ID that is unique to each instance of a Stream/Channel which allows entries to be properly dispatched to the instance that original made the request. With this change, all Dispatchers across the instances for a group now work together to dispatch entries to each others consumers.
The gem is available as open source under the terms of the MIT License.