-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlight_controller
executable file
·238 lines (195 loc) · 8.38 KB
/
light_controller
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
#!/usr/bin/python
""" Build light controller """
import os
current_dir = os.path.dirname(os.path.realpath(__file__))
import sys
sys.path.append(current_dir)
import signal
import getopt
from lib import daemonize
from lib import logger
from lib import list_utils
from config import json_config
default_config_file = 'config.json'
light = None
def _handle_signals(signum, stack):
global light
if signum == signal.SIGTERM or signum == signal.SIGINT:
if light is not None:
light.stop()
def print_usage(prog_cmd):
print "Usage: %s [options]" % prog_cmd
print "Options:"
print "-b --daemonize Run in the background."
print "-l --syslog Log output to syslog."
print "-c <file> --config <file> Config file, default \"%s\"." % default_config_file
print "-h --help Print this help page."
def run():
global light
background = False
forcesyslog = False
config_file = default_config_file
dlogger = logger.Logger(os.path.basename(sys.argv[0]))
try:
(opts, args) = getopt.getopt(sys.argv[1:], "hblc:", ["help", "daemonize", "syslog", "config="])
except getopt.error, why:
print "Error: getopt error: %s" % (why)
print_usage(sys.argv[0])
sys.exit(-1)
try:
for opt in opts:
if opt[0] == "-h" or opt[0] == "--help":
print_usage(sys.argv[0])
sys.exit(1)
if opt[0] == "-b" or opt[0] == "--daemonize":
background = True
continue
if opt[0] == "-l" or opt[0] == "--syslog":
forcesyslog = True
continue
if opt[0] == "-c" or opt[0] == "--config":
config_file = opt[1]
continue
print_usage(sys.argv[0])
sys.exit(-1)
except ValueError, why:
print "Error: bad parameter \"%s\" for option %s: %s" % (opt[1], opt[0], why)
print_usage(sys.argv[0])
sys.exit(-1)
if forcesyslog:
logger.Logger.use_syslog = True
if background:
logger.Logger.use_syslog = True
daemonize.createDaemon()
signal.signal(signal.SIGTERM, _handle_signals)
signal.signal(signal.SIGINT, _handle_signals)
try:
if not os.path.isfile(config_file):
dlogger.log("ERROR: config file %s not found.", config_file)
sys.exit(-1)
conf = json_config.JsonConfig(config_file)
api_conf = conf.get_api_config()
light_conf = conf.get_light_config()
sound_conf = conf.get_sound_config()
jobs = conf.get_jobs()
poll_interval_seconds = 3
splayer = None
if light_conf['type'] == 'adafruit_lpd8806':
dlogger.log("Light type is Adafruit LPD8806 LED strip.")
from lights import adafruit_lpd8806
from lights import job2ledstrip
kwargs = {}
if light_conf.has_key('num_leds'):
kwargs['num_leds'] = light_conf['num_leds']
if light_conf.has_key('spidev'):
kwargs['spidev'] = light_conf['spidev']
if light_conf.has_key('simulate_mode'):
kwargs['simulate_mode'] = light_conf['simulate_mode']
light = adafruit_lpd8806.Strand(**kwargs)
elif light_conf['type'] == 'epistar_lpd8806':
dlogger.log("Light type is Epistar LPD8806 LED strip.")
from lights import epistar_lpd8806
from lights import job2ledstrip
kwargs = {}
if light_conf.has_key('num_leds'):
kwargs['num_leds'] = light_conf['num_leds']
if light_conf.has_key('spidev'):
kwargs['spidev'] = light_conf['spidev']
if light_conf.has_key('simulate_mode'):
kwargs['simulate_mode'] = light_conf['simulate_mode']
light = epistar_lpd8806.Strand(**kwargs)
else:
raise json_config.ConfigError(str(light_conf['strand']) + ' light type not supported')
if sound_conf.has_key('enabled'):
if sound_conf['enabled']:
from sounds import sound_player
kwargs = {}
kwargs['sound_clips_directory'] = os.path.join(current_dir, 'sounds')
if sound_conf.has_key('sound_clips_dir'):
kwargs['sound_clips_directory'] = sound_conf['sound_clips_dir']
if sound_conf.has_key('bin_path'):
kwargs['player_bin'] = sound_conf['bin_path']
splayer = sound_player.SoundPlayer(**kwargs)
if api_conf['type'] == 'jenkins_direct':
dlogger.log("API is Jenkins API.")
from monitors import jenkins_monitor
from pollers import jenkins_poller
if api_conf.has_key('pollrate_s'):
if api_conf['pollrate_s'] != 0:
poll_interval_seconds = api_conf['pollrate_s']
dlogger.log("Polling API once every %s seconds.", str(poll_interval_seconds))
else:
poll_interval_seconds = None
dlogger.log("NOT Polling API.")
elif api_conf['type'] == 'jenkins_aws_sqs':
dlogger.log("API is Jenkins via AWS SQS.")
from monitors import jenkins_aws_sqs_monitor
from pollers import aws_sqs_poller
if api_conf.has_key('pollrate_s'):
if api_conf['pollrate_s'] != 0:
poll_interval_seconds = api_conf['pollrate_s']
dlogger.log("Polling API once every %s seconds.", str(poll_interval_seconds))
else:
poll_interval_seconds = None
dlogger.log("NOT Polling API.")
else:
raise ConfigError(str(api_conf['type']) + ' API type not supported')
except Exception, e:
logger.print_trace(e)
sys.exit(-1)
# write pid file
pidfilename = "/var/run/%s.pid" % os.path.basename(sys.argv[0])
try:
pidfile = open(pidfilename, "w")
pidfile.write("%d\n" % os.getpid())
pidfile.close()
except IOError, e:
dlogger.log("ERROR: unable to write pid file %s: %s", pidfilename, str(e))
dlogger.log("Starting light controller")
try:
if api_conf['type'] == 'jenkins_direct':
# Poll Jenkins API directly
translator = job2ledstrip.Job2LedStrip(jobs, light)
monitor = jenkins_monitor.JenkinsMonitor(jobs, translator, sound_player=splayer)
poller = jenkins_poller.JenkinsPoller(api_conf['url'], monitor)
elif api_conf['type'] == 'jenkins_aws_sqs':
# Poll Jenkins via AWS SQS
first_job_as_trigger = True
if api_conf.has_key('first_job_as_trigger'):
first_job_as_trigger = api_conf['first_job_as_trigger']
if first_job_as_trigger:
tmplist = list(list_utils.remove_first_item(jobs))
translator = job2ledstrip.Job2LedStrip(tmplist, light)
else:
translator = job2ledstrip.Job2LedStrip(jobs, light)
monitor = jenkins_aws_sqs_monitor.JenkinsAwsSqsMonitor(jobs, translator, first_job_as_trigger, sound_player=splayer)
kwargs = {}
kwargs['monitor'] = monitor
if not api_conf.has_key('sqs_region'):
raise json_config.ConfigError('sqs_region not configured')
kwargs['sqs_region'] = api_conf['sqs_region']
if not api_conf.has_key('sqs_queue_name'):
raise json_config.ConfigError('sqs_queue_name not configured')
kwargs['sqs_queue_name'] = api_conf['sqs_queue_name']
if api_conf.has_key('aws_access_key_id'):
kwargs['aws_access_key_id'] = api_conf['aws_access_key_id']
if api_conf.has_key('aws_secret_access_key'):
kwargs['aws_secret_access_key'] = api_conf['aws_secret_access_key']
poller = aws_sqs_poller.AwsSqsPoller(**kwargs)
# start the light driver
light.daemon = True
light.start()
while True:
poller.poll()
light.join(poll_interval_seconds)
if not light.isAlive():
break
except Exception, e:
logger.print_trace(e)
try:
os.unlink(pidfilename)
except:
pass
dlogger.log("Terminated light controller")
if __name__ == "__main__":
run()