diff --git a/topic/timeseries/dask-weather-data-import.ipynb b/topic/timeseries/dask-weather-data-import.ipynb new file mode 100644 index 00000000..3e5487f4 --- /dev/null +++ b/topic/timeseries/dask-weather-data-import.ipynb @@ -0,0 +1,680 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "ec2a7c84", + "metadata": {}, + "source": [ + "# How to Build Time Series Applications in CrateDB\n", + "\n", + "## Batch Import Sample Weather Data with Pands/Dask Data Frames\n", + "\n", + "We will work with a daily weather data set provided on kaggle.com. This dataset offers a collection of **daily weather readings from major cities around the world, making up to ~1250 cities**. Some locations provide historical data tracing back to 1833, giving users a deep dive into **long-term weather patterns and their evolution**.\n", + "\n", + "The measurements include:\n", + "- Station ID\n", + "- City Name\n", + "- Timestamp (granularity: day)\n", + "- Season\n", + "- Average temperature in °C\n", + "- Minimum temperature in °C\n", + "- Maximum temperature in °C\n", + "- Precipitation in mm\n", + "- Snow depth in mm\n", + "- Average wind direction in degrees\n", + "- Average wind speed in km/h\n", + "- Peak wind gust in km/h\n", + "- Average sea level pressure in hpa\n", + "- Total sunshine in min\n", + "\n", + "The data set is available on kaggle and can be downloaded: [The Weather Dataset](https://www.kaggle.com/datasets/guillemservera/global-daily-climate-data)" + ] + }, + { + "cell_type": "markdown", + "id": "b59cf879", + "metadata": {}, + "source": [ + "## Step 1: Install dependencies\n", + "\n", + "If not available already, we require SQLAlchemy, and Dask (which offers a framework to parallelize operations on Pandas Data Frames)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e0649e64", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "#!pip install dask pandas==2.0.0 'sqlalchemy[crate]'" + ] + }, + { + "cell_type": "markdown", + "id": "4aa9b1d0", + "metadata": {}, + "source": [ + "## Step 2: Read and prepare the data\n", + "\n", + "We will download and prepare the data for import into CrateDB.\n", + "\n", + "The following data sets need to be processed:\n", + "- Daily weather data (daily_weather.parquet)\n", + "- Cities (cities.csv)\n", + "- Countries (countries.csv)" + ] + }, + { + "cell_type": "code", + "execution_count": 88, + "id": "fa24e753", + "metadata": {}, + "outputs": [], + "source": [ + "from dask import dataframe as dd\n", + "from dask.diagnostics import ProgressBar\n", + "\n", + "# Show a progress bar for dask activities\n", + "pbar = ProgressBar()\n", + "pbar.register()" + ] + }, + { + "cell_type": "code", + "execution_count": 56, + "id": "a506f7c9", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[########################################] | 100% Completed | 6.26 ss\n", + "[########################################] | 100% Completed | 6.37 s\n", + "[########################################] | 100% Completed | 6.47 s\n", + "[########################################] | 100% Completed | 6.47 s\n", + "\n", + "Index: 27635763 entries, 0 to 24220\n", + "Data columns (total 14 columns):\n", + " # Column Non-Null Count Dtype\n", + "--- ------ -------------- -----\n", + " 0 station_id 27635763 non-null category\n", + " 1 city_name 27621770 non-null category\n", + " 2 date 27635763 non-null datetime64[ns]\n", + " 3 season 27635763 non-null category\n", + " 4 avg_temp_c 21404856 non-null float64\n", + " 5 min_temp_c 21917534 non-null float64\n", + " 6 max_temp_c 22096417 non-null float64\n", + " 7 precipitation_mm 20993263 non-null float64\n", + " 8 snow_depth_mm 3427148 non-null float64\n", + " 9 avg_wind_dir_deg 3452568 non-null float64\n", + "10 avg_wind_speed_kmh 5285468 non-null float64\n", + "11 peak_wind_gust_kmh 1121486 non-null float64\n", + "12 avg_sea_level_pres_hpa 4017157 non-null float64\n", + "13 sunshine_total_min 1021461 non-null float64\n", + "dtypes: category(3), datetime64[ns](1), float64(10)\n", + "memory usage: 2.6 GB\n", + "[########################################] | 100% Completed | 5.37 ss\n", + "[########################################] | 100% Completed | 5.48 s\n", + "[########################################] | 100% Completed | 5.58 s\n", + "[########################################] | 100% Completed | 5.68 s\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
station_idcity_namedateseasonavg_temp_cmin_temp_cmax_temp_cprecipitation_mmsnow_depth_mmavg_wind_dir_degavg_wind_speed_kmhpeak_wind_gust_kmhavg_sea_level_pres_hpasunshine_total_min
041515Asadabad1957-07-01Summer27.021.135.60.0NaNNaNNaNNaNNaNNaN
141515Asadabad1957-07-02Summer22.818.932.20.0NaNNaNNaNNaNNaNNaN
241515Asadabad1957-07-03Summer24.316.735.61.0NaNNaNNaNNaNNaNNaN
341515Asadabad1957-07-04Summer26.616.137.84.1NaNNaNNaNNaNNaNNaN
441515Asadabad1957-07-05Summer30.820.041.70.0NaNNaNNaNNaNNaNNaN
\n", + "
" + ], + "text/plain": [ + " station_id city_name date season avg_temp_c min_temp_c max_temp_c \n", + "0 41515 Asadabad 1957-07-01 Summer 27.0 21.1 35.6 \\\n", + "1 41515 Asadabad 1957-07-02 Summer 22.8 18.9 32.2 \n", + "2 41515 Asadabad 1957-07-03 Summer 24.3 16.7 35.6 \n", + "3 41515 Asadabad 1957-07-04 Summer 26.6 16.1 37.8 \n", + "4 41515 Asadabad 1957-07-05 Summer 30.8 20.0 41.7 \n", + "\n", + " precipitation_mm snow_depth_mm avg_wind_dir_deg avg_wind_speed_kmh \n", + "0 0.0 NaN NaN NaN \\\n", + "1 0.0 NaN NaN NaN \n", + "2 1.0 NaN NaN NaN \n", + "3 4.1 NaN NaN NaN \n", + "4 0.0 NaN NaN NaN \n", + "\n", + " peak_wind_gust_kmh avg_sea_level_pres_hpa sunshine_total_min \n", + "0 NaN NaN NaN \n", + "1 NaN NaN NaN \n", + "2 NaN NaN NaN \n", + "3 NaN NaN NaN \n", + "4 NaN NaN NaN " + ] + }, + "execution_count": 56, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Load the parquet file (adapt the file path as needed)\n", + "df_kaggle = dd.read_parquet('DOWNLOAD_PATH/daily_weather.parquet')\n", + "\n", + "# Show info about the data\n", + "df_kaggle.info(verbose=True, memory_usage=True)\n", + "\n", + "# Show the first rows\n", + "df_kaggle.head()" + ] + }, + { + "cell_type": "code", + "execution_count": 68, + "id": "4c083721", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[########################################] | 100% Completed | 107.04 ms\n", + "[########################################] | 100% Completed | 211.77 ms\n", + "[########################################] | 100% Completed | 316.85 ms\n", + "[########################################] | 100% Completed | 421.17 ms\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
station_idcity_namecountrystateiso2iso3loc
041515AsadabadAfghanistanKunarAFAFG[71.1500045859, 34.8660000397]
138954FayzabadAfghanistanBadakhshanAFAFG[70.5792471913, 37.1297607616]
241560JalalabadAfghanistanNangarharAFAFG[70.4361034738, 34.4415269155]
338947KunduzAfghanistanKunduzAFAFG[68.8725296619, 36.7279506623]
438987Qala i NawAfghanistanBadghisAFAFG[63.1332996367, 34.983000131]
\n", + "
" + ], + "text/plain": [ + " station_id city_name country state iso2 iso3 \n", + "0 41515 Asadabad Afghanistan Kunar AF AFG \\\n", + "1 38954 Fayzabad Afghanistan Badakhshan AF AFG \n", + "2 41560 Jalalabad Afghanistan Nangarhar AF AFG \n", + "3 38947 Kunduz Afghanistan Kunduz AF AFG \n", + "4 38987 Qala i Naw Afghanistan Badghis AF AFG \n", + "\n", + " loc \n", + "0 [71.1500045859, 34.8660000397] \n", + "1 [70.5792471913, 37.1297607616] \n", + "2 [70.4361034738, 34.4415269155] \n", + "3 [68.8725296619, 36.7279506623] \n", + "4 [63.1332996367, 34.983000131] " + ] + }, + "execution_count": 68, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Read cities, adapt the path to the files accordingly\n", + "cities = dd.read_csv(\"DOWNLOAD_PATH/cities.csv\",dtype={'station_id': 'object'})\n", + "\n", + "# Modify lon and lat of cities into an array that can be interpreted directly by CrateDB\n", + "def create_location_column(df):\n", + " df['loc'] = df[['longitude', 'latitude']].values.tolist()\n", + " return df\n", + "\n", + "cities = cities.map_partitions(create_location_column)\n", + "cities = cities.drop(['longitude', 'latitude'], axis=1)\n", + "\n", + "cities.head()" + ] + }, + { + "cell_type": "code", + "execution_count": 119, + "id": "903e0fed", + "metadata": {}, + "outputs": [], + "source": [ + "# Read countries, adapt the path to the files accordingly\n", + "countries = dd.read_csv(\"DOWNLOAD_PATH/countries.csv\")" + ] + }, + { + "cell_type": "markdown", + "id": "ee6cd33c", + "metadata": {}, + "source": [ + "## Step 3: Import the data into CrateDB\n", + "\n", + "Now that the data is prepared, we can import it into CrateDB. In order to provide the correct datatypes and use, for example, fulltext indexes, we create the tables manually. When writing a dataframe to CrateDB the schema could also be derived automatically." + ] + }, + { + "cell_type": "code", + "execution_count": 102, + "id": "9eaf4af1", + "metadata": {}, + "outputs": [], + "source": [ + "import sqlalchemy as sa\n", + "from crate.client.sqlalchemy.support import insert_bulk\n", + "\n", + "# Connect to CrateDB\n", + "# For a database running in the cloud, please use a connection string like this:\n", + "dburi = 'crate://USER:PASSWORD@HOST:4200?ssl=true'\n", + "\n", + "# For a database running locally, please use the following connection string:\n", + "# dburi = 'crate://localhost:4200?ssl=false'\n", + "\n", + "engine = sa.create_engine(dburi, echo=False)\n", + "connection = engine.connect()" + ] + }, + { + "cell_type": "markdown", + "id": "efee5dab", + "metadata": {}, + "source": [ + "#### Create tables\n", + "\n", + "Let us create the weather data table. We want to use a fulltext search on the city name and therefore define a fulltext index." + ] + }, + { + "cell_type": "code", + "execution_count": 121, + "id": "5f972876", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 121, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "connection.execute(sa.text(\"\"\"\n", + "CREATE TABLE IF NOT EXISTS \"doc\".\"weather_data\" (\n", + " \"station_id\" TEXT,\n", + " \"city_name\" TEXT,\n", + " \"date\" TIMESTAMP WITHOUT TIME ZONE,\n", + " \"season\" TEXT,\n", + " \"avg_temp_c\" REAL,\n", + " \"min_temp_c\" REAL,\n", + " \"max_temp_c\" REAL,\n", + " \"precipitation_mm\" REAL,\n", + " \"snow_depth_mm\" REAL,\n", + " \"avg_wind_dir_deg\" REAL,\n", + " \"avg_wind_speed_kmh\" REAL,\n", + " \"peak_wind_gust_kmh\" REAL,\n", + " \"avg_sea_level_pres_hpa\" REAL,\n", + " \"sunshine_total_min\" REAL,\n", + " INDEX city_name_ft using fulltext (city_name)\n", + ")\n", + "\"\"\"))" + ] + }, + { + "cell_type": "markdown", + "id": "d9021224", + "metadata": {}, + "source": [ + "Create the table for cities using GEO_POINT as a datatype for the location:" + ] + }, + { + "cell_type": "code", + "execution_count": 105, + "id": "9d8b46de", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 105, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "connection.execute(sa.text(\"\"\"\n", + "CREATE TABLE \"doc\".\"cities\" (\n", + " \"station_id\" TEXT,\n", + " \"city_name\" TEXT,\n", + " \"country\" TEXT,\n", + " \"state\" TEXT,\n", + " \"iso2\" TEXT,\n", + " \"iso3\" TEXT,\n", + " \"loc\" GEO_POINT\n", + ")\n", + "\"\"\"))" + ] + }, + { + "cell_type": "markdown", + "id": "d78e31e1", + "metadata": {}, + "source": [ + "#### Import Weather Data\n", + "\n", + "In case you are using a **CrateDB Cloud cluster**, the easiest and fastest way to import the weather data is to use the **import mechanism of CrateDB Cloud**. It avoids to transfer a lot of data across the network, as the parquet file is uploaded directly into a staging area and imported into CrateDB.\n", + "\n", + "If you are running **CrateDB locally** or do not want to use the GUI, we recommend to use a parallelized import via the Dask data frame, which follows the following ideas (pandas data frames would only use one CPU to prepare the data and not utilize the database enough):\n", + "- We create additional partitions to parallelize the import to CrateDB (which will be automatically processed/imported in parallel by Dask)\n", + "- Playing with the parameters is important to not overload the database, a chunk size of 10,000 has shown good results on a single node CrateDB with 4 GB of assigned heap memory. Watch the logs if the garbage collector consumes a lot of time - an indicator that there is not enough memory assigned to CrateDB's heap\n", + "- We use the bulk insert method instead of individual INSERT statements\n", + "- The parallelization of the import works on all partitions\n", + "- Running CrateDB in a local Docker container with 5 assigned CPUs, 8 GB total memory, 4 GB heap space led to about 80,000 inserts/second, incl. all the indexing.\n", + "\n", + "You can find additional hints about importing large datasets via Python's Dask Data Frames to CrateDB here: https://cratedb.com/docs/python/en/latest/by-example/sqlalchemy/dataframe.html" + ] + }, + { + "cell_type": "code", + "execution_count": 84, + "id": "311e588c", + "metadata": {}, + "outputs": [], + "source": [ + "# Uncomment the following lines to process the actual weather data,\n", + "# we commented them out in order to avoid long-running operations\n", + "# df_kaggle = df_kaggle.repartition(26)\n", + "# df_kaggle.to_sql(name='weather_data', uri=dburi, schema='doc', if_exists='append', \n", + "# index=False, chunksize=10000, parallel=True, method=insert_bulk)" + ] + }, + { + "cell_type": "markdown", + "id": "acd226fa", + "metadata": {}, + "source": [ + "#### Import Countries\n", + "\n", + "Countries will be imported as is, the schema is automatically derived by SQLAlchemy." + ] + }, + { + "cell_type": "code", + "execution_count": 120, + "id": "53e02715", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[########################################] | 100% Completed | 964.80 ms\n", + "[########################################] | 100% Completed | 1.06 s\n", + "[########################################] | 100% Completed | 1.16 s\n", + "[########################################] | 100% Completed | 1.17 s\n", + "[########################################] | 100% Completed | 1.27 s\n" + ] + } + ], + "source": [ + "countries.to_sql('countries', dburi, schema='doc', if_exists='append', \n", + " index=False, chunksize=1000, parallel=True, method=insert_bulk)" + ] + }, + { + "cell_type": "markdown", + "id": "9dcab082", + "metadata": {}, + "source": [ + "#### Import Cities\n", + "\n", + "Cities will be imported with the changed geolocation." + ] + }, + { + "cell_type": "code", + "execution_count": 106, + "id": "c1f87112", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[########################################] | 100% Completed | 1.17 sms\n", + "[########################################] | 100% Completed | 1.17 s\n", + "[########################################] | 100% Completed | 1.27 s\n", + "[########################################] | 100% Completed | 1.27 s\n", + "[########################################] | 100% Completed | 1.37 s\n" + ] + } + ], + "source": [ + "cities.to_sql('cities', dburi, schema='doc', if_exists='append', \n", + " index=False, chunksize=1000, parallel=True, method=insert_bulk)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.4" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}