Skip to content

Commit

Permalink
separated scraped and scraped_airlines
Browse files Browse the repository at this point in the history
  • Loading branch information
esalonico committed Aug 4, 2023
1 parent fa0ad2a commit 6a4db3d
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 68 deletions.
113 changes: 76 additions & 37 deletions flight_analysis.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -339,55 +339,94 @@
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": null,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'https://www.google.com/travel/flights?q=Flights%20to%20JFK%20from%20%28MUC%2CFCO%29%20on%202023-10-28%20oneway&curr=EUR&gl=IT'"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"base = \"https://www.google.com/travel/flights?q=Flights%20to%20JFK%20from%20MUC%20on%202023-10-28%20oneway&curr=EUR&gl=IT\"\n",
"\n",
"a = \"https://www.google.com/travel/flights?q=Flights%20to%20JFK%20from%20%28MUC%2CFCO%29%20on%202023-10-28%20oneway&curr=EUR&gl=IT\"\n",
"a"
]
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 7,
"execution_count": 49,
"metadata": {},
"outputs": [
{
"ename": "AssertionError",
"evalue": "",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mAssertionError\u001b[0m Traceback (most recent call last)",
"\u001b[1;32m/Users/emanuelesalonico/Library/CloudStorage/[email protected]/My Drive/SYNC/Dev/flight-analysis/flight_analysis.ipynb Cell 6\u001b[0m in \u001b[0;36m<cell line: 4>\u001b[0;34m()\u001b[0m\n\u001b[1;32m <a href='vscode-notebook-cell:/Users/emanuelesalonico/Library/CloudStorage/GoogleDrive-esalonico%40gmail.com/My%20Drive/SYNC/Dev/flight-analysis/flight_analysis.ipynb#X10sZmlsZQ%3D%3D?line=2'>3</a>\u001b[0m parser \u001b[39m=\u001b[39m StandardParser()\n\u001b[1;32m <a href='vscode-notebook-cell:/Users/emanuelesalonico/Library/CloudStorage/GoogleDrive-esalonico%40gmail.com/My%20Drive/SYNC/Dev/flight-analysis/flight_analysis.ipynb#X10sZmlsZQ%3D%3D?line=3'>4</a>\u001b[0m \u001b[39mwith\u001b[39;00m \u001b[39mopen\u001b[39m(\u001b[39m'\u001b[39m\u001b[39mtest\u001b[39m\u001b[39m'\u001b[39m, \u001b[39m'\u001b[39m\u001b[39mrb\u001b[39m\u001b[39m'\u001b[39m) \u001b[39mas\u001b[39;00m fh:\n\u001b[1;32m <a href='vscode-notebook-cell:/Users/emanuelesalonico/Library/CloudStorage/GoogleDrive-esalonico%40gmail.com/My%20Drive/SYNC/Dev/flight-analysis/flight_analysis.ipynb#X10sZmlsZQ%3D%3D?line=4'>5</a>\u001b[0m \u001b[39m# print(fh.read())\u001b[39;00m\n\u001b[0;32m----> <a href='vscode-notebook-cell:/Users/emanuelesalonico/Library/CloudStorage/GoogleDrive-esalonico%40gmail.com/My%20Drive/SYNC/Dev/flight-analysis/flight_analysis.ipynb#X10sZmlsZQ%3D%3D?line=5'>6</a>\u001b[0m output \u001b[39m=\u001b[39m parser\u001b[39m.\u001b[39;49mparse_message(fh, \u001b[39m\"\u001b[39;49m\u001b[39mmessage\u001b[39;49m\u001b[39m\"\u001b[39;49m)\n\u001b[1;32m <a href='vscode-notebook-cell:/Users/emanuelesalonico/Library/CloudStorage/GoogleDrive-esalonico%40gmail.com/My%20Drive/SYNC/Dev/flight-analysis/flight_analysis.ipynb#X10sZmlsZQ%3D%3D?line=6'>7</a>\u001b[0m \u001b[39mprint\u001b[39m(output)\n",
"File \u001b[0;32m~/miniforge3/envs/flight-analysis/lib/python3.10/site-packages/protobuf_inspector/types.py:75\u001b[0m, in \u001b[0;36mStandardParser.parse_message\u001b[0;34m(self, file, gtype, endgroup)\u001b[0m\n\u001b[1;32m 73\u001b[0m \u001b[39mif\u001b[39;00m \u001b[39mtype\u001b[39m \u001b[39mis\u001b[39;00m \u001b[39mNone\u001b[39;00m: \u001b[39mtype\u001b[39m \u001b[39m=\u001b[39m \u001b[39m\"\u001b[39m\u001b[39mmessage\u001b[39m\u001b[39m\"\u001b[39m\n\u001b[1;32m 74\u001b[0m end \u001b[39m=\u001b[39m [\u001b[39mNone\u001b[39;00m]\n\u001b[0;32m---> 75\u001b[0m x \u001b[39m=\u001b[39m \u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49mparse_message(file, \u001b[39mtype\u001b[39;49m, end)\n\u001b[1;32m 76\u001b[0m x \u001b[39m=\u001b[39m \u001b[39m\"\u001b[39m\u001b[39mgroup (end \u001b[39m\u001b[39m%s\u001b[39;00m\u001b[39m) \u001b[39m\u001b[39m\"\u001b[39m \u001b[39m%\u001b[39m fg4(\u001b[39mstr\u001b[39m(end[\u001b[39m0\u001b[39m])) \u001b[39m+\u001b[39m x\n\u001b[1;32m 77\u001b[0m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39mgroups_observed \u001b[39m=\u001b[39m \u001b[39mTrue\u001b[39;00m\n",
"File \u001b[0;32m~/miniforge3/envs/flight-analysis/lib/python3.10/site-packages/protobuf_inspector/types.py:59\u001b[0m, in \u001b[0;36mStandardParser.parse_message\u001b[0;34m(self, file, gtype, endgroup)\u001b[0m\n\u001b[1;32m 56\u001b[0m key, wire_type \u001b[39m=\u001b[39m read_identifier(file)\n\u001b[1;32m 57\u001b[0m \u001b[39mif\u001b[39;00m key \u001b[39mis\u001b[39;00m \u001b[39mNone\u001b[39;00m: \u001b[39mbreak\u001b[39;00m\n\u001b[0;32m---> 59\u001b[0m x \u001b[39m=\u001b[39m read_value(file, wire_type)\n\u001b[1;32m 60\u001b[0m \u001b[39massert\u001b[39;00m(\u001b[39mnot\u001b[39;00m (x \u001b[39mis\u001b[39;00m \u001b[39mNone\u001b[39;00m))\n\u001b[1;32m 62\u001b[0m \u001b[39mif\u001b[39;00m wire_type \u001b[39m==\u001b[39m \u001b[39m4\u001b[39m:\n",
"File \u001b[0;32m~/miniforge3/envs/flight-analysis/lib/python3.10/site-packages/protobuf_inspector/core.py:38\u001b[0m, in \u001b[0;36mread_value\u001b[0;34m(file, wire_type)\u001b[0m\n\u001b[1;32m 36\u001b[0m \u001b[39mif\u001b[39;00m length \u001b[39mis\u001b[39;00m \u001b[39mNone\u001b[39;00m: \u001b[39mreturn\u001b[39;00m \u001b[39mNone\u001b[39;00m\n\u001b[1;32m 37\u001b[0m c \u001b[39m=\u001b[39m file\u001b[39m.\u001b[39mread(length)\n\u001b[0;32m---> 38\u001b[0m \u001b[39massert\u001b[39;00m(\u001b[39mlen\u001b[39m(c) \u001b[39m==\u001b[39m length)\n\u001b[1;32m 39\u001b[0m \u001b[39mreturn\u001b[39;00m io\u001b[39m.\u001b[39mBytesIO(c)\n\u001b[1;32m 40\u001b[0m \u001b[39mif\u001b[39;00m wire_type \u001b[39m==\u001b[39m \u001b[39m3\u001b[39m \u001b[39mor\u001b[39;00m wire_type \u001b[39m==\u001b[39m \u001b[39m4\u001b[39m:\n",
"\u001b[0;31mAssertionError\u001b[0m: "
"name": "stdout",
"output_type": "stream",
"text": [
"1 / 26\n",
"2 / 26\n",
"3 / 26\n",
"4 / 26\n",
"5 / 26\n",
"6 / 26\n",
"7 / 26\n",
"8 / 26\n",
"9 / 26\n",
"10 / 26\n",
"11 / 26\n",
"12 / 26\n",
"13 / 26\n",
"14 / 26\n",
"15 / 26\n",
"16 / 26\n",
"17 / 26\n",
"18 / 26\n",
"19 / 26\n",
"20 / 26\n",
"21 / 26\n",
"22 / 26\n",
"23 / 26\n",
"24 / 26\n",
"25 / 26\n",
"26 / 26\n"
]
}
],
"source": [
"from protobuf_inspector.types import StandardParser\n",
"# this process is to retroactively split the flight airlines into a separate \"scraped_airlines\" table,\n",
"# in order to violate the 1NF and 2NF rules of database normalization.\n",
"\n",
"raise NotImplementedError(\"This script is not meant to be run again!!!!!\")\n",
"\n",
"import pandas as pd\n",
"from src.flight_analysis.database import Database\n",
"import private.private as private\n",
"\n",
"# db = Database(\n",
"# db_host=private.DB_HOST,\n",
"# db_name=private.DB_NAME,\n",
"# db_user=private.DB_USER,\n",
"# db_pw=private.DB_PW,\n",
"# )\n",
"\n",
"query = \"SELECT id, airlines FROM scraped\"\n",
"\n",
"cur = db.conn.cursor()\n",
"cur.execute(query)\n",
"\n",
"res = cur.fetchall()\n",
"\n",
"df = pd.DataFrame(res, columns=[\"flight_uuid\", \"airline\"])\n",
"\n",
"df2 = df.explode(\"airline\")\n",
"df2[\"airline\"] = df2[\"airline\"].map(lambda x: x.lstrip(\"'\").rstrip(\"'\"))\n",
"df2[\"airline\"] = df2[\"airline\"].replace({\"Separate tickets booked together\": \"multiple\"})\n",
"df2 = df2.reset_index(drop=True)\n",
"\n",
"def split_dataframe(df, chunk_size = 5000): \n",
" chunks = list()\n",
" num_chunks = len(df) // chunk_size + 1\n",
" for i in range(num_chunks):\n",
" chunks.append(df[i*chunk_size:(i+1)*chunk_size])\n",
" return chunks\n",
"\n",
"chunks = split_dataframe(df2, chunk_size = 5000)\n",
"\n",
"parser = StandardParser()\n",
"with open('test', 'rb') as fh:\n",
" # print(fh.read())\n",
" output = parser.parse_message(fh, \"message\")\n",
"print(output)"
"i = 1\n",
"for c in chunks:\n",
" print(i, \"/\", len(chunks))\n",
" db.add_pandas_df_to_db(c, table_name=db.table_scraped_airlines)\n",
" i += 1"
]
}
],
Expand Down
22 changes: 18 additions & 4 deletions flight_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,18 @@ def get_routes_df(routes: list):

