diff --git a/catalog/catalog.go b/catalog/catalog.go index 8d36677d..f8c8622c 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -85,7 +85,7 @@ func load(rootDmap *sync.Map, d *Directory, subPath, rootPath string) error { } // Mark this as a pending Fileinfo reference d.datafile[leafPath] = new(io.TimeBucketInfo) - d.datafile[leafPath].IsRead = false + d.datafile[leafPath].IsInitialized = io.TbiNotInitialized d.datafile[leafPath].Path = leafPath yearFileBase := filepath.Base(leafPath) yearString := yearFileBase[:len(yearFileBase)-4] diff --git a/utils/io/metadata.go b/utils/io/metadata.go index cb62c0dc..b800fabf 100644 --- a/utils/io/metadata.go +++ b/utils/io/metadata.go @@ -4,6 +4,7 @@ import ( "bytes" "os" "sync" + "sync/atomic" "unsafe" "fmt" @@ -18,6 +19,13 @@ import ( const Headersize = 37024 const FileinfoVersion = int64(2.0) +type IsInitialized = uint32 + +var ( + TbiInitialized IsInitialized = 1 + TbiNotInitialized IsInitialized = 0 +) + func nanosecondsInYear(year int) int64 { start := time.Date(year, time.January, 1, 0, 0, 0, 0, time.Local) end := time.Date(year+1, time.January, 1, 0, 0, 0, 0, time.Local) @@ -30,12 +38,12 @@ func FileSize(tf time.Duration, year int, recordSize int) int64 { } type TimeBucketInfo struct { - // Year, Path and IsRead are all set on catalog startup + // Year, Path and IsInitialized are all set on catalog startup Year int16 // Path is the absolute path to the data binary file. // (e.g. "/project/marketstore/data/TEST/1Sec/Tick/2021.bin") - Path string - IsRead bool + Path string + IsInitialized IsInitialized version int64 description string @@ -52,7 +60,7 @@ type TimeBucketInfo struct { elementNames []string elementTypes []EnumElementType - once sync.Once + mu sync.Mutex } func AlignedSize(unalignedSize int) (alignedSize int) { @@ -67,19 +75,22 @@ func AlignedSize(unalignedSize int) (alignedSize int) { func NewTimeBucketInfo(tf utils.Timeframe, path, description string, year int16, dsv []DataShape, recordType EnumRecordType) (f *TimeBucketInfo) { elementTypes, elementNames := CreateShapesForTimeBucketInfo(dsv) f = &TimeBucketInfo{ - version: FileinfoVersion, - Path: filepath.Join(path, strconv.Itoa(int(year))+".bin"), - IsRead: true, - timeframe: tf.Duration, - description: description, - Year: year, - nElements: int32(len(elementTypes)), + Year: year, + Path: filepath.Join(path, strconv.Itoa(int(year))+".bin"), + IsInitialized: 1, + version: FileinfoVersion, + description: description, + timeframe: tf.Duration, + + nElements: int32(len(elementTypes)), + recordType: recordType, + elementTypes: elementTypes, elementNames: elementNames, - recordType: recordType, } if f.recordType == FIXED { f.recordLength = int32(AlignedSize(f.getFieldRecordLength())) + 8 // add an 8-byte epoch field + f.variableRecordLength = 0 } else if f.recordType == VARIABLE { f.recordLength = 24 // Length of the indirect data pointer {index, offset, len} f.variableRecordLength = 0 @@ -87,6 +98,39 @@ func NewTimeBucketInfo(tf utils.Timeframe, path, description string, year int16, return f } +func NewTimeBucketInfoFromFile(path string) (*TimeBucketInfo, error) { + header, err := readHeader(path) + if err != nil { + return nil, fmt.Errorf("read header of TimeBucketInfo path=%s:%w", path, err) + } + + return NewTimeBucketInfoFromHeader(header, path), nil +} + +// NewTimeBucketInfoFromHeader creates a TimeBucketInfo from a given Header +func NewTimeBucketInfoFromHeader(hp *Header, path string) *TimeBucketInfo { + f := &TimeBucketInfo{ + Year: int16(hp.Year), + Path: filepath.Clean(path), + IsInitialized: TbiInitialized, + version: hp.Version, + description: string(bytes.Trim(hp.Description[:], "\x00")), + timeframe: time.Duration(hp.Timeframe), + nElements: int32(hp.NElements), + recordType: EnumRecordType(hp.RecordType), + recordLength: int32(hp.RecordLength), + variableRecordLength: 0, + elementNames: nil, + elementTypes: nil, + } + for i := 0; i < int(f.nElements); i++ { + baseName := string(bytes.Trim(hp.ElementNames[i][:], "\x00")) + f.elementNames = append(f.elementNames, baseName) + f.elementTypes = append(f.elementTypes, EnumElementType(hp.ElementTypes[i])) + } + return f +} + func CreateShapesForTimeBucketInfo(dsv []DataShape) (elementTypes []EnumElementType, elementNames []string) { /* Takes a datashape array and returns elementTypes and elementNames @@ -125,11 +169,14 @@ func (f *TimeBucketInfo) getFieldRecordLength() (fieldRecordLength int) { // GetDeepCopy returns a copy of this TimeBucketInfo. func (f *TimeBucketInfo) GetDeepCopy() *TimeBucketInfo { - f.once.Do(f.initFromFile) + err := f.initFromFile() + if err != nil { + + } fcopy := TimeBucketInfo{ Year: f.Year, Path: f.Path, - IsRead: f.IsRead, + IsInitialized: f.IsInitialized, version: f.version, description: f.description, timeframe: f.timeframe, @@ -145,62 +192,88 @@ func (f *TimeBucketInfo) GetDeepCopy() *TimeBucketInfo { return &fcopy } -func (f *TimeBucketInfo) initFromFile() { - if f.IsRead { +// initFromFile retrieves all TimeBucketInfo parameters from f.filepath +// if the tbi is not initialized. +// Context: When catalog.Directory is loaded, new TimeBucketInfo struct is lazily constructed +// with only f.filepath and f.isInitialized=io.TbiNotInitialized parameters +// so that it doesn't need to actually open files to get the params. +// This function is called when the TimeBucketInfo is actually used. +func (f *TimeBucketInfo) initFromFile() error { + if atomic.LoadUint32(&f.IsInitialized) == TbiInitialized { // do nothing if we found it already done - return + return nil } - if err := f.readHeader(f.Path); err != nil { - log.Fatal(err.Error()) + + f.mu.Lock() + defer f.mu.Unlock() + + // if not initialized yet, read the information from the file + tbi, err := NewTimeBucketInfoFromFile(f.Path) + if err != nil { + return fmt.Errorf("failed to read TimeBucketInfo from file %s:%w", f.Path, err) } - f.IsRead = true + f.Year = tbi.Year + f.Path = tbi.Path + f.version = tbi.version + f.description = tbi.description + f.timeframe = tbi.timeframe + f.nElements = tbi.nElements + f.recordType = tbi.recordType + f.recordLength = tbi.recordLength + f.variableRecordLength = tbi.variableRecordLength + f.elementNames = make([]string, len(tbi.elementNames)) + f.elementTypes = make([]EnumElementType, len(tbi.elementTypes)) + copy(f.elementNames, tbi.elementNames) + copy(f.elementTypes, tbi.elementTypes) + + atomic.StoreUint32(&f.IsInitialized, TbiInitialized) + return nil } // GetVersion returns the version number for the given TimeBucketInfo. func (f *TimeBucketInfo) GetVersion() int64 { - f.once.Do(f.initFromFile) + f.initFromFile() return f.version } // GetDescription returns the description string contained in the // given TimeBucketInfo. func (f *TimeBucketInfo) GetDescription() string { - f.once.Do(f.initFromFile) + f.initFromFile() return f.description } // GetTimeframe returns the duration for which each record's data is valid. // This means for 1Min resolution data, GetTimeframe will return time.Minute. func (f *TimeBucketInfo) GetTimeframe() time.Duration { - f.once.Do(f.initFromFile) + f.initFromFile() return f.timeframe } // GetIntervals returns the number of records that can fit in a 24 hour day. func (f *TimeBucketInfo) GetIntervals() int64 { - f.once.Do(f.initFromFile) - return int64(utils.Day.Nanoseconds()) / int64(f.timeframe.Nanoseconds()) + f.initFromFile() + return utils.Day.Nanoseconds() / f.timeframe.Nanoseconds() } // GetNelements returns the number of elements (data fields) for a given // TimeBucketInfo. func (f *TimeBucketInfo) GetNelements() int32 { - f.once.Do(f.initFromFile) + f.initFromFile() return f.nElements } // GetRecordLength returns the length of a single record in the file described // by the given TimeBucketInfo func (f *TimeBucketInfo) GetRecordLength() int32 { - f.once.Do(f.initFromFile) + f.initFromFile() return f.recordLength } // GetVariableRecordLength returns the length of a single record for a variable // length TimeBucketInfo file func (f *TimeBucketInfo) GetVariableRecordLength() int32 { - f.once.Do(f.initFromFile) - + f.initFromFile() if f.recordType == VARIABLE && f.variableRecordLength == 0 { // Variable records use the raw element sizes plus a 4-byte trailer for interval ticks f.variableRecordLength = int32(f.getFieldRecordLength()) + 4 // Variable records have a 4-byte trailer @@ -211,21 +284,21 @@ func (f *TimeBucketInfo) GetVariableRecordLength() int32 { // GetRecordType returns the type of the file described by the TimeBucketInfo // as an EnumRecordType func (f *TimeBucketInfo) GetRecordType() EnumRecordType { - f.once.Do(f.initFromFile) + f.initFromFile() return f.recordType } // GetElementNames returns the field names contained by the file described by // the given TimeBucketInfo func (f *TimeBucketInfo) GetElementNames() []string { - f.once.Do(f.initFromFile) + f.initFromFile() return f.elementNames } // GetElementTypes returns the field types contained by the file described by // the given TimeBucketInfo func (f *TimeBucketInfo) GetElementTypes() []EnumElementType { - f.once.Do(f.initFromFile) + f.initFromFile() return f.elementTypes } @@ -241,20 +314,20 @@ func (f *TimeBucketInfo) SetElementTypes(newTypes []EnumElementType) error { return nil } -func (f *TimeBucketInfo) readHeader(path string) (err error) { +func readHeader(path string) (header *Header, err error) { file, err := os.Open(path) if err != nil { log.Error("Failed to open file: %v - Error: %v", path, err) - return err + return nil, err } defer file.Close() var buffer [Headersize]byte - header := (*Header)(unsafe.Pointer(&buffer)) + header = (*Header)(unsafe.Pointer(&buffer)) // Read the top part of the header, which is not dependent on the number of elements n, err := file.Read(buffer[:312]) if err != nil || n != 312 { log.Error("Failed to read header part1 from file: %v - Error: %v", path, err) - return err + return nil, fmt.Errorf("failed to read header part1 from file: %s - Error: %w", path, err) } // Second part of read element names @@ -262,17 +335,20 @@ func (f *TimeBucketInfo) readHeader(path string) (err error) { n, err = file.Read(buffer[312 : 312+secondReadSize]) if err != nil || n != int(secondReadSize) { log.Error("Failed to read header part2 from file: %v - Error: %v", path, err) - return err + return nil, fmt.Errorf("failed to read header part2 from file: %s - Error: %w", path, err) } // Read past empty element name space - file.Seek(1024*32-secondReadSize, os.SEEK_CUR) + _, err = file.Seek(1024*32-secondReadSize, os.SEEK_CUR) + if err != nil { + return nil, fmt.Errorf("failed to seek file %v to read past empty element name space: %w", file, err) + } // Read element types start := 312 + 1024*32 n, err = file.Read(buffer[start : start+int(header.NElements)]) if err != nil || n != int(header.NElements) { log.Error("Failed to read header part3 from file: %v - Error: %v", path, err) - return err + return nil, fmt.Errorf("failed to read header part3 from file: %s - Error: %w", path, err) } if EnumRecordType(header.RecordType) == VARIABLE { // Read to end of header @@ -280,37 +356,10 @@ func (f *TimeBucketInfo) readHeader(path string) (err error) { n, err = file.Read(buffer[start:Headersize]) if err != nil || n != (Headersize-start) { log.Error("Failed to read header part4 from file: %v - Error: %v", path, err) - return err + return nil, fmt.Errorf("failed to read header part4 from file: %s - Error: %w", path, err) } } - f.load(header, path) - return nil -} - -func (f *TimeBucketInfo) load(hp *Header, path string) { - f.version = hp.Version - f.description = string(bytes.Trim(hp.Description[:], "\x00")) - f.Year = int16(hp.Year) - f.Path = filepath.Clean(path) - f.IsRead = true - f.timeframe = time.Duration(hp.Timeframe) - f.nElements = int32(hp.NElements) - f.recordLength = int32(hp.RecordLength) - f.recordType = EnumRecordType(hp.RecordType) - f.elementNames = nil - f.elementTypes = nil - for i := 0; i < int(f.nElements); i++ { - baseName := string(bytes.Trim(hp.ElementNames[i][:], "\x00")) - f.elementNames = append(f.elementNames, baseName) - f.elementTypes = append(f.elementTypes, EnumElementType(hp.ElementTypes[i])) - } -} - -// NewTimeBucketInfoFromHeader creates a TimeBucketInfo from a given Header -func NewTimeBucketInfoFromHeader(hp *Header, path string) *TimeBucketInfo { - tbi := new(TimeBucketInfo) - tbi.load(hp, path) - return tbi + return header, nil } // Header is the on-disk byte representation of the file header @@ -354,7 +403,8 @@ func (hp *Header) Load(f *TimeBucketInfo) { hp.RecordLength = int64(f.GetRecordLength()) hp.RecordType = int64(f.GetRecordType()) for i := 0; i < int(hp.NElements); i++ { - copy(hp.ElementNames[i][:], f.GetElementNames()[i]) + copy(hp.ElementNames[i][:], + f.GetElementNames()[i]) hp.ElementTypes[i] = byte(f.GetElementTypes()[i]) } hp.RecordType = int64(f.GetRecordType())