From 672e49a18405d809d0615179f2231c24b85d2fac Mon Sep 17 00:00:00 2001 From: Daito AKIMURA Date: Sat, 11 Sep 2021 17:00:54 +0900 Subject: [PATCH 1/2] refactor: avoid using sync.Once --- utils/io/metadata.go | 173 +++++++++++++++++++++++++++---------------- 1 file changed, 110 insertions(+), 63 deletions(-) diff --git a/utils/io/metadata.go b/utils/io/metadata.go index e6c36a7d..1784970b 100644 --- a/utils/io/metadata.go +++ b/utils/io/metadata.go @@ -52,7 +52,7 @@ type TimeBucketInfo struct { elementNames []string elementTypes []EnumElementType - once sync.Once + mu sync.Mutex } func AlignedSize(unalignedSize int) (alignedSize int) { @@ -67,19 +67,22 @@ func AlignedSize(unalignedSize int) (alignedSize int) { func NewTimeBucketInfo(tf utils.Timeframe, path, description string, year int16, dsv []DataShape, recordType EnumRecordType) (f *TimeBucketInfo) { elementTypes, elementNames := CreateShapesForTimeBucketInfo(dsv) f = &TimeBucketInfo{ - version: FileinfoVersion, - Path: filepath.Join(path, strconv.Itoa(int(year))+".bin"), - IsRead: true, - timeframe: tf.Duration, - description: description, - Year: year, - nElements: int32(len(elementTypes)), + Year: year, + Path: filepath.Join(path, strconv.Itoa(int(year))+".bin"), + IsRead: true, + version: FileinfoVersion, + description: description, + timeframe: tf.Duration, + + nElements: int32(len(elementTypes)), + recordType: recordType, + elementTypes: elementTypes, elementNames: elementNames, - recordType: recordType, } if f.recordType == FIXED { f.recordLength = int32(AlignedSize(f.getFieldRecordLength())) + 8 // add an 8-byte epoch field + f.variableRecordLength = 0 } else if f.recordType == VARIABLE { f.recordLength = 24 // Length of the indirect data pointer {index, offset, len} f.variableRecordLength = 0 @@ -87,6 +90,39 @@ func NewTimeBucketInfo(tf utils.Timeframe, path, description string, year int16, return f } +func NewTimeBucketInfoFromFile(path string) (*TimeBucketInfo, error) { + header, err := readHeader(path) + if err != nil { + return nil, fmt.Errorf("read header of TimeBucketInfo path=%s:%w", path, err) + } + + return NewTimeBucketInfoFromHeader(header, path), nil +} + +// NewTimeBucketInfoFromHeader creates a TimeBucketInfo from a given Header +func NewTimeBucketInfoFromHeader(hp *Header, path string) *TimeBucketInfo { + f := &TimeBucketInfo{ + Year: int16(hp.Year), + Path: filepath.Clean(path), + IsRead: true, + version: hp.Version, + description: string(bytes.Trim(hp.Description[:], "\x00")), + timeframe: time.Duration(hp.Timeframe), + nElements: int32(hp.NElements), + recordType: EnumRecordType(hp.RecordType), + recordLength: int32(hp.RecordLength), + variableRecordLength: 0, + elementNames: nil, + elementTypes: nil, + } + for i := 0; i < int(f.nElements); i++ { + baseName := string(bytes.Trim(hp.ElementNames[i][:], "\x00")) + f.elementNames = append(f.elementNames, baseName) + f.elementTypes = append(f.elementTypes, EnumElementType(hp.ElementTypes[i])) + } + return f +} + func CreateShapesForTimeBucketInfo(dsv []DataShape) (elementTypes []EnumElementType, elementNames []string) { /* Takes a datashape array and returns elementTypes and elementNames @@ -112,9 +148,7 @@ func (f *TimeBucketInfo) GetDataShapesWithEpoch() (out []DataShape) { ep := DataShape{Name: "Epoch", Type: INT64} dsv := f.GetDataShapes() out = append(out, ep) - for _, shape := range dsv { - out = append(out, shape) - } + out = append(out, dsv...) return out } @@ -127,7 +161,7 @@ func (f *TimeBucketInfo) getFieldRecordLength() (fieldRecordLength int) { // GetDeepCopy returns a copy of this TimeBucketInfo. func (f *TimeBucketInfo) GetDeepCopy() *TimeBucketInfo { - f.once.Do(f.initFromFile) + f.initFromFilePath() fcopy := TimeBucketInfo{ Year: f.Year, Path: f.Path, @@ -147,62 +181,98 @@ func (f *TimeBucketInfo) GetDeepCopy() *TimeBucketInfo { return &fcopy } +// initFromFilePath retrieves all TimeBucketInfo parameters from f.filepath. +// When catalog.Directory is loaded, new TimeBucketInfo struct is lazily constructed +// with only f.filepath and f.isRead=False parameters so that it doesn't need to actually open files to get the params. +// This function is called when the TimeBucketInfo is actually used to get the parameters. +func (f *TimeBucketInfo) initFromFilePath() error { + if f.IsRead { + // do nothing if we found it already done + return nil + } + + f.mu.Lock() + defer f.mu.Unlock() + + tbi, err := NewTimeBucketInfoFromFile(f.Path) + if err != nil { + log.Fatal(err.Error()) + } + f.Year = tbi.Year + f.Path = tbi.Path + f.IsRead = true + f.version = tbi.version + f.description = tbi.description + f.timeframe = tbi.timeframe + f.nElements = tbi.nElements + f.recordType = tbi.recordType + f.recordLength = tbi.recordLength + f.variableRecordLength = tbi.variableRecordLength + f.elementNames = make([]string, len(tbi.elementNames)) + f.elementTypes = make([]EnumElementType, len(tbi.elementTypes)) + copy(f.elementNames, tbi.elementNames) + copy(f.elementTypes, tbi.elementTypes) + + return nil +} + func (f *TimeBucketInfo) initFromFile() { if f.IsRead { // do nothing if we found it already done return } - if err := f.readHeader(f.Path); err != nil { + tbi, err := NewTimeBucketInfoFromFile(f.Path) + if err != nil { log.Fatal(err.Error()) } + f = tbi f.IsRead = true } // GetVersion returns the version number for the given TimeBucketInfo. func (f *TimeBucketInfo) GetVersion() int64 { - f.once.Do(f.initFromFile) + f.initFromFilePath() return f.version } // GetDescription returns the description string contained in the // given TimeBucketInfo. func (f *TimeBucketInfo) GetDescription() string { - f.once.Do(f.initFromFile) + f.initFromFilePath() return f.description } // GetTimeframe returns the duration for which each record's data is valid. // This means for 1Min resolution data, GetTimeframe will return time.Minute. func (f *TimeBucketInfo) GetTimeframe() time.Duration { - f.once.Do(f.initFromFile) + f.initFromFilePath() return f.timeframe } // GetIntervals returns the number of records that can fit in a 24 hour day. func (f *TimeBucketInfo) GetIntervals() int64 { - f.once.Do(f.initFromFile) - return int64(utils.Day.Nanoseconds()) / int64(f.timeframe.Nanoseconds()) + f.initFromFilePath() + return utils.Day.Nanoseconds() / f.timeframe.Nanoseconds() } // GetNelements returns the number of elements (data fields) for a given // TimeBucketInfo. func (f *TimeBucketInfo) GetNelements() int32 { - f.once.Do(f.initFromFile) + f.initFromFilePath() return f.nElements } // GetRecordLength returns the length of a single record in the file described // by the given TimeBucketInfo func (f *TimeBucketInfo) GetRecordLength() int32 { - f.once.Do(f.initFromFile) + f.initFromFilePath() return f.recordLength } // GetVariableRecordLength returns the length of a single record for a variable // length TimeBucketInfo file func (f *TimeBucketInfo) GetVariableRecordLength() int32 { - f.once.Do(f.initFromFile) - + f.initFromFilePath() if f.recordType == VARIABLE && f.variableRecordLength == 0 { // Variable records use the raw element sizes plus a 4-byte trailer for interval ticks f.variableRecordLength = int32(f.getFieldRecordLength()) + 4 // Variable records have a 4-byte trailer @@ -213,21 +283,21 @@ func (f *TimeBucketInfo) GetVariableRecordLength() int32 { // GetRecordType returns the type of the file described by the TimeBucketInfo // as an EnumRecordType func (f *TimeBucketInfo) GetRecordType() EnumRecordType { - f.once.Do(f.initFromFile) + f.initFromFilePath() return f.recordType } // GetElementNames returns the field names contained by the file described by // the given TimeBucketInfo func (f *TimeBucketInfo) GetElementNames() []string { - f.once.Do(f.initFromFile) + f.initFromFilePath() return f.elementNames } // GetElementTypes returns the field types contained by the file described by // the given TimeBucketInfo func (f *TimeBucketInfo) GetElementTypes() []EnumElementType { - f.once.Do(f.initFromFile) + f.initFromFilePath() return f.elementTypes } @@ -243,20 +313,20 @@ func (f *TimeBucketInfo) SetElementTypes(newTypes []EnumElementType) error { return nil } -func (f *TimeBucketInfo) readHeader(path string) (err error) { +func readHeader(path string) (header *Header, err error) { file, err := os.Open(path) if err != nil { log.Error("Failed to open file: %v - Error: %v", path, err) - return err + return nil, err } defer file.Close() var buffer [Headersize]byte - header := (*Header)(unsafe.Pointer(&buffer)) + header = (*Header)(unsafe.Pointer(&buffer)) // Read the top part of the header, which is not dependent on the number of elements n, err := file.Read(buffer[:312]) if err != nil || n != 312 { log.Error("Failed to read header part1 from file: %v - Error: %v", path, err) - return err + return nil, fmt.Errorf("failed to read header part1 from file: %s - Error: %w", path, err) } // Second part of read element names @@ -264,17 +334,20 @@ func (f *TimeBucketInfo) readHeader(path string) (err error) { n, err = file.Read(buffer[312 : 312+secondReadSize]) if err != nil || n != int(secondReadSize) { log.Error("Failed to read header part2 from file: %v - Error: %v", path, err) - return err + return nil, fmt.Errorf("failed to read header part2 from file: %s - Error: %w", path, err) } // Read past empty element name space - file.Seek(1024*32-secondReadSize, os.SEEK_CUR) + _, err = file.Seek(1024*32-secondReadSize, os.SEEK_CUR) + if err != nil { + return nil, fmt.Errorf("failed to seek file %v to read past empty element name space: %w", file, err) + } // Read element types start := 312 + 1024*32 n, err = file.Read(buffer[start : start+int(header.NElements)]) if err != nil || n != int(header.NElements) { log.Error("Failed to read header part3 from file: %v - Error: %v", path, err) - return err + return nil, fmt.Errorf("failed to read header part3 from file: %s - Error: %w", path, err) } if EnumRecordType(header.RecordType) == VARIABLE { // Read to end of header @@ -282,37 +355,10 @@ func (f *TimeBucketInfo) readHeader(path string) (err error) { n, err = file.Read(buffer[start:Headersize]) if err != nil || n != (Headersize-start) { log.Error("Failed to read header part4 from file: %v - Error: %v", path, err) - return err + return nil, fmt.Errorf("failed to read header part4 from file: %s - Error: %w", path, err) } } - f.load(header, path) - return nil -} - -func (f *TimeBucketInfo) load(hp *Header, path string) { - f.version = hp.Version - f.description = string(bytes.Trim(hp.Description[:], "\x00")) - f.Year = int16(hp.Year) - f.Path = filepath.Clean(path) - f.IsRead = true - f.timeframe = time.Duration(hp.Timeframe) - f.nElements = int32(hp.NElements) - f.recordLength = int32(hp.RecordLength) - f.recordType = EnumRecordType(hp.RecordType) - f.elementNames = nil - f.elementTypes = nil - for i := 0; i < int(f.nElements); i++ { - baseName := string(bytes.Trim(hp.ElementNames[i][:], "\x00")) - f.elementNames = append(f.elementNames, baseName) - f.elementTypes = append(f.elementTypes, EnumElementType(hp.ElementTypes[i])) - } -} - -// NewTimeBucketInfoFromHeader creates a TimeBucketInfo from a given Header -func NewTimeBucketInfoFromHeader(hp *Header, path string) *TimeBucketInfo { - tbi := new(TimeBucketInfo) - tbi.load(hp, path) - return tbi + return header, nil } // Header is the on-disk byte representation of the file header @@ -356,7 +402,8 @@ func (hp *Header) Load(f *TimeBucketInfo) { hp.RecordLength = int64(f.GetRecordLength()) hp.RecordType = int64(f.GetRecordType()) for i := 0; i < int(hp.NElements); i++ { - copy(hp.ElementNames[i][:], f.GetElementNames()[i]) + copy(hp.ElementNames[i][:], + f.GetElementNames()[i]) hp.ElementTypes[i] = byte(f.GetElementTypes()[i]) } hp.RecordType = int64(f.GetRecordType()) From 3e15ab1bbf66f3b56de57b2755e8376bed97acd8 Mon Sep 17 00:00:00 2001 From: Daito AKIMURA Date: Tue, 14 Sep 2021 10:40:21 +0900 Subject: [PATCH 2/2] chore: use atomic.load/store for IsRead flag --- catalog/catalog.go | 2 +- utils/io/metadata.go | 87 ++++++++++++++++++++++---------------------- 2 files changed, 45 insertions(+), 44 deletions(-) diff --git a/catalog/catalog.go b/catalog/catalog.go index 8d36677d..f8c8622c 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -85,7 +85,7 @@ func load(rootDmap *sync.Map, d *Directory, subPath, rootPath string) error { } // Mark this as a pending Fileinfo reference d.datafile[leafPath] = new(io.TimeBucketInfo) - d.datafile[leafPath].IsRead = false + d.datafile[leafPath].IsInitialized = io.TbiNotInitialized d.datafile[leafPath].Path = leafPath yearFileBase := filepath.Base(leafPath) yearString := yearFileBase[:len(yearFileBase)-4] diff --git a/utils/io/metadata.go b/utils/io/metadata.go index 1784970b..b800fabf 100644 --- a/utils/io/metadata.go +++ b/utils/io/metadata.go @@ -4,6 +4,7 @@ import ( "bytes" "os" "sync" + "sync/atomic" "unsafe" "fmt" @@ -18,6 +19,13 @@ import ( const Headersize = 37024 const FileinfoVersion = int64(2.0) +type IsInitialized = uint32 + +var ( + TbiInitialized IsInitialized = 1 + TbiNotInitialized IsInitialized = 0 +) + func nanosecondsInYear(year int) int64 { start := time.Date(year, time.January, 1, 0, 0, 0, 0, time.Local) end := time.Date(year+1, time.January, 1, 0, 0, 0, 0, time.Local) @@ -30,12 +38,12 @@ func FileSize(tf time.Duration, year int, recordSize int) int64 { } type TimeBucketInfo struct { - // Year, Path and IsRead are all set on catalog startup + // Year, Path and IsInitialized are all set on catalog startup Year int16 // Path is the absolute path to the data binary file. // (e.g. "/project/marketstore/data/TEST/1Sec/Tick/2021.bin") - Path string - IsRead bool + Path string + IsInitialized IsInitialized version int64 description string @@ -67,12 +75,12 @@ func AlignedSize(unalignedSize int) (alignedSize int) { func NewTimeBucketInfo(tf utils.Timeframe, path, description string, year int16, dsv []DataShape, recordType EnumRecordType) (f *TimeBucketInfo) { elementTypes, elementNames := CreateShapesForTimeBucketInfo(dsv) f = &TimeBucketInfo{ - Year: year, - Path: filepath.Join(path, strconv.Itoa(int(year))+".bin"), - IsRead: true, - version: FileinfoVersion, - description: description, - timeframe: tf.Duration, + Year: year, + Path: filepath.Join(path, strconv.Itoa(int(year))+".bin"), + IsInitialized: 1, + version: FileinfoVersion, + description: description, + timeframe: tf.Duration, nElements: int32(len(elementTypes)), recordType: recordType, @@ -104,7 +112,7 @@ func NewTimeBucketInfoFromHeader(hp *Header, path string) *TimeBucketInfo { f := &TimeBucketInfo{ Year: int16(hp.Year), Path: filepath.Clean(path), - IsRead: true, + IsInitialized: TbiInitialized, version: hp.Version, description: string(bytes.Trim(hp.Description[:], "\x00")), timeframe: time.Duration(hp.Timeframe), @@ -161,11 +169,14 @@ func (f *TimeBucketInfo) getFieldRecordLength() (fieldRecordLength int) { // GetDeepCopy returns a copy of this TimeBucketInfo. func (f *TimeBucketInfo) GetDeepCopy() *TimeBucketInfo { - f.initFromFilePath() + err := f.initFromFile() + if err != nil { + + } fcopy := TimeBucketInfo{ Year: f.Year, Path: f.Path, - IsRead: f.IsRead, + IsInitialized: f.IsInitialized, version: f.version, description: f.description, timeframe: f.timeframe, @@ -181,12 +192,14 @@ func (f *TimeBucketInfo) GetDeepCopy() *TimeBucketInfo { return &fcopy } -// initFromFilePath retrieves all TimeBucketInfo parameters from f.filepath. -// When catalog.Directory is loaded, new TimeBucketInfo struct is lazily constructed -// with only f.filepath and f.isRead=False parameters so that it doesn't need to actually open files to get the params. -// This function is called when the TimeBucketInfo is actually used to get the parameters. -func (f *TimeBucketInfo) initFromFilePath() error { - if f.IsRead { +// initFromFile retrieves all TimeBucketInfo parameters from f.filepath +// if the tbi is not initialized. +// Context: When catalog.Directory is loaded, new TimeBucketInfo struct is lazily constructed +// with only f.filepath and f.isInitialized=io.TbiNotInitialized parameters +// so that it doesn't need to actually open files to get the params. +// This function is called when the TimeBucketInfo is actually used. +func (f *TimeBucketInfo) initFromFile() error { + if atomic.LoadUint32(&f.IsInitialized) == TbiInitialized { // do nothing if we found it already done return nil } @@ -194,13 +207,13 @@ func (f *TimeBucketInfo) initFromFilePath() error { f.mu.Lock() defer f.mu.Unlock() + // if not initialized yet, read the information from the file tbi, err := NewTimeBucketInfoFromFile(f.Path) if err != nil { - log.Fatal(err.Error()) + return fmt.Errorf("failed to read TimeBucketInfo from file %s:%w", f.Path, err) } f.Year = tbi.Year f.Path = tbi.Path - f.IsRead = true f.version = tbi.version f.description = tbi.description f.timeframe = tbi.timeframe @@ -213,66 +226,54 @@ func (f *TimeBucketInfo) initFromFilePath() error { copy(f.elementNames, tbi.elementNames) copy(f.elementTypes, tbi.elementTypes) + atomic.StoreUint32(&f.IsInitialized, TbiInitialized) return nil } -func (f *TimeBucketInfo) initFromFile() { - if f.IsRead { - // do nothing if we found it already done - return - } - tbi, err := NewTimeBucketInfoFromFile(f.Path) - if err != nil { - log.Fatal(err.Error()) - } - f = tbi - f.IsRead = true -} - // GetVersion returns the version number for the given TimeBucketInfo. func (f *TimeBucketInfo) GetVersion() int64 { - f.initFromFilePath() + f.initFromFile() return f.version } // GetDescription returns the description string contained in the // given TimeBucketInfo. func (f *TimeBucketInfo) GetDescription() string { - f.initFromFilePath() + f.initFromFile() return f.description } // GetTimeframe returns the duration for which each record's data is valid. // This means for 1Min resolution data, GetTimeframe will return time.Minute. func (f *TimeBucketInfo) GetTimeframe() time.Duration { - f.initFromFilePath() + f.initFromFile() return f.timeframe } // GetIntervals returns the number of records that can fit in a 24 hour day. func (f *TimeBucketInfo) GetIntervals() int64 { - f.initFromFilePath() + f.initFromFile() return utils.Day.Nanoseconds() / f.timeframe.Nanoseconds() } // GetNelements returns the number of elements (data fields) for a given // TimeBucketInfo. func (f *TimeBucketInfo) GetNelements() int32 { - f.initFromFilePath() + f.initFromFile() return f.nElements } // GetRecordLength returns the length of a single record in the file described // by the given TimeBucketInfo func (f *TimeBucketInfo) GetRecordLength() int32 { - f.initFromFilePath() + f.initFromFile() return f.recordLength } // GetVariableRecordLength returns the length of a single record for a variable // length TimeBucketInfo file func (f *TimeBucketInfo) GetVariableRecordLength() int32 { - f.initFromFilePath() + f.initFromFile() if f.recordType == VARIABLE && f.variableRecordLength == 0 { // Variable records use the raw element sizes plus a 4-byte trailer for interval ticks f.variableRecordLength = int32(f.getFieldRecordLength()) + 4 // Variable records have a 4-byte trailer @@ -283,21 +284,21 @@ func (f *TimeBucketInfo) GetVariableRecordLength() int32 { // GetRecordType returns the type of the file described by the TimeBucketInfo // as an EnumRecordType func (f *TimeBucketInfo) GetRecordType() EnumRecordType { - f.initFromFilePath() + f.initFromFile() return f.recordType } // GetElementNames returns the field names contained by the file described by // the given TimeBucketInfo func (f *TimeBucketInfo) GetElementNames() []string { - f.initFromFilePath() + f.initFromFile() return f.elementNames } // GetElementTypes returns the field types contained by the file described by // the given TimeBucketInfo func (f *TimeBucketInfo) GetElementTypes() []EnumElementType { - f.initFromFilePath() + f.initFromFile() return f.elementTypes }