-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathoffline_microstate.py
161 lines (132 loc) · 4.63 KB
/
offline_microstate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#!/usr/bin/env python3
#coding:utf-8
"""
Author: Arnaud Desvachez --<[email protected]>
Purpose: Offline protocol for BrainHack microtstate project
Created: 15.10.2019
"""
import sys
import cv2
import time
import os
import multiprocessing as mp
import pygame.mixer as pgmixer
from importlib import import_module
from builtins import input
import neurodecode.utils.pycnbi_utils as pu
import neurodecode.triggers.pyLptControl as pyLptControl
from neurodecode import logger
from neurodecode.utils import q_common as qc
from neurodecode.protocols.viz_bars import BarVisual
from neurodecode.triggers.trigger_def import trigger_def
from neurodecode.gui.streams import redirect_stdout_to_queue
#----------------------------------------------------------------------
def check_config(cfg):
"""
Ensure that the config file contains the parameters
"""
critical_vars = {
'COMMON': ['TRIGGER_DEVICE',
'TRIGGER_FILE',
'SCREEN_SIZE',
'START_VOICE',
'END_VOICE'],
}
optional_vars = {
'GLOBAL_TIME': 2 * 60,
'SCREEN_POS': (0, 0),
'GLASS_USE':False,
}
for key in critical_vars['COMMON']:
if not hasattr(cfg, key):
logger.error('%s is a required parameter' % key)
raise RuntimeError
for key in optional_vars:
if not hasattr(cfg, key):
setattr(cfg, key, optional_vars[key])
logger.warning('Setting undefined parameter %s=%s' % (key, getattr(cfg, key)))
#----------------------------------------------------------------------
def run(cfg, state=mp.Value('i', 1), queue=None):
"""
Offline protocol
"""
# visualizer
keys = {'left':81, 'right':83, 'up':82, 'down':84, 'pgup':85, 'pgdn':86,
'home':80, 'end':87, 'space':32, 'esc':27, ',':44, '.':46, 's':115, 'c':99,
'[':91, ']':93, '1':49, '!':33, '2':50, '@':64, '3':51, '#':35}
redirect_stdout_to_queue(logger, queue, 'INFO')
# Wait the recording to start (GUI)
while state.value == 2: # 0: stop, 1:start, 2:wait
pass
# Protocol runs if state equals to 1
if not state.value:
sys.exit(-1)
global_timer = qc.Timer(autoreset=False)
# Init trigger communication
cfg.tdef = trigger_def(cfg.TRIGGER_FILE)
trigger = pyLptControl.Trigger(state, cfg.TRIGGER_DEVICE)
if trigger.init(50) == False:
logger.error('\n** Error connecting to trigger device.')
raise RuntimeError
# Preload the starting voice
pgmixer.init()
pgmixer.music.load(cfg.START_VOICE)
# Init feedback
viz = BarVisual(cfg.GLASS_USE, screen_pos=cfg.SCREEN_POS, screen_size=cfg.SCREEN_SIZE)
viz.fill()
viz.put_text('Close your eyes and relax')
viz.update()
# PLay the start voice
pgmixer.music.play()
# Wait a key press
key = 0xFF & cv2.waitKey(0)
if key == keys['esc'] or not state.value:
sys.exit(-1)
viz.fill()
viz.put_text('Recording in progress')
viz.update()
#----------------------------------------------------------------------
# Main
#----------------------------------------------------------------------
trigger.signal(cfg.tdef.INIT)
while state.value == 1 and global_timer.sec() < cfg.GLOBAL_TIME:
key = cv2.waitKey(1)
if key == keys['esc']:
with state.get_lock():
state.value = 0
trigger.signal(cfg.tdef.END)
# Remove the text
viz.fill()
viz.put_text('Recording is finished')
viz.update()
# Ending voice
pgmixer.music.load(cfg.END_VOICE)
pgmixer.music.play()
time.sleep(5)
# Close cv2 window
viz.finish()
# ----------------------------------------------------------------------
def load_config(cfg_file):
"""
Dynamic loading of a config file.
Format the lib to fit the previous developed neurodecode code if subject specific file (not for the templates).
cfg_file: tuple containing the path and the config file name.
"""
cfg_file = os.path.split(cfg_file)
sys.path.append(cfg_file[0])
cfg_module = import_module(cfg_file[1].split('.')[0])
return cfg_module
#----------------------------------------------------------------------
def batch_run(cfg_module):
"""
For batch script
"""
cfg = load_config(cfg_module)
check_config(cfg)
run(cfg)
if __name__ == '__main__':
if len(sys.argv) < 2:
cfg_module = input('Config module name? ')
else:
cfg_module = sys.argv[1]
batch_run(cfg_module)