7
7
from celery import Celery
8
8
from celery .result import ResultSet
9
9
10
- from worker .worker import process_video_part
10
+ from worker .worker import process_part
11
11
12
12
app = Celery ('core' , backend = 'amqp' , broker = 'amqp://' )
13
- WORKERS = 10
14
13
15
14
16
- # Video and audio
17
15
def get_total_workers ():
18
- return WORKERS
16
+ return 10
19
17
20
18
21
19
def get_file_length (input_path ):
@@ -37,26 +35,25 @@ def get_file_length(input_path):
37
35
raise Exception ("File length not found {}" .format (input_path ))
38
36
39
37
40
- # Video
41
- @app .task (name = 'core.process_video' )
42
- def process_video (input_path , output_format ):
43
- video_length = get_file_length (input_path )
44
- part_length = int (video_length / WORKERS )
38
+ @app .task (name = 'core.process_file' )
39
+ def process_file (is_video , input_path , output_format ):
40
+ file_length = get_file_length (input_path )
41
+ part_length = int (file_length / get_total_workers ())
45
42
46
- output_part_paths = convert_video ( input_path , output_format , video_length , part_length )
47
- output_path = concatenate_video_parts (input_path , output_format , output_part_paths )
43
+ output_part_paths = convert_file ( is_video , input_path , output_format , file_length , part_length )
44
+ output_path = concatenate_parts (input_path , output_format , output_part_paths )
48
45
49
46
return output_path
50
47
51
48
52
- def convert_video ( input_path , output_format , video_length , part_length ):
49
+ def convert_file ( is_video , input_path , output_format , video_length , part_length ):
53
50
rs = ResultSet ([])
54
51
55
- for i in range (WORKERS ):
52
+ for i in range (get_total_workers () ):
56
53
start_at = i * part_length
57
- stop_at = start_at + part_length if i != WORKERS - 1 else video_length
54
+ stop_at = start_at + part_length if i != get_total_workers () - 1 else video_length
58
55
print ("worker {} will process from {}s to {}s" .format (i + 1 , start_at , stop_at ))
59
- rs .add (process_video_part .delay (input_path , output_format , start_at , stop_at ))
56
+ rs .add (process_part .delay (is_video , input_path , output_format , start_at , stop_at ))
60
57
61
58
return rs .get ()
62
59
@@ -73,7 +70,7 @@ def create_parts_list(input_name, output_part_paths):
73
70
return parts_list_path
74
71
75
72
76
- def concatenate_video_parts (input_path , output_format , output_part_paths ):
73
+ def concatenate_parts (input_path , output_format , output_part_paths ):
77
74
base_name = os .path .basename (input_path )
78
75
input_name = os .path .splitext (base_name )[0 ]
79
76
@@ -90,15 +87,3 @@ def concatenate_video_parts(input_path, output_format, output_part_paths):
90
87
os .remove (output_part_path )
91
88
92
89
return output_path
93
-
94
-
95
- # Audio
96
- @app .task (name = 'core.process_audio' )
97
- def process_audio (input_path , output_format ):
98
- audio_length = get_file_length (input_path )
99
- part_length = int (audio_length / WORKERS )
100
-
101
- output_part_paths = convert_audio (input_path , output_format , audio_length , part_length )
102
- output_path = concatenate_audio_parts (input_path , output_format , output_part_paths )
103
-
104
- return output_path
0 commit comments