Skip to content

Commit

Permalink
feat: add multiple table handling
Browse files Browse the repository at this point in the history
  • Loading branch information
johnonline35 committed Nov 11, 2024
1 parent e6efe07 commit 02292ce
Showing 1 changed file with 159 additions and 125 deletions.
284 changes: 159 additions & 125 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"fmt"
"os"
"strings"
"time"
"runtime/debug"

Expand All @@ -21,13 +22,15 @@ type IoTRecord struct {
Timestamp time.Time
DeviceID string
Value float64
TableName string // Added to track source table
}

type ParquetFile struct {
ID int64 `parquet:"name=id, type=INT64"`
Timestamp int64 `parquet:"name=timestamp,type=INT64"`
DeviceID string `parquet:"name=device_id, type=BYTE_ARRAY, convertedtype=UTF8"`
Value float64 `parquet:"name=value, type=DOUBLE"`
TableName string `parquet:"name=table_name, type=BYTE_ARRAY, convertedtype=UTF8"`
}

func (r IoTRecord) ToParquet() ParquetFile {
Expand All @@ -36,43 +39,26 @@ func (r IoTRecord) ToParquet() ParquetFile {
Timestamp: r.Timestamp.UnixNano(),
DeviceID: r.DeviceID,
Value: r.Value,
TableName: r.TableName,
}
}

func main() {
defer func() {
if r := recover(); r != nil {
fmt.Fprintf(os.Stderr, "Panic recovered: %v\nStack trace:\n%s\n", r, debug.Stack())
os.Exit(1)
}
}()

if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
}