n_iter += 1

# concatenate all results into a single dataframe
final_df = pd.concat(all_results).reset_index(drop=True)

# clean and transform the dataframe
final_df["layover_time"] = final_df["layover_time"].fillna(-1)
final_df["layover_location"] = (
final_df["layover_location"].fillna(np.nan).replace([np.nan], [None])
)
final_df["price_value"] = (
final_df["price_value"].fillna(np.nan).replace([np.nan], [None])
)

final_df["uuid"] = [uuid.uuid4() for _ in range(final_df.shape[0])]
final_df = final_df.set_index("uuid")

Expand All @@ -121,10 +132,12 @@ def generate_airline_df_from_flights(flights_df):
flights_df = flights_df.reset_index(drop=True)

# create a dataframe with all the airlines, referencing the index
airlines_df = flights_df.explode("airlines")[["airlines"]]
airlines_df = flights_df.explode("airlines")[["airlines"]].reset_index()

# rename column to "airline"
airlines_df = airlines_df.rename(columns={"airlines": "airline"})
airlines_df = airlines_df.rename(
columns={"uuid": "flight_uuid", "airlines": "airline"}
)

return airlines_df

Expand All @@ -142,7 +155,7 @@ def generate_airline_df_from_flights(flights_df):
scraped_airlines = generate_airline_df_from_flights(scraped_flights)

