Skip to content

Commit 503f789

Browse files
committed
Add enricher
1 parent ac7f216 commit 503f789

File tree

14 files changed

+365
-85
lines changed

14 files changed

+365
-85
lines changed

LICENSE

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
MIT License
22

3-
Copyright (c) 2019 Adam Kariv, Viderum Inc.
3+
Copyright (c) 2019 Adam Kariv
44

55
Permission is hereby granted, free of charge, to any person obtaining a copy
66
of this software and associated documentation files (the "Software"), to deal

dgp/core/base_enricher.py

+151
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
from hashlib import md5
2+
3+
from dataflows import Flow, PackageWrapper
4+
from dataflows import load, concatenate, join, set_type, checkpoint,\
5+
dump_to_path
6+
7+
from .config import Config
8+
from .context import Context
9+
from ..genera.consts import CONFIG_MODEL_MAPPING, RESOURCE_NAME
10+
11+
12+
class BaseEnricher:
13+
14+
def __init__(self, config: Config):
15+
self.config = config
16+
self.prepare()
17+
18+
def prepare(self):
19+
pass
20+
21+
def test(self):
22+
return False
23+
24+
def preflow(self):
25+
return None
26+
27+
def postflow(self):
28+
return None
29+
30+
31+
class ColumnTypeTester(BaseEnricher):
32+
33+
# REQUIRED_COLUMN_TYPES = []
34+
# PROHIBITED_COLUMN_TYPES = []
35+
36+
def test(self):
37+
all_cts = [
38+
x['columnType']
39+
for x in self.config.get(CONFIG_MODEL_MAPPING)
40+
if 'columnType' in x
41+
]
42+
if not all(x in all_cts for x in self.REQUIRED_COLUMN_TYPES):
43+
return False
44+
if any(x in all_cts for x in self.PROHIBITED_COLUMN_TYPES):
45+
return False
46+
return True
47+
48+
49+
def rename_last_resource(name):
50+
51+
def func(package: PackageWrapper):
52+
package.pkg.descriptor['resources'][-1]['name'] = name
53+
num_resources = len(package.pkg.descriptor['resources'])
54+
55+
yield package.pkg
56+
57+
for i, res in enumerate(iter(package)):
58+
if i == (num_resources - 1):
59+
yield res.it
60+
else:
61+
yield res
62+
63+
return func
64+
65+
66+
class DatapackageJoiner(BaseEnricher):
67+
68+
# REF_DATAPACKAGE = ''
69+
# REF_KEY_FIELDS = ['']
70+
# REF_FETCH_FIELDS = ['']
71+
# SOURCE_KEY_FIELDS = ['']
72+
# TARGET_FIELD_COLUMNTYPES = ['']
73+
74+
def prepare(self):
75+
self.ref_hash = md5(self.REF_DATAPACKAGE.encode('utf8')).hexdigest()
76+
self.key = self.__class__.__name__
77+
78+
Flow(load(self.REF_DATAPACKAGE),
79+
rename_last_resource(self.ref_hash),
80+
dump_to_path('.enrichments/{}'.format(self.ref_hash)),
81+
checkpoint(self.ref_hash)).process()
82+
print('DONE PREPARING', self.key)
83+
84+
def preflow(self):
85+
f = Flow(
86+
load('.enrichments/{}/datapackage.json'.format(self.ref_hash)),
87+
concatenate(
88+
fields=dict(
89+
(f, [])
90+
for f in self.REF_KEY_FIELDS + self.REF_FETCH_FIELDS
91+
),
92+
target=dict(
93+
name=self.key,
94+
path=self.key+'.csv'
95+
),
96+
resources=self.ref_hash
97+
),
98+
)
99+
return f
100+
101+
def postflow(self):
102+
target_field_names = [ct.replace(':', '-') for ct in self.TARGET_FIELD_COLUMNTYPES]
103+
steps = [
104+
join(
105+
self.key, self.REF_KEY_FIELDS,
106+
RESOURCE_NAME, self.SOURCE_KEY_FIELDS,
107+
dict(
108+
(
109+
target_field_name,
110+
dict(name=fetch_field)
111+
)
112+
for target_field_name, fetch_field
113+
in zip(target_field_names, self.REF_FETCH_FIELDS)
114+
)
115+
),
116+
]
117+
steps.extend([
118+
set_type(target_field_name,
119+
resources=RESOURCE_NAME,
120+
columnType=target_field_columntype)
121+
for target_field_name, target_field_columntype
122+
in zip(target_field_names, self.TARGET_FIELD_COLUMNTYPES)
123+
])
124+
f = Flow(*steps)
125+
return f
126+
127+
128+
def enrichments_flow(config: Config, context: Context, *classes):
129+
active_enrichments = [e(config) for e in classes]
130+
active_enrichments = [e for e in active_enrichments if e.test()]
131+
132+
steps = []
133+
134+
for e in active_enrichments:
135+
f = e.preflow()
136+
if f:
137+
steps.append(f)
138+
139+
steps.extend([
140+
load(context.enricher_dir),
141+
])
142+
143+
for e in active_enrichments:
144+
f = e.postflow()
145+
if f:
146+
steps.append(f)
147+
148+
f = Flow(
149+
*steps
150+
)
151+
return f

