Skip to content
This repository was archived by the owner on Aug 13, 2021. It is now read-only.

Commit 1de9bd2

Browse files
jaklingerJoel Klinger
andauthored
[271] GtR general ES pipeline (#286)
* make sure conf dir is empty * simplified es config * added orm es config reader * modified setup_es to pick up new es config * swapped es_mode for boolean * aliases now consistent with config * aliases now automatically located * added endpoint field to estasks * added endpoint field to sql2estasks * changed branch name * mappings build * updated docs * updated docs * updated docs * added docstrings * pruned deprecated schema transformations * updated fos fieldname on arxlive * unified data set schema transformations * restructured directory * refactored references to schema_transformation * refactored references to schema_transformation * slimmed down transformations, and included entity_type * pruned ontology * tidied schemas * consistency tests * reverted unrelated json file * added dynamic strict to settings * removed index.json in favour of a single defaults file * harmonised name fieldsofstudy across arxiv * using soft alias until a future PR to minimise changes * added novelty back in * sorted json * sorted json * sorted json * changed schema_transformor to use new simpler mapping * removed to/from keys * new null syntax mapping implemented * cleaned and sorted json * adding temporary eurito-dev index to avoid conflating es7 compatibility issues * adding temporary eurito-dev index to avoid conflating es7 compatibility issues * testing es7 on cordis only * testing es7 on cordis only * testing es7 on cordis only * changes to make cordis es7 run * eurito-dev iteration * compatibility issues between arxlive and eurito arxiv * sorted json * pycountry change no longer assumes not null country * needed to split pathstub args * removed redundant es mappings * empty gtr transformation * [267] Pool ES mappings across datasets (#280) * changed branch name * mappings build * updated docs * updated docs * updated docs * added docstrings * added dynamic strict to settings * removed index.json in favour of a single defaults file * using soft alias until a future PR to minimise changes * cleaned and sorted json * [267] Tidy & slim schema transformations (#281) * pruned deprecated schema transformations * updated fos fieldname on arxlive * unified data set schema transformations * restructured directory * refactored references to schema_transformation * refactored references to schema_transformation * slimmed down transformations, and included entity_type * pruned ontology * tidied schemas * consistency tests * reverted unrelated json file * harmonised name fieldsofstudy across arxiv * added novelty back in * sorted json * sorted json * sorted json Co-authored-by: Joel Klinger <[email protected]> Co-authored-by: Joel Klinger <[email protected]> * patched out es config setup from tests * removed redundant tests * fixed json formatting * fixed bad table name (NB table was empty anyway) * fixed bad table name (NB table was empty anyway) * gtr ontology * none included for testing * added schema transformation * picked up bug in test * gtr ontology is self consistent * added gtr mapping * added gtr to config * fixed merge conflicts * fixed merge conflicts * changed json field names * instiutes are now analyzed and text * sorted and cleaned json * added gtr batchable * empty test commit * couple of tests * tidied json * added schema module to reqs, finished tests * set up root task * moved to es7 image * removed standard token filter, as it is deprecated in es6.5 then removed in es7 * removed start/end dates since theyre empty * misalignment between batchable keys and field names * fixed mapping and removed outcomes due to mapping explosion * removed seconds from fund date fields * tidied json * added none value edgecase to str truncation * Update elasticsearchplus.py Co-authored-by: Joel Klinger <[email protected]>
1 parent 5229238 commit 1de9bd2

File tree

11 files changed

+512
-24
lines changed

11 files changed

+512
-24
lines changed
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
"""
2+
run.py (GtR general)
3+
--------------------
4+
5+
Transfer pre-collected GtR data from MySQL to Elasticsearch.
6+
"""
7+
8+
from ast import literal_eval
9+
import boto3
10+
import json
11+
import logging
12+
import os
13+
from datetime import datetime as dt
14+
15+
from nesta.core.luigihacks.elasticsearchplus import ElasticsearchPlus
16+
from nesta.core.luigihacks.luigi_logging import set_log_level
17+
from nesta.core.orms.orm_utils import db_session, get_mysql_engine
18+
from nesta.core.orms.orm_utils import load_json_from_pathstub
19+
from nesta.core.orms.orm_utils import object_to_dict, get_class_by_tablename
20+
from nesta.core.orms.gtr_orm import Base, Projects, LinkTable, OrganisationLocation
21+
from collections import defaultdict, Counter
22+
23+
24+
def default_pop(dictobj, key, default={}):
25+
"""Pop the key from the dict-like object. If the key doesn't exist, return a default.
26+
27+
Args:
28+
dictobj (dict-like): A dict-like object to modify.
29+
key (hashable): A key to pop from the dict-like object.
30+
default: Any value to be returned as default, should the key not exist.
31+
Returns:
32+
value: Either the value stored at the key, or the default value.
33+
"""
34+
try:
35+
default = dictobj.pop(key)
36+
except KeyError:
37+
pass
38+
return default
39+
40+
41+
def truncate_if_str(value, n):
42+
"""Truncate a value if it's a string, otherwise return the value itself.
43+
44+
Args:
45+
value: Object to truncate, if it's a string
46+
n (int): Number of chars after which to truncate.
47+
Returns:
48+
truncated: A truncated string, otherwise the original value itself.
49+
"""
50+
return value[:n] if type(value) is str else value
51+
52+
53+
def extract_funds(gtr_funds):
54+
"""Extract and deduplicate funding information
55+
56+
Args:
57+
gtr_funds (list of dict): Raw GtR funding information for a single project
58+
Returns:
59+
_gtr_funds (list of dict): Deduplicated GtR funding information, ready for ingestion to ES
60+
"""
61+
funds = {}
62+
for row in gtr_funds:
63+
row = {k:row[k] for k in row if k != 'id'}
64+
row['start_date'] = truncate_if_str(row.pop('start'), 10)
65+
row['end_date'] = truncate_if_str(row.pop('end'), 10)
66+
composite_key = (row[k] for k in ('start_date', 'end_date', 'category',
67+
'amount', 'currencyCode'))
68+
funds[tuple(composite_key)] = row
69+
return [row for _, row in funds.items()]
70+
71+
72+
def get_linked_rows(session, links):
73+
"""Pull rows out of the database from various tables,
74+
as indicated by the link table.
75+
76+
Args:
77+
session (SqlAlchemy session): Open session from which to query the database.
78+
links (dict): Mapping of table name to a list of PKs in that table
79+
Returns:
80+
rows (dict): Mapping of table name to a list of rows of data from that table
81+
"""
82+
linked_rows = defaultdict(list)
83+
for table_name, ids in links.items():
84+
if table_name.startswith('gtr_outcomes'): # Just make counts of GtR outcomes for now as
85+
# they otherwise lead to a mapping explosion
86+
linked_rows['gtr_outcomes'] += [table_name[13:]]*len(ids) # Will make a count of these later
87+
else:
88+
_class = get_class_by_tablename(Base, table_name)
89+
rows = [object_to_dict(_obj)
90+
for _obj in (session.query(_class)\
91+
.filter(_class.id.in_(ids)).all())]
92+
linked_rows[table_name] += rows
93+
return linked_rows
94+
95+
96+
def reformat_row(row, linked_rows, locations):
97+
"""Prepare raw data for ingestion to ES.
98+
99+
Args:
100+
row (dict): Row of data.
101+
linked_rows (dict): Mapping of table name to a list of rows of data from that table
102+
locations (dict): Mapping of organisation id to location data
103+
Returns:
104+
row (dict): Reformatted row of data
105+
"""
106+
# Extract general info
107+
gtr_funds = default_pop(linked_rows, 'gtr_funds')
108+
row['_json_funding_project'] = extract_funds(gtr_funds)
109+
row['_json_outcomes_project'] = dict(Counter(linked_rows['gtr_outcomes']))
110+
row['_terms_topics_project'] = [r['text'] for r in linked_rows['gtr_topic'] if r['text'] != 'Unclassified']
111+
row['_terms_institutes_project'] = [r['name'] for r in linked_rows['gtr_organisations']]
112+
row['_terms_instituteIds_project'] = [r['id'] for r in linked_rows['gtr_organisations']]
113+
114+
# Extract geographic info
115+
org_ids = list(row['_terms_instituteIds_project'])
116+
_locations = [loc for org_id, loc in locations.items() if org_id in org_ids]
117+
row['_terms_countries_project'] = [loc['country_name'] for loc in _locations]
118+
row['_terms_iso2_project'] = [loc['country_alpha_2'] for loc in _locations]
119+
row['_terms_continent_project'] = [loc['continent'] for loc in _locations]
120+
121+
row['_coordinate_institutes_project'] = []
122+
for loc in _locations:
123+
lat = loc['latitude']
124+
lon = loc['longitude']
125+
if lat is None or lon is None:
126+
continue
127+
row['_coordinate_institutes_project'].append({'lat': float(lat), 'lon': float(lon)})
128+
return row
129+
130+
131+
def get_project_links(session, project_ids):
132+
"""Generate the look-up table of table_name to object ids, by project id,
133+
as a prepatory stage for retrieving the "rows" by id from each table_name,
134+
by project id.
135+
136+
Args:
137+
session (SqlAlchemy session): Open session from which to query the database.
138+
project_ids (list-like): List of project ids to extract linked entities from.
139+
Returns:
140+
linked_rows (dict): Mapping of table name to a list of row ids of data in that table
141+
"""
142+
project_links = defaultdict(lambda: defaultdict(list))
143+
for obj in session.query(LinkTable).filter(LinkTable.project_id.in_(project_ids)).all():
144+
row = object_to_dict(obj)
145+
project_links[row['project_id']][row['table_name']].append(row['id'])
146+
return project_links
147+
148+
149+
def get_org_locations(session):
150+
"""Retrieve look-up of all organisation ids to location metadata.
151+
152+
Args:
153+
session (SqlAlchemy session): Open session from which to query the database.
154+
Returns:
155+
locations (nested dict): Mapping of organisation id to location metadata.
156+
"""
157+
locations = {}
158+
for obj in session.query(OrganisationLocation).all():
159+
row = object_to_dict(obj)
160+
locations[row.pop('id')] = row
161+
return locations
162+
163+
164+
def run():
165+
test = literal_eval(os.environ["BATCHPAR_test"])
166+
bucket = os.environ['BATCHPAR_bucket']
167+
batch_file = os.environ['BATCHPAR_batch_file']
168+
db_name = os.environ["BATCHPAR_db_name"]
169+
es_host = os.environ['BATCHPAR_outinfo']
170+
es_port = int(os.environ['BATCHPAR_out_port'])
171+
es_index = os.environ['BATCHPAR_out_index']
172+
entity_type = os.environ["BATCHPAR_entity_type"]
173+
aws_auth_region = os.environ["BATCHPAR_aws_auth_region"]
174+
175+
# database setup
176+
logging.info('Retrieving engine connection')
177+
engine = get_mysql_engine("BATCHPAR_config", "mysqldb",
178+
db_name)
179+
180+
# es setup
181+
logging.info('Connecting to ES')
182+
strans_kwargs = {'filename': 'gtr.json', 'ignore': ['id']}
183+
es = ElasticsearchPlus(hosts=es_host,
184+
port=es_port,
185+
aws_auth_region=aws_auth_region,
186+
no_commit=("AWSBATCHTEST" in
187+
os.environ),
188+
entity_type=entity_type,
189+
strans_kwargs=strans_kwargs,
190+
null_empty_str=True,
191+
coordinates_as_floats=True,
192+
listify_terms=True,
193+
do_sort=False,
194+
ngram_fields=['textBody_abstract_project',
195+
'textBody_potentialImpact_project',
196+
'textBody_techAbstract_project'])
197+
198+
# collect file
199+
logging.info('Retrieving project ids')
200+
s3 = boto3.resource('s3')
201+
obj = s3.Object(bucket, batch_file)
202+
project_ids = json.loads(obj.get()['Body']._raw_stream.read())
203+
logging.info(f"{len(project_ids)} project IDs "
204+
"retrieved from s3")
205+
206+
#
207+
logging.info('Processing rows')
208+
with db_session(engine) as session:
209+
locations = get_org_locations(session)
210+
project_links = get_project_links(session, project_ids)
211+
for count, obj in enumerate((session.query(Projects)
212+
.filter(Projects.id.in_(project_ids))
213+
.all())):
214+
row = object_to_dict(obj)
215+
links = default_pop(project_links, row['id'])
216+
linked_rows = get_linked_rows(session, links)
217+
row = reformat_row(row, linked_rows, locations)
218+
es.index(index=es_index, id=row.pop('id'), body=row)
219+
if not count % 1000:
220+
logging.info(f"{count} rows loaded to "
221+
"elasticsearch")
222+
223+
224+
if __name__ == "__main__":
225+
set_log_level()
226+
if 'BATCHPAR_outinfo' not in os.environ:
227+
from nesta.core.orms.orm_utils import setup_es
228+
from nesta.core.luigihacks.misctools import find_filepath_from_pathstub
229+
es, es_config = setup_es(production=False, endpoint='general',
230+
dataset='gtr', drop_and_recreate=True)
231+
environ = {'config': find_filepath_from_pathstub('mysqldb.config'),
232+
'batch_file' : (''),
233+
'db_name': 'dev',
234+
'bucket': 'nesta-production-intermediate',
235+
'outinfo': es_config['host'],
236+
'out_port': es_config['port'],
237+
'out_index': es_config['index'],
238+
'aws_auth_region': 'eu-west-2',
239+
'entity_type': 'project',
240+
'test': "True"}
241+
for k, v in environ.items():
242+
os.environ[f'BATCHPAR_{k}'] = v
243+
244+
logging.info('Starting...')
245+
run()
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import pytest
2+
from unittest import mock
3+
from schema import Schema
4+
5+
from nesta.core.batchables.general.gtr.run import extract_funds
6+
from nesta.core.batchables.general.gtr.run import get_linked_rows
7+
from nesta.core.batchables.general.gtr.run import reformat_row
8+
from nesta.core.batchables.general.gtr.run import get_project_links
9+
from nesta.core.batchables.general.gtr.run import get_org_locations
10+
11+
PATH='nesta.core.batchables.general.gtr.run.{}'
12+
13+
14+
15+
@pytest.fixture
16+
def gtr_funds():
17+
"""There are 3 unique values here"""
18+
return [{'start': '1 Dec 2020', 'end': '2 Dec 2020',
19+
'category': 'Ingoings', 'amount': 10000, 'currencyCode': '$$$s'},
20+
{'start': '1 Dec 2020', 'end': '2 Dec 2020',
21+
'category': 'Ingoings', 'amount': 10000, 'currencyCode': '$$$s'},
22+
{'start': '1 Dec 2020', 'end': '2 Dec 2021',
23+
'category': 'Ingoings', 'amount': 10000, 'currencyCode': '$$$s'},
24+
{'start': '1 Dec 2020', 'end': '2 Dec 2021',
25+
'category': 'Ingoings', 'amount': 100, 'currencyCode': '$$$s'}]*100
26+
27+
@pytest.fixture
28+
def links():
29+
return {'gtr_outcomes_outcomeA': [1,2,3,4],
30+
'gtr_aTable': [2,3,4],
31+
'gtr_outcomes_outcomeB': [1,3,4],
32+
'gtr_anotherTable': [1,4]}
33+
34+
@pytest.fixture
35+
def link_table_rows():
36+
return [{'project_id': 1, 'table_name': 'table_1', 'id': 23},
37+
{'project_id': 21, 'table_name': 'table_2', 'id': 432},
38+
{'project_id': 1, 'table_name': 'table_1', 'id': 32},
39+
{'project_id': 21, 'table_name': 'table_1', 'id': 12}]
40+
41+
@pytest.fixture
42+
def flattened_link_table_rows():
43+
return {1: {'table_1': [23, 32]},
44+
21: {'table_1': [12], 'table_2': [432]}}
45+
46+
def test_extract_funds(gtr_funds):
47+
output = extract_funds(gtr_funds)
48+
assert len(output) == 3
49+
assert all(len(row) == 5 for row in output)
50+
51+
52+
@mock.patch(PATH.format('get_class_by_tablename'))
53+
@mock.patch(PATH.format('object_to_dict'))
54+
def test_get_linked_rows(mocked_obj_to_dict, mocked_get_class, links):
55+
session = mock.MagicMock()
56+
outputs = [[None]*len(ids) for table_name, ids in links.items()
57+
if not table_name.startswith('gtr_outcomes')]
58+
session.query().filter().all.side_effect = outputs
59+
results = get_linked_rows(session, links)
60+
assert set(results.keys()) == set(['gtr_aTable', 'gtr_anotherTable', 'gtr_outcomes'])
61+
assert len(results['gtr_aTable']) == len(links['gtr_aTable'])
62+
assert len(results['gtr_anotherTable']) == len(links['gtr_anotherTable'])
63+
assert len(results['gtr_outcomes']) == 7 # total outcomes
64+
assert type(results['gtr_outcomes']) == list
65+
66+
67+
@mock.patch(PATH.format('extract_funds'), return_value=['the funds!'])
68+
def test_reformat_row(mocked_extract_funds):
69+
row = {'something': 'value'}
70+
linked_rows = {'gtr_funds': None, # Mocked out
71+
'gtr_topic': [{'text': 'one topic'}, {'text': 'Unclassified'},
72+
{'text': 'a topic'}, {'text': 'another topic'}],
73+
'gtr_outcomes': ['some outcomes', 'some outcomes', 'some other outcomes'],
74+
'gtr_organisations': [{'id': 'first org', 'name':'an org name'}]}
75+
76+
locations = {'first org': {'country_name': 'Japan', 'country_alpha_2': 'jp',
77+
'continent': 'Asia', 'latitude': 1000, 'longitude': 200},
78+
'second org': {'country_name': 'Peru', 'country_alpha_2': 'pe',
79+
'continent': 'South America', 'latitude': -1000, 'longitude': -200}}
80+
81+
row = reformat_row(row, linked_rows, locations)
82+
assert row == {'something': 'value',
83+
'_json_funding_project': ['the funds!'],
84+
'_json_outcomes_project': {'some outcomes': 2, 'some other outcomes': 1},
85+
'_terms_topics_project': ['one topic', 'a topic', 'another topic'],
86+
'_terms_institutes_project': ['an org name'],
87+
'_terms_instituteIds_project': ['first org'],
88+
'_terms_countries_project': ['Japan'],
89+
'_terms_iso2_project': ['jp'],
90+
'_terms_continent_project': ['Asia'],
91+
'_coordinate_institutes_project': [{'lat': 1000, 'lon': 200}]}
92+
93+
94+
@mock.patch(PATH.format('LinkTable'))
95+
@mock.patch(PATH.format('object_to_dict'))
96+
def test_get_project_links(mocked_otd, mocked_LinkTable, link_table_rows, flattened_link_table_rows):
97+
mocked_otd.side_effect = link_table_rows
98+
session = mock.MagicMock()
99+
session.query().filter().all.return_value = [None]*len(link_table_rows)
100+
project_links = get_project_links(session, None)
101+
Schema(project_links).validate(flattened_link_table_rows)
102+
103+
104+
@mock.patch(PATH.format('object_to_dict'))
105+
def test_get_org_locations(link_table_rows):
106+
session = mock.MagicMock()
107+
session.query().all.return_value = [None]*len(link_table_rows)
108+
locations = get_org_locations(session)
109+
for row in link_table_rows:
110+
assert locations[row.pop('id')] == row
4 Bytes
Binary file not shown.

nesta/core/config/luigi.cfg

0 Bytes
Binary file not shown.

nesta/core/luigihacks/elasticsearchplus.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,18 @@ def _null_empty_str(row):
293293
_row[k] = None
294294
return _row
295295

296+
297+
# Double underscore: a very private method
298+
def __floatify_coord(coord):
299+
_coord = deepcopy(coord)
300+
if _coord['lat'] is None or _coord['lon'] is None:
301+
_coord = None
302+
else:
303+
_coord['lat'] = float(_coord['lat'])
304+
_coord['lon'] = float(_coord['lon'])
305+
return _coord
306+
307+
296308
def _coordinates_as_floats(row):
297309
"""Ensure coordinate data are always floats.
298310
@@ -307,13 +319,13 @@ def _coordinates_as_floats(row):
307319
continue
308320
if v is None:
309321
continue
310-
if v['lat'] is None or v['lon'] is None:
311-
_row[k] = None
312-
continue
313-
_row[k]['lat'] = float(v['lat'])
314-
_row[k]['lon'] = float(v['lon'])
322+
if type(v) is list:
323+
_row[k] = [__floatify_coord(coord) for coord in v]
324+
else:
325+
_row[k] = __floatify_coord(v)
315326
return _row
316327

328+
317329
def _country_lookup():
318330
"""Extract country/nationality --> iso2 code lookup
319331
from a public json file.

0 commit comments

Comments
 (0)