Skip to content

Commit

Permalink
refactor: run_workflow approach & changed reverse ingestion approach …
Browse files Browse the repository at this point in the history
…similar to automation runner;
  • Loading branch information
keshavmohta09 committed Feb 14, 2025
1 parent 8b4c74a commit 84aaf5e
Show file tree
Hide file tree
Showing 16 changed files with 155 additions and 123 deletions.
2 changes: 1 addition & 1 deletion ingestion/operators/docker/run_automation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import yaml

from metadata.automations.runner import execute
from metadata.automations.execute_runner import execute
from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
Expand Down
13 changes: 13 additions & 0 deletions ingestion/src/metadata/automations/collate_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Run the Automation Workflow for Collate
"""
49 changes: 49 additions & 0 deletions ingestion/src/metadata/automations/execute_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Run the Automation Workflow
"""
from functools import singledispatch
from typing import Any

from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata


@singledispatch
def run_workflow(request: Any, *_, **__) -> Any:
"""
Main entrypoint to execute the automation workflow
"""
raise NotImplementedError(f"Workflow runner not implemented for {type(request)}")


def execute(encrypted_automation_workflow: AutomationWorkflow) -> Any:
"""
Execute the automation workflow.
The implementation depends on the request body type
"""
# Import all the functions defined for run_workflow
import metadata.automations.collate_runner # pylint: disable=import-outside-toplevel
import metadata.automations.runner # pylint: disable=import-outside-toplevel

# This will already instantiate the Secrets Manager
metadata = OpenMetadata(
config=encrypted_automation_workflow.openMetadataServerConnection
)

automation_workflow = metadata.get_by_name(
entity=AutomationWorkflow, fqn=encrypted_automation_workflow.name.root
)