dgp/core/context.py

+10-4
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33
import logging
44

55
from .config import Config
6-
from ..genera.consts import CONFIG_SKIP_ROWS
7-
from ..taxonomies import TaxonomyRegistry
6+
from ..genera.consts import CONFIG_SKIP_ROWS, CONFIG_TAXONOMY_ID
7+
from ..taxonomies import TaxonomyRegistry, Taxonomy
88

99

1010
def trimmer(extended_rows):
1111
for row_number, headers, row in extended_rows:
12-
if headers is not None:
12+
if headers:
1313
row = row[:len(headers)]
1414
if len(row) < len(headers):
1515
continue
@@ -22,12 +22,13 @@ def __init__(self, config: Config, taxonomies: TaxonomyRegistry):
2222
self.config = config
2323
self.taxonomies: TaxonomyRegistry = taxonomies
2424
self._stream = None
25+
self.enricher_dir = None
2526

2627
def _structure_params(self):
2728
skip_rows = self.config.get(CONFIG_SKIP_ROWS) if CONFIG_SKIP_ROWS in self.config else None
2829
return dict(
2930
headers=skip_rows + 1 if skip_rows is not None else None,
30-
ignore_blank_headers=(skip_rows or 0) > 0, # Temporary hack as tabulator is kind of limited here
31+
ignore_blank_headers=True, #(skip_rows or 0) > 0, # Temporary hack as tabulator is kind of limited here
3132
post_parse=[trimmer]
3233
)
3334

@@ -45,3 +46,8 @@ def stream(self):
4546
logging.exception('Failed to open URL')
4647
raise
4748
return self._stream
49+
50+
@property
51+
def taxonomy(self) -> Taxonomy:
52+
if CONFIG_TAXONOMY_ID in self.config:
53+
return self.taxonomies.get(self.config[CONFIG_TAXONOMY_ID])

dgp/genera/consts.py

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
RESOURCE_NAME = 'out'
2+
13
CONFIG_URL = 'source.path'
24
CONFIG_FORMAT = 'source.format'
35
CONFIG_FORMAT_ = (CONFIG_FORMAT, 'File Format')

dgp/genera/enrich/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .enricher import EnricherDGP

dgp/genera/enrich/enricher.py

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from dataflows import Flow, dump_to_path
2+
3+
from ...core import BaseDataGenusProcessor
4+
from ..consts import CONFIG_URL
5+
6+
7+
class EnricherDGP(BaseDataGenusProcessor):
8+
9+
def preflow(self):
10+
11+
def flow(self):
12+
config_hash = self.config._calc_hash(CONFIG_URL)
13+
enricher_dir = '.enrichments/{}'.format(config_hash)
14+
self.context.enricher_dir = '{}/datapackage.json'.format(enricher_dir)
15+
return Flow(
16+
dump_to_path(enricher_dir),
17+
)

