-
Notifications
You must be signed in to change notification settings - Fork 201
/
Copy pathray_video_deduplicator.py
57 lines (47 loc) · 1.92 KB
/
ray_video_deduplicator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import hashlib
from data_juicer.utils.mm_utils import (close_video, load_data_with_context,
load_video)
from ..base_op import OPERATORS
from ..op_fusion import LOADED_VIDEOS
from .ray_basic_deduplicator import RayBasicDeduplicator
OP_NAME = 'ray_video_deduplicator'
@OPERATORS.register_module(OP_NAME)
@LOADED_VIDEOS.register_module(OP_NAME)
class RayVideoDeduplicator(RayBasicDeduplicator):
"""
Deduplicator to deduplicate samples at document-level using exact matching
of videos between documents.
"""
def __init__(self,
backend: str = 'ray_actor',
redis_address: str = 'redis://localhost:6379',
*args,
**kwargs):
"""
Initialization.
:param backend: the backend for dedup, either 'ray_actor' or 'redis'
:param redis_address: the address of redis server
:param args: extra args
:param kwargs: extra args
"""
super().__init__(backend=backend,
redis_address=redis_address,
*args,
**kwargs)
def calculate_hash(self, sample, context=False):
if self.video_key not in sample or not sample[self.video_key]:
return RayBasicDeduplicator.EMPTY_HASH_VALUE
# load videos
loaded_video_keys = sample[self.video_key]
sample, videos = load_data_with_context(sample, context,
loaded_video_keys, load_video)
# compute hash
md5_hash = hashlib.md5()
for key in videos:
# consider the multi stream of video in one container
for packet in videos[key].demux():
if packet.stream.type == 'video':
md5_hash.update(bytes(packet))
for key in videos:
close_video(videos[key])
return md5_hash.hexdigest()