Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

133 and 134 redis updates #152

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
24 changes: 19 additions & 5 deletions onair/data/telemetry_configs/redis_example_CONFIG.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,25 @@
}
}
},
"redis_subscriptions": [
"state_0",
"state_1",
"state_2"
],

"redis" : [
{
"address": "localhost",
"port": 6379,
"subscriptions": [
"state_0"
]
},
{
"address": "localhost",
"port": 6380,
"subscriptions": [
"state_1",
"state_2"
]
}
],

"order": [
"time",
"state_0.x",
Expand Down
90 changes: 63 additions & 27 deletions onair/data_handling/redis_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,59 +29,95 @@ class DataSource(OnAirDataSource):

def __init__(self, data_file, meta_file, ss_breakdown = False):
super().__init__(data_file, meta_file, ss_breakdown)
self.address = 'localhost'
self.port = 6379
self.db = 0
self.server = None
self.new_data_lock = threading.Lock()
self.new_data = False
self.servers = []
self.currentData = []
self.currentData.append({'headers':self.order,
'data':list('-' * len(self.order))})
self.currentData.append({'headers':self.order,
'data':list('-' * len(self.order))})
self.double_buffer_read_index = 0
self.connect()
self.subscribe(self.subscriptions)

def connect(self):
"""Establish connection to REDIS server."""
print_msg('Redis adapter connecting to server...')
self.server = redis.Redis(self.address, self.port, self.db)
for idx, server_config in enumerate(self.server_configs):
server_config_keys = server_config.keys()
if 'address' in server_config_keys:
address = server_config['address']
else:
address = 'localhost'

if 'port' in server_config_keys:
port = server_config['port']
else:
port = 6379

if 'db' in server_config_keys:
db = server_config['db']
else:
db = 0

if self.server.ping():
print_msg('... connected!')
if 'password' in server_config_keys:
password = server_config['password']
else:
password = ''

def subscribe(self, subscriptions):
"""Subscribe to REDIS message channel(s) and launch listener thread."""
if len(subscriptions) != 0 and self.server.ping():
self.pubsub = self.server.pubsub()
#if there are subscriptions in this Redis server configuration's subscription key
if len(server_config['subscriptions']) != 0:
#Create the servers and append them to self.servers list
self.servers.append(redis.Redis(address, port, db, password))

for s in subscriptions:
self.pubsub.subscribe(s)
print_msg(f"Subscribing to channel: {s}")
try:
#Ping server to make sure we can connect
self.servers[-1].ping()
print_msg(f'... connected to server # {idx}!')

listen_thread = threading.Thread(target=self.message_listener)
listen_thread.start()
else:
print_msg(f"No subscriptions given!")
#Set up Redis pubsub function for the current server
pubsub = self.servers[-1].pubsub()

for s in server_config['subscriptions']:
pubsub.subscribe(s)
print_msg(f"Subscribing to channel: {s} on server # {idx}")
listen_thread = threading.Thread(target=self.message_listener, args=(pubsub,))
listen_thread.start()

#This except will be hit if self.servers[-1].ping() threw an exception (could not properly ping server)
except:
print_msg(f'Did not connect to server # {idx}. Not setting up subscriptions.', ['FAIL'])

else:
print_msg("No subscriptions given! Redis server not created")

def parse_meta_data_file(self, meta_data_file, ss_breakdown):
self.server_configs = []
configs = extract_meta_data_handle_ss_breakdown(
meta_data_file, ss_breakdown)
meta = parseJson(meta_data_file)
keys = meta.keys()

# Setup redis server configuration
#Checking if 'redis' exists
if 'redis' in keys:
count_server_config = 0
#Checking if dictionaries within 'redis' key each have a 'subscription' key. Error will be thrown if not.
for server_config in meta['redis']:
redis_config_keys = server_config.keys()
if ('subscriptions' in redis_config_keys) == False:
raise ConfigKeyError(f'Config file: \'{meta_data_file}\' ' \
f'missing required key \'subscriptions\' from {count_server_config} in key \'redis\'')
count_server_config +=1

#Saving all of Redis dictionaries from JSON file to self.server_configs
self.server_configs = meta['redis']

if 'order' in keys:
self.order = meta['order']
else:
raise ConfigKeyError(f'Config file: \'{meta_data_file}\' ' \
'missing required key \'order\'')

if 'redis_subscriptions' in meta.keys():
self.subscriptions = meta['redis_subscriptions']
else:
self.subscriptions = []
'missing required key \'order\'')

return configs

Expand Down Expand Up @@ -115,9 +151,9 @@ def has_more(self):
"""Live connection should always return True"""
return True

def message_listener(self):
def message_listener(self, pubsub):
"""Loop for listening for messages on channels"""
for message in self.pubsub.listen():
for message in pubsub.listen():
if message['type'] == 'message':
channel_name = f"{message['channel'].decode()}"
# Attempt to load message as json
Expand Down
70 changes: 70 additions & 0 deletions redis-experiment-publisher-multi-server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import redis
import time
import random

# Initialize the Redis connection for server #1
redis_host = "localhost"
redis_port = 6379
# When your Redis server requires a password, fill it in here
redis_password = ""
# Connect to Redis
r1 = redis.Redis(host=redis_host,
port=redis_port,
password=redis_password,
decode_responses=True)

# Initialize the Redis connection for server #1
redis_host = "localhost"
redis_port = 6380
# When your Redis server requires a password, fill it in here
redis_password = ""
# Connect to Redis
r2 = redis.Redis(host=redis_host,
port=redis_port,
password=redis_password,
decode_responses=True)

# List of channel names
server1_channels = ['state_0']
server2_channels = ['state_1', 'state_2']
# Publish messages on each channel in random order
def publish_messages():
loop_count = 0
inner_loop_count = 0
max_loops = 9
while loop_count < max_loops:
random.shuffle(server1_channels)
for channel in server1_channels:
r1.publish(channel, f'{{"time":{inner_loop_count}, ' \
f'"x":{inner_loop_count+0.1}, ' \
f'"y":{inner_loop_count+0.2}}}')

print(f"Published data to {channel}, " \
f"[{inner_loop_count}, " \
f"{inner_loop_count+0.1}, " \
f"{inner_loop_count+0.2}]")

inner_loop_count += 1
time.sleep(2)

random.shuffle(server2_channels)
for channel in server2_channels:
r2.publish(channel, f'{{"time":{inner_loop_count}, ' \
f'"x":{inner_loop_count+0.1}, ' \
f'"y":{inner_loop_count+0.2}}}')

print(f"Published data to {channel}, " \
f"[{inner_loop_count}, " \
f"{inner_loop_count+0.1}, " \
f"{inner_loop_count+0.2}]")

inner_loop_count += 1
time.sleep(2)

loop_count += 1
print(f"Completed {loop_count} loops")



if __name__ == "__main__":
publish_messages()
Loading
Loading