dgp/genera/load/analyzers/file_format/xls_format.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@ class XLSFormatAnalyzer(BaseAnalyzer):
1010

1111
def run(self):
1212
if self.config[CONFIG_FORMAT].startswith('xls'):
13-
self.config[CONFIG_SHEET] = 0
13+
self.config.setdefault(CONFIG_SHEET, 0)
1414
self.config[CONFIG_FORCE_STRINGS] = True
15+

dgp/genera/load/loader.py

+9-10
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1-
from dataflows import Flow, load, printer, checkpoint, \
2-
dump_to_path, stream, PackageWrapper
1+
from dataflows import Flow, load, PackageWrapper
32

43
from ...core import BaseDataGenusProcessor, Required, Validator
54
from .analyzers import FileFormatDGP, StructureDGP
6-
from ..consts import *
5+
from ..consts import CONFIG_URL, CONFIG_MODEL_EXTRA_FIELDS, CONFIG_TAXONOMY_CT,\
6+
CONFIG_MODEL_MAPPING, CONFIG_TAXONOMY_ID, RESOURCE_NAME
77

88

99
class LoaderDGP(BaseDataGenusProcessor):
1010

1111
PRE_CHECKS = Validator(
1212
Required(CONFIG_URL, 'Source data URL or path')
1313
)
14-
14+
1515
def init(self):
1616
self.steps = self.init_classes([
1717
FileFormatDGP,
@@ -26,7 +26,7 @@ def func(package: PackageWrapper):
2626
columnTypes = self.config[CONFIG_TAXONOMY_CT]
2727
descriptor['columnTypes'] = columnTypes
2828

29-
resource = descriptor['resources'][0]
29+
resource = descriptor['resources'][-1]
3030
resource['path'] = 'out.csv'
3131
resource['format'] = 'csv'
3232
resource['mediatype'] = 'text/csv'
@@ -41,7 +41,7 @@ def func(package: PackageWrapper):
4141
if self.config[CONFIG_MODEL_EXTRA_FIELDS]:
4242
for kind, field, *value in self.config[CONFIG_MODEL_EXTRA_FIELDS]:
4343
for entry in self.config[CONFIG_MODEL_MAPPING]:
44-
if entry['name'] == field:
44+
if entry['name'] == field:
4545
if kind == 'constant':
4646
entry['constant'] = value[0]
4747
elif kind == 'normalize':
@@ -70,7 +70,7 @@ def func(package: PackageWrapper):
7070
# Our own additions
7171
descriptor['taxonomyId'] = self.config[CONFIG_TAXONOMY_ID]
7272

73-
yield package.pkg
73+
yield package.pkg
7474
yield from package
7575

7676
return func
@@ -80,9 +80,8 @@ def flow(self):
8080
structure_params = self.context._structure_params()
8181
source = self.config._unflatten()['source']
8282
return Flow(
83-
load(source.pop('path'), validate=False, **source, **structure_params),
83+
load(source.pop('path'), validate=False, name=RESOURCE_NAME,
84+
**source, **structure_params),
8485
# printer(),
8586
self.create_fdp(),
8687
)
87-
88-

dgp/genera/simple.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,24 @@
33
from ..core import BaseDataGenusProcessor
44
from .load import LoaderDGP
55
from .transform import TransformDGP
6+
from .enrich import EnricherDGP
67

78

89
class SimpleDGP(BaseDataGenusProcessor):
910

10-
def init(self, post_load_flow=None, post_transform_flow=None):
11+
def init(self,
12+
post_load_flow=None,
13+
post_transform_flow=None):
14+
1115
self.steps = self.init_classes([
1216
LoaderDGP,
1317
TransformDGP,
18+
EnricherDGP,
1419
])
1520
self.post_flows = [
1621
post_load_flow,
1722
post_transform_flow,
23+
None
1824
]
1925

2026
def flow(self):
@@ -26,5 +32,6 @@ def flow(self):
2632
flow = self.post_flows[i]
2733
if flow:
2834
flows.append(flow)
35+
else:
36+
break
2937
return Flow(*flows)
30-

0 commit comments

Comments
 (0)