# drop airlines from flights dataframe
# scraped_flights = scraped_flights.drop(columns=["airlines"])
scraped_flights = scraped_flights.drop(columns=["airlines"])

# connect to database
db = Database(
Expand All @@ -158,8 +171,9 @@ def generate_airline_df_from_flights(flights_df):
# add results to database
if not SKIP_SAVE_TO_DB:
db.add_pandas_df_to_db(scraped_flights, table_name=db.table_scraped)

print(scraped_airlines)
db.add_pandas_df_to_db(scraped_airlines, table_name=db.table_scraped_airlines)


# if it's a monday, backup the database
if datetime.today().weekday() == 0:
Expand Down
30 changes: 4 additions & 26 deletions src/flight_analysis/database.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# author: Emanuele Salonico, 2023

import psycopg2
import pandas as pd
import numpy as np
import ast
import psycopg2.extras as extras
import os
import logging
Expand Down Expand Up @@ -93,7 +90,6 @@ def create_scraped_table(self):
id uuid DEFAULT gen_random_uuid() PRIMARY KEY,
departure_datetime timestamp with time zone,
arrival_datetime timestamp with time zone,
airlines text[] COLLATE pg_catalog."default",
travel_time smallint NOT NULL,
origin character(3) COLLATE pg_catalog."default" NOT NULL,
destination character(3) COLLATE pg_catalog."default" NOT NULL,
Expand Down Expand Up @@ -123,7 +119,8 @@ def create_scraped_airlines_table(self):
query += f"""
CREATE TABLE IF NOT EXISTS public.{self.table_scraped_airlines}
(
id uuid PRIMARY KEY NOT NULL,
id uuid DEFAULT gen_random_uuid() PRIMARY KEY,
flight_uuid uuid NOT NULL,
airline text COLLATE pg_catalog."default"
)
Expand Down Expand Up @@ -153,29 +150,10 @@ def prepare_db_and_tables(self):
if self.table_scraped_airlines not in self.list_all_tables():
self.create_scraped_airlines_table()

def transform_and_clean_df(self, df):
"""
Some necessary cleaning and transforming operations to the df
before sending its content to the database
"""

df["airlines"] = df.airlines.apply(
lambda x: np.array(
ast.literal_eval(str(x).replace("[", '"{').replace("]", '}"'))
)
)
df["layover_time"] = df["layover_time"].fillna(-1)
df["layover_location"] = (
df["layover_location"].fillna(np.nan).replace([np.nan], [None])
)
df["price_value"] = df["price_value"].fillna(np.nan).replace([np.nan], [None])

return df

def add_pandas_df_to_db(self, df, table_name):
# clean df
df = self.transform_and_clean_df(df)

extras.register_uuid()

# Create a list of tuples from the dataframe values
tuples = [tuple(x) for x in df.to_numpy()]

Expand Down
1 change: 0 additions & 1 deletion test

This file was deleted.

0 comments on commit 6a4db3d

Please sign in to comment.