Skip to content

Commit e62ed8f

Browse files
committed
Implement Batch Jobs using Google Drive
1 parent 50ca7cc commit e62ed8f

File tree

19 files changed

+481
-98
lines changed

19 files changed

+481
-98
lines changed

src/api/collections.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ export default class Data {
4848

4949
const b = Date.now();
5050
const pContext = this.context.processingContext();
51-
this.ee = await pContext.connectGee(true);
51+
this.ee = await pContext.connect(true);
5252
console.log(`Established connection to GEE for STAC (${Date.now()-b} ms)`);
5353

5454
return num;

src/api/files.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,7 @@ export default class FilesAPI {
9494
size: newFileStat.size,
9595
modified: Utils.getISODateTime(newFileStat.mtime)
9696
});
97-
}
98-
catch (e) {
97+
} catch (e) {
9998
if (this.context.debug) {
10099
console.error(e);
101100
}

src/api/jobs.js

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ export default class JobsAPI {
5454
if (!req.user._id) {
5555
throw new Errors.AuthenticationRequired();
5656
}
57+
58+
// Update the task status
59+
this.context.processingContext(req.user).startTaskMonitor();
5760
}
5861

5962
async getJobs(req, res) {
@@ -200,6 +203,9 @@ export default class JobsAPI {
200203
}
201204

202205
const makeStorageUrl = obj => {
206+
if (Utils.isUrl(obj.href)) {
207+
return obj;
208+
}
203209
obj.href = API.getUrl("/storage/" + job.token + "/" + obj.href);
204210
return obj;
205211
};
@@ -246,7 +252,7 @@ export default class JobsAPI {
246252
if (this.storage.isFieldEditable(key)) {
247253
switch(key) {
248254
case 'process': {
249-
const pg = new ProcessGraph(req.body.process, this.context.processingContext(req.user));
255+
const pg = new ProcessGraph(req.body.process, this.context.processingContext(req.user, job));
250256
pg.allowUndefinedParameters(false);
251257
promises.push(pg.validate());
252258
break;

src/api/services.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ export default class ServicesAPI {
118118
if (this.storage.isFieldEditable(key)) {
119119
switch(key) {
120120
case 'process': {
121-
const pg = new ProcessGraph(req.body.process, this.context.processingContext(req.user));
121+
const pg = new ProcessGraph(req.body.process, this.context.processingContext(req.user, service));
122122
// ToDo 1.0: Correctly handle service paramaters #79
123123
pg.allowUndefinedParameters(false);
124124
promises.push(pg.validate());

src/api/worker/batchjob.js

Lines changed: 78 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,40 +19,72 @@ export default async function run(config, storage, user, query) {
1919
await Promise.all(cleanupTasks);
2020

2121
logger.info("Starting batch job");
22+
2223
await storage.updateJobStatus(query, 'running');
2324

24-
const context = config.processingContext(user);
25+
const jobfolder = storage.getJobFolder(job._id);
26+
await fse.ensureDir(path.dirname(jobfolder));
27+
28+
const context = config.processingContext(user, job);
2529
const pg = new ProcessGraph(job.process, context, logger);
2630
await pg.execute();
2731

28-
const computeTasks = pg.getResults().map(async (datacube) => {
29-
const response = await GeeResults.retrieve(context, datacube, logger);
30-
const params = datacube.getOutputFormatParameters();
31-
const filename = (params.name || String(Utils.generateHash())) + GeeResults.getFileExtension(datacube, config);
32-
const filepath = storage.getJobFile(job._id, filename);
33-
logger.debug("Storing result to: " + filepath);
34-
await fse.ensureDir(path.dirname(filepath));
35-
await new Promise((resolve, reject) => {
36-
const writer = fse.createWriteStream(filepath);
37-
response.data.pipe(writer);
38-
writer.on('error', reject);
39-
writer.on('close', resolve);
40-
});
41-
return { filepath, datacube };
32+
const computeTasks = pg.getResults().map(async (dc) => {
33+
const format = config.getOutputFormat(dc.getOutputFormat());
34+
const datacube = format.preprocess(GeeResults.BATCH, context, dc, logger);
35+
36+
if (format.canExport()) {
37+
const tasks = await format.export(context.ee, dc, context.getResource());
38+
storage.addTasks(job, tasks);
39+
context.startTaskMonitor();
40+
const filepath = await new Promise((resolve, reject) => {
41+
setInterval(async () => {
42+
const updatedJob = await storage.getById(job._id, job.user_id);
43+
if (!updatedJob) {
44+
reject(new Error("Job was deleted"));
45+
}
46+
if (['canceled', 'error', 'finished'].includes(updatedJob.status)) {
47+
// todo: resolve google drive URLs
48+
resolve(job.googleDriveResults);
49+
}
50+
}, 10000);
51+
});
52+
return { filepath, datacube };
53+
}
54+
else {
55+
const response = await format.retrieve(context.ee, dc);
56+
const params = datacube.getOutputFormatParameters();
57+
const filename = (params.name || String(Utils.generateHash())) + GeeResults.getFileExtension(datacube, config);
58+
const filepath = storage.getJobFile(job._id, filename);
59+
await new Promise((resolve, reject) => {
60+
const writer = fse.createWriteStream(filepath);
61+
response.data.pipe(writer);
62+
writer.on('error', reject);
63+
writer.on('close', resolve);
64+
});
65+
return { filepath, datacube };
66+
}
4267
});
4368

4469
await Promise.all(computeTasks);
4570

4671
const results = [];
4772
for (const task of computeTasks) {
48-
results.push(await task);
73+
const { filepath, datacube } = await task;
74+
if (Array.isArray(filepath)) {
75+
filepath.forEach(fp => results.push({ filepath: fp, datacube }));
76+
}
77+
else {
78+
results.push({ filepath, datacube });
79+
}
4980
}
5081

5182
const item = await createSTAC(storage, job, results);
5283
const stacpath = storage.getJobFile(job._id, 'stac.json');
5384
await fse.writeJSON(stacpath, item, {spaces: 2});
5485

5586
logger.info("Finished");
87+
// todo: set to error is any task failed
5688
storage.updateJobStatus(query, 'finished');
5789
} catch(e) {
5890
logger.error(e);
@@ -78,17 +110,36 @@ async function createSTAC(storage, job, results) {
78110
let endTime = null;
79111
const extents = [];
80112
for(const { filepath, datacube } of results) {
81-
const filename = path.basename(filepath);
82-
const stat = await fse.stat(filepath);
83-
let asset = {
84-
href: path.relative(folder, filepath),
85-
roles: ["data"],
86-
type: Utils.extensionToMediaType(filepath),
87-
title: filename,
88-
"file:size": stat.size,
89-
created: stat.birthtime,
90-
updated: stat.mtime
91-
};
113+
if (!filepath) {
114+
continue;
115+
}
116+
117+
let asset;
118+
let filename;
119+
if (Utils.isUrl(filepath)) {
120+
let url = new URL(filepath);
121+
console.log(url);
122+
filename = path.basename(url.pathname || url.hash.substring(1));
123+
asset = {
124+
href: filepath,
125+
roles: ["data"],
126+
// type: Utils.extensionToMediaType(filepath),
127+
title: filename
128+
};
129+
}
130+
else {
131+
filename = path.basename(filepath);
132+
const stat = await fse.stat(filepath);
133+
asset = {
134+
href: path.relative(folder, filepath),
135+
roles: ["data"],
136+
type: Utils.extensionToMediaType(filepath),
137+
title: filename,
138+
"file:size": stat.size,
139+
created: stat.birthtime,
140+
updated: stat.mtime
141+
};
142+
}
92143

93144
if (datacube.hasT()) {
94145
const t = datacube.dimT();

src/api/worker/sync.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@ export default async function run(config, user, id, process, log_level) {
2525
if (pg.getResults().length > 1) {
2626
logger.warn("Multiple results can't be processed in synchronous mode. Only the result from the result node will be returned.");
2727
}
28-
return await GeeResults.retrieve(context, resultNode.getResult(), logger);
28+
29+
const dc = resultNode.getResult();
30+
const format = config.getOutputFormat(dc.getOutputFormat());
31+
const dc2 = format.preprocess(GeeResults.SYNC, context, dc, logger);
32+
return await format.retrieve(context.ee, dc2);
2933
}
3034

3135
export async function getResultLogs(user_id, id, log_level) {

src/api/worker/webservice.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ export default async function run(config, storage, user, query, xyz) {
99

1010
try {
1111
const rect = storage.calculateXYZRect(...xyz);
12-
const context = config.processingContext(user);
12+
const context = config.processingContext(user, service);
1313
// Update user id to the user id, which stored the job.
1414
// See https://github.com/Open-EO/openeo-earthengine-driver/issues/19
1515
context.setUserId(service.user_id);
@@ -29,7 +29,9 @@ export default async function run(config, storage, user, query, xyz) {
2929
dc.setOutputFormat('png');
3030
}
3131

32-
return await GeeResults.retrieve(context, dc, logger);
32+
const format = config.getOutputFormat(dc.getOutputFormat());
33+
const dc2 = format.preprocess(GeeResults.SERVICE, context, dc, logger);
34+
return await format.retrieve(context.ee, dc2);
3335
} catch(e) {
3436
logger.error(e);
3537
throw e;

src/formats/bitmap.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import GeeResults from "../processes/utils/results.js";
33
import DataCube from "../datacube/datacube.js";
44
import Utils from "../utils/utils.js";
55
import FileFormat, { EPSGCODE_PARAMETER, SIZE_PARAMETER } from "./fileformat.js";
6+
import HttpUtils from "../utils/http.js";
67

78
export const EPSGCODE_PARAMETER_BITMAP = Object.assign({}, EPSGCODE_PARAMETER);
89
EPSGCODE_PARAMETER_BITMAP.default = 4326;
@@ -96,7 +97,7 @@ export default class BitmapLike extends FileFormat {
9697
return renderer === 'filmstrip';
9798
}
9899

99-
preprocess(context, dc, logger) {
100+
preprocess(mode, context, dc, logger) {
100101
const ee = context.ee;
101102
const parameters = dc.getOutputFormatParameters();
102103

@@ -175,7 +176,7 @@ export default class BitmapLike extends FileFormat {
175176
reject('Download URL provided by Google Earth Engine is empty.');
176177
}
177178
else {
178-
resolve(url);
179+
resolve(HttpUtils.stream(url));
179180
}
180181
});
181182
});

src/formats/fileformat.js

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ export const SIZE_PARAMETER = {
1616

1717
export const SCALE_PARAMETER = {
1818
type: 'number',
19-
description: 'Scale of the image in meters per pixel.',
19+
description: 'Scale of the image in meters per pixel. Defaults to native resolution in batch jobs, and 100 otherwise.',
2020
default: 100,
2121
minimum: 1
2222
};
@@ -81,15 +81,19 @@ export default class FileFormat {
8181
};
8282
}
8383

84-
preprocess(context, dc/*, logger*/) {
84+
preprocess(mode, context, dc/*, logger*/) {
8585
return dc;
8686
}
8787

88-
async retrieve(/*ee, dc*/) {
88+
async retrieve(/*ee, dc */) {
8989
throw new Error('Not implemented');
9090
}
9191

92-
async export(/*ee, dc*/) {
92+
canExport() {
93+
return false;
94+
}
95+
96+
async export(/*ee, dc */) {
9397
throw new Error('Not implemented');
9498
}
9599

src/formats/gif.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import HttpUtils from "../utils/http.js";
12
import Utils from "../utils/utils.js";
23
import BitmapLike from "./bitmap.js";
34

@@ -53,7 +54,7 @@ export default class GifFormat extends BitmapLike {
5354
reject('Download URL provided by Google Earth Engine is empty.');
5455
}
5556
else {
56-
resolve(url);
57+
resolve(HttpUtils.stream(url));
5758
}
5859
});
5960
});

0 commit comments

Comments
 (0)