Skip to content

Commit eb2ae9f

Browse files
committed
[components] Add SlingReplicationCollection component
1 parent a5770fd commit eb2ae9f

File tree

2 files changed

+134
-0
lines changed

2 files changed

+134
-0
lines changed

python_modules/libraries/dagster-components/dagster_components/lib/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
from dagster_components.lib.sling_replication import (
1111
SlingReplicationComponent as SlingReplicationComponent,
1212
)
13+
from dagster_components.lib.sling_replication_collection import (
14+
SlingReplicationCollectionComponent as SlingReplicationCollectionComponent,
15+
)
1316

1417
from dagster_components.lib.pipes_subprocess_script_collection import (
1518
PipesSubprocessScriptCollection as PipesSubprocessScriptCollection,
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
from pathlib import Path
2+
from typing import Any, Iterator, Mapping, Optional, Sequence, Union
3+
4+
from dagster._core.definitions.asset_key import AssetKey
5+
from dagster._core.definitions.assets import AssetsDefinition
6+
from dagster._core.definitions.definitions_class import Definitions
7+
from dagster._core.definitions.events import AssetMaterialization
8+
from dagster._core.definitions.result import MaterializeResult
9+
from dagster_embedded_elt.sling import DagsterSlingTranslator, SlingResource, sling_assets
10+
from dagster_embedded_elt.sling.resources import AssetExecutionContext
11+
from pydantic import BaseModel, Field
12+
from typing_extensions import Self
13+
14+
from dagster_components import Component, ComponentLoadContext
15+
from dagster_components.core.component import (
16+
ComponentGenerateRequest,
17+
TemplatedValueResolver,
18+
component,
19+
)
20+
from dagster_components.core.component_rendering import RenderingScope
21+
from dagster_components.core.dsl_schema import AssetAttributes, AssetSpecProcessor, OpSpecBaseModel
22+
from dagster_components.generate import generate_component_yaml
23+
24+
25+
class SlingReplicationTranslatorParams(BaseModel):
26+
key: Optional[str] = None
27+
group_name: Optional[str] = None
28+
29+
30+
class SlingReplicationParams(BaseModel):
31+
path: str
32+
op: Optional[OpSpecBaseModel] = None
33+
translator: Optional[SlingReplicationTranslatorParams] = RenderingScope(
34+
Field(None), required_scope={"stream_definition"}
35+
)
36+
37+
38+
class SlingReplicationCollectionParams(BaseModel):
39+
sling: Optional[SlingResource] = None
40+
replications: Sequence[SlingReplicationParams]
41+
asset_attributes: Optional[AssetAttributes] = None
42+
43+
44+
class SlingReplicationTranslator(DagsterSlingTranslator):
45+
def __init__(
46+
self,
47+
*,
48+
value_resolver: TemplatedValueResolver,
49+
translator_params: Optional[SlingReplicationTranslatorParams] = None,
50+
):
51+
self.value_resolver = value_resolver
52+
self.translator_params = translator_params
53+
54+
def get_asset_key(self, stream_definition: Mapping[str, Any]) -> AssetKey:
55+
if not self.translator_params or not self.translator_params.key:
56+
return super().get_asset_key(stream_definition)
57+
58+
return self.value_resolver.with_context(stream_definition=stream_definition).resolve(
59+
self.translator_params.key
60+
)
61+
62+
def get_group_name(self, stream_definition: Mapping[str, Any]) -> Optional[str]:
63+
if not self.translator_params or not self.translator_params.group_name:
64+
return super().get_group_name(stream_definition)
65+
return self.value_resolver.with_context(stream_definition=stream_definition).resolve(
66+
self.translator_params.group_name
67+
)
68+
69+
70+
@component(name="sling_replication_collection")
71+
class SlingReplicationCollectionComponent(Component):
72+
params_schema = SlingReplicationCollectionParams
73+
74+
def __init__(
75+
self,
76+
dirpath: Path,
77+
resource: SlingResource,
78+
sling_replications: Sequence[SlingReplicationParams],
79+
asset_attributes: Sequence[AssetSpecProcessor],
80+
):
81+
self.dirpath = dirpath
82+
self.resource = resource
83+
self.sling_replications = sling_replications
84+
self.asset_attributes = asset_attributes
85+
86+
@classmethod
87+
def load(cls, context: ComponentLoadContext) -> Self:
88+
loaded_params = context.load_params(cls.params_schema)
89+
return cls(
90+
dirpath=context.path,
91+
resource=loaded_params.sling or SlingResource(),
92+
sling_replications=loaded_params.replications,
93+
asset_attributes=loaded_params.asset_attributes or [],
94+
)
95+
96+
def build_replication_asset(
97+
self, context: ComponentLoadContext, replication: SlingReplicationParams
98+
) -> AssetsDefinition:
99+
@sling_assets(
100+
name=replication.op.name if replication.op else self.dirpath.stem,
101+
op_tags=replication.op.tags if replication.op else {},
102+
replication_config=self.dirpath / replication.path,
103+
dagster_sling_translator=SlingReplicationTranslator(
104+
value_resolver=context.templated_value_resolver,
105+
translator_params=replication.translator,
106+
),
107+
)
108+
def _asset(context: AssetExecutionContext):
109+
yield from self.execute(context=context, sling=self.resource)
110+
111+
return _asset
112+
113+
def execute(
114+
self, context: AssetExecutionContext, sling: SlingResource
115+
) -> Iterator[Union[AssetMaterialization, MaterializeResult]]:
116+
yield from sling.replicate(context=context)
117+
118+
def build_defs(self, context: ComponentLoadContext) -> Definitions:
119+
defs = Definitions(
120+
assets=[
121+
self.build_replication_asset(context, replication)
122+
for replication in self.sling_replications
123+
],
124+
)
125+
for transform in self.asset_attributes:
126+
defs = transform.apply(defs)
127+
return defs
128+
129+
@classmethod
130+
def generate_files(cls, request: ComponentGenerateRequest, params: Any) -> None:
131+
generate_component_yaml(request, params)

0 commit comments

Comments
 (0)