func writeParquetFile(records []IoTRecord, filename string) error {
fmt.Printf("Starting to write %d records to parquet file: %s\n", len(records), filename)

// Create new ParquetFile
fw, err := local.NewLocalFileWriter(filename)
if err != nil {
return fmt.Errorf("creating file writer: %w", err)
}

// Configure writer
pw, err := writer.NewParquetWriter(fw, new(ParquetFile), 1)
if err != nil {
fw.Close()
return fmt.Errorf("creating parquet writer: %w", err)
}

// Set basic configuration
pw.CompressionType = goparquet.CompressionCodec_SNAPPY

// Write records
for i, record := range records {
pRecord := record.ToParquet()
if err := pw.Write(pRecord); err != nil {
Expand All @@ -86,158 +72,206 @@ func writeParquetFile(records []IoTRecord, filename string) error {
}
}

// Close writer
if err := pw.WriteStop(); err != nil {
fw.Close()
return fmt.Errorf("stopping writer: %w", err)
}

// Close file
if err := fw.Close(); err != nil {
return fmt.Errorf("closing file: %w", err)
}

return nil
}

func run() error {
// Get config from env vars with defaults
pgConnStr := getEnv("PG_CONN_STRING", "postgresql://localhost:5432/iot?sslmode=disable")
s3Bucket := getEnv("S3_BUCKET", "my-iot-archive")
tableName := getEnv("TABLE_NAME", "iot_data")
batchSize := 100
retentionDays := 90

fmt.Printf("Starting archival process with batch size: %d\n", batchSize)

// Connect to Postgres
db, err := sql.Open("postgres", pgConnStr)
if err != nil {
return fmt.Errorf("connecting to postgres: %w", err)
}
defer db.Close()

if err := db.Ping(); err != nil {
return fmt.Errorf("testing database connection: %w", err)
}

fmt.Println("Successfully connected to database")

// Get data older than retention period
cutoff := time.Now().AddDate(0, 0, -retentionDays)
query := fmt.Sprintf(`
SELECT id, timestamp, device_id, value
FROM %s
WHERE timestamp < $1
ORDER BY timestamp DESC
LIMIT $2`, tableName)

fmt.Printf("Executing query with cutoff date: %v\n", cutoff)

rows, err := db.Query(query, cutoff, batchSize)
if err != nil {
return fmt.Errorf("querying old records: %w", err)
}
defer rows.Close()
func processTable(db *sql.DB, tableName string, cutoff time.Time, batchSize int) ([]IoTRecord, error) {
query := fmt.Sprintf(`
SELECT id, timestamp, device_id, value
FROM %s
WHERE timestamp < $1
ORDER BY timestamp DESC
LIMIT $2`, tableName)

fmt.Printf("Executing query for table %s with cutoff date: %v\n", tableName, cutoff)

rows, err := db.Query(query, cutoff, batchSize)
if err != nil {
return nil, fmt.Errorf("querying old records from %s: %w", tableName, err)
}
defer rows.Close()

var records []IoTRecord
var lastTimestamp time.Time
recordCount := 0

fmt.Println("Starting to read records from database")
fmt.Printf("Starting to read records from table: %s\n", tableName)

for rows.Next() {
var record IoTRecord
if err := rows.Scan(&record.ID, &record.Timestamp, &record.DeviceID, &record.Value); err != nil {
return fmt.Errorf("scanning row: %w", err)
}

if recordCount == 0 {
lastTimestamp = record.Timestamp
return nil, fmt.Errorf("scanning row from %s: %w", tableName, err)
}

record.TableName = tableName
records = append(records, record)
recordCount++

if recordCount%100 == 0 {
fmt.Printf("Read %d records from database\n", recordCount)
fmt.Printf("Read %d records from %s\n", recordCount, tableName)
}
}

if err := rows.Err(); err != nil {
return fmt.Errorf("reading rows: %w", err)
return nil, fmt.Errorf("reading rows from %s: %w", tableName, err)
}

fmt.Printf("Finished reading %d records from %s\n", recordCount, tableName)
return records, nil
}

func deleteArchivedRecords(db *sql.DB, tableName string, cutoff time.Time) (int64, error) {
result, err := db.Exec(fmt.Sprintf(`
DELETE FROM %s
WHERE timestamp < $1`, tableName), cutoff)
if err != nil {
return 0, fmt.Errorf("deleting archived records from %s: %w", tableName, err)
}

fmt.Printf("Finished reading %d records from database\n", recordCount)
deleted, err := result.RowsAffected()
if err != nil {
return 0, fmt.Errorf("getting affected rows count from %s: %w", tableName, err)
}

return deleted, nil
}

func run() error {
// Get config from env vars with defaults
pgConnStr := getEnv("PG_CONN_STRING", "postgresql://localhost:5432/iot?sslmode=disable")
s3Bucket := getEnv("S3_BUCKET", "my-iot-archive")
tableNames := getEnv("TABLE_NAMES", "iot_data") // Comma-separated list of tables
batchSize := 100
retentionDays := 90

// Split table names
tables := strings.Split(tableNames, ",")
for i := range tables {
tables[i] = strings.TrimSpace(tables[i])
}

fmt.Printf("Starting archival process for tables: %v with batch size: %d\n", tables, batchSize)

// Connect to Postgres
db, err := sql.Open("postgres", pgConnStr)
if err != nil {
return fmt.Errorf("connecting to postgres: %w", err)
}
defer db.Close()

if err := db.Ping(); err != nil {
return fmt.Errorf("testing database connection: %w", err)
}

fmt.Println("Successfully connected to database")

// Get data older than retention period
cutoff := time.Now().AddDate(0, 0, -retentionDays)

// Process each table
var allRecords []IoTRecord
for _, tableName := range tables {
records, err := processTable(db, tableName, cutoff, batchSize)
if err != nil {
return fmt.Errorf("processing table %s: %w", tableName, err)
}
allRecords = append(allRecords, records...)
}

if recordCount == 0 {
if len(allRecords) == 0 {
fmt.Println("No records to archive")
return nil
}

// Get latest timestamp for S3 path
lastTimestamp := allRecords[0].Timestamp
for _, record := range allRecords[1:] {
if record.Timestamp.After(lastTimestamp) {
lastTimestamp = record.Timestamp
}
}

// Write to parquet file
parquetFile := "/tmp/archive.parquet"
if err := writeParquetFile(records, parquetFile); err != nil {
if err := writeParquetFile(allRecords, parquetFile); err != nil {
return fmt.Errorf("writing parquet file: %w", err)
}

fmt.Println("Successfully wrote parquet file")

// Upload to S3
fmt.Println("Configuring AWS client")

cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
return fmt.Errorf("loading AWS config: %w", err)
}

s3Client := s3.NewFromConfig(cfg)

s3Key := fmt.Sprintf("year=%d/month=%02d/%s_%s.parquet",
lastTimestamp.Year(),
lastTimestamp.Month(),
tableName,
lastTimestamp.Format("20060102_150405"))

fmt.Printf("Uploading to S3: %s/%s\n", s3Bucket, s3Key)

file, err := os.Open(parquetFile)
if err != nil {
return fmt.Errorf("opening parquet file: %w", err)
}
defer file.Close()

_, err = s3Client.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: &s3Bucket,
Key: &s3Key,
Body: file,
})
if err != nil {
return fmt.Errorf("uploading to S3: %w", err)
}

fmt.Println("Successfully uploaded to S3")

// Delete archived records
fmt.Println("Deleting archived records from database")

result, err := db.Exec(fmt.Sprintf(`
DELETE FROM %s
WHERE timestamp < $1`, tableName), cutoff)
if err != nil {
return fmt.Errorf("deleting archived records: %w", err)
}

deleted, _ := result.RowsAffected()
fmt.Printf("Successfully archived %d records to s3://%s/%s\n", recordCount, s3Bucket, s3Key)
fmt.Printf("Deleted %d records from postgres\n", deleted)

return nil
// Upload to S3
fmt.Println("Configuring AWS client")

cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
return fmt.Errorf("loading AWS config: %w", err)
}

s3Client := s3.NewFromConfig(cfg)

s3Key := fmt.Sprintf("year=%d/month=%02d/multi_table_%s.parquet",
lastTimestamp.Year(),
lastTimestamp.Month(),
lastTimestamp.Format("20060102_150405"))

fmt.Printf("Uploading to S3: %s/%s\n", s3Bucket, s3Key)

file, err := os.Open(parquetFile)
if err != nil {
return fmt.Errorf("opening parquet file: %w", err)
}
defer file.Close()

_, err = s3Client.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: &s3Bucket,
Key: &s3Key,
Body: file,
})
if err != nil {
return fmt.Errorf("uploading to S3: %w", err)
}

fmt.Println("Successfully uploaded to S3")

// Delete archived records from each table
for _, tableName := range tables {
deleted, err := deleteArchivedRecords(db, tableName, cutoff)
if err != nil {
return fmt.Errorf("deleting archived records: %w", err)
}
fmt.Printf("Deleted %d records from table %s\n", deleted, tableName)
}

fmt.Printf("Successfully archived %d total records to s3://%s/%s\n", len(allRecords), s3Bucket, s3Key)

return nil
}

func main() {
defer func() {
if r := recover(); r != nil {
fmt.Fprintf(os.Stderr, "Panic recovered: %v\nStack trace:\n%s\n", r, debug.Stack())
os.Exit(1)
}
}()

if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
}

func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}

0 comments on commit 02292ce

Please sign in to comment.