return run_workflow(automation_workflow.request, automation_workflow, metadata)
31 changes: 2 additions & 29 deletions ingestion/src/metadata/automations/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Run the Automation Workflow
Run the Automation Workflow for OpenMetadata
"""
from functools import singledispatch
from typing import Any

from metadata.automations.execute_runner import run_workflow
from metadata.generated.schema.entity.automations.testServiceConnection import (
TestServiceConnectionRequest,
)
Expand All @@ -28,32 +27,6 @@
from metadata.utils.ssl_manager import SSLManager, check_ssl_and_init


def execute(encrypted_automation_workflow: AutomationWorkflow) -> Any:
"""
Execute the automation workflow.
The implementation depends on the request body type
"""

# This will already instantiate the Secrets Manager
metadata = OpenMetadata(
config=encrypted_automation_workflow.openMetadataServerConnection
)

automation_workflow = metadata.get_by_name(
entity=AutomationWorkflow, fqn=encrypted_automation_workflow.name.root
)

return run_workflow(automation_workflow.request, automation_workflow, metadata)


@singledispatch
def run_workflow(request: Any, *_, **__) -> Any:
"""
Main entrypoint to execute the automation workflow
"""
raise NotImplementedError(f"Workflow runner not implemented for {type(request)}")


@run_workflow.register
def _(
request: TestServiceConnectionRequest,
Expand Down
5 changes: 0 additions & 5 deletions ingestion/src/metadata/ingestion/api/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,6 @@
PipelineMetadataConfigType,
PipelineServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.reverseIngestionPipeline import (
ReverseIngestionPipeline,
ReverseIngestionType,
)
from metadata.generated.schema.metadataIngestion.searchServiceMetadataPipeline import (
SearchMetadataConfigType,
SearchServiceMetadataPipeline,
Expand Down Expand Up @@ -162,7 +158,6 @@
StorageMetadataConfigType.StorageMetadata.value: StorageServiceMetadataPipeline,
SearchMetadataConfigType.SearchMetadata.value: SearchServiceMetadataPipeline,
DbtConfigType.DBT.value: DbtPipeline,
ReverseIngestionType.ReverseIngestion.value: ReverseIngestionPipeline,
}

DBT_CONFIG_TYPE_MAP = {
Expand Down
8 changes: 0 additions & 8 deletions ingestion/src/metadata/utils/class_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@
from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline import (
PipelineServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.reverseIngestionPipeline import (
ReverseIngestionPipeline,
ReverseIngestionType,
)
from metadata.generated.schema.metadataIngestion.searchServiceMetadataPipeline import (
SearchServiceMetadataPipeline,
)
Expand All @@ -86,7 +82,6 @@
ServiceType.Storage.value: "storageService",
# We use test suites as "services" for DQ Ingestion Pipelines
TestSuiteServiceType.TestSuite.value: "testSuite",
ReverseIngestionType.ReverseIngestion.value: "reverseIngestion",
}

SOURCE_CONFIG_TYPE_INGESTION = {
Expand All @@ -102,7 +97,6 @@
StorageServiceMetadataPipeline.__name__: PipelineType.metadata,
SearchServiceMetadataPipeline.__name__: PipelineType.metadata,
TestSuitePipeline.__name__: PipelineType.TestSuite,
ReverseIngestionPipeline.__name__: PipelineType.reverseIngestion,
MetadataToElasticSearchPipeline.__name__: PipelineType.elasticSearchReindex,
DataInsightPipeline.__name__: PipelineType.dataInsight,
DbtPipeline.__name__: PipelineType.dbt,
Expand Down Expand Up @@ -137,8 +131,6 @@ def _get_service_type_from( # pylint: disable=inconsistent-return-statements
) -> ServiceType:
if service_subtype.lower() == "testsuite":
return TestSuiteServiceType.TestSuite
if service_subtype.lower() == "reverseingestion":
return ReverseIngestionType.ReverseIngestion

for service_type in ServiceType:
if service_subtype.lower() in [
Expand Down
3 changes: 1 addition & 2 deletions ingestion/src/metadata/utils/service_spec/service_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ class BaseSpec(BaseModel):
lineage_source_class: Optional[str] = None
usage_source_class: Optional[str] = None
sampler_class: Optional[str] = None
reverse_ingestion_source_class: Optional[str] = None

@model_validator(mode="before")
@classmethod
Expand Down Expand Up @@ -95,7 +94,7 @@ def import_source_class(
service_type: ServiceType, source_type: str, from_: str = "ingestion"
) -> Type[Source]:
source_class_type = source_type.split(TYPE_SEPARATOR)[-1]
if source_class_type in ["usage", "lineage", "reverse_ingestion"]:
if source_class_type in ["usage", "lineage"]:
field = f"{source_class_type}_source_class"
else:
field = "metadata_source_class"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from openmetadata_managed_apis.utils.logger import routes_logger
from pydantic import ValidationError

from metadata.automations.runner import execute
from metadata.automations.execute_runner import execute
from metadata.ingestion.api.parser import parse_automation_workflow_gracefully
from metadata.utils.secrets.secrets_manager_factory import SecretsManagerFactory

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
package org.openmetadata.service.secrets.converter;

import java.util.List;
import org.openmetadata.schema.api.services.ingestionPipelines.CreateIngestionPipeline;
import org.openmetadata.schema.entity.automations.TestServiceConnectionRequest;
import org.openmetadata.schema.entity.automations.Workflow;
import org.openmetadata.schema.metadataIngestion.ReverseIngestionPipeline;
import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection;
import org.openmetadata.service.util.JsonUtils;

Expand All @@ -33,7 +33,7 @@ public Object convert(Object object) {

tryToConvertOrFail(
workflow.getRequest(),
List.of(TestServiceConnectionRequest.class, CreateIngestionPipeline.class))
List.of(TestServiceConnectionRequest.class, ReverseIngestionPipeline.class))
.ifPresent(workflow::setRequest);

if (workflow.getOpenMetadataServerConnection() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"$ref": "../../entity/automations/testServiceConnection.json"
},
{
"$ref": "../services/ingestionPipelines/createIngestionPipeline.json"
"$ref": "../../metadataIngestion/reverseIngestionPipeline.json"
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
"$ref": "testServiceConnection.json"
},
{
"$ref": "../../api/services/ingestionPipelines/createIngestionPipeline.json"
"$ref": "../../metadataIngestion/reverseIngestionPipeline.json"
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
"dataInsight",
"elasticSearchReindex",
"dbt",
"application",
"reverseIngestion"
"application"
]
},
"pipelineStatus": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"$id": "https://open-metadata.org/schema/entity/applications/metadataIngestion/reverseIngestionPipeline.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "reverseIngestionPipeline",
"javaType": "org.openmetadata.schema.metadataIngestion.ReverseMetadataOperationRequest",
"javaType": "org.openmetadata.schema.metadataIngestion.ReverseIngestionPipeline",
"description": "Apply a set of operations on a service",
"type": "object",
"definitions": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@
},
{
"$ref": "apiServiceMetadataPipeline.json"
},
{
"$ref": "reverseIngestionPipeline.json"
}
]
}
Expand Down Expand Up @@ -92,10 +89,7 @@
}
},
"additionalProperties": false,
"required": [
"type",
"sourceConfig"
]
"required": ["type", "sourceConfig"]
},
"processor": {
"description": "Configuration for Processor Component in the OpenMetadata Ingestion Framework.",
Expand All @@ -110,9 +104,7 @@
}
},
"additionalProperties": false,
"required": [
"type"
]
"required": ["type"]
},
"stage": {
"description": "Configuration for Stage Component in the OpenMetadata Ingestion Framework.",
Expand All @@ -127,9 +119,7 @@
}
},
"additionalProperties": false,
"required": [
"type"
]
"required": ["type"]
},
"sink": {
"description": "Configuration for Sink Component in the OpenMetadata Ingestion Framework.",
Expand All @@ -144,9 +134,7 @@
}
},
"additionalProperties": false,
"required": [
"type"
]
"required": ["type"]
},
"bulkSink": {
"description": "Configuration for BulkSink Component in the OpenMetadata Ingestion Framework.",
Expand All @@ -161,20 +149,13 @@
}
},
"additionalProperties": false,
"required": [
"type"
]
"required": ["type"]
},
"logLevels": {
"description": "Supported logging levels",
"javaType": "org.openmetadata.schema.metadataIngestion.LogLevels",
"type": "string",
"enum": [
"DEBUG",
"INFO",
"WARN",
"ERROR"
],
"enum": ["DEBUG", "INFO", "WARN", "ERROR"],
"default": "INFO"
},
"workflowConfig": {
Expand All @@ -201,9 +182,7 @@
}
},
"additionalProperties": false,
"required": [
"openMetadataServerConfig"
]
"required": ["openMetadataServerConfig"]
}
},
"properties": {
Expand Down Expand Up @@ -246,29 +225,16 @@
"$ref": "../type/basic.json#/definitions/uuid"
}
},
"required": [
"source",
"workflowConfig"
],
"required": ["source", "workflowConfig"],
"additionalProperties": false
}
},
"oneOf": [
{
"required": [
"name",
"openMetadataWorkflowConfig",
"source",
"sink"
]
"required": ["name", "openMetadataWorkflowConfig", "source", "sink"]
},
{
"required": [
"name",
"openMetadataWorkflowConfig",
"source",
"bulkSink"
]
"required": ["name", "openMetadataWorkflowConfig", "source", "bulkSink"]
}
],
"additionalProperties": false
Expand Down
Loading

0 comments on commit 84aaf5e

Please sign in to comment.