Skip to content

Commit ec8a677

Browse files
authored
mysql: map 00 day/month to 01 (#3146)
requires go-mysql-org/go-mysql#1047 to fix CDC
1 parent 4d6dc24 commit ec8a677

File tree

5 files changed

+14
-18
lines changed

5 files changed

+14
-18
lines changed

flow/connectors/mysql/qvalue_convert.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ func QValueFromMysqlFieldValue(qkind types.QValueKind, mytype byte, fv mysql.Fie
316316
if strings.HasPrefix(unsafeString, "0000-00-00") {
317317
return types.QValueTimestamp{Val: time.Unix(0, 0)}, nil
318318
}
319-
val, err := time.Parse("2006-01-02 15:04:05.999999", unsafeString)
319+
val, err := time.Parse("2006-01-02 15:04:05.999999", strings.ReplaceAll(unsafeString, "-00", "-01"))
320320
if err != nil {
321321
return nil, err
322322
}
@@ -334,7 +334,7 @@ func QValueFromMysqlFieldValue(qkind types.QValueKind, mytype byte, fv mysql.Fie
334334
if unsafeString == "0000-00-00" {
335335
return types.QValueDate{Val: time.Unix(0, 0)}, nil
336336
}
337-
val, err := time.Parse(time.DateOnly, unsafeString)
337+
val, err := time.Parse(time.DateOnly, strings.ReplaceAll(unsafeString, "-00", "-01"))
338338
if err != nil {
339339
return nil, err
340340
}
@@ -469,13 +469,13 @@ func QValueFromMysqlRowEvent(
469469
return types.QValueTime{Val: tm}, nil
470470
case types.QValueKindDate:
471471
if val == "0000-00-00" {
472-
return types.QValueDate{Val: time.Unix(0, 0)}, nil
472+
return types.QValueDate{Val: time.Unix(0, 0).UTC()}, nil
473473
}
474-
val, err := time.Parse(time.DateOnly, val)
474+
val, err := time.Parse(time.DateOnly, strings.ReplaceAll(val, "-00", "-01"))
475475
if err != nil {
476476
return nil, err
477477
}
478-
return types.QValueDate{Val: val}, nil
478+
return types.QValueDate{Val: val.UTC()}, nil
479479
case types.QValueKindTimestamp: // 0000-00-00 ends up here
480480
if mytype == mysql.MYSQL_TYPE_TIME || mytype == mysql.MYSQL_TYPE_TIME2 {
481481
tm, err := processTime(val)
@@ -485,13 +485,13 @@ func QValueFromMysqlRowEvent(
485485
return types.QValueTimestamp{Val: time.Unix(0, 0).UTC().Add(tm)}, nil
486486
}
487487
if strings.HasPrefix(val, "0000-00-00") {
488-
return types.QValueTimestamp{Val: time.Unix(0, 0)}, nil
488+
return types.QValueTimestamp{Val: time.Unix(0, 0).UTC()}, nil
489489
}
490-
tm, err := time.Parse("2006-01-02 15:04:05.999999", val)
490+
tm, err := time.Parse("2006-01-02 15:04:05.999999", strings.ReplaceAll(val, "-00", "-01"))
491491
if err != nil {
492492
return nil, err
493493
}
494-
return types.QValueTimestamp{Val: tm}, nil
494+
return types.QValueTimestamp{Val: tm.UTC()}, nil
495495
}
496496
}
497497
return nil, fmt.Errorf("unexpected type %T for mysql type %d, qkind %s", val, mytype, qkind)

flow/connectors/postgres/qrep.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -689,8 +689,7 @@ func (c *PostgresConnector) IsQRepPartitionSynced(ctx context.Context,
689689

690690
// prepare and execute the query
691691
var result bool
692-
err := c.conn.QueryRow(ctx, queryString, req.PartitionId).Scan(&result)
693-
if err != nil {
692+
if err := c.conn.QueryRow(ctx, queryString, req.PartitionId).Scan(&result); err != nil {
694693
return false, fmt.Errorf("failed to execute query: %w", err)
695694
}
696695

flow/e2e/clickhouse/mysql_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func (s ClickHouseSuite) Test_MySQL_Time() {
8888
require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`INSERT INTO %s ("key",d,dt,tm,t) VALUES
8989
('init','1935-01-01','1953-02-02 12:01:02','1973-02-02 13:01:02.123','14:21.654321'),
9090
('init','0000-00-00','0000-00-00 00:00:00','0000-00-00 00:00:00.000','00:00'),
91-
('init','0000-00-00','0000-00-00 00:00:00','0000-00-00 00:00:00.000','-800:0:1')`,
91+
('init','2000-01-00','2000-00-01 00:00:00','2000-01-01 00:00:00.000','-800:0:1')`,
9292
quotedSrcFullName)))
9393

9494
connectionGen := e2e.FlowConnectionGenerationConfig{
@@ -108,7 +108,7 @@ func (s ClickHouseSuite) Test_MySQL_Time() {
108108
require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`INSERT INTO %s ("key",d,dt,tm,t) VALUES
109109
('cdc','1935-01-01','1953-02-02 12:01:02','1973-02-02 13:01:02.123','14:21.654321'),
110110
('cdc','0000-00-00','0000-00-00 00:00:00','0000-00-00 00:00:00.000','00:00'),
111-
('cdc','0000-00-00','0000-00-00 00:00:00','0000-00-00 00:00:00.000','-800:0:1')`,
111+
('cdc','2000-01-00','2000-00-01 00:00:00','2000-01-01 00:00:00.000','-800:0:1')`,
112112
quotedSrcFullName)))
113113

114114
e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on cdc", srcTableName, dstTableName, "id,\"key\",d,dt,tm,t")

flow/go.mod

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ require (
3030
github.com/aws/smithy-go v1.22.4
3131
github.com/cockroachdb/pebble v1.1.5
3232
github.com/elastic/go-elasticsearch/v8 v8.18.1
33-
github.com/go-mysql-org/go-mysql v1.12.0
33+
github.com/go-mysql-org/go-mysql v1.12.1-0.20250706035254-4a082cf9bd9a
3434
github.com/google/uuid v1.6.0
3535
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.0
3636
github.com/hamba/avro/v2 v2.29.0
@@ -89,7 +89,6 @@ require (
8989
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.29.0 // indirect
9090
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.53.0 // indirect
9191
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0 // indirect
92-
github.com/Masterminds/semver v1.5.0 // indirect
9392
github.com/VividCortex/ewma v1.2.0 // indirect
9493
github.com/apache/arrow-go/v18 v18.3.1 // indirect
9594
github.com/apache/arrow/go/v15 v15.0.2 // indirect

flow/go.sum

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,6 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0
8282
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0.53.0/go.mod h1:jUZ5LYlw40WMd07qxcQJD5M40aUxrfwqQX1g7zxYnrQ=
8383
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0 h1:Ron4zCA/yk6U7WOBXhTJcDpsUBG9npumK6xw2auFltQ=
8484
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0/go.mod h1:cSgYe11MCNYunTnRXrKiR/tHc0eoKjICUuWpNZoVCOo=
85-
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
86-
github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y=
8785
github.com/PeerDB-io/glua64 v1.0.1 h1:biXLlFF/L5pnJCwDon7hkWkuQPozC8NjKS3J7Wzi69I=
8886
github.com/PeerDB-io/glua64 v1.0.1/go.mod h1:UHmAhniv61bJPMhQvxkpC7jXbn353dSbQviu83bgQVg=
8987
github.com/PeerDB-io/gluabit32 v1.0.2 h1:AGI1Z7dwDVotakpuOOuyTX4/QGi5HUYsipL/VfodmO4=
@@ -278,8 +276,8 @@ github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
278276
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
279277
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
280278
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
281-
github.com/go-mysql-org/go-mysql v1.12.0 h1:tyToNggfCfl11OY7GbWa2Fq3ofyScO9GY8b5f5wAmE4=
282-
github.com/go-mysql-org/go-mysql v1.12.0/go.mod h1:/XVjs1GlT6NPSf13UgXLv/V5zMNricTCqeNaehSBghs=
279+
github.com/go-mysql-org/go-mysql v1.12.1-0.20250706035254-4a082cf9bd9a h1:xmHPT1ElX3AzFn8uSWlsJArgcjPjTt/KFFsaJe28qBY=
280+
github.com/go-mysql-org/go-mysql v1.12.1-0.20250706035254-4a082cf9bd9a/go.mod h1:FQxw17uRbFvMZFK+dPtIPufbU46nBdrGaxOw0ac9MFs=
283281
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
284282
github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE=
285283
github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78=

0 commit comments

Comments
 (0)