-
Notifications
You must be signed in to change notification settings - Fork 12
/
schema_transformation.py
415 lines (329 loc) · 17.4 KB
/
schema_transformation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
# -*- coding: utf-8 -*-
"""Functions for handling schema updates within any yoda-metadata file."""
__copyright__ = 'Copyright (c) 2018-2023, Utrecht University'
__license__ = 'GPLv3, see LICENSE'
__all__ = ['rule_batch_transform_vault_metadata',
'rule_batch_vault_metadata_correct_orcid_format',
'rule_batch_vault_metadata_schema_report',
'rule_get_transformation_info',
'api_transform_metadata']
import json
import os
import re
import time
import genquery
import session_vars
import meta
import schema
import schema_transformations
from util import *
def execute_transformation(ctx, metadata_path, transform, keep_metadata_backup=True):
"""Transform a metadata file with the given transformation function."""
coll, data = os.path.split(metadata_path)
group_name = metadata_path.split('/')[3]
metadata = jsonutil.read(ctx, metadata_path)
metadata = transform(ctx, metadata)
# make_metadata_backup is only relevant for research
if group_name.startswith('research-'):
if keep_metadata_backup:
backup = '{}/transformation-backup[{}].json'.format(coll, str(int(time.time())))
data_object.copy(ctx, metadata_path, backup)
jsonutil.write(ctx, metadata_path, metadata)
elif group_name.startswith('vault-'):
new_path = '{}/yoda-metadata[{}].json'.format(coll, str(int(time.time())))
# print('TRANSFORMING in vault <{}> -> <{}>'.format(metadata_path, new_path))
jsonutil.write(ctx, new_path, metadata)
copy_acls_from_parent(ctx, new_path, "default")
ctx.rule_provenance_log_action("system", coll, "updated metadata schema")
log.write(ctx, "Transformed %s" % (new_path))
else:
raise AssertionError()
@api.make()
def api_transform_metadata(ctx, coll, keep_metadata_backup=True):
"""Transform a yoda-metadata file in the given collection to the active schema."""
metadata_path = meta.get_collection_metadata_path(ctx, coll)
if metadata_path.endswith('.json'):
# JSON metadata.
log.write(ctx, 'Transforming JSON metadata in the research space: <{}>'.format(metadata_path))
transform = get(ctx, metadata_path)
if transform is None:
return api.Error('undefined_transformation', 'No transformation found')
execute_transformation(ctx, metadata_path, transform, keep_metadata_backup)
else:
return api.Error('no_metadata', 'No metadata file found')
return None
def get(ctx, metadata_path, metadata=None):
"""Find a transformation that can be executed on the given metadata JSON.
:param ctx: Combined type of a ctx and rei struct
:param metadata_path: Path to metadata JSON
:param metadata: Optional metadata object
:returns: Transformation function on success, or None if no transformation was found
"""
try:
src = schema.get_schema_id(ctx, metadata_path, metadata=metadata)
dst = schema.get_active_schema_id(ctx, metadata_path)
# Ideally, we would check that the metadata is valid in its current
# schema before claiming that we can transform it...
# print('{} -> {}'.format(src,dst))
return schema_transformations.get(src, dst)
except KeyError:
return None
except error.UUError:
# print('{} -> {} ERR {}'.format(src,dst, e))
return None
# TODO: @rule.make
def rule_get_transformation_info(rule_args, callback, rei):
"""Check if a yoda-metadata.json transformation is possible and if so, retrieve transformation description.
:param rule_args: [0] JSON path
[1] Transformation possible? true|false
[2] human-readable description of the transformation
:param callback: Callback to rule Language
:param rei: The rei struct
"""
json_path = rule_args[0]
rule_args[1:3] = 'false', ''
transform = get(callback, json_path)
if transform is not None:
rule_args[1:3] = 'true', transformation_html(transform)
def copy_acls_from_parent(ctx, path, recursive_flag):
"""
When inheritance is missing we need to copy ACLs when introducing new data in vault package.
:param ctx: Combined type of a ctx and rei struct
:param path: Path of object that needs the permissions of parent
:param recursive_flag: Either "default" for no recursion or "recursive"
"""
parent = os.path.dirname(path)
iter = genquery.row_iterator(
"COLL_ACCESS_NAME, COLL_ACCESS_USER_ID",
"COLL_NAME = '" + parent + "'",
genquery.AS_LIST, ctx
)
for row in iter:
access_name = row[0]
user_id = int(row[1])
user_name = user.name_from_id(ctx, user_id)
# iRODS keeps ACLs for deleted users in the iCAT database (https://github.com/irods/irods/issues/7778),
# so we need to skip ACLs referring to users that no longer exist.
if user_name == "":
continue
if access_name == "own":
log.write(ctx, "iiCopyACLsFromParent: granting own to <" + user_name + "> on <" + path + "> with recursiveFlag <" + recursive_flag + ">")
msi.set_acl(ctx, recursive_flag, "own", user_name, path)
elif access_name == "read object":
log.write(ctx, "iiCopyACLsFromParent: granting read to <" + user_name + "> on <" + path + "> with recursiveFlag <" + recursive_flag + ">")
msi.set_acl(ctx, recursive_flag, "read", user_name, path)
elif access_name == "modify object":
log.write(ctx, "iiCopyACLsFromParent: granting write to <" + user_name + "> on <" + path + "> with recursiveFlag <" + recursive_flag + ">")
msi.set_acl(ctx, recursive_flag, "write", user_name, path)
# TODO: @rule.make
def rule_batch_transform_vault_metadata(rule_args, callback, rei):
"""
Transform all metadata JSON files in the vault to the active schema.
:param rule_args: [0] First COLL_ID to check - initial = 0
[1] Batch size, <= 256
[2] Pause between checks (float)
[3] Delay between batches in seconds
:param callback: Callback to rule Language
:param rei: The rei struct
"""
coll_id = int(rule_args[0])
batch = int(rule_args[1])
pause = float(rule_args[2])
delay = int(rule_args[3])
rods_zone = session_vars.get_map(rei)["client_user"]["irods_zone"]
# Check one batch of metadata schemas.
# Find all research and vault collections, ordered by COLL_ID.
iter = genquery.row_iterator(
"ORDER(COLL_ID), COLL_NAME",
"COLL_NAME like '/%s/home/vault-%%' AND DATA_NAME like 'yoda-metadata%%json' AND COLL_ID >= '%d'" % (rods_zone, coll_id),
genquery.AS_LIST, callback)
# Check each collection in batch.
for row in iter:
coll_id = int(row[0])
coll_name = row[1]
path_parts = coll_name.split('/')
# Only process collections that are directly beneath the apex
# vault collection, e.g. /zoneName/home/vault-foo/data-package[123],
# since metadata in the original part of the data package should not
# be processed.
if not re.match(r"^\/[^\/]+\/home\/[^\/]+\/[^\/]+$", coll_name):
continue
try:
# Get vault package path.
vault_package = '/'.join(path_parts[:5])
metadata_path = meta.get_latest_vault_metadata_path(callback, vault_package)
log.write(callback, "[METADATA] Checking whether metadata needs to be transformed: " + metadata_path)
if metadata_path != '':
transform = get(callback, metadata_path)
if transform is not None:
log.write(callback, "[METADATA] Executing transformation for: " + metadata_path)
execute_transformation(callback, metadata_path, transform)
except Exception as e:
log.write(callback, "[METADATA] Exception occurred during schema transformation of %s: %s" % (coll_name, str(type(e)) + ":" + str(e)))
# Sleep briefly between checks.
time.sleep(pause)
# The next collection to check must have a higher COLL_ID.
coll_id += 1
else:
# All done.
coll_id = 0
log.write(callback, "[METADATA] Finished updating metadata.")
if coll_id != 0:
# Check the next batch after a delay.
callback.delayExec(
"<INST_NAME>irods_rule_engine_plugin-irods_rule_language-instance</INST_NAME><PLUSET>%ds</PLUSET>" % delay,
"rule_batch_transform_vault_metadata('%d', '%d', '%f', '%d')" % (coll_id, batch, pause, delay),
"")
# TODO: @rule.make
def rule_batch_vault_metadata_correct_orcid_format(rule_args, callback, rei):
"""
Correct ORCID person identifier with invalid format in metadata JSON files in the vault.
:param rule_args: [0] First COLL_ID to check - initial = 0
[1] Batch size, <= 256
[2] Pause between checks (float)
[3] Delay between batches in seconds
[4] Dry-run mode ('true' or 'false'; everything else is considered 'false')
:param callback: Callback to rule Language
:param rei: The rei struct
"""
coll_id = int(rule_args[0])
batch = int(rule_args[1])
pause = float(rule_args[2])
delay = int(rule_args[3])
dryrun_mode = rule_args[4] == "true"
rods_zone = session_vars.get_map(rei)["client_user"]["irods_zone"]
# Check one batch of metadata schemas.
# Find all vault collections, ordered by COLL_ID.
iter = genquery.row_iterator(
"ORDER(COLL_ID), COLL_NAME",
"COLL_NAME like '/%s/home/vault-%%' AND COLL_NAME not like '%%/original' AND DATA_NAME like 'yoda-metadata%%json' AND COLL_ID >= '%d'" % (rods_zone, coll_id),
genquery.AS_LIST, callback)
# Check each collection in batch.
for row in iter:
coll_id = int(row[0])
coll_name = row[1]
path_parts = coll_name.split('/')
try:
# Get vault package path.
vault_package = '/'.join(path_parts[:5])
metadata_path = meta.get_latest_vault_metadata_path(callback, vault_package)
if metadata_path != '':
metadata = jsonutil.read(callback, metadata_path)
# We only need to transform metadata with schemas that do not constrain ORCID format
license_url = metadata.get("links", {})[0].get("href", "")
license = license_url.replace("https://yoda.uu.nl/schemas/", "").replace("/metadata.json", "")
if license not in ['core-1', 'core-2', 'default-1', 'default-2', 'default-3', 'hptlab-1', 'teclab-1', 'dag-0', 'vollmer-0']:
log.write(callback, "Skipping data package '%s' for ORCID transformation because license '%s' is excluded."
% (vault_package, license))
continue
# Correct the incorrect orcid(s) if possible
# result is a dict containing 'data_changed' 'metadata'
result = transform_orcid(callback, metadata)
# In order to minimize changes within the vault only save a new metadata.json if there actually has been at least one orcid correction.
if result['data_changed'] and not dryrun_mode:
# orcid('s) has/have been adjusted. Save the changes in the same manner as execute_transformation for vault packages.
coll, data = os.path.split(metadata_path)
new_path = '{}/yoda-metadata[{}].json'.format(coll, str(int(time.time())))
log.write(callback, 'TRANSFORMING in vault <{}> -> <{}>'.format(metadata_path, new_path))
jsonutil.write(callback, new_path, result['metadata'])
copy_acls_from_parent(callback, new_path, "default")
callback.rule_provenance_log_action("system", coll, "updated person identifier metadata")
log.write(callback, "Transformed ORCIDs for: %s" % (new_path))
elif result['data_changed']:
log.write(callback, "Would have transformed ORCIDs for: %s if dry run mode was disabled." % (metadata_path))
except Exception as e:
log.write(callback, "Exception occurred during ORCID transformation of %s: %s" % (coll_name, str(type(e)) + ":" + str(e)))
# Sleep briefly between checks.
time.sleep(pause)
# The next collection to check must have a higher COLL_ID.
coll_id += 1
else:
# All done.
coll_id = 0
log.write(callback, "[METADATA] Finished correcting ORCID's within vault metadata.")
if coll_id != 0:
# Check the next batch after a delay.
callback.delayExec(
"<INST_NAME>irods_rule_engine_plugin-irods_rule_language-instance</INST_NAME><PLUSET>%ds</PLUSET>" % delay,
"rule_batch_vault_metadata_correct_orcid_format('%d', '%d', '%f', '%d')" % (coll_id, batch, pause, delay),
"")
def transform_orcid(ctx, m):
"""
Transform all present orcid's into the correct format. If possible!
:param ctx: Combined type of a callback and rei struct
:param m: Metadata to transform
:returns: Dict with indication whether data has changed and transformed JSON object with regard to ORCID
"""
data_changed = False
# Only Creators and Contributors hold Person identifiers that can hold ORCIDs.
for pi_holder in ['Creator', 'Contributor']:
if m.get(pi_holder, False):
for holder in m[pi_holder]:
for pi in holder.get('Person_Identifier', dict()):
if pi.get('Name_Identifier_Scheme', None) == 'ORCID':
# If incorrect ORCID format => try to correct.
if not re.search("^(https://orcid.org/)[0-9]{4}-[0-9]{4}-[0-9]{4}-[0-9]{3}[0-9X]$", pi.get('Name_Identifier', None)):
original_orcid = pi['Name_Identifier']
corrected_orcid = correctify_orcid(original_orcid)
# Only it an actual correction took place change the value and mark this data as 'changed'.
if corrected_orcid is None:
log.write(ctx, "Warning: unable to automatically fix ORCID '%s'" % (original_orcid))
elif corrected_orcid != original_orcid:
log.write(ctx, "Updating ORCID '%s' to '%s'" % (original_orcid, corrected_orcid))
pi['Name_Identifier'] = corrected_orcid
data_changed = True
return {'metadata': m, 'data_changed': data_changed}
def correctify_orcid(org_orcid):
"""Function to correct illformatted ORCIDs. Returns None if value cannot be fixed."""
# Get rid of all spaces.
orcid = org_orcid.replace(' ', '')
# Upper-case X.
orcid = org_orcid.replace('x', 'X')
# The last part should hold a valid id like eg: 1234-1234-1234-123X.
# If not, it is impossible to correct it to the valid orcid format
orcs = orcid.split('/')
if not re.search("^[0-9]{4}-[0-9]{4}-[0-9]{4}-[0-9]{3}[0-9X]$", orcs[-1]):
return None
return "https://orcid.org/{}".format(orcs[-1])
def html(f):
"""Get a human-readable HTML description of a transformation function.
The text is derived from the function's docstring.
:param f: Transformation function
:returns: Human-readable HTML description of a transformation function
"""
description = '\n'.join(map(lambda paragraph:
'<p>{}</p>'.format( # Trim whitespace.
re.sub('\s+', ' ', paragraph).strip()),
# Docstring paragraphs are separated by blank lines.
re.split('\n{2,}', f.__doc__)))
# Remove docstring.
return re.sub('((:param).*)|((:returns:).*)', ' ', description)
@rule.make(inputs=[], outputs=[0])
def rule_batch_vault_metadata_schema_report(ctx):
"""Show vault metadata schema about each data package in vault
:param ctx: Combined type of a callback and rei struct
:returns: JSON-encoded dictionary, where each key is a vault data package path.
Values are dictionaries with keys "schema" (contains the short name of the schema
(e.g. 'default-3', as per the information in the metadata file, or empty if no metadata
schema could be found), and match_schema (contains a boolean value that indicates whether
the metadata matches the JSON schema). match_schema only has a meaning if a metadata schema
could be found.
"""
results = {}
schema_cache = {}
# Find all vault collections
iter = genquery.row_iterator(
"COLL_NAME",
"COLL_NAME like '/%s/home/vault-%%' AND COLL_NAME not like '%%/original' AND COLL_NAME NOT LIKE '%%/original/%%' AND DATA_NAME like 'yoda-metadata%%json'" %
(user.zone(ctx)),
genquery.AS_LIST, ctx)
for row in iter:
try:
coll_name = row[0]
result = meta.vault_metadata_matches_schema(ctx, coll_name, schema_cache, "Vault metadata schema report", True)
if result:
results[coll_name] = result
except Exception as e:
log.write(ctx, "Error processing collection {}: {}".format(coll_name, str(e)))
continue
return json.dumps(results)