Skip to content

Commit

Permalink
#199 création du traitement d'archivage
Browse files Browse the repository at this point in the history
  • Loading branch information
njouanin committed Nov 9, 2024
1 parent 468cdaa commit da237b6
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 1 deletion.
77 changes: 76 additions & 1 deletion backend/bloom/infra/repositories/repository_spire_ais_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from bloom.infra.database import sql_model
from dependency_injector.providers import Callable
from sqlalchemy.orm import Session
from sqlalchemy import select, and_
from sqlalchemy import select, and_, delete, Result
from datetime import datetime
from bloom.logger import logger

Expand Down Expand Up @@ -159,6 +159,81 @@ def get_all_data_by_mmsi(
result = session.execute(stmt).scalars()
return [SpireAisDataRepository.map_to_domain(e) for e in result]

def get_all_data_before_as_df(self, session: Session, date_before: datetime) -> pd.DataFrame:
stmt = select(sql_model.SpireAisData.id,
sql_model.SpireAisData.spire_update_statement,
sql_model.SpireAisData.vessel_ais_class,
sql_model.SpireAisData.vessel_flag,
sql_model.SpireAisData.vessel_name,
sql_model.SpireAisData.vessel_callsign,
sql_model.SpireAisData.vessel_timestamp,
sql_model.SpireAisData.vessel_update_timestamp,
sql_model.SpireAisData.vessel_ship_type,
sql_model.SpireAisData.vessel_sub_ship_type,
sql_model.SpireAisData.vessel_mmsi,
sql_model.SpireAisData.vessel_imo,
sql_model.SpireAisData.vessel_width,
sql_model.SpireAisData.vessel_length,
sql_model.SpireAisData.position_accuracy,
sql_model.SpireAisData.position_collection_type,
sql_model.SpireAisData.position_course,
sql_model.SpireAisData.position_heading,
sql_model.SpireAisData.position_latitude,
sql_model.SpireAisData.position_longitude,
sql_model.SpireAisData.position_maneuver,
sql_model.SpireAisData.position_navigational_status,
sql_model.SpireAisData.position_rot,
sql_model.SpireAisData.position_speed,
sql_model.SpireAisData.position_timestamp,
sql_model.SpireAisData.position_update_timestamp,
sql_model.SpireAisData.voyage_destination,
sql_model.SpireAisData.voyage_draught,
sql_model.SpireAisData.voyage_eta,
sql_model.SpireAisData.voyage_timestamp,
sql_model.SpireAisData.voyage_update_timestamp,
sql_model.SpireAisData.created_at
).where(sql_model.SpireAisData.created_at < date_before)
result = session.execute(stmt)
return pd.DataFrame(result, columns=[
"id",
"spire_update_statement",
"vessel_ais_class",
"vessel_flag",
"vessel_name",
"vessel_callsign",
"vessel_timestamp",
"vessel_update_timestamp",
"vessel_ship_type",
"vessel_sub_ship_type",
"vessel_mmsi",
"vessel_imo",
"vessel_width",
"vessel_length",
"position_accuracy",
"position_collection_type",
"position_course",
"position_heading",
"position_latitude",
"position_longitude",
"position_maneuver",
"position_navigational_status",
"position_rot",
"position_speed",
"position_timestamp",
"position_update_timestamp",
"voyage_destination",
"voyage_draught",
"voyage_eta",
"voyage_timestamp",
"voyage_update_timestamp",
"created_at"
])

def delete_rows(self, session: Session, row_ids: list[int]) -> int:
stmt = delete(sql_model.SpireAisData).where(sql_model.SpireAisData.id.in_(row_ids))
result = session.execute(stmt)
return result.rowcount

@staticmethod
def map_to_orm(data: SpireAisData) -> sql_model.SpireAisData:
return sql_model.SpireAisData(**data.__dict__)
Expand Down
56 changes: 56 additions & 0 deletions backend/bloom/tasks/archive_spire_ais_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import argparse
from time import perf_counter
from datetime import timedelta, datetime, timezone
from pathlib import Path

from bloom.container import UseCases
from bloom.logger import logger


def run(window: int, output_path: str):
use_cases = UseCases()
spire_ais_data_repository = use_cases.spire_ais_data_repository()

db = use_cases.db()
with db.session() as session:
now = datetime.now(timezone.utc)
date_limit = now - timedelta(days=window)
logger.info(f"Suppression des données antérieures au {date_limit}")
df = spire_ais_data_repository.get_all_data_before_as_df(session, date_limit)
if len(df) > 0:
min_date = df["created_at"].min().strftime("%Y-%m-%dT%H:%M:%S")
max_date = df["created_at"].max().strftime("%Y-%m-%dT%H:%M:%S")
file_name = Path(output_path).joinpath(f"./spire_ais_data_{min_date}_{max_date}.parquet")
df.to_parquet(file_name)
logger.info(f"{len(df)} enregistrements archivés dans le fichier {file_name}")
count = spire_ais_data_repository.delete_rows(session, list(df["id"]))
logger.info(f"{count} enregistrements supprimés en base de données")
else:
logger.info("Aucune donnée à archiver")
session.commit()

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Archivage de la table spire_ais_data")
parser.add_argument(
"-w",
"--history-window",
type=int,
help="history window in days",
required=False,
default=365/2,
)
parser.add_argument(
"-o",
"--output-path",
type=str,
help="output path",
required=False,
default="./",
)
args = parser.parse_args()
time_start = perf_counter()
logger.info("DEBUT - Archivage des données de la table spire_ais_data")
run(args.history_window, args.output_path)
time_end = perf_counter()
duration = time_end - time_start
logger.info(f"FIN - Archivage des données de la table spire_ais_data en {duration:.2f}s")

0 comments on commit da237b6

Please sign in to comment.