forked from amazoniabiodiversity/webapp
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathadmin.py
87 lines (75 loc) · 3.34 KB
/
admin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# This file is part of VertNet: https://github.com/VertNet/webapp
#
# VertNet is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# VertNet is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Foobar. If not, see: http://www.gnu.org/licenses
# This module contains request handlers for admin APIs.
import logging
import os
import webapp2
from vertnet.service.model import IndexJob
from google.appengine.api import files, namespace_manager
from google.appengine.ext.webapp.util import run_wsgi_app
from mapreduce import control as mrc
from mapreduce import input_readers
IS_DEV = os.environ.get('SERVER_SOFTWARE', '').startswith('Development')
# App routes:
routes = [
webapp2.Route(r'/mr/finalize', handler='app.AppHandler:mapreduce_finalize',
name='mapreduce_finalize'),
webapp2.Route(r'/mr/indexall', handler='app.AppHandler:index', name='index'),
]
class AppHandler(webapp2.RequestHandler):
def mapreduce_finalize(self):
"""Finalizes indexing MR job by finalizing files on GCS."""
mrid = self.request.headers.get('Mapreduce-Id')
namespace = namespace_manager.get_namespace()
job = IndexJob.get_by_id(mrid, namespace=namespace)
if not job:
return
logging.info('Finalizing index job for resource %s' % job.resource)
files.finalize(job.write_path)
job.done = True
job.put()
logging.info('Index job finalized for resource %s' % job.resource)
def index(self):
"""Fires off an indexing MR job over files in GCS at supplied path."""
input_class = (input_readers.__name__ + "." + input_readers.FileInputReader.__name__)
path = self.request.get('path')
shard_count = self.request.get_range('shard_count', default=8)
processing_rate = self.request.get_range('processing_rate', default=100)
files_pattern = '/gs/%s' % path
# Create file on GCS to log any failed index puts:
namespace = namespace_manager.get_namespace()
filename = '/gs/vn-staging/errors/failures-%s-all.csv' % namespace
write_path = files.gs.create(filename, mime_type='text/tab-separated-values',
acl='public-read')
mrid = mrc.start_map(
path,
"vertnet.service.search.build_search_index",
input_class,
{
"input_reader": dict(
files=[files_pattern],
format='lines'),
"resource": path,
"write_path": write_path,
"processing_rate": processing_rate,
"shard_count": shard_count
},
mapreduce_parameters={'done_callback': '/mr/finalize'},
shard_count=shard_count)
IndexJob(id=mrid, namespace=namespace, resource=path, write_path=write_path,
failed_logs=['NONE']).put()
handler = webapp2.WSGIApplication(routes, debug=IS_DEV)
def main():
run_wsgi_app(handler)