diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index af18ba5661..9e2dd2051a 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -450,7 +450,8 @@ func (a *FlowableActivity) startNormalize( return fmt.Errorf("failed to get table name schema mapping: %w", err) } - logger.Info("Normalizing batch", slog.Int64("SyncBatchID", batchID)) + logger.Info("Normalizing batch", + slog.Int64("SyncBatchID", batchID)) res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{ FlowJobName: config.FlowJobName, Env: config.Env, diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 26e0cf9162..7d09ed3a74 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -274,7 +274,9 @@ func (c *ClickHouseConnector) NormalizeRecords( } parallelNormalize = min(max(parallelNormalize, 1), len(destinationTableNames)) c.logger.Info("[clickhouse] normalizing batch", - slog.Int64("StartBatchID", normBatchID), slog.Int64("EndBatchID", req.SyncBatchID), slog.Int("connections", parallelNormalize)) + slog.Int64("StartBatchID", normBatchID), + slog.Int64("EndBatchID", req.SyncBatchID), + slog.Int("connections", parallelNormalize)) queries := make(chan string) rawTbl := c.getRawTableName(req.FlowJobName) @@ -295,7 +297,10 @@ func (c *ClickHouseConnector) NormalizeRecords( } for query := range queries { - c.logger.Info("normalizing batch", slog.String("query", query)) + c.logger.Info("executing normalize query", + slog.Int64("syncBatchId", req.SyncBatchID), + slog.Int64("normalizeBatchId", normBatchID), + slog.String("query", query)) if err := chConn.Exec(errCtx, query); err != nil { return fmt.Errorf("error while inserting into normalized table: %w", err) } diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index a87f1af481..96e52d8fcd 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -172,7 +172,7 @@ func (p *PostgresMetadata) FinishBatch(ctx context.Context, jobName string, sync } func (p *PostgresMetadata) UpdateNormalizeBatchID(ctx context.Context, jobName string, batchID int64) error { - p.logger.Info("updating normalize batch id for job", slog.Int64("batchID", batchID)) + p.logger.Info("updating normalize batch id for job", slog.Int64("normalizeBatchID", batchID)) if _, err := p.pool.Exec(ctx, `UPDATE `+lastSyncStateTableName+` SET normalize_batch_id=$2 WHERE job_name=$1`, jobName, batchID, ); err != nil {