Skip to content

Commit

Permalink
edit some logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jan 6, 2025
1 parent bb88bd1 commit bcca451
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 4 deletions.
3 changes: 2 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,8 @@ func (a *FlowableActivity) startNormalize(
return fmt.Errorf("failed to get table name schema mapping: %w", err)
}

logger.Info("Normalizing batch", slog.Int64("SyncBatchID", batchID))
logger.Info("Normalizing batch",
slog.Int64("SyncBatchID", batchID))
res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{
FlowJobName: config.FlowJobName,
Env: config.Env,
Expand Down
9 changes: 7 additions & 2 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,9 @@ func (c *ClickHouseConnector) NormalizeRecords(
}
parallelNormalize = min(max(parallelNormalize, 1), len(destinationTableNames))
c.logger.Info("[clickhouse] normalizing batch",
slog.Int64("StartBatchID", normBatchID), slog.Int64("EndBatchID", req.SyncBatchID), slog.Int("connections", parallelNormalize))
slog.Int64("StartBatchID", normBatchID),
slog.Int64("EndBatchID", req.SyncBatchID),
slog.Int("connections", parallelNormalize))

queries := make(chan string)
rawTbl := c.getRawTableName(req.FlowJobName)
Expand All @@ -295,7 +297,10 @@ func (c *ClickHouseConnector) NormalizeRecords(
}

for query := range queries {
c.logger.Info("normalizing batch", slog.String("query", query))
c.logger.Info("executing normalize query",
slog.Int64("syncBatchId", req.SyncBatchID),
slog.Int64("normalizeBatchId", normBatchID),
slog.String("query", query))
if err := chConn.Exec(errCtx, query); err != nil {
return fmt.Errorf("error while inserting into normalized table: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (p *PostgresMetadata) FinishBatch(ctx context.Context, jobName string, sync
}

func (p *PostgresMetadata) UpdateNormalizeBatchID(ctx context.Context, jobName string, batchID int64) error {
p.logger.Info("updating normalize batch id for job", slog.Int64("batchID", batchID))
p.logger.Info("updating normalize batch id for job", slog.Int64("normalizeBatchID", batchID))
if _, err := p.pool.Exec(ctx,
`UPDATE `+lastSyncStateTableName+` SET normalize_batch_id=$2 WHERE job_name=$1`, jobName, batchID,
); err != nil {
Expand Down

0 comments on commit bcca451

Please sign in to comment.