From 905c072b882e50809cd930a23362a0999ae2e98b Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj <65964360+Amogh-Bharadwaj@users.noreply.github.com> Date: Fri, 10 Jan 2025 01:59:39 +0530 Subject: [PATCH] Edit some logs (#2410) Just some log nits for monitoring syncs --- flow/activities/flowable.go | 3 ++- flow/connectors/clickhouse/normalize.go | 9 +++++++-- flow/connectors/external_metadata/store.go | 2 +- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 7febb04d9..fd042db0b 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -450,7 +450,8 @@ func (a *FlowableActivity) startNormalize( return fmt.Errorf("failed to get table name schema mapping: %w", err) } - logger.Info("Normalizing batch", slog.Int64("SyncBatchID", batchID)) + logger.Info("Normalizing batch", + slog.Int64("SyncBatchID", batchID)) res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{ FlowJobName: config.FlowJobName, Env: config.Env, diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 3483c4be8..da417d096 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -274,7 +274,9 @@ func (c *ClickHouseConnector) NormalizeRecords( } parallelNormalize = min(max(parallelNormalize, 1), len(destinationTableNames)) c.logger.Info("[clickhouse] normalizing batch", - slog.Int64("StartBatchID", normBatchID), slog.Int64("EndBatchID", req.SyncBatchID), slog.Int("connections", parallelNormalize)) + slog.Int64("StartBatchID", normBatchID), + slog.Int64("EndBatchID", req.SyncBatchID), + slog.Int("connections", parallelNormalize)) queries := make(chan string) rawTbl := c.getRawTableName(req.FlowJobName) @@ -295,7 +297,10 @@ func (c *ClickHouseConnector) NormalizeRecords( } for query := range queries { - c.logger.Info("normalizing batch", slog.String("query", query)) + c.logger.Info("executing normalize query", + slog.Int64("syncBatchId", req.SyncBatchID), + slog.Int64("normalizeBatchId", normBatchID), + slog.String("query", query)) if err := chConn.Exec(errCtx, query); err != nil { return fmt.Errorf("error while inserting into normalized table: %w", err) } diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index d6ddcec4a..eabe2509f 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -172,7 +172,7 @@ func (p *PostgresMetadata) FinishBatch(ctx context.Context, jobName string, sync } func (p *PostgresMetadata) UpdateNormalizeBatchID(ctx context.Context, jobName string, batchID int64) error { - p.logger.Info("updating normalize batch id for job", slog.Int64("batchID", batchID)) + p.logger.Info("updating normalize batch id for job", slog.Int64("normalizeBatchID", batchID)) if _, err := p.pool.Exec(ctx, `UPDATE `+lastSyncStateTableName+` SET normalize_batch_id=$2 WHERE job_name=$1`, jobName, batchID, ); err != nil {