-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaudio_hub.py
executable file
·174 lines (150 loc) · 6.59 KB
/
audio_hub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#!/usr/bin/env -S python3 -u
import asyncio, subprocess, datetime, time, json, signal, os # official python packages
import evdev, vlc, alsaaudio # pip installed
import devices # local file require: OPi.GPIO
import dbus_bluez # local file require: dbus_next
import http_server
def main():
global s
s = State()
asyncio.run(s.loop())
class State:
def __init__(self):
signal.signal(signal.SIGINT, shutdown_app)
signal.signal(signal.SIGTERM, shutdown_app)
self.__init_radios()
self.mixer = alsaaudio.Mixer('Master')
self.update_ui = asyncio.Event()
self.red_led = devices.System_Led('red', 'trigger','mmc0')
self.green_led = devices.System_Led('green','trigger','rc-feedback')
devices.init()
http_server.run_thread(self)
async def loop(self):
loops = ir_loops()
asyncio.create_task(dbus_bluez.init(self.red_led))
asyncio.create_task(self.__shedule_player_restart())
self.set_action('off')
await asyncio.gather(*loops)
def set_action(self, action):
print(f'Action: {action}')
if type(action) == int or action.isdigit(): self.__set_radio(int(action))
elif action in devices.dac_inputs: self.__set_dac_in(action)
elif action == 'stereo': asyncio.create_task(devices.surround_toggle())
elif action == 'reboot': subprocess.Popen('reboot')
elif action == 'pair': asyncio.create_task(dbus_bluez.enable_pairing())
else: print(f'Unknown action: {action}')
def set_volume(self, volume):
self.mixer.setvolume(volume)
self.update_ui.set()
def change_volume(self, step):
old_vol = self.get_volume()
self.set_volume(max(min(100, old_vol + step*3), 0))
self.update_ui.set()
def get_volume(self): return self.mixer.getvolume()[0]
def get_input(self): return self.input
def get_ui_state(self): return dict(input=self.input,volume=self.get_volume())
def __set_radio(self, channel):
asyncio.create_task(devices.set_aux('bt'))
self.player.play_item_at_index(channel)
self.input = channel
self.update_ui.set()
def __set_dac_in(self, ext_in):
self.player.stop()
asyncio.create_task(devices.set_aux(ext_in))
self.input = ext_in
self.update_ui.set()
def __init_radios(self):
instance = vlc.Instance('-A alsa')
self.player = vlc.MediaListPlayer(instance)
radios = vlc.MediaList(instance)
with open('radios.json') as json_file: self.radio_list = json.load(json_file)
for _, stream in self.radio_list: radios.add_media(stream)
self.player.set_media_list(radios)
async def __shedule_player_restart(self):
now = datetime.datetime.now()
at3 = (now + datetime.timedelta(days=4)).replace(hour=3,minute=0,second=0)
await asyncio.sleep((at3-now).total_seconds())
pospond = 10
while self.player.is_playing() and (pospond > 0):
pospond -= 1
await asyncio.sleep(3600)
self.player.stop()
subprocess.Popen('reboot')
# IR Device + Event Loop ----------------------------------------------------
keys = evdev.ecodes
def ir_key_pressed(key):
print(f'Remote key pressed: {keys.KEY[key]}')
if key == keys.KEY_VOLUMEDOWN: s.change_volume(-1)
elif key == keys.KEY_VOLUMEUP: s.change_volume(+1)
elif keys.KEY_1 <= key <= keys.KEY_0: s.set_action(key-keys.KEY_1)
elif key == keys.KEY_BLUETOOTH: s.set_action('bt')
elif key == keys.KEY_VOICECOMMAND: s.set_action('bt')
elif key == keys.KEY_PC: s.set_action('pc')
elif key == keys.KEY_PAGEUP: s.set_action('pc')
elif key == keys.KEY_TV: s.set_action('tv')
elif key == keys.KEY_PAGEDOWN: s.set_action('tv')
elif key == keys.KEY_POWER: s.set_action('off')
elif key == keys.KEY_MUTE: s.set_action('stereo')
def ir_key_hold(key):
print(f'Remote key hold: {keys.KEY[key]}')
if key == keys.KEY_VOLUMEDOWN: s.change_volume(-2)
elif key == keys.KEY_VOLUMEUP: s.change_volume(+2)
def ir_key_long(key):
print(f'Remote key long hold: {keys.KEY[key]}')
if key == keys.KEY_BLUETOOTH: s.set_action('pair')
elif key == keys.KEY_POWER: s.set_action('reboot')
def ir_loops():
print("Searching for event devices")
devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
loops = set()
for device in devices:
if (( device.name == 'sunxi-ir' ) or
( device.name == 'HAOBO Technology USB Composite Device Keyboard')):
print('Found: ',device.name)
loops.add(asyncio.create_task(ir_loop(device)))
else: device.close()
return loops
def test_long_press(key):
'''Due to buggy RC transmiter (no hold option) hack to detect long press buttons'''
now = time.time()
if not hasattr(test_long_press, 'done'):
test_long_press.done = False
test_long_press.last_key = keys.KEY_POWER
test_long_press.first_press = now
test_long_press.last_press = now
if key == test_long_press.last_key:
if (now - test_long_press.last_press) < 0.5:
test_long_press.last_press = now
if (now - test_long_press.first_press) > 5:
if test_long_press.done == False:
test_long_press.done = True
ir_key_long(key)
return
test_long_press.done = False
test_long_press.first_press = now
test_long_press.last_press = now
test_long_press.last_key = key
async def ir_loop(device):
try:
async for event in device.async_read_loop():
if event.type == keys.EV_KEY:
KEY_PRESSED = 0
KEY_RELEASED = 1
KEY_HOLD = 2
test_long_press(event.code)
if event.value == KEY_RELEASED: ir_key_pressed(event.code)
elif event.value == KEY_HOLD: ir_key_hold(event.code)
except OSError as error:
print('---------- ERROR USB GLITCH? ---------------')
print(error)
print('TODO: wait, re-initiate ',device.name)
shutdown_app('restart')
#--------------------------------------------------------------------------------
def shutdown_app(signum, frame=None):
exit_code = 0 if signum != 'restart' else 121
dbus_bluez.exit()
try: devices.end()
except OSError as error: print('Error when stopping GPIO: ',error)
os._exit(exit_code)
if __name__ == '__main__':
main()