Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prohibitively long discovery times #15

Closed
anden-akkio opened this issue Jan 18, 2024 · 2 comments
Closed

Prohibitively long discovery times #15

anden-akkio opened this issue Jan 18, 2024 · 2 comments

Comments

@anden-akkio
Copy link

We're seeing extremely long --discover times (anywhere from 5-25min) running this tap on a BigQuery setup with a large amount of tables. We're running this via meltano invoke <tap name> --discover.

We're using this tap in an embedded ETL context where we want to provide our customers a more structured approach to providing their credential details, then they're able to see their datasets, then they can see the tables within the dataset they select, and so on, and discovery is how we do that with a variety of other taps. However, the timing as it works out here is way too long for that process to be smooth.

Any clue how we might optimize this?

@anden-akkio
Copy link
Author

anden-akkio commented Jan 18, 2024

I've traced this to an inefficiency where the singer_sdk package is running most things in discover_catalog_entries serially, meaning it would need to wait a huge amount of round trips to get through them all. Switching that implementation to something like this results in a drastic speedup (I didn't exhaustively benchmark, but somewhere from 20-50x on BigQuery instances with a large amount of tables seems like the right ballpark).

    def discover_catalog_entries(self) -> list[dict]:
        """Return a list of catalog entries from discovery.

        Returns:
            The discovered catalog entries as a list.
        """
        result: list[dict] = []
        engine = self._engine
        inspected = sqlalchemy.inspect(engine)

        from concurrent.futures import ThreadPoolExecutor
        def handle_schema(schema_name: str) -> list[dict]:
            def handle_table(table_name: str, is_view: bool) -> dict:
                catalog_entry = self.discover_catalog_entry(
                    engine,
                    inspected,
                    schema_name,
                    table_name,
                    is_view,
                )
                return catalog_entry.to_dict()
            with ThreadPoolExecutor() as executor:
                futures = [executor.submit(handle_table, table_name, is_view) for table_name, is_view
                    in self.get_object_names(
                    engine,
                    inspected,
                    schema_name,
                )]
                return [future.result() for future in futures]

        with ThreadPoolExecutor() as executor:
            futures = [executor.submit(handle_schema, schema_name) for schema_name in self.get_schema_names(engine, inspected)]
            futures_completed = [future.result() for future in futures]

        flattened = []
        for future_completed in futures_completed:
            for elem in future_completed:
                flattened.append(elem)
        return flattened

I'm working on forking the SDK and potentially opening an issue/PR there. As this isn't an issue with this tap specifically, but instead the SDK, I'll close this issue.

@edgarrmondragon
Copy link
Member

@anden-akkio Thanks for reporting and digging into the root cause!

I've created an issue in the SDK repo: meltano/sdk#2166

There's a suggestion in the linked tap-postgres issue to use newer SQLAlchemy APIs that might interesting to explore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants