@@ -124,8 +124,57 @@ func (t *FSTree) extractHeaderAndStream(id oid.ID, f *os.File) (*objectSDK.Objec
124124
125125// readHeaderAndPayload reads an object header from the file and returns reader for payload.
126126// This function takes ownership of the io.ReadCloser and will close it if it does not return it.
127- func (t * FSTree ) readHeaderAndPayload (f io.ReadCloser , initial []byte ) (* objectSDK.Object , io.ReadSeekCloser , error ) {
127+ func (t * FSTree ) readHeaderAndPayload (f io.ReadSeekCloser , initial []byte ) (* objectSDK.Object , io.ReadSeekCloser , error ) {
128128 var err error
129+ var hLen , pLen uint32
130+ if len (initial ) >= streamDataOff {
131+ hLen , pLen = parseStreamPrefix (initial )
132+ } else {
133+ var p []byte
134+ copy (p [:], initial )
135+ _ , err := io .ReadFull (f , p [len (initial ):])
136+ if err != nil && ! errors .Is (err , io .EOF ) && ! errors .Is (err , io .ErrUnexpectedEOF ) {
137+ return nil , f , fmt .Errorf ("read stream prefix: %w" , err )
138+ }
139+ hLen , pLen = parseStreamPrefix (p )
140+ if hLen == 0 {
141+ initial = p [:]
142+ }
143+ }
144+ if hLen > 0 {
145+ initial = initial [streamDataOff :]
146+ var header []byte
147+ if len (initial ) < int (hLen ) {
148+ header = make ([]byte , hLen )
149+ copy (header , initial )
150+ _ , err = io .ReadFull (f , header [len (initial ):])
151+ if err != nil {
152+ return nil , nil , fmt .Errorf ("read stream header: %w" , err )
153+ }
154+ initial = header
155+ }
156+ header = initial [:hLen ]
157+ var obj objectSDK.Object
158+ err = obj .Unmarshal (header )
159+ if err != nil {
160+ return nil , nil , fmt .Errorf ("unmarshal object: %w" , err )
161+ }
162+
163+ data := initial [hLen :]
164+ reader := io .LimitReader (io .MultiReader (bytes .NewReader (data ), f ), int64 (pLen ))
165+ if t .IsCompressed (data ) {
166+ decoder , err := zstd .NewReader (reader )
167+ if err != nil {
168+ return nil , nil , fmt .Errorf ("zstd decoder: %w" , err )
169+ }
170+ reader = decoder .IOReadCloser ()
171+ }
172+ return & obj , & payloadReader {
173+ Reader : reader ,
174+ close : f .Close ,
175+ }, nil
176+ }
177+
129178 if len (initial ) < objectSDK .MaxHeaderLen {
130179 _ = f .Close ()
131180 initial , err = t .Decompress (initial )
@@ -168,80 +217,98 @@ func (t *FSTree) readUntilPayload(f io.ReadCloser, initial []byte) (*objectSDK.O
168217 initial = buf [:n ]
169218 }
170219
171- obj , rest , err := extractHeaderAndPayload (initial )
220+ var (
221+ obj object.Object
222+ res objectSDK.Object
223+ )
224+
225+ _ , offset , err := extractHeaderAndPayload (initial , func (num int , val []byte ) error {
226+ switch num {
227+ case fieldObjectID :
228+ obj .ObjectId = new (refs.ObjectID )
229+ err := proto .Unmarshal (val , obj .ObjectId )
230+ if err != nil {
231+ return fmt .Errorf ("unmarshal object ID: %w" , err )
232+ }
233+ case fieldObjectSignature :
234+ obj .Signature = new (refs.Signature )
235+ err := proto .Unmarshal (val , obj .Signature )
236+ if err != nil {
237+ return fmt .Errorf ("unmarshal object signature: %w" , err )
238+ }
239+ case fieldObjectHeader :
240+ obj .Header = new (object.Header )
241+ err := proto .Unmarshal (val , obj .Header )
242+ if err != nil {
243+ return fmt .Errorf ("unmarshal object header: %w" , err )
244+ }
245+ default :
246+ return fmt .Errorf ("unknown field number: %d" , num )
247+ }
248+ return nil
249+ })
172250 if err != nil {
173251 _ = reader .Close ()
174252 return nil , nil , fmt .Errorf ("extract header and payload: %w" , err )
175253 }
176254
177- return obj , & payloadReader {
178- Reader : io .MultiReader (bytes .NewReader (rest ), reader ),
255+ err = res .FromProtoMessage (& obj )
256+ if err != nil {
257+ _ = reader .Close ()
258+ return nil , nil , fmt .Errorf ("convert to objectSDK.Object: %w" , err )
259+ }
260+
261+ return & res , & payloadReader {
262+ Reader : io .MultiReader (bytes .NewReader (initial [offset :]), reader ),
179263 close : reader .Close ,
180264 }, nil
181265}
182266
183- // extractHeaderAndPayload extracts the header of an object from the given byte slice and returns rest of the data.
184- func extractHeaderAndPayload (data []byte ) (* objectSDK.Object , []byte , error ) {
185- var (
186- offset int
187- res objectSDK.Object
188- obj object.Object
189- )
267+ // extractHeaderAndPayload processes the initial data to extract the header and payload
268+ // fields of an object. It calls the provided dataHandler for each field found in the data.
269+ // It returns the start offset of the header, the end offset of the payload, and an error if any.
270+ func extractHeaderAndPayload (data []byte , dataHandler func (int , []byte ) error ) (int , int , error ) {
271+ var offset , headerEnd int
190272
191273 if len (data ) == 0 {
192- return nil , nil , fmt .Errorf ("empty data" )
274+ return 0 , 0 , fmt .Errorf ("empty data" )
193275 }
194276
195277 for offset < len (data ) {
196278 num , typ , n := protowire .ConsumeTag (data [offset :])
197279 if err := protowire .ParseError (n ); err != nil {
198- return nil , nil , fmt .Errorf ("invalid tag at offset %d: %w" , offset , err )
280+ return 0 , 0 , fmt .Errorf ("invalid tag at offset %d: %w" , offset , err )
199281 }
200282 offset += n
201283
202284 if typ != protowire .BytesType {
203- return nil , nil , fmt .Errorf ("unexpected wire type: %v" , typ )
285+ return 0 , 0 , fmt .Errorf ("unexpected wire type: %v" , typ )
204286 }
205287
206288 if num == fieldObjectPayload {
289+ headerEnd = offset - n
207290 _ , n = binary .Varint (data [offset :])
208291 if err := protowire .ParseError (n ); err != nil {
209- return nil , nil , fmt .Errorf ("invalid varint at offset %d: %w" , offset , err )
292+ return 0 , 0 , fmt .Errorf ("invalid varint at offset %d: %w" , offset , err )
210293 }
211294 offset += n
212295 break
213296 }
214297 val , n := protowire .ConsumeBytes (data [offset :])
215298 if err := protowire .ParseError (n ); err != nil {
216- return nil , nil , fmt .Errorf ("invalid bytes field at offset %d: %w" , offset , err )
299+ return 0 , 0 , fmt .Errorf ("invalid bytes field at offset %d: %w" , offset , err )
217300 }
218301 offset += n
219302
220- switch num {
221- case fieldObjectID :
222- obj .ObjectId = new (refs.ObjectID )
223- err := proto .Unmarshal (val , obj .ObjectId )
224- if err != nil {
225- return nil , nil , fmt .Errorf ("unmarshal object ID: %w" , err )
226- }
227- case fieldObjectSignature :
228- obj .Signature = new (refs.Signature )
229- err := proto .Unmarshal (val , obj .Signature )
303+ if dataHandler != nil {
304+ err := dataHandler (int (num ), val )
230305 if err != nil {
231- return nil , nil , fmt .Errorf ("unmarshal object signature : %w" , err )
306+ return 0 , 0 , fmt .Errorf ("data handler error at offset %d : %w" , offset , err )
232307 }
233- case fieldObjectHeader :
234- obj .Header = new (object.Header )
235- err := proto .Unmarshal (val , obj .Header )
236- if err != nil {
237- return nil , nil , fmt .Errorf ("unmarshal object header: %w" , err )
238- }
239- default :
240- return nil , nil , fmt .Errorf ("unknown field number: %d" , num )
241308 }
242309 }
243310
244- return & res , data [ offset :], res . FromProtoMessage ( & obj )
311+ return headerEnd , offset , nil
245312}
246313
247314type payloadReader struct {
0 commit comments