@@ -41,6 +41,7 @@ type Reader struct {
4141
4242 refCount atomic.Int64
4343 rec arrow.RecordBatch
44+ meta * arrow.Metadata
4445 err error
4546
4647 // types dictTypeMap
@@ -50,6 +51,7 @@ type Reader struct {
5051 swapEndianness bool
5152 ensureNativeEndian bool
5253 expectedSchema * arrow.Schema
54+ readCustomMetadata bool
5355
5456 mem memory.Allocator
5557}
@@ -76,6 +78,7 @@ func NewReaderFromMessageReader(r MessageReader, opts ...Option) (reader *Reader
7678 mem : cfg .alloc ,
7779 ensureNativeEndian : cfg .ensureNativeEndian ,
7880 expectedSchema : cfg .schema ,
81+ readCustomMetadata : cfg .readCustomMetadata ,
7982 }
8083 rr .refCount .Add (1 )
8184
@@ -170,6 +173,9 @@ func (r *Reader) Next() bool {
170173 r .rec .Release ()
171174 r .rec = nil
172175 }
176+ if r .meta != nil {
177+ r .meta = nil
178+ }
173179
174180 if r .err != nil || r .done {
175181 return false
@@ -251,7 +257,14 @@ func (r *Reader) next() bool {
251257 r .err = fmt .Errorf ("arrow/ipc: invalid message type (got=%v, want=%v" , got , want )
252258 return false
253259 }
254-
260+ if r .readCustomMetadata {
261+ rootMsg := flatbuf .GetRootAsMessage (msg .meta .Bytes (), 0 )
262+ meta , err := metadataFromFB (rootMsg )
263+ if err != nil {
264+ panic (err )
265+ }
266+ r .meta = & meta
267+ }
255268 r .rec = newRecordBatch (r .schema , & r .memo , msg .meta , msg .body , r .swapEndianness , r .mem )
256269 return true
257270}
@@ -263,6 +276,12 @@ func (r *Reader) RecordBatch() arrow.RecordBatch {
263276 return r .rec
264277}
265278
279+ // RecordBatchCustomMetadata returns the current record batch custom metadata from the
280+ // underlying stream.
281+ func (r * Reader ) RecordBatchCustomMetadata () (* arrow.Metadata , error ) {
282+ return r .meta , nil
283+ }
284+
266285// Record returns the current record that has been extracted from the
267286// underlying stream.
268287// It is valid until the next call to Next.
@@ -279,7 +298,9 @@ func (r *Reader) Read() (arrow.RecordBatch, error) {
279298 r .rec .Release ()
280299 r .rec = nil
281300 }
282-
301+ if r .meta != nil {
302+ r .meta = nil
303+ }
283304 if ! r .next () {
284305 if r .done && r .err == nil {
285306 return nil , io .EOF
0 commit comments