Skip to content

Commit

Permalink
Adding pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
poffo authored Oct 16, 2024
1 parent 3adca6a commit 01924fd
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 0 deletions.
67 changes: 67 additions & 0 deletions mastering-carol/sql-pipelines-manifest-v2/businesspartner.csql
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
-- pk: businesspartner_id

CREATE TEMP FUNCTION rejection_rules(arr ANY TYPE) AS (
(SELECT [
STRUCT( "REJECTION_RULE" as mdmStage,
"O campo businesspartner_id é inválido." as mdmErrorMsg,
((record.businesspartner_id is Null) or (record.businesspartner_id = '')) as mdmActive)
,
STRUCT( "REJECTION_RULE" as mdmStage,
"O campo erp_id é inválido." as mdmErrorMsg,
((record.erp_id is null) or (record.erp_id = '')) as mdmActive)
,
STRUCT( "RELATIONSHIP_CONSTRAINT" as mdmStage,
"O campo _orgid não existe na ST mapping." as mdmErrorMsg,
((record._orgid is Null) or (record._orgid = '')) as mdmActive)
,
STRUCT( "REJECTION_RULE" as mdmStage,
"O campo businesspartnergroup_id é inválido." as mdmErrorMsg,
((record.field1 is Null) or (record.field1 = '')) as mdmActive)
,
STRUCT( "REJECTION_RULE" as mdmStage,
"O campo alias é inválido." as mdmErrorMsg,
((record.alias is Null) or (record.alias = '')) as mdmActive)
,
STRUCT( "REJECTION_RULE" as mdmStage,
"O campo deleted é inválido." as mdmErrorMsg,
(record.deleted is Null) as mdmActive)
] FROM UNNEST(arr) AS record)
);

with sa2_mdbusinesspartner as (
SELECT
stg.mdmTenantId as stgTenantId,
stg.a2_cgc as _orgid,
--`labs-app-mdm-production.a_techfin`.hash(stg.a2_cod, 'mdbusinesspartner_supplier') as businesspartner_id,
`labs-app-mdm-production.a_techfin`.hash(stg.protheus_pk, "mdbusinesspartner_supplier") As businesspartner_id,--Mantem compativel a PK, com a que esta sendo geradda no DM antigo
stg.A2_NREDUZ as alias,
`labs-app-mdm-production.a_techfin`.hash(stg.a2_cod, 'mdaddress_supplier') as addres_id,
`labs-app-mdm-production.a_techfin`.hash(stg.a2_cod, 'mdbusinesspartnergroup_supplier') as businesspartnergroup_id,
stg.a2_cod as protheus_id,
false as deleted,
stg.a2_cod as erp_id
--metadata-v2--
from (select * except(ranking) from (select row_number() over (partition by mdmTenantId, a2_cod, a2_cgc ORDER BY mdmCounterForEntity DESC, mdmLastUpdated DESC) ranking, * from stg_protheus_carol_sa2
--timestamp-- WHERE _ingestionDatetime > SAFE.DATETIME(TIMESTAMP_MICROS(SAFE_CAST({{start_from}} AS INT64)))
) where ranking = 1 ) as stg
),
mdbusinesspartner as (
select * from sa2_mdbusinesspartner
)

SELECT *,
(EXISTS(SELECT 1 FROM UNNEST(mdmErrors) WHERE mdmActive = true)) AS mdmDeleted
FROM
(
select dm.* except(alias, stgTenantId),
REGEXP_REPLACE(dm.alias, r'([^\p{ASCII}]+)', '') AS alias,
(rejection_rules(ARRAY((SELECT AS STRUCT
dm.businesspartner_id,
dm.erp_id,
dm._orgid,
dm.businesspartnergroup_id as field1,
dm.alias,
dm.deleted
)))) AS mdmErrors
from mdbusinesspartner dm
)
66 changes: 66 additions & 0 deletions mastering-carol/sql-pipelines-manifest-v2/organization.csql
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
-- pk: uuid

CREATE TEMP FUNCTION rejection_rules(arr ANY TYPE) AS (
(SELECT [
STRUCT( "REJECTION_RULE" as mdmStage,
"O campo uuid é inválido." as mdmErrorMsg,
(record.uuid is Null) as mdmActive)
] FROM UNNEST(arr) AS record)
);

with company_fndorganization as (
SELECT
stg.mdmTenantId as stgTenantId,
stg.uuid as uuid,
stg.description as name,
stg.description as alias,
org.federalid as federalid,
stg.mdmCounterForEntity as _counter,
`labs-app-mdm-production.a_techfin`.mdmIdStaging(stg.mdmConnectorId, 'company_organization', CONCAT('{"emitente":{"path":"', lower(path), '"}}')) as mdmStagingRecord
--metadataNoId-v2--
from (select * except(ranking) from (select row_number() over (partition by mdmTenantId, mdmId ORDER BY mdmCounterForEntity DESC, mdmLastUpdated DESC) ranking, * from stg_protheus_carol_company
--timestamp-- WHERE _ingestionDatetime > SAFE.DATETIME(TIMESTAMP_MICROS(SAFE_CAST({{start_from}} AS INT64)))
) where ranking = 1 ) as stg
left join (select * except(ranking) from (select *, row_number() over (partition by mdmTenantId, uuid ORDER BY mdmCounterForEntity DESC, mdmLastUpdated DESC) ranking from stg_protheus_carol_organization) where ranking = 1) as org
on org.mdmTenantId = stg.mdmTenantId
and org.uuid = stg.uuid
),
organization_fndorganization as (
SELECT
stg.mdmTenantId as stgTenantId,
stg.uuid as uuid,
stg.name as name,
stg.alias as alias,
stg.federalid as federalid,
stg.mdmCounterForEntity as _counter,
`labs-app-mdm-production.a_techfin`.mdmIdStaging(stg.mdmConnectorId, 'organization_map', CONCAT('{"pk":{"uuid":"', lower(stg.uuid), '"}}')) as mdmStagingRecord
--metadataNoId-v2--
from (select * except(ranking) from (select row_number() over (partition by mdmTenantId, mdmId ORDER BY mdmCounterForEntity DESC, mdmLastUpdated DESC) ranking, * from stg_protheus_carol_organization
--timestamp-- WHERE _ingestionDatetime > SAFE.DATETIME(TIMESTAMP_MICROS(SAFE_CAST({{start_from}} AS INT64)))
) where ranking = 1 ) as stg
),
organization as (
select *
from company_fndorganization stg
union all
select *
from organization_fndorganization stg
)

SELECT *,
(EXISTS(SELECT 1 FROM UNNEST(mdmErrors) WHERE mdmActive = true)) AS mdmDeleted
FROM
(
select dm.* except(mdmStagingRecord,_counter, name, alias, stgTenantId),
dm.mdmStagingRecord as __mdmId,
(case length(trim(coalesce(dm.name,""))) -- adding a default value instead of rejecting it. Environment tenant70ead8d42e8111eba8f40a586461440e got a lot of rejections because of this validation.
when 0 then "<em branco>"
else dm.name
end) as name,
(case length(trim(coalesce(dm.alias,""))) -- adding a default value instead of rejecting it. Environment tenant70ead8d42e8111eba8f40a586461440e got a lot of rejections because of this validation.
when 0 then "<em branco>"
else dm.alias
end) as alias,
rejection_rules(ARRAY((SELECT AS STRUCT dm.uuid))) AS mdmErrors
from (select * except(ranking) from (select row_number() over (partition by stgTenantId, uuid ORDER BY _counter DESC) ranking, * from organization) where ranking = 1 ) dm
)

0 comments on commit 01924fd

Please sign in to comment.