Skip to content
This repository was archived by the owner on Aug 30, 2025. It is now read-only.

Commit c6ed418

Browse files
Fixes Postgres jsonb in CLI sync (#2954)
1 parent 7c27eed commit c6ed418

File tree

3 files changed

+38
-6
lines changed

3 files changed

+38
-6
lines changed

cli/internal/cmds/neosync/sync/sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ func newCliSyncFromCmd(
238238
return nil, err
239239
}
240240
cmd.SilenceUsage = true
241-
logger = logger.With("accountId", cmdCfg.AccountId)
241+
logger = logger.With("accountId", *cmdCfg.AccountId)
242242

243243
logger.Info("Starting sync")
244244

internal/postgres/utils.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,9 @@ func IsPgArrayColumnDataType(colDataType string) bool {
135135
}
136136

137137
func pgArrayToGoSlice(array *PgxArray[any]) any {
138+
if array.Elements == nil {
139+
return nil
140+
}
138141
goSlice := convertArrayToGoType(array)
139142

140143
dim := array.Dimensions()

worker/pkg/benthos/sql/json_processor.go

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,12 @@ func (p *jsonToSqlProcessor) transform(path string, root any) any {
8484
return v
8585
}
8686
if isPgArray(datatype) {
87-
var pgarray []any
88-
err := json.Unmarshal(v, &pgarray)
87+
pgarray, err := processPgArray(v, datatype)
8988
if err != nil {
90-
p.logger.Errorf("unable to unmarshal pg array: %w", err)
89+
p.logger.Errorf("unable to process PG Array: %w", err)
9190
return v
9291
}
93-
return pq.Array(pgarray)
92+
return pgarray
9493
}
9594
switch datatype {
9695
case "bit":
@@ -100,7 +99,7 @@ func (p *jsonToSqlProcessor) transform(path string, root any) any {
10099
return v
101100
}
102101
return bit
103-
case "json":
102+
case "json", "jsonb":
104103
validJson, err := getValidJson(v)
105104
if err != nil {
106105
p.logger.Errorf("unable to get valid json: %w", err)
@@ -117,6 +116,24 @@ func (p *jsonToSqlProcessor) transform(path string, root any) any {
117116
}
118117
}
119118

119+
func processPgArray(bits []byte, datatype string) (any, error) {
120+
var pgarray []any
121+
err := json.Unmarshal(bits, &pgarray)
122+
if err != nil {
123+
return nil, err
124+
}
125+
switch datatype {
126+
case "json[]", "jsonb[]":
127+
jsonArray, err := stringifyJsonArray(pgarray)
128+
if err != nil {
129+
return nil, err
130+
}
131+
return pq.Array(jsonArray), nil
132+
default:
133+
return pq.Array(pgarray), nil
134+
}
135+
}
136+
120137
func isPgArray(datatype string) bool {
121138
return strings.HasSuffix(datatype, "[]")
122139
}
@@ -135,6 +152,18 @@ func getValidJson(jsonData []byte) ([]byte, error) {
135152
return quotedData, nil
136153
}
137154

155+
func stringifyJsonArray(pgarray []any) ([]string, error) {
156+
jsonArray := make([]string, len(pgarray))
157+
for i, item := range pgarray {
158+
bytes, err := json.Marshal(item)
159+
if err != nil {
160+
return nil, err
161+
}
162+
jsonArray[i] = string(bytes)
163+
}
164+
return jsonArray, nil
165+
}
166+
138167
func convertStringToBit(bitString string) ([]byte, error) {
139168
val, err := strconv.ParseUint(bitString, 2, len(bitString))
140169
if err != nil {

0 commit comments

Comments
 (0)