Skip to content

Commit da55ada

Browse files
authored
Merge pull request #80 from planetlabs/num-row-groups
Configurable row group length for writing
2 parents b1b08b5 + 79298c5 commit da55ada

File tree

8 files changed

+341
-31
lines changed

8 files changed

+341
-31
lines changed

cmd/gpq/command/convert.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type ConvertCmd struct {
3535
Max int `help:"Maximum number of features to consider when building a schema." default:"100"`
3636
InputPrimaryColumn string `help:"Primary geometry column name when reading Parquet withtout metadata." default:"geometry"`
3737
Compression string `help:"Parquet compression to use. Possible values: ${enum}." enum:"uncompressed, snappy, gzip, brotli, zstd" default:"zstd"`
38+
RowGroupLength int `help:"Maximum number of rows per group when writing Parquet."`
3839
}
3940

4041
type FormatType string
@@ -149,7 +150,12 @@ func (c *ConvertCmd) Run() error {
149150
if outputFormat != ParquetType && outputFormat != GeoParquetType {
150151
return errors.New("GeoJSON input can only be converted to GeoParquet")
151152
}
152-
convertOptions := &geojson.ConvertOptions{MinFeatures: c.Min, MaxFeatures: c.Max, Compression: c.Compression}
153+
convertOptions := &geojson.ConvertOptions{
154+
MinFeatures: c.Min,
155+
MaxFeatures: c.Max,
156+
Compression: c.Compression,
157+
RowGroupLength: c.RowGroupLength,
158+
}
153159
return geojson.ToParquet(input, output, convertOptions)
154160
}
155161

