Skip to content

Commit 55cce51

Browse files
authored
Merge pull request #50 from klemengit/master
Multiprocessing in shell, fixing temp_files for analysis resume
2 parents 1464087 + bbc8f73 commit 55cce51

File tree

6 files changed

+78
-28
lines changed

6 files changed

+78
-28
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,5 @@ server_user_id.txt
126126
/temp
127127

128128
data_synthetic_pyidi_analysis/
129-
temp_file/
129+
temp_file/
130+
data/data_*_pyidi_analysis/analysis_*/

docs/source/quick_start/basic_usage.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,14 @@ The method can be configured using:
7171
7272
sof.configure(...)
7373
74+
.. note::
75+
76+
Some of the methods enable the multiprocessing option. By setting the number of processes, the
77+
points are divided into groups and each group is processed in a separate process.
78+
79+
A caveat is that when using the multiprocessing option in a shell (not jupyter notebook), the
80+
code must be run in a ``if __name__ == '__main__':`` block.
81+
7482

7583
Get displacement
7684
----------------

pyidi/methods/_directional_lucas_kanade.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class DirectionalLucasKanade(IDIMethod):
3737
def configure(
3838
self, roi_size=(9, 9), dij = (1,0), pad=(2,2), max_nfev=20,
3939
tol=1e-8, int_order=3, verbose=1, show_pbar=True,
40-
processes=1, resume_analysis=True, reference_image=0,
40+
processes=1, resume_analysis=False, reference_image=0,
4141
frame_range='full', use_numba=False
4242
):
4343
"""
@@ -135,7 +135,7 @@ def _set_frame_range(self):
135135
if self.frame_range[1] <= self.video.N:
136136
self.stop_time = self.frame_range[1]
137137
else:
138-
raise ValueError(f'frame_range can only go to end of video - index {self.video.N}')
138+
raise ValueError(f'frame_range can only go to end of video - up to index {self.video.N}. selected range was: {self.frame_range}')
139139
else:
140140
raise ValueError(f'Wrong frame_range definition.')
141141

@@ -209,6 +209,11 @@ def calculate_displacements(self, **kwargs):
209209
# Time iteration.
210210
len_of_task = len(range(self.start_time, self.stop_time, self.step_time))
211211
for ii, i in enumerate(progress_bar(self.start_time, self.stop_time, self.step_time, show_pbar=self.show_pbar)):
212+
213+
# if resuming analysis and completed points are available, skip those points
214+
if self.resume_analysis and hasattr(self, "completed_points") and self.completed_points > ii:
215+
continue
216+
212217
ii = ii + 1
213218

214219
# Iterate over points.

pyidi/methods/_lucas_kanade.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class LucasKanade(IDIMethod):
3535
def configure(
3636
self, roi_size=(9, 9), pad=2, max_nfev=20,
3737
tol=1e-8, int_order=3, verbose=1, show_pbar=True,
38-
processes=1, resume_analysis=True, reference_image=0, frame_range='full'
38+
processes=1, resume_analysis=False, reference_image=0, frame_range='full'
3939
):
4040
"""
4141
Displacement identification based on Lucas-Kanade method,
@@ -120,7 +120,7 @@ def _set_frame_range(self):
120120
if self.frame_range[1] <= self.video.N:
121121
self.stop_time = self.frame_range[1]
122122
else:
123-
raise ValueError(f'frame_range can only go to end of video - index {self.video.N}')
123+
raise ValueError(f'frame_range can only go to end of video - up to index {self.video.N}. selected range was: {self.frame_range}')
124124
else:
125125
raise ValueError('Wrong frame_range definition.')
126126

@@ -193,6 +193,11 @@ def calculate_displacements(self):
193193
# Time iteration.
194194
len_of_task = len(range(self.start_time, self.stop_time, self.step_time))
195195
for ii, i in enumerate(progress_bar(self.start_time, self.stop_time, self.step_time)):
196+
197+
# if resuming analysis and completed points are available, skip those points
198+
if self.resume_analysis and hasattr(self, "completed_points") and self.completed_points > ii:
199+
continue
200+
196201
ii = ii + 1
197202

198203
# Iterate over points.

pyidi/methods/idi_method.py

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from ..selection import SubsetSelection
1212
from ..video_reader import VideoReader
13+
from ..tools import setup_logger
1314

