diff --git a/knime_extension/geospatial_env.yml b/knime_extension/geospatial_env.yml index a549339f..844dada7 100644 --- a/knime_extension/geospatial_env.yml +++ b/knime_extension/geospatial_env.yml @@ -36,3 +36,4 @@ dependencies: - pip: - ipinfo==4.4.3 - pulp==2.7.0 + - sodapy==2.2.0 diff --git a/knime_extension/src/nodes/opendata.py b/knime_extension/src/nodes/opendata.py index d7032034..eaf5b26d 100644 --- a/knime_extension/src/nodes/opendata.py +++ b/knime_extension/src/nodes/opendata.py @@ -878,3 +878,417 @@ def execute(self, exec_context: knext.ExecutionContext): crs="EPSG:4326", ) return knext.Table.from_pandas(gdf) + + +############################################ +# Socrata Search +############################################ +@knext.node( + name="Socrata Search", + node_type=knext.NodeType.SOURCE, + icon_path=__NODE_ICON_PATH + "Socrata Search.png", + category=__category, + after="", +) +@knext.output_table( + name="Socrata dataset list", + description="Socrata dataset list from a wealth of open data resources from governments, non-profits, and NGOs around the world based on the query term.", +) +class SocrataSearchNode: + """Access open datasets from various well-known data resources and organizations effortlessly using the SODA interface. + + US Centers for Disease Control and Prevention (CDC): CDC data includes information on infectious diseases, chronic conditions, environmental health hazards, + injury prevention, maternal and child health, immunization coverage, and much more. These datasets are collected through surveillance systems, population surveys, + epidemiological studies, and collaborative research efforts conducted by the CDC and its partners. + + Data.gov: The official open data platform of the United States government, offering datasets from various U.S. government agencies covering fields such as education, + healthcare, transportation, and the environment. + + Chicago Data Portal: The open data platform provided by the City of Chicago, offering datasets related to the city, including crime data, transportation data, demographic statistics, and more. + + NYC Open Data: The open data platform provided by the City of New York, offering datasets covering urban planning, public transportation, health, and various other aspects of the city. + + UK Government Data Service: The open data platform provided by the UK government, offering datasets from various governmental bodies covering economics, social issues, the environment, and more. + + World Bank Data: The open data platform provided by the World Bank, offering a wide range of economic, social, and environmental datasets from around the world for research and analysis of global development trends. + + The Socrata Open Data API (SODA) is a powerful tool designed for programmatically accessing a vast array of open data resources from various organizations around the world, including governments, non-profits,and NGOs. + + Note: This node only retrieves publicly available datasets, as no authentication is provided. + This node uses the [SODA Consumer API](https://dev.socrata.com/consumers/getting-started.html) to get the dataset list. + """ + + queryitem = knext.StringParameter( + label="Input searching item", + description="""Enter search keywords or dataset names to find relevant datasets in the Socrata database. + This search is not case-sensitive and can include multiple words separated by spaces. """, + default_value="Massachusetts", + ) + + timeout = knext.IntParameter( + label="Request timeout in seconds", + description="The timeout in seconds for the request API.", + default_value=120, + min_value=1, + is_advanced=True, + ) + + def configure(self, configure_context): + # TODO Create combined schema + return None + + def execute(self, exec_context: knext.ExecutionContext): + from urllib.request import Request, urlopen + import pandas as pd + import json + from pandas import json_normalize + from urllib.parse import quote + + query_item = self.queryitem + encoded_query_item = quote(query_item) + request = Request( + f"http://api.us.socrata.com/api/catalog/v1?q={encoded_query_item}&only=datasets&limit=10000" + ) + + response = urlopen(request, timeout=self.timeout) + response_body = response.read() + + # Load the JSON response into a Python dictionary + data = json.loads(response_body) + + # Extract the "results" key, which contains the dataset information + dataset_info = data["results"] + + # Create a DataFrame from the dataset information, and flatten the nested dictionaries + df = json_normalize(dataset_info) + # Check if columns exist before dropping them + columns_to_drop = [ + "classification.domain_tags", + "classification.domain_metadata", + ] + columns_to_drop = [col for col in columns_to_drop if col in df.columns] + df = df.drop(columns=columns_to_drop) + + # Find List + list_columns = [ + col for col in df.columns if any(isinstance(item, list) for item in df[col]) + ] + + # Drop error list column + for col in list_columns: + try: + df[col] = df[col].apply( + lambda x: ", ".join(x) if isinstance(x, list) else x + ) + except Exception as e: + df.drop(columns=[col], inplace=True) + + # Drop columns that cannot be saved in KNIME + drop_columns = [] + for col in df.columns: + try: + # Attempt to convert the column to a KNIME-compatible data type + knime_table = knext.Table.from_pandas(df[[col]]) + except Exception as e: + # If an exception is raised, add the column to the list of columns to drop + drop_columns.append(col) + + # Drop the columns that cannot be saved in KNIME + df.drop(columns=drop_columns, inplace=True) + df.replace("?", pd.NA, inplace=True) + df.replace("", pd.NA, inplace=True) + df.dropna(axis=1, how="all", inplace=True) + df = df.reset_index(drop=True) + + # Reorder the columns to have "metadata.domain" and "resource.id" at the beginning + important_cols = [] + if "metadata.domain" in df.columns: + important_cols.append("metadata.domain") + if "resource.id" in df.columns: + important_cols.append("resource.id") + + # Create the reordered column list + remaining_cols = [col for col in df.columns if col not in important_cols] + reordered_cols = important_cols + remaining_cols + + # Reorder the DataFrame columns + if important_cols: + df = df[reordered_cols] + + return knext.Table.from_pandas(df) + + +############################################ +# Socrata Data Query +############################################ +@knext.node( + name="Socrata Data Query", + node_type=knext.NodeType.SOURCE, + icon_path=__NODE_ICON_PATH + "Socrata Data Query.png", + category=__category, + after="", +) +@knext.output_table( + name="Socrata dataset", + description="Socrata dataset based on search keywords", +) +class SocrataDataNode: + """Retrieve the open data category via Socrata API. + + US Centers for Disease Control and Prevention (CDC): CDC data includes information on infectious diseases, chronic conditions, environmental health hazards, + injury prevention, maternal and child health, immunization coverage, and much more. These datasets are collected through surveillance systems, population surveys, + epidemiological studies, and collaborative research efforts conducted by the CDC and its partners. + + Data.gov: The official open data platform of the United States government, offering datasets from various U.S. government agencies covering fields such as education, + healthcare, transportation, and the environment. + + Chicago Data Portal: The open data platform provided by the City of Chicago, offering datasets related to the city, including crime data, transportation data, demographic statistics, and more. + + NYC Open Data: The open data platform provided by the City of New York, offering datasets covering urban planning, public transportation, health, and various other aspects of the city. + + UK Government Data Service: The open data platform provided by the UK government, offering datasets from various governmental bodies covering economics, social issues, the environment, and more. + + World Bank Data: The open data platform provided by the World Bank, offering a wide range of economic, social, and environmental datasets from around the world for research and analysis of global development trends. + + The Socrata Open Data API (SODA) is a powerful tool designed for programmatically accessing a vast array of open data resources from various organizations around the world, including governments, non-profits,and NGOs.. + This node uses the [SODA Consumer API](https://dev.socrata.com/consumers/getting-started.html) to get the dataset from a dataset list generated by Socrata Search Node. + + For instance, this dataset [Incidence Rate Of Breast Cancer](https://opendata.utah.gov/Health/Incidence-Rate-Of-Breast-Cancer-Per-100-000-All-St/q22t-rbk9) has a resource_id of "q22t-rbk9" and a metadata domain of "opendata.utah.gov". + They can be found in the link under API,"https://opendata.utah.gov/resource/q22t-rbk9.json". Both the two items will be used for data retriving. + """ + + metadata_domain = knext.StringParameter( + label="Metadata domain", + description="""The value in the column metadata.domain of a table generated by a Socrata Search node. """, + default_value="", + ) + + resource_id = knext.StringParameter( + label="Resource ID", + description="""The value in the column resource.id of a table generated by a Socrata Search node. """, + default_value="", + ) + + app_token = knext.StringParameter( + label="Application Token", + description="""Optional: Provide an application token to increase API request limits. + You can register for a token at [Application Tokens](https://dev.socrata.com/docs/app-tokens.html)""", + default_value="", + is_advanced=True, + ) + + query_filter = knext.StringParameter( + label="Query Filter", + description="""Provide filtering conditions to narrow down results. Socrata API supports two main filtering mechanisms: + + 1. Simple Filters: Use column names directly as parameters. Examples: + - source=nn (query for records where source field equals 'nn') + - source=pr®ion=Virgin Islands region (multiple conditions, combined with AND) + + 2. SoQL Query Language: Use $where parameter for more complex queries: + - $where=magnitude > 3.0 (numeric comparison) + - $where=datetime > '2020-01-01' (date comparison) + - $where=state='NY' AND age > 30 (combined conditions) + - $where=annual_salary between '40000' and '60000' (range query) + + For more information, see [Simple filter](https://dev.socrata.com/docs/filtering.html) and [SoQL Queries](https://dev.socrata.com/docs/queries/) + """, + default_value="", + is_advanced=True, + ) + + timeout = knext.IntParameter( + label="Request timeout in seconds", + description="The timeout in seconds for the request API.", + default_value=120, + min_value=1, + is_advanced=True, + ) + + def configure(self, configure_context): + # TODO Create combined schema + return None + + def execute(self, exec_context: knext.ExecutionContext): + import pandas as pd + import json + from sodapy import Socrata + import requests + + # Start with validation phase + exec_context.set_progress(0.05, "Validating connection and parameters...") + + # Validate the required parameters + if not self.metadata_domain or self.metadata_domain.strip() == "": + raise knext.InvalidParametersError("Metadata domain cannot be empty") + + if not self.resource_id or self.resource_id.strip() == "": + raise knext.InvalidParametersError("Resource ID cannot be empty") + + # Initialize app token if provided + app_token = ( + None + if not self.app_token or self.app_token.strip() == "" + else self.app_token + ) + + # Create Socrata client and validate it works + try: + client = Socrata(self.metadata_domain, app_token) + client.timeout = self.timeout + + # Validate that we can connect to the API with a minimal test query + validation_params = {"$limit": 1} + + # First validate the connection and resource ID by attempting a minimal query + test_results = client.get(self.resource_id, **validation_params) + + if test_results is None: + raise knext.InvalidParametersError( + f"Resource ID '{self.resource_id}' is invalid or not accessible" + ) + + except Exception as e: + # Connection or resource ID validation failed + error_message = str(e) + raise knext.InvalidParametersError( + f"Failed to connect to Socrata API: {error_message}" + ) + + # Now validate query filter if provided + query_params = {} + if self.query_filter and self.query_filter.strip() != "": + filter_text = self.query_filter.strip() + + try: + # Parse the filter based on its format + if filter_text.lower().startswith("$where="): + query_params["$where"] = filter_text[7:] + elif "=" in filter_text and not filter_text.startswith("$"): + filter_pairs = filter_text.split("&") + for pair in filter_pairs: + if "=" in pair: + key, value = pair.split("=", 1) + query_params[key.strip()] = value.strip() + else: + query_params["$where"] = filter_text + + # Validate the filter by making a test query + test_filter_params = {**query_params, "$limit": 1} + test_filter_results = client.get(self.resource_id, **test_filter_params) + + # If we get here, the filter is valid + + except Exception as e: + # Filter validation failed + error_message = str(e) + raise knext.InvalidParametersError( + f"Invalid SQL Filter: {error_message}" + ) + + # If all validations pass, proceed to data retrieval phase + exec_context.set_progress( + 0.1, "Connection and parameters validated. Determining dataset size..." + ) + + # Try to get the total count to calculate progress + total_records = 0 + try: + # Add COUNT(*) to our validated query parameters + count_params = {**query_params, "$select": "COUNT(*)"} + count_results = client.get(self.resource_id, **count_params) + total_records = int(count_results[0]["COUNT"]) if count_results else 0 + + exec_context.set_progress( + 0.15, f"Dataset size: {total_records:,} records. Starting download..." + ) + except Exception as e: + # Non-critical error: proceed without exact count + exec_context.set_progress( + 0.15, + f"Could not determine dataset size: {str(e)}. Proceeding with download...", + ) + total_records = 0 + + # Check if there's data to retrieve + if total_records == 0: + # Try a test query to see if there's any data at all + test_data_params = {**query_params, "$limit": 1} + test_data_results = client.get(self.resource_id, **test_data_params) + + if not test_data_results: + # No data found for this query + raise knext.InvalidParametersError("No data found for the given query") + + # Main data retrieval phase + limit = 100000 + offset = 0 + all_results = [] + rows_retrieved = 0 + iteration_count = 0 + + while True: + # Update progress + if total_records > 0: + progress_percent = min( + 0.15 + 0.75 * (rows_retrieved / total_records), 0.9 + ) + progress_message = f"Downloaded {rows_retrieved:,} of {total_records:,} records ({(rows_retrieved / total_records * 100):.1f}%)" + else: + progress_percent = min(0.15 + (iteration_count * 0.05), 0.9) + progress_message = ( + f"Downloaded {rows_retrieved:,} records so far (offset: {offset:,})" + ) + + exec_context.set_progress(progress_percent, progress_message) + iteration_count += 1 + + # Prepare pagination parameters + current_params = {**query_params, "$limit": limit, "$offset": offset} + + try: + # Get results + results = client.get(self.resource_id, **current_params) + + if not results: + # End of pagination + break + + all_results.extend(results) + rows_retrieved += len(results) + offset += limit + + except requests.exceptions.HTTPError as e: + # Handle HTTP errors that might occur during pagination + error_message = str(e) + if 400 <= e.response.status_code < 500: + raise knext.InvalidParametersError( + f"Query error during pagination: {error_message}" + ) + else: + raise knext.InvalidParametersError( + f"Server error during data retrieval: {error_message}" + ) + except Exception as e: + # Handle other exceptions + error_message = str(e) + raise knext.InvalidParametersError( + f"Error during data retrieval: {error_message}" + ) + + # Final processing phase + if not all_results: + raise knext.InvalidParametersError( + "No data was retrieved. The query may be too restrictive." + ) + + exec_context.set_progress(0.95, f"Processing {rows_retrieved:,} records...") + + # Convert to pandas DataFrame + results_df = pd.DataFrame.from_records(all_results) + + # Complete + exec_context.set_progress( + 1.0, f"Complete: Retrieved {rows_retrieved:,} records" + ) + return knext.Table.from_pandas(results_df)