Skip to content

Commit c02eefc

Browse files
Merge pull request #80 from googleinterns/main
update most recent version
2 parents 4b4a46d + a804cbc commit c02eefc

40 files changed

+4269
-3293
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"level1/s0.csv": 1607310026831041}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"level0/s0.csv": 1607310020479006.0, "level0/s1.csv": 1607310936975548.0}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"level1/s0.csv": 1607310021218054.0}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"raw_file": "ClockworkMapTests-testMapWear__2020-12-07T03:00:20.471Z.csv", "levels": {"names": ["level0", "level1"], "level0": {"names": ["level0/s0.csv", "level0/s1.csv"], "frequency": 0.00010911451500060223, "number": 195335}, "level1": {"names": ["level1/s0.csv"], "frequency": 1.0909496393179725e-06, "number": 1953}}, "raw_number": 195335, "start": 1607310020479006.0, "end": 1607311810662466.0}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"level1/s0.csv": 1607310021084773.0}

backend/app.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ runtime: python37
99
env: standard
1010

1111
manual_scaling:
12-
instances: 1
12+
instances: 10
1313

1414
# Set App Engine instance class (defaults to F1)
1515
# See https://cloud.google.com/appengine/docs/standard/#instance_classes

backend/cron.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ cron:
22
- description: "Scan new files to downsample"
33
url: /downsample
44
target: api
5-
schedule: every 10 mins
5+
schedule: every 5 mins

backend/data_fetcher.py

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
"""A module for fetching from multiple-level preprocessing."""
1616

1717
import utils
18-
18+
import time
1919
from level_slices_reader import LevelSlices
2020
from metadata import Metadata
2121

@@ -77,9 +77,17 @@ def fetch(self, strategy, number_records, timespan_start, timespan_end):
7777
}
7878
]
7979
"""
80+
81+
prevTime = time.time()
82+
print("fetch data starts", prevTime)
83+
8084
self._metadata = Metadata(
8185
self._preprocess_dir, bucket=self._preprocess_bucket)
8286
self._metadata.load()
87+
88+
diff = time.time() - prevTime
89+
prevTime = time.time()
90+
print("meta data done", diff)
8391

8492
if timespan_start is None:
8593
timespan_start = self._metadata['start']
@@ -100,6 +108,10 @@ def fetch(self, strategy, number_records, timespan_start, timespan_end):
100108
target_level = self._metadata['levels'][self._metadata['levels']
101109
['names'][target_level_index]]
102110

111+
diff = time.time() - prevTime
112+
prevTime = time.time()
113+
print("target level located",diff)
114+
103115
level_metadata = Metadata(
104116
self._preprocess_dir, strategy, utils.get_level_name(
105117
target_level_index), bucket=self._preprocess_bucket)
@@ -115,15 +127,62 @@ def fetch(self, strategy, number_records, timespan_start, timespan_end):
115127
self._preprocess_dir,
116128
utils.get_level_name(target_level_index),
117129
single_slice, strategy) for single_slice in target_slices_names]
130+
131+
diff = time.time() - prevTime
132+
prevTime = time.time()
133+
print("all slice found", diff)
134+
135+
target_slice_paths_min = [utils.get_slice_path(
136+
self._preprocess_dir,
137+
utils.get_level_name(target_level_index),
138+
single_slice, 'min') for single_slice in target_slices_names]
139+
140+
target_slice_paths_max = [utils.get_slice_path(
141+
self._preprocess_dir,
142+
utils.get_level_name(target_level_index),
143+
single_slice, 'max') for single_slice in target_slices_names]
144+
145+
diff = time.time() - prevTime
146+
prevTime = time.time()
147+
print("min max slice found", diff)
118148

119149
# Reads records and downsamples.
120150
target_slices = LevelSlices(
121151
target_slice_paths, self._preprocess_bucket)
152+
153+
154+
122155
target_slices.read(timespan_start, timespan_end)
123-
number_target_records = target_slices.get_records_count()
124156

157+
diff = time.time() - prevTime
158+
prevTime = time.time()
159+
print("main file read", diff)
160+
161+
target_slices_min = LevelSlices(
162+
target_slice_paths_min, self._preprocess_bucket)
163+
164+
target_slices_max = LevelSlices(
165+
target_slice_paths_max, self._preprocess_bucket)
166+
target_slices_min.read(timespan_start, timespan_end)
167+
target_slices_max.read(timespan_start, timespan_end)
168+
169+
diff = time.time() - prevTime
170+
prevTime = time.time()
171+
print("min max file read", diff)
172+
173+
minList = target_slices_min.get_min()
174+
maxList = target_slices_max.get_max()
175+
176+
diff = time.time() - prevTime
177+
prevTime = time.time()
178+
print("min max get", diff)
179+
number_target_records = target_slices.get_records_count()
125180
target_slices.downsample(strategy, max_records=number_records)
126-
downsampled_data = target_slices.format_response()
181+
downsampled_data = target_slices.format_response(minList, maxList)
182+
183+
diff = time.time() - prevTime
184+
prevTime = time.time()
185+
print("dowmsample finished", diff)
127186
number_result_records = target_slices.get_records_count()
128187

129188
if number_target_records == 0:
@@ -146,6 +205,8 @@ def _binary_search(self, data_list, value, reverse=False):
146205
Returns:
147206
An int of index for the result.
148207
"""
208+
print(data_list)
209+
149210
if not data_list:
150211
return -1
151212

backend/level_slices_reader.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ def __init__(self, filenames, bucket=None):
2727
self._filenames = filenames
2828
self._bucket = bucket
2929
self._records = defaultdict(list)
30+
self._minList = defaultdict(float)
31+
self._maxList = defaultdict(float)
3032

3133
def read(self, start, end):
3234
"""Reads and loads records from a set of slices, only records in the range
@@ -75,7 +77,7 @@ def downsample(self, strategy, downsample_factor=1, max_records=None):
7577
self._records[channel], strategy, downsample_factor)
7678
return self._records
7779

78-
def format_response(self):
80+
def format_response(self, minList: defaultdict(float), maxList: defaultdict(float)):
7981
"""Gets current data in dict type for http response.
8082
8183
Returns:
@@ -85,6 +87,30 @@ def format_response(self):
8587
for channel in self._records.keys():
8688
response.append({
8789
'name': channel,
88-
'data': [[record[0], record[1]] for record in self._records[channel]]
90+
'data': [[record[0], record[1]] for record in self._records[channel]],
91+
'min': minList[channel],
92+
'max': maxList[channel],
8993
})
9094
return response
95+
96+
def get_min(self):
97+
if self._records is not None:
98+
for channel in self._records.keys():
99+
channelData = self._records[channel]
100+
min = channelData[0][1]
101+
for data in channelData:
102+
if data[1] < min:
103+
min = data[1]
104+
self._minList[channel] = min
105+
return self._minList
106+
107+
def get_max(self):
108+
if self._records is not None:
109+
for channel in self._records.keys():
110+
channelData = self._records[channel]
111+
max = channelData[0][1]
112+
for data in channelData:
113+
if data[1] > max:
114+
max = data[1]
115+
self._minList[channel] = max
116+
return self._minList

backend/multiple_level_preprocess.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -151,18 +151,22 @@ def _raw_preprocess(self, number_per_slice):
151151
level_slice = LevelSlice(
152152
slice_name, bucket=self._preprocess_bucket)
153153
raw_slice = raw_data.read_next_slice()
154-
print(raw_slice)
155-
if isinstance(raw_slice, str):
156-
return raw_slice
157-
level_slice.save(raw_slice)
158-
raw_start_times.append(raw_slice[0][0])
159-
160-
slice_index += 1
161-
record_count += len(raw_slice)
162-
if timespan_start == -1:
163-
timespan_start = raw_slice[0][0]
164-
timespan_end = raw_slice[-1][0]
154+
if len(raw_slice) > 0 and len(raw_slice[0]) > 0:
155+
print(raw_slice)
156+
if isinstance(raw_slice, str):
157+
return raw_slice
158+
level_slice.save(raw_slice)
159+
raw_start_times.append(raw_slice[0][0])
165160

161+
slice_index += 1
162+
record_count += len(raw_slice)
163+
if timespan_start == -1:
164+
timespan_start = raw_slice[0][0]
165+
timespan_end = raw_slice[-1][0]
166+
else:
167+
print('Invalid Slice: ', raw_slice)
168+
slice_index += 1
169+
record_count += len(raw_slice)
166170
self._metadata['raw_number'] = record_count
167171
self._metadata['start'] = timespan_start
168172
self._metadata['end'] = timespan_end

0 commit comments

Comments
 (0)