Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions etl/src/birdxplorer_etl/extract_ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import os
import zipfile
from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta

import boto3
import requests
Expand All @@ -15,7 +15,6 @@
from birdxplorer_common.storage import (
RowNoteRecord,
RowNoteStatusRecord,
RowPostRecord,
)


Expand All @@ -24,14 +23,11 @@ def extract_data(postgresql: Session):

# Noteデータを取得してPostgreSQLに保存
date = datetime.now()
latest_note = postgresql.query(RowNoteRecord).order_by(RowNoteRecord.created_at_millis.desc()).first()
two_days_ago = datetime.now() - timedelta(days=2)
two_days_ago_timestamp = int(two_days_ago.timestamp())

while True:
if (
latest_note
and int(latest_note.created_at_millis) / 1000
> datetime.timestamp(date) - 24 * 60 * 60 * settings.COMMUNITY_NOTE_DAYS_AGO
):
if datetime.timestamp(date) < two_days_ago_timestamp:
break

dateString = date.strftime("%Y/%m/%d")
Expand Down Expand Up @@ -63,9 +59,9 @@ def extract_data(postgresql: Session):
reader.fieldnames = [stringcase.snakecase(field) for field in reader.fieldnames]

rows_to_add = []
rows_to_update = []
for index, row in enumerate(reader):
if postgresql.query(RowNoteRecord).filter(RowNoteRecord.note_id == row["note_id"]).first():
continue
existing_note = postgresql.query(RowNoteRecord).filter(RowNoteRecord.note_id == row["note_id"]).first()

# BinaryBoolフィールドの値を正規化
binary_bool_fields = [
Expand Down Expand Up @@ -154,24 +150,31 @@ def extract_data(postgresql: Session):
if value == "" and key not in ["harmful", "validation_difficulty"]:
row[key] = None

note_record = RowNoteRecord(**row)
rows_to_add.append(note_record)
if existing_note:
for key, value in row.items():
if hasattr(existing_note, key):
setattr(existing_note, key, value)
rows_to_update.append(existing_note)
else:
note_record = RowNoteRecord(**row)
rows_to_add.append(note_record)

if index % 1000 == 0:
postgresql.bulk_save_objects(rows_to_add)
postgresql.commit()

# バッチ処理後にSQSキューイング
# バッチ処理後にSQSキューイング(新規追加のみ)
for note in rows_to_add:
enqueue_notes(note.note_id)

rows_to_add = []
rows_to_update = []

# 最後のバッチを処理
postgresql.bulk_save_objects(rows_to_add)
postgresql.commit()

# 最後のバッチのSQSキューイング
# 最後のバッチのSQSキューイング(新規追加のみ)
for note in rows_to_add:
enqueue_notes(note.note_id)

Expand Down