@@ -160,6 +166,7 @@ func (c *ConvertCmd) Run() error {
160166
convertOptions := &geoparquet.ConvertOptions{
161167
InputPrimaryColumn: c.InputPrimaryColumn,
162168
Compression: c.Compression,
169+
RowGroupLength: c.RowGroupLength,
163170
}
164171

165172
return geoparquet.FromParquet(input, output, convertOptions)

internal/geojson/geojson.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,11 @@ func FromParquet(reader parquet.ReaderAtSeeker, writer io.Writer) error {
5858
}
5959

6060
type ConvertOptions struct {
61-
MinFeatures int
62-
MaxFeatures int
63-
Compression string
64-
Metadata string
61+
MinFeatures int
62+
MaxFeatures int
63+
Compression string
64+
RowGroupLength int
65+
Metadata string
6566
}
6667

6768
var defaultOptions = &ConvertOptions{
@@ -80,12 +81,19 @@ func ToParquet(input io.Reader, output io.Writer, convertOptions *ConvertOptions
8081
featuresRead := 0
8182

8283
var pqWriterProps *parquet.WriterProperties
84+
var writerOptions []parquet.WriterProperty
8385
if convertOptions.Compression != "" {
8486
compression, err := pqutil.GetCompression(convertOptions.Compression)
8587
if err != nil {
8688
return err
8789
}
88-
pqWriterProps = parquet.NewWriterProperties(parquet.WithCompression(compression))
90+
writerOptions = append(writerOptions, parquet.WithCompression(compression))
91+
}
92+
if convertOptions.RowGroupLength > 0 {
93+
writerOptions = append(writerOptions, parquet.WithMaxRowGroupLength(int64(convertOptions.RowGroupLength)))
94+
}
95+
if len(writerOptions) > 0 {
96+
pqWriterProps = parquet.NewWriterProperties(writerOptions...)
8997
}
9098

9199
var featureWriter *geoparquet.FeatureWriter

internal/geojson/geojson_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,42 @@ func TestToParquet(t *testing.T) {
105105
assert.JSONEq(t, string(expected), geojsonBuffer.String())
106106
}
107107

108+
func TestToParquetRowGroupLength3(t *testing.T) {
109+
geojsonFile, openErr := os.Open("testdata/ten-points.geojson")
110+
require.NoError(t, openErr)
111+
112+
parquetBuffer := &bytes.Buffer{}
113+
toParquetErr := geojson.ToParquet(geojsonFile, parquetBuffer, &geojson.ConvertOptions{
114+
RowGroupLength: 3,
115+
})
116+
assert.NoError(t, toParquetErr)
117+
118+
parquetInput := bytes.NewReader(parquetBuffer.Bytes())
119+
fileReader, fileErr := file.NewParquetReader(parquetInput)
120+
require.NoError(t, fileErr)
121+
defer fileReader.Close()
122+
123+
assert.Equal(t, 4, fileReader.NumRowGroups())
124+
}
125+
126+
func TestToParquetRowGroupLength5(t *testing.T) {
127+
geojsonFile, openErr := os.Open("testdata/ten-points.geojson")
128+
require.NoError(t, openErr)
129+
130+
parquetBuffer := &bytes.Buffer{}
131+
toParquetErr := geojson.ToParquet(geojsonFile, parquetBuffer, &geojson.ConvertOptions{
132+
RowGroupLength: 5,
133+
})
134+
assert.NoError(t, toParquetErr)
135+
136+
parquetInput := bytes.NewReader(parquetBuffer.Bytes())
137+
fileReader, fileErr := file.NewParquetReader(parquetInput)
138+
require.NoError(t, fileErr)
139+
defer fileReader.Close()
140+
141+
assert.Equal(t, 2, fileReader.NumRowGroups())
142+
}
143+
108144
func TestToParquetMismatchedTypes(t *testing.T) {
109145
geojsonFile, openErr := os.Open("testdata/mismatched-types.geojson")
110146
require.NoError(t, openErr)
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
{
2+
"type": "FeatureCollection",
3+
"features": [
4+
{
5+
"type": "Feature",
6+
"properties": {
7+
"num": 0
8+
},
9+
"geometry": {
10+
"type": "Point",
11+
"coordinates": [0, 0]
12+
}
13+
},
14+
{
15+
"type": "Feature",
16+
"properties": {
17+
"num": 1
18+
},
19+
"geometry": {
20+
"type": "Point",
21+
"coordinates": [1, 1]
22+
}
23+
},
24+
{
25+
"type": "Feature",
26+
"properties": {
27+
"num": 2
28+
},
29+
"geometry": {
30+
"type": "Point",
31+
"coordinates": [2, 2]
32+
}
33+
},
34+
{
35+
"type": "Feature",
36+
"properties": {
37+
"num": 3
38+
},
39+
"geometry": {
40+
"type": "Point",
41+
"coordinates": [3, 3]
42+
}
43+
},
44+
{
45+
"type": "Feature",
46+
"properties": {
47+
"num": 4
48+
},
49+
"geometry": {
50+
"type": "Point",
51+
"coordinates": [4, 4]
52+
}
53+
},
54+
{
55+
"type": "Feature",
56+
"properties": {
57+
"num": 5
58+
},
59+
"geometry": {
60+
"type": "Point",
61+
"coordinates": [5, 5]
62+
}
63+
},
64+
{
65+
"type": "Feature",
66+
"properties": {
67+
"num": 6
68+
},
69+
"geometry": {
70+
"type": "Point",
71+
"coordinates": [6, 6]
72+
}
73+
},
74+
{
75+
"type": "Feature",
76+
"properties": {
77+
"num": 7
78+
},
79+
"geometry": {
80+
"type": "Point",
81+
"coordinates": [7, 7]
82+
}
83+
},
84+
{
85+
"type": "Feature",
86+
"properties": {
87+
"num": 8
88+
},
89+
"geometry": {
90+
"type": "Point",
91+
"coordinates": [8, 8]
92+
}
93+
},
94+
{
95+
"type": "Feature",
96+
"properties": {
97+
"num": 9
98+
},
99+
"geometry": {
100+
"type": "Point",
101+
"coordinates": [9, 9]
102+
}
103+
}
104+
]
105+
}

internal/geoparquet/geoparquet.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
type ConvertOptions struct {
2222
InputPrimaryColumn string
2323
Compression string
24+
RowGroupLength int
2425
}
2526

2627
func getMetadata(fileReader *file.Reader, convertOptions *ConvertOptions) *Metadata {
@@ -171,6 +172,7 @@ func FromParquet(input parquet.ReaderAtSeeker, output io.Writer, convertOptions
171172
TransformColumn: transformColumn,
172173
BeforeClose: beforeClose,
173174
Compression: compression,
175+
RowGroupLength: convertOptions.RowGroupLength,
174176
}
175177

176178
return pqutil.TransformByColumn(config)

internal/pqutil/transform.go

Lines changed: 77 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type TransformConfig struct {
2323
Reader parquet.ReaderAtSeeker
2424
Writer io.Writer
2525
Compression *compress.Compression
26+
RowGroupLength int
2627
TransformSchema SchemaTransformer
2728
TransformColumn ColumnTransformer
2829
BeforeClose func(*file.Reader, *file.Writer) error
@@ -50,6 +51,10 @@ func getWriterProperties(config *TransformConfig, fileReader *file.Reader) (*par
5051
}
5152
}
5253

54+
if config.RowGroupLength > 0 {
55+
writerProperties = append(writerProperties, parquet.WithMaxRowGroupLength(int64(config.RowGroupLength)))
56+
}
57+
5358
return parquet.NewWriterProperties(writerProperties...), nil
5459
}
5560

@@ -104,34 +109,85 @@ func TransformByColumn(config *TransformConfig) error {
104109

105110
ctx := pqarrow.NewArrowWriteContext(context.Background(), nil)
106111

107-
numRowGroups := fileReader.NumRowGroups()
108-
for rowGroupIndex := 0; rowGroupIndex < numRowGroups; rowGroupIndex += 1 {
109-
rowGroupReader := arrowReader.RowGroup(rowGroupIndex)
110-
rowGroupWriter := fileWriter.AppendRowGroup()
112+
if config.RowGroupLength > 0 {
113+
columnReaders := make([]*pqarrow.ColumnReader, numFields)
111114
for fieldNum := 0; fieldNum < numFields; fieldNum += 1 {
112-
arr, readErr := rowGroupReader.Column(fieldNum).Read(ctx)
113-
if readErr != nil {
114-
return readErr
115+
colReader, err := arrowReader.GetColumn(ctx, fieldNum)
116+
if err != nil {
117+
return err
115118
}
116-
if config.TransformColumn != nil {
117-
inputField := inputManifest.Fields[fieldNum].Field
118-
outputField := outputManifest.Fields[fieldNum].Field
119-
transformed, err := config.TransformColumn(inputField, outputField, arr)
120-
if err != nil {
121-
return err
119+
columnReaders[fieldNum] = colReader
120+
}
121+
122+
numRows := fileReader.NumRows()
123+
numRowsWritten := int64(0)
124+
for {
125+
rowGroupWriter := fileWriter.AppendRowGroup()
126+
for fieldNum := 0; fieldNum < numFields; fieldNum += 1 {
127+
colReader := columnReaders[fieldNum]
128+
arr, readErr := colReader.NextBatch(int64(config.RowGroupLength))
129+
if readErr != nil {
130+
return readErr
122131
}
123-
if transformed.DataType() != outputField.Type {
124-
return fmt.Errorf("transform generated an unexpected type, got %s, expected %s", transformed.DataType().Name(), outputField.Type.Name())
132+
if config.TransformColumn != nil {
133+
inputField := inputManifest.Fields[fieldNum].Field
134+
outputField := outputManifest.Fields[fieldNum].Field
135+
transformed, err := config.TransformColumn(inputField, outputField, arr)
136+
if err != nil {
137+
return err
138+
}
139+
if transformed.DataType() != outputField.Type {
140+
return fmt.Errorf("transform generated an unexpected type, got %s, expected %s", transformed.DataType().Name(), outputField.Type.Name())
141+
}
142+
arr = transformed
143+
}
144+
colWriter, colWriterErr := pqarrow.NewArrowColumnWriter(arr, 0, int64(arr.Len()), outputManifest, rowGroupWriter, fieldNum)
145+
if colWriterErr != nil {
146+
return colWriterErr
147+
}
148+
if err := colWriter.Write(ctx); err != nil {
149+
return err
125150
}
126-
arr = transformed
127-
}
128-
colWriter, colWriterErr := pqarrow.NewArrowColumnWriter(arr, 0, int64(arr.Len()), outputManifest, rowGroupWriter, fieldNum)
129-
if colWriterErr != nil {
130-
return colWriterErr
131151
}
132-
if err := colWriter.Write(ctx); err != nil {
152+
numRowsInGroup, err := rowGroupWriter.NumRows()
153+
if err != nil {
133154
return err
134155
}
156+
numRowsWritten += int64(numRowsInGroup)
157+
if numRowsWritten >= numRows {
158+
break
159+
}
160+
}
161+
} else {
162+
numRowGroups := fileReader.NumRowGroups()
163+
for rowGroupIndex := 0; rowGroupIndex < numRowGroups; rowGroupIndex += 1 {
164+
rowGroupReader := arrowReader.RowGroup(rowGroupIndex)
165+
rowGroupWriter := fileWriter.AppendRowGroup()
166+
for fieldNum := 0; fieldNum < numFields; fieldNum += 1 {
167+
arr, readErr := rowGroupReader.Column(fieldNum).Read(ctx)
168+
if readErr != nil {
169+
return readErr
170+
}
171+
if config.TransformColumn != nil {
172+
inputField := inputManifest.Fields[fieldNum].Field
173+
outputField := outputManifest.Fields[fieldNum].Field
174+
transformed, err := config.TransformColumn(inputField, outputField, arr)
175+
if err != nil {
176+
return err
177+
}
178+
if transformed.DataType() != outputField.Type {
179+
return fmt.Errorf("transform generated an unexpected type, got %s, expected %s", transformed.DataType().Name(), outputField.Type.Name())
180+
}
181+
arr = transformed
182+
}
183+
colWriter, colWriterErr := pqarrow.NewArrowColumnWriter(arr, 0, int64(arr.Len()), outputManifest, rowGroupWriter, fieldNum)
184+
if colWriterErr != nil {
185+
return colWriterErr
186+
}
187+
if err := colWriter.Write(ctx); err != nil {
188+
return err
189+
}
190+
}
135191
}
136192
}
137193

0 commit comments

Comments
 (0)