Skip to content

refactor common sources to work with dlt+ #624

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions sources/asana_dlt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
"""

import typing as t
from typing import Any, Iterable
from typing import Any, Iterable, Sequence

import dlt
from dlt.common.typing import TDataItem
from dlt.extract.resource import DltResource

from .helpers import get_client
from .settings import (
Expand All @@ -28,7 +29,7 @@


@dlt.source
def asana_source() -> Any: # should be Sequence[DltResource]:
def asana_source(access_token: str = dlt.secrets.value) -> Sequence[DltResource]:
"""
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can just add the access token here, even though its not used, so that it shows up in the config
or remove it from the resources too and pass it down? but that seems less elegant

The main function that runs all the other functions to fetch data from Asana.
Returns:
Expand Down
13 changes: 7 additions & 6 deletions sources/bing_webmaster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,27 @@
"""

import time
from typing import Iterable, Iterator, List, Sequence
from typing import Dict, Iterator, List, Sequence

import dlt
from dlt.common import logger
from dlt.common.typing import DictStrAny, DictStrStr
from dlt.common.typing import DictStrAny
from dlt.sources import DltResource

from .helpers import get_stats_with_retry, parse_response


@dlt.source(name="bing_webmaster")
def source(
site_urls: List[str] = None, site_url_pages: Iterable[DictStrStr] = None
site_urls: List[str] = dlt.config.value,
site_url_pages: List[Dict[str, str]] = dlt.config.value,
) -> Sequence[DltResource]:
"""
A dlt source for the Bing Webmaster api.
It groups resources for the APIs which return organic search traffic statistics
Args:
site_urls: List[str]: A list of site_urls, e.g, ["dlthub.com", "dlthub.de"]. Use this if you need the weekly traffic per site_url and page
site_url_pages: Iterable[DictStrStr]: A list of pairs of site_url and page. Use this if you need the weekly traffic per site_url, page, and query
site_url_pages: Iterable[Dict[str, str]]: A list of pairs of site_url and page. Use this if you need the weekly traffic per site_url, page, and query
Returns:
Sequence[DltResource]: A sequence of resources that can be selected from including page_stats and page_query_stats.
"""
Expand Down Expand Up @@ -70,7 +71,7 @@ def page_stats(
table_name="bing_page_query_stats",
)
def page_query_stats(
site_url_pages: Iterable[DictStrStr],
site_url_pages: List[Dict[str, str]],
api_key: str = dlt.secrets.value,
) -> Iterator[Iterator[DictStrAny]]:
"""
Expand All @@ -80,7 +81,7 @@ def page_query_stats(
https://learn.microsoft.com/en-us/dotnet/api/microsoft.bing.webmaster.api.interfaces.iwebmasterapi.getpagequerystats

Args:
site_url_page (Iterable[DictStrStr]): Iterable of site_url and pages to retrieve statistics for. Can be result of a SQL query, a parsed sitemap, etc.
site_url_page (List[Dict[str,str]]): Iterable of site_url and pages to retrieve statistics for. Can be result of a SQL query, a parsed sitemap, etc.
Yields:
Iterator[Dict[str, Any]]: An iterator over list of organic traffic statistics.
"""
Expand Down
4 changes: 3 additions & 1 deletion sources/chess/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

@dlt.source(name="chess")
def source(
players: List[str], start_month: str = None, end_month: str = None
players: List[str] = dlt.config.value,
start_month: str = None,
end_month: str = None,
) -> Sequence[DltResource]:
"""
A dlt source for the chess.com api. It groups several resources (in this case chess.com API endpoints) containing
Expand Down
4 changes: 2 additions & 2 deletions sources/github/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@

@dlt.source
def github_reactions(
owner: str,
name: str,
owner: str = dlt.config.value,
name: str = dlt.config.value,
access_token: str = dlt.secrets.value,
items_per_page: int = 100,
max_items: Optional[int] = None,
Expand Down
4 changes: 2 additions & 2 deletions sources/hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,8 @@ def fetch_props(

@dlt.resource
def hubspot_events_for_objects(
object_type: THubspotObjectType,
object_ids: List[str],
object_type: THubspotObjectType = dlt.config.value,
object_ids: List[str] = dlt.config.value,
api_key: str = dlt.secrets.value,
start_date: pendulum.DateTime = STARTDATE,
) -> DltResource:
Expand Down
5 changes: 1 addition & 4 deletions sources/kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
standalone=True,
)
def kafka_consumer(
topics: Union[str, List[str]],
topics: List[str] = dlt.config.value,
credentials: Union[KafkaCredentials, Consumer] = dlt.secrets.value,
msg_processor: Optional[
Callable[[Message], Dict[str, Any]]
Expand Down Expand Up @@ -60,9 +60,6 @@ def kafka_consumer(
Yields:
Iterable[TDataItem]: Kafka messages.
"""
if not isinstance(topics, list):
topics = [topics]

if isinstance(credentials, Consumer):
consumer = credentials
elif isinstance(credentials, KafkaCredentials):
Expand Down
4 changes: 2 additions & 2 deletions sources/kafka_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def custom_msg_processor(msg: confluent_kafka.Message) -> Dict[str, Any]:
"data": msg.value().decode("utf-8"),
}

data = kafka_consumer("books", msg_processor=custom_msg_processor)
data = kafka_consumer(["books"], msg_processor=custom_msg_processor)

info = pipeline.run(data)
print(info)
Expand All @@ -63,7 +63,7 @@ def load_starting_from_date() -> None:
)

from_date = datetime(2023, 12, 15)
data = kafka_consumer("books", start_from=from_date)
data = kafka_consumer(["books"], start_from=from_date)

info = pipeline.run(data)
print(info)
Expand Down
2 changes: 1 addition & 1 deletion sources/pipedrive/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def parsed_mapping(


@dlt.resource(primary_key="id", write_disposition="merge")
def leads(
def leads(
pipedrive_api_key: str = dlt.secrets.value,
update_time: dlt.sources.incremental[str] = dlt.sources.incremental(
"update_time", "1970-01-01 00:00:00"
Expand Down
Loading