Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPORT] [MONITORING] feat: move dataset selection to the fieldmapping step #3289

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions backend/geonature/core/gn_synthese/imports/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def check_transient_data(task, logger, imprt: TImports):
field_name: fields[field_name]
for field_name, source_field in imprt.fieldmapping.items()
if source_field.get("column_src", None) in imprt.columns
or source_field.get("default_value", None) is not None
}
init_rows_validity(imprt)
task.update_state(state="PROGRESS", meta={"progress": 0.05})
Expand Down Expand Up @@ -246,7 +247,6 @@ def update_batch_progress(batch, step):
entity,
fields["id_nomenclature_blurring"],
fields["id_dataset"],
fields["unique_dataset_id"],
)
if current_app.config["IMPORT"]["CHECK_REF_BIBLIO_LITTERATURE"]:
check_nomenclature_source_status(
Expand Down Expand Up @@ -276,15 +276,18 @@ def update_batch_progress(batch, step):
if "unique_id_sinp" in selected_fields:
check_duplicate_uuid(imprt, entity, selected_fields["unique_id_sinp"])
if current_app.config["IMPORT"]["PER_DATASET_UUID_CHECK"]:
whereclause = Synthese.id_dataset == imprt.id_dataset
check_existing_uuid(
imprt,
entity,
selected_fields["unique_id_sinp"],
id_dataset_field=fields["id_dataset"],
)
else:
whereclause = sa.true()
check_existing_uuid(
imprt,
entity,
selected_fields["unique_id_sinp"],
whereclause=whereclause,
)
check_existing_uuid(
imprt,
entity,
selected_fields["unique_id_sinp"],
)
if imprt.fieldmapping.get(
"unique_id_sinp_generate",
current_app.config["IMPORT"]["DEFAULT_GENERATE_MISSING_UUID"],
Expand Down Expand Up @@ -334,6 +337,7 @@ def import_data_to_destination(imprt: TImports) -> None:
fields["the_geom_local"],
fields["the_geom_point"],
fields["id_area_attachment"], # XXX sure?
fields["id_dataset"],
}
if imprt.fieldmapping.get(
"unique_id_sinp_generate",
Expand All @@ -358,14 +362,11 @@ def import_data_to_destination(imprt: TImports) -> None:
):
insert_fields |= {field}

insert_fields -= {fields["unique_dataset_id"]} # Column only used for filling `id_dataset`

select_stmt = (
sa.select(
*[transient_table.c[field.dest_field] for field in insert_fields],
sa.literal(source.id_source),
sa.literal(source.module.id_module),
sa.literal(imprt.id_dataset),
sa.literal(imprt.id_import),
sa.literal("I"),
)
Expand All @@ -375,7 +376,6 @@ def import_data_to_destination(imprt: TImports) -> None:
names = [field.dest_field for field in insert_fields] + [
"id_source",
"id_module",
"id_dataset",
"id_import",
"last_action",
]
Expand Down
8 changes: 8 additions & 0 deletions backend/geonature/core/imports/checks/dataframe/cast.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ def convert_to_uuid(value):
return None


def is_valid_uuid(value):
try:
uuid_obj = UUID(value)
except Exception as e:
return False
return str(uuid_obj) == value


def convert_to_integer(value):
try:
return int(value)
Expand Down
138 changes: 76 additions & 62 deletions backend/geonature/core/imports/checks/dataframe/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from geonature.core.imports.models import BibFields, TImports

from .utils import dataframe_check, error_replace

from .cast import is_valid_uuid

__all__ = ["check_required_values", "check_counts", "check_datasets"]

Expand Down Expand Up @@ -194,7 +194,7 @@ def check_datasets(
) -> Set[str]:
"""
Check if datasets exist and are authorized for the user and import.

It also fill the id_field based on the content of uuid_field
Parameters
----------
imprt : TImports
Expand Down Expand Up @@ -222,72 +222,86 @@ def check_datasets(

"""
updated_cols = set()
uuid_col = uuid_field.dest_field
id_col = id_field.dest_field
uuid_col = uuid_field.source_column
uuid = df[uuid_col].unique().tolist()

# check uuid format
valid_uuid_mask = df[uuid_col].apply(lambda x: is_valid_uuid(x))
invalid_uuid_mask = ~valid_uuid_mask
if invalid_uuid_mask.any():
yield {
"error_code": ImportCodeError.INVALID_UUID,
"column": uuid_field.name_field,
"invalid_rows": df[invalid_uuid_mask],
}

if uuid_col in df:
has_uuid_mask = df[uuid_col].notnull()
uuid = df.loc[has_uuid_mask, uuid_col].unique().tolist()
filtered_ds_mask = valid_uuid_mask
uuid = df[filtered_ds_mask][uuid_col].unique().tolist()

datasets = {
ds.unique_dataset_id.hex: ds
for ds in TDatasets.query.filter(TDatasets.unique_dataset_id.in_(uuid))
.options(sa.orm.joinedload(TDatasets.nomenclature_data_origin))
.options(sa.orm.raiseload("*"))
.all()
# check dataset existance
datasets = {
str(ds.unique_dataset_id): ds
for ds in TDatasets.query.filter(TDatasets.unique_dataset_id.in_(uuid))
.options(sa.orm.joinedload(TDatasets.nomenclature_data_origin))
.options(sa.orm.raiseload("*"))
.all()
}
valid_ds_mask = df[uuid_col].isin(datasets.keys())
invalid_ds_mask = ~valid_ds_mask & filtered_ds_mask
if invalid_ds_mask.any():
yield {
"error_code": ImportCodeError.DATASET_NOT_FOUND,
"column": uuid_field.name_field,
"invalid_rows": df[invalid_ds_mask],
}
valid_ds_mask = df[uuid_col].isin(datasets.keys())
invalid_ds_mask = has_uuid_mask & ~valid_ds_mask
if invalid_ds_mask.any():
yield {
"error_code": ImportCodeError.DATASET_NOT_FOUND,
"column": uuid_field.name_field,
"invalid_rows": df[invalid_ds_mask],
}

inactive_dataset = [uuid for uuid, ds in datasets.items() if not ds.active]
inactive_dataset_mask = df[uuid_col].isin(inactive_dataset)
if inactive_dataset_mask.any():
yield {
"error_code": ImportCodeError.DATASET_NOT_ACTIVE,
"column": uuid_field.name_field,
"invalid_rows": df[inactive_dataset_mask],
}

# Warning: we check only permissions of first author, but currently there it only one author per import.
authorized_datasets = {
ds.unique_dataset_id.hex: ds
for ds in db.session.execute(
TDatasets.filter_by_creatable(
user=imprt.authors[0], module_code=module_code, object_code=object_code
)
.where(TDatasets.unique_dataset_id.in_(uuid))
.options(sa.orm.raiseload("*"))
)
.scalars()
.all()

filtered_ds_mask = filtered_ds_mask & valid_ds_mask
uuid = df[filtered_ds_mask][uuid_col].unique().tolist()

# check dataset active status
active_ds = [uuid for uuid, ds in datasets.items() if ds.active]
active_ds_mask = df[uuid_col].isin(active_ds)
inactive_ds_mask = ~active_ds_mask & filtered_ds_mask

if inactive_ds_mask.any():
yield {
"error_code": ImportCodeError.DATASET_NOT_ACTIVE,
"column": uuid_field.name_field,
"invalid_rows": df[inactive_ds_mask],
}
authorized_ds_mask = df[uuid_col].isin(authorized_datasets.keys())
unauthorized_ds_mask = valid_ds_mask & ~authorized_ds_mask
if unauthorized_ds_mask.any():
yield {
"error_code": ImportCodeError.DATASET_NOT_AUTHORIZED,
"column": uuid_field.name_field,
"invalid_rows": df[unauthorized_ds_mask],
}

if authorized_ds_mask.any():
df.loc[authorized_ds_mask, id_col] = df[authorized_ds_mask][uuid_col].apply(
lambda uuid: authorized_datasets[uuid].id_dataset
filtered_ds_mask = filtered_ds_mask & active_ds_mask
uuid = df[filtered_ds_mask][uuid_col].unique().tolist()

# check dataset authorized
# Warning: we check only permissions of first author, but currently there it only one author per import.
authorized_datasets = {
str(ds.unique_dataset_id): ds
for ds in db.session.execute(
TDatasets.filter_by_creatable(
user=imprt.authors[0], module_code=module_code, object_code=object_code
)
updated_cols = {id_col}

else:
has_uuid_mask = pd.Series(False, index=df.index)
.where(TDatasets.unique_dataset_id.in_(uuid))
.options(sa.orm.raiseload("*"))
)
.scalars()
.all()
}
authorized_ds_mask = active_ds_mask & df[uuid_col].isin(authorized_datasets.keys())
unauthorized_ds_mask = ~authorized_ds_mask & filtered_ds_mask
if unauthorized_ds_mask.any():
yield {
"error_code": ImportCodeError.DATASET_NOT_AUTHORIZED,
"column": uuid_field.name_field,
"invalid_rows": df[unauthorized_ds_mask],
}
filtered_ds_mask = filtered_ds_mask & authorized_ds_mask

if (~has_uuid_mask).any():
# Set id_dataset from import for empty cells:
df.loc[~has_uuid_mask, id_col] = imprt.id_dataset
# compute id_col based on uuid_col
if filtered_ds_mask.any():
id_col = id_field.dest_field
df.loc[filtered_ds_mask, id_col] = df[filtered_ds_mask][uuid_col].apply(
lambda uuid: authorized_datasets[uuid].id_dataset
)
updated_cols = {id_col}

return updated_cols
27 changes: 18 additions & 9 deletions backend/geonature/core/imports/checks/sql/extra.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def check_existing_uuid(
imprt: TImports,
entity: Entity,
uuid_field: BibFields,
whereclause: Any = sa.true(),
id_dataset_field: BibFields = None,
skip=False,
):
"""
Expand All @@ -254,25 +254,34 @@ def check_existing_uuid(
entity : Entity
The entity to check.
uuid_field : BibFields
The field to check.
whereclause : BooleanClause
The WHERE clause to apply to the check.
The field to check
id_dataset_field : BibFields
if defnied, the uuid existence is checked for the given dataset. Otherwise, it is checked globally

skip: Boolean
Raise SKIP_EXISTING_UUID instead of EXISTING_UUID and set row validity to None (do not import)
"""
transient_table = imprt.destination.get_transient_table()
dest_table = entity.get_destination_table()
error_type = "SKIP_EXISTING_UUID" if skip else "EXISTING_UUID"

whereclause = sa.and_(
transient_table.c[uuid_field.dest_field] == dest_table.c[uuid_field.dest_field],
transient_table.c[entity.validity_column].is_(True),
)

if id_dataset_field:
whereclause = whereclause & (
transient_table.c[id_dataset_field.dest_field]
== dest_table.c[id_dataset_field.dest_field]
)

report_erroneous_rows(
imprt,
entity,
error_type=error_type,
error_column=uuid_field.name_field,
whereclause=sa.and_(
transient_table.c[uuid_field.dest_field] == dest_table.c[uuid_field.dest_field],
transient_table.c[entity.validity_column].is_(True),
whereclause,
),
whereclause=whereclause,
level_validity_mapping={"ERROR": False, "WARNING": None},
)

Expand Down
9 changes: 2 additions & 7 deletions backend/geonature/core/imports/checks/sql/nomenclature.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,7 @@ def check_nomenclature_exist_proof(
)


def check_nomenclature_blurring(
imprt, entity, blurring_field, id_dataset_field, uuid_dataset_field
):
def check_nomenclature_blurring(imprt, entity, blurring_field, id_dataset_field):
"""
Raise an error if blurring not set.
Required if the dataset is private.
Expand All @@ -196,10 +194,7 @@ def check_nomenclature_blurring(
error_type=ImportCodeError.CONDITIONAL_MANDATORY_FIELD_ERROR,
error_column=blurring_field.name_field,
whereclause=sa.and_(
sa.or_(
transient_table.c[id_dataset_field.name_field] == TDatasets.id_dataset,
transient_table.c[uuid_dataset_field.name_field] == TDatasets.unique_dataset_id,
),
transient_table.c[id_dataset_field.name_field] == TDatasets.id_dataset,
TDatasets.id_nomenclature_data_origin == id_nomenclature_private,
transient_table.c[blurring_field.dest_field] == None,
),
Expand Down
6 changes: 0 additions & 6 deletions backend/geonature/core/imports/config_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@
"show": True,
"filter": True,
},
{
"prop": "dataset.dataset_name",
"name": "Jeu de données",
"show": True,
"filter": False,
},
{
"prop": "statistics_rows",
"name": "Lignes importées",
Expand Down
5 changes: 0 additions & 5 deletions backend/geonature/core/imports/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from geonature.utils.celery import celery_app
from geonature.core.gn_permissions.tools import get_scopes_by_action
from geonature.core.gn_commons.models import TModules
from geonature.core.gn_meta.models import TDatasets
from pypnnomenclature.models import BibNomenclaturesTypes
from pypnusershub.db.models import User

Expand Down Expand Up @@ -319,8 +318,6 @@ def get_instance_permissions(self, scopes, user=None):
@serializable(
fields=[
"authors.nom_complet",
"dataset.dataset_name",
"dataset.active",
"destination.code",
"destination.label",
"destination.statistics_labels",
Expand Down Expand Up @@ -352,7 +349,6 @@ class TImports(InstancePermissionMixin, db.Model):
detected_encoding = db.Column(db.Unicode, nullable=True)
# import_table = db.Column(db.Unicode, nullable=True)
full_file_name = db.Column(db.Unicode, nullable=True)
id_dataset = db.Column(db.Integer, ForeignKey("gn_meta.t_datasets.id_dataset"), nullable=True)
date_create_import = db.Column(db.DateTime, default=datetime.now)
date_update_import = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now)
date_end_import = db.Column(db.DateTime, nullable=True)
Expand All @@ -372,7 +368,6 @@ class TImports(InstancePermissionMixin, db.Model):
)
loaded = db.Column(db.Boolean, nullable=False, default=False)
processed = db.Column(db.Boolean, nullable=False, default=False)
dataset = db.relationship(TDatasets, lazy="joined")
source_file = deferred(db.Column(db.LargeBinary))
columns = db.Column(ARRAY(db.Unicode))
# keys are target names, values are source names
Expand Down
Loading