Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 102 additions & 2 deletions parquet/pqarrow/encode_arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pqarrow_test
import (
"bytes"
"context"
"encoding/base64"
"encoding/binary"
"fmt"
"math"
Expand Down Expand Up @@ -1532,9 +1533,9 @@ func makeListArray(values arrow.Array, size, nullcount int) arrow.Array {
nullBitmap := make([]byte, int(bitutil.BytesForBits(int64(size))))

curOffset := 0
for i := 0; i < size; i++ {
for i := range size {
offsetsArr[i] = int32(curOffset)
if !(((i % 2) == 0) && ((i / 2) < nullcount)) {
if i%2 != 0 || i/2 >= nullcount {
// non-null list (list with index 1 is always empty)
bitutil.SetBit(nullBitmap, i)
if i != 1 {
Expand Down Expand Up @@ -2108,6 +2109,105 @@ func (ps *ParquetIOTestSuite) TestStructWithListOfNestedStructs() {
ps.roundTripTable(mem, expected, false)
}

// TestListOfStructWithEmptyListStoreSchema tests that ARROW:schema metadata stored
// in a Parquet file uses "element" (not "item") as the list element field name, to
// match the actual Parquet column paths. This is required for compatibility with
// readers like Snowflake that resolve columns by matching ARROW:schema field names
// to Parquet column path segments. See https://github.com/apache/arrow-go/issues/744.
func TestListOfStructWithEmptyListStoreSchema(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)

opsStruct := arrow.StructOf(
arrow.Field{Name: "id", Type: arrow.BinaryTypes.String, Nullable: false},
arrow.Field{Name: "token", Type: arrow.BinaryTypes.String, Nullable: true},
arrow.Field{Name: "amount", Type: arrow.BinaryTypes.String, Nullable: true},
)
// arrow.ListOf uses "item" as the element field name, which would mismatch
// the Parquet column path that uses "element". The fix ensures the stored
// ARROW:schema uses "element" to stay consistent with the Parquet columns.
schema := arrow.NewSchema([]arrow.Field{
{Name: "block_num", Type: arrow.PrimitiveTypes.Uint64, Nullable: false},
{Name: "tx_id", Type: arrow.BinaryTypes.String, Nullable: false},
{Name: "ops", Type: arrow.ListOf(opsStruct), Nullable: true},
}, nil)

b := array.NewRecordBuilder(mem, schema)
defer b.Release()

b.Field(0).(*array.Uint64Builder).AppendValues([]uint64{100, 101, 102}, nil)
b.Field(1).(*array.StringBuilder).AppendValues([]string{"tx-a", "tx-b", "tx-c"}, nil)

lb := b.Field(2).(*array.ListBuilder)
sb := lb.ValueBuilder().(*array.StructBuilder)
idb := sb.FieldBuilder(0).(*array.StringBuilder)
tokb := sb.FieldBuilder(1).(*array.StringBuilder)
amtb := sb.FieldBuilder(2).(*array.StringBuilder)

lb.Append(true)
sb.Append(true)
idb.Append("op-1")
tokb.Append("USDC")
amtb.Append("10")
sb.Append(true)
idb.Append("op-2")
tokb.Append("ETH")
amtb.Append("1.5")
lb.Append(true) // empty list
lb.Append(true)
sb.Append(true)
idb.Append("op-3")
tokb.AppendNull()
amtb.Append("42")

rec := b.NewRecordBatch()
defer rec.Release()

var buf bytes.Buffer
props := parquet.NewWriterProperties(parquet.WithDictionaryDefault(true), parquet.WithStats(true))
arrowProps := pqarrow.NewArrowWriterProperties(pqarrow.WithStoreSchema())

pw, err := pqarrow.NewFileWriter(schema, &buf, props, arrowProps)
require.NoError(t, err)
require.NoError(t, pw.Write(rec))
require.NoError(t, pw.Close())

// Verify round-trip data is correct.
pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
require.NoError(t, err)
defer pf.Close()

fr, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, mem)
require.NoError(t, err)

tbl, err := fr.ReadTable(context.Background())
require.NoError(t, err)
defer tbl.Release()

require.EqualValues(t, 3, tbl.NumRows())

// Verify the stored ARROW:schema uses "element" as the list element field name
// (consistent with the Parquet column path "ops.list.element.*"), not "item"
// (the default Arrow field name from arrow.ListOf()).
arrowSchemaEncoded := pf.MetaData().KeyValueMetadata().FindValue("ARROW:schema")
require.NotNil(t, arrowSchemaEncoded, "ARROW:schema metadata key must be present")
decoded, err := base64.StdEncoding.DecodeString(*arrowSchemaEncoded)
require.NoError(t, err)
// DeserializeSchema wraps bytes in an IPC stream; use ipc.NewReader to decode.
ipcRdr, err := ipc.NewReader(bytes.NewReader(decoded), ipc.WithAllocator(mem))
require.NoError(t, err)
defer ipcRdr.Release()
storedSchema := ipcRdr.Schema()

opsField, ok := storedSchema.FieldsByName("ops")
require.True(t, ok)
opsListType, ok := opsField[0].Type.(*arrow.ListType)
require.True(t, ok)
// Must be "element" (matching Parquet column path) not "item" (Arrow default).
assert.Equal(t, "element", opsListType.ElemField().Name,
"ARROW:schema element name must match the Parquet column path segment")
}

func TestParquetArrowIO(t *testing.T) {
suite.Run(t, new(ParquetIOTestSuite))
}
Expand Down
51 changes: 50 additions & 1 deletion parquet/pqarrow/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,48 @@ import (
"golang.org/x/xerrors"
)

// normalizeFieldForParquet recursively normalizes an Arrow field so that its
// type matches the Parquet column structure that fieldToNode would produce.
// Specifically, list element field names are set to "element" because
// ListOfWithName (used by fieldToNode) always names the Parquet element group
// "element", regardless of the original Arrow element field name.
func normalizeFieldForParquet(f arrow.Field) arrow.Field {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to recurse down into run-end-encoded arrays and listviews and such too? (Should we use NestedType so we don't have to hardcode all the nested types every time we want to do structural recursion like this?)

switch dt := f.Type.(type) {
case *arrow.ListType:
elem := normalizeFieldForParquet(dt.ElemField())
elem.Name = "element"
return arrow.Field{Name: f.Name, Type: arrow.ListOfField(elem), Nullable: f.Nullable, Metadata: f.Metadata}
case *arrow.FixedSizeListType:
elem := normalizeFieldForParquet(dt.ElemField())
elem.Name = "element"
return arrow.Field{Name: f.Name, Type: arrow.FixedSizeListOfField(dt.Len(), elem), Nullable: f.Nullable, Metadata: f.Metadata}
case *arrow.StructType:
fields := make([]arrow.Field, dt.NumFields())
for i := 0; i < dt.NumFields(); i++ {
fields[i] = normalizeFieldForParquet(dt.Field(i))
}
return arrow.Field{Name: f.Name, Type: arrow.StructOf(fields...), Nullable: f.Nullable, Metadata: f.Metadata}
case *arrow.MapType:
key := normalizeFieldForParquet(dt.KeyField())
item := normalizeFieldForParquet(dt.ItemField())
return arrow.Field{Name: f.Name, Type: arrow.MapOfFields(key, item), Nullable: f.Nullable, Metadata: f.Metadata}
}
return f
}

// normalizeSchemaForParquet returns a copy of the Arrow schema with list element
// field names updated to "element" to match the Parquet column paths produced by
// fieldToNode. This is used when storing the ARROW:schema metadata to ensure
// consistency between the stored schema and the actual Parquet column structure.
func normalizeSchemaForParquet(sc *arrow.Schema) *arrow.Schema {
fields := make([]arrow.Field, sc.NumFields())
for i, f := range sc.Fields() {
fields[i] = normalizeFieldForParquet(f)
}
meta := sc.Metadata()
return arrow.NewSchema(fields, &meta)
}

// WriteTable is a convenience function to create and write a full array.Table to a parquet file. The schema
// and columns will be determined by the schema of the table, writing the file out to the provided writer.
// The chunksize will be utilized in order to determine the size of the row groups.
Expand Down Expand Up @@ -80,7 +122,14 @@ func NewFileWriter(arrschema *arrow.Schema, w io.Writer, props *parquet.WriterPr
}

if arrprops.storeSchema {
serializedSchema := flight.SerializeSchema(arrschema, props.Allocator())
// Normalize the Arrow schema so that list element field names match the
// Parquet column group names. fieldToNode always uses "element" as the
// Parquet group name for list element fields (via ListOfWithName), but
// arrow.ListOf() uses "item" as the Arrow element field name. This
// inconsistency causes readers (e.g. Snowflake) that map ARROW:schema field
// names to Parquet column paths to fail to locate the correct columns.
schemaToStore := normalizeSchemaForParquet(arrschema)
serializedSchema := flight.SerializeSchema(schemaToStore, props.Allocator())
meta.Append("ARROW:schema", base64.StdEncoding.EncodeToString(serializedSchema))
}

Expand Down
2 changes: 1 addition & 1 deletion parquet/pqarrow/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ func listToSchemaField(n *schema.GroupNode, currentLevels file.LevelInfo, ctx *s
// If the name is array or ends in _tuple, this should be a list of struct
// even for single child elements.
listGroup := listNode.(*schema.GroupNode)
if listGroup.NumFields() == 1 && !(listGroup.Name() == "array" || listGroup.Name() == (n.Name()+"_tuple")) {
if listGroup.NumFields() == 1 && (listGroup.Name() != "array" && listGroup.Name() != n.Name()+"_tuple") {
// list of primitive type
if err := nodeToSchemaField(listGroup.Field(0), currentLevels, ctx, out, &out.Children[0]); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion parquet/schema/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func NewPrimitiveNodeLogical(name string, repetition parquet.Repetition, logical
n.convertedType, n.decimalMetaData = n.logicalType.ToConvertedType()
}

if !(n.logicalType != nil && !n.logicalType.IsNested() && n.logicalType.IsCompatible(n.convertedType, n.decimalMetaData)) {
if n.logicalType == nil || n.logicalType.IsNested() || !n.logicalType.IsCompatible(n.convertedType, n.decimalMetaData) {
return nil, fmt.Errorf("invalid logical type %s", n.logicalType)
}

Expand Down