1415
class IDIMethod:
1516
"""Common functions for all methods.
@@ -25,6 +26,8 @@ def __init__(self, video: VideoReader, *args, **kwargs):
2526
self.process_number = 0
2627
self.configure(*args, **kwargs)
2728

29+
# self.logger = setup_logger("pyidi", 10)
30+
2831
# Set the temporary directory
2932
self.temp_dir = os.path.join(self.video.root, 'temp_file')
3033
self.settings_filename = os.path.join(self.temp_dir, 'settings.pkl')
@@ -101,19 +104,25 @@ def create_temp_files(self, init_multi=False):
101104
if not init_multi:
102105
token = f'{self.process_number:0>3.0f}'
103106

104-
self.process_log = os.path.join(temp_dir, 'process_log_' + token + '.txt')
107+
self.process_log = os.path.join(temp_dir, 'process_log_' + token + '.json')
105108
self.points_filename = os.path.join(temp_dir, 'points.pkl')
106109
self.disp_filename = os.path.join(temp_dir, 'disp_' + token + '.pkl')
107110

111+
log = {
112+
"input_file": self.video.input_file,
113+
"token": token,
114+
"points_filename": self.points_filename,
115+
"disp_filename": self.disp_filename,
116+
"disp_shape": (self.points.shape[0], self.N_time_points, 2),
117+
"start_frame": self.start_time,
118+
"stop_frame": self.stop_time,
119+
"step_frame": self.step_time,
120+
"analysis_run": {
121+
f"run {self.analysis_run}": {}
122+
},
123+
}
108124
with open(self.process_log, 'w', encoding='utf-8') as f:
109-
f.writelines([
110-
f'input_file: {self.video.input_file}\n',
111-
f'token: {token}\n',
112-
f'points_filename: {self.points_filename}\n',
113-
f'disp_filename: {self.disp_filename}\n',
114-
f'disp_shape: {(self.points.shape[0], self.N_time_points, 2)}\n',
115-
f'analysis_run <{self.analysis_run}>:'
116-
])
125+
json.dump(log, f, indent=4)
117126

118127
if not self.points.shape[0]:
119128
raise Exception("Points not set. Please set the points before running the analysis.")
@@ -135,16 +144,15 @@ def update_log(self, last_time):
135144
:type last_time: int
136145
"""
137146
with open(self.process_log, 'r', encoding='utf-8') as f:
138-
log = f.readlines()
147+
log = json.load(f)
139148

140-
log_entry = f'analysis_run <{self.analysis_run}>: finished: {datetime.datetime.now()}\tlast time point: {last_time}'
141-
if f'<{self.analysis_run}>' in log[-1]:
142-
log[-1] = log_entry
143-
else:
144-
log.append('\n' + log_entry)
149+
log['analysis_run'][f"run {self.analysis_run}"] = {
150+
'finished': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
151+
'last_time_point': last_time
152+
}
145153

146154
with open(self.process_log, 'w', encoding='utf-8') as f:
147-
f.writelines(log)
155+
json.dump(log, f, indent=4)
148156

149157
def resume_temp_files(self):
150158
"""Reload the settings written in the temporary files.
@@ -155,19 +163,26 @@ def resume_temp_files(self):
155163
temp_dir = self.temp_dir
156164
token = f'{self.process_number:0>3.0f}'
157165

158-
self.process_log = os.path.join(temp_dir, 'process_log_' + token + '.txt')
166+
self.process_log = os.path.join(temp_dir, 'process_log_' + token + '.json')
159167
self.disp_filename = os.path.join(temp_dir, 'disp_' + token + '.pkl')
160168

161169
with open(self.process_log, 'r', encoding='utf-8') as f:
162-
log = f.readlines()
170+
log = json.load(f)
171+
172+
shape = tuple([int(_) for _ in log['disp_shape']])
163173

164-
shape = tuple([int(_) for _ in log[4].replace(' ', '').split(':')[1].replace('(', '').replace(')', '').split(',')])
165-
166174
self.temp_disp = np.memmap(self.disp_filename, dtype=np.float64, mode='r+', shape=shape)
167175
self.displacements = np.array(self.temp_disp).copy()
168176

169-
self.start_time = int(log[-1].replace(' ', '').rstrip().split('\t')[1].split(':')[1]) + 1
170-
self.analysis_run = int(log[-1].split('<')[1].split('>')[0]) + 1
177+
self.start_time = log['start_frame']
178+
self.stop_time = log['stop_frame']
179+
self.step_time = log['step_frame']
180+
181+
last_analysis_run = int(list(log['analysis_run'].keys())[-1].split(' ')[-1])
182+
self.completed_points = int(log['analysis_run'][f"run {last_analysis_run}"]['last_time_point'])
183+
184+
self.analysis_run = last_analysis_run + 1
185+
self.N_time_points = len(range(self.start_time, self.stop_time, self.step_time))
171186

172187
def temp_files_check(self):
173188
"""Checking the settings of computation.

pyidi/tools.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
from tqdm import tqdm
1111
import numba as nb
1212

13+
import logging
14+
import logging.handlers
15+
1316
class ManualROI:
1417
"""Manual ROI selection."""
1518

@@ -335,7 +338,20 @@ def get_gradient(image):
335338
return Gx[1:-1], Gy[:, 1:-1]
336339

337340

338-
341+
def setup_logger(logger_name, level="DEBUG", backup_count=1):
342+
# Set up logging
343+
logger = logging.getLogger(logger_name)
344+
logger.setLevel(level)
345+
# Set up a rotating file handler to manage log file size
346+
file_handler = logging.handlers.RotatingFileHandler(
347+
f'{logger_name}.log', maxBytes=1024 * 1024, backupCount=backup_count # 1 MB per file, keep 5 backups
348+
)
349+
file_handler.setLevel(level)
350+
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
351+
file_handler.setFormatter(formatter)
352+
logger.addHandler(file_handler)
353+
354+
return logger
339355

340356

341357

0 commit comments

Comments
 (0)