diff --git a/internal/testsuite/store_suite.go b/internal/testsuite/store_suite.go index 49b76b05a..da7e62b91 100644 --- a/internal/testsuite/store_suite.go +++ b/internal/testsuite/store_suite.go @@ -16,6 +16,7 @@ package testsuite import ( + "sort" "time" "github.com/quickfixgo/quickfix" @@ -106,8 +107,13 @@ func (s *StoreTestSuite) TestMessageStoreSaveMessageGetMessage() { 2: "they were forced to eat Robin's minstrels", 3: "and there was much rejoicing", } - for seqNum, msg := range expectedMsgsBySeqNum { - s.Require().Nil(s.MsgStore.SaveMessage(seqNum, []byte(msg))) + var seqNums []int + for seqNum := range expectedMsgsBySeqNum { + seqNums = append(seqNums, seqNum) + } + sort.Ints(seqNums) + for _, seqNum := range seqNums { + s.Require().Nil(s.MsgStore.SaveMessage(seqNum, []byte(expectedMsgsBySeqNum[seqNum]))) } // When the messages are retrieved from the MessageStore @@ -141,8 +147,13 @@ func (s *StoreTestSuite) TestMessageStoreSaveMessageAndIncrementGetMessage() { 2: "they were forced to eat Robin's minstrels", 3: "and there was much rejoicing", } - for seqNum, msg := range expectedMsgsBySeqNum { - s.Require().Nil(s.MsgStore.SaveMessageAndIncrNextSenderMsgSeqNum(seqNum, []byte(msg))) + var seqNums []int + for seqNum := range expectedMsgsBySeqNum { + seqNums = append(seqNums, seqNum) + } + sort.Ints(seqNums) + for _, seqNum := range seqNums { + s.Require().Nil(s.MsgStore.SaveMessageAndIncrNextSenderMsgSeqNum(seqNum, []byte(expectedMsgsBySeqNum[seqNum]))) } s.Equal(423, s.MsgStore.NextSenderMsgSeqNum()) diff --git a/store/file/file_store.go b/store/file/file_store.go index ae44540b1..085612d4e 100644 --- a/store/file/file_store.go +++ b/store/file/file_store.go @@ -22,20 +22,13 @@ import ( "path" "strconv" "strings" - "sync" "time" "github.com/pkg/errors" - "github.com/quickfixgo/quickfix" "github.com/quickfixgo/quickfix/config" ) -type msgDef struct { - offset int64 - size int -} - type fileStoreFactory struct { settings *quickfix.Settings } @@ -43,7 +36,6 @@ type fileStoreFactory struct { type fileStore struct { sessionID quickfix.SessionID cache quickfix.MessageStore - offsets sync.Map bodyFname string headerFname string sessionFname string @@ -107,7 +99,6 @@ func newFileStore(sessionID quickfix.SessionID, dirname string, fileSync bool) ( store := &fileStore{ sessionID: sessionID, cache: memStore, - offsets: sync.Map{}, bodyFname: path.Join(dirname, fmt.Sprintf("%s.%s", sessionPrefix, "body")), headerFname: path.Join(dirname, fmt.Sprintf("%s.%s", sessionPrefix, "header")), sessionFname: path.Join(dirname, fmt.Sprintf("%s.%s", sessionPrefix, "session")), @@ -199,18 +190,6 @@ func (store *fileStore) Refresh() (err error) { } func (store *fileStore) populateCache() (creationTimePopulated bool, err error) { - if tmpHeaderFile, err := os.Open(store.headerFname); err == nil { - defer tmpHeaderFile.Close() - for { - var seqNum, size int - var offset int64 - if cnt, err := fmt.Fscanf(tmpHeaderFile, "%d,%d,%d\n", &seqNum, &offset, &size); err != nil || cnt != 3 { - break - } - store.offsets.Store(seqNum, msgDef{offset: offset, size: size}) - } - } - if timeBytes, err := os.ReadFile(store.sessionFname); err == nil { var ctime time.Time if err := ctime.UnmarshalText(timeBytes); err == nil { @@ -348,7 +327,6 @@ func (store *fileStore) SaveMessage(seqNum int, msg []byte) error { } } - store.offsets.Store(seqNum, msgDef{offset: offset, size: len(msg)}) return nil } @@ -360,34 +338,38 @@ func (store *fileStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg [] return store.IncrNextSenderMsgSeqNum() } -func (store *fileStore) getMessage(seqNum int) (msg []byte, found bool, err error) { - msgInfoTemp, found := store.offsets.Load(seqNum) - if !found { - return - } - msgInfo, ok := msgInfoTemp.(msgDef) - if !ok { - return nil, true, fmt.Errorf("incorrect msgInfo type while reading file: %s", store.bodyFname) - } - - msg = make([]byte, msgInfo.size) - if _, err = store.bodyFile.ReadAt(msg, msgInfo.offset); err != nil { - return nil, true, fmt.Errorf("unable to read from file: %s: %s", store.bodyFname, err.Error()) - } - - return msg, true, nil -} - func (store *fileStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error { - for seqNum := beginSeqNum; seqNum <= endSeqNum; seqNum++ { - m, found, err := store.getMessage(seqNum) - if err != nil { - return err - } - if found { - if err = cb(m); err != nil { - return err + // Sync files and seek to start of header file + if err := store.bodyFile.Sync(); err != nil { + return fmt.Errorf("unable to flush file: %s: %s", store.bodyFname, err.Error()) + } else if err = store.headerFile.Sync(); err != nil { + return fmt.Errorf("unable to flush file: %s: %s", store.headerFname, err.Error()) + } else if _, err = store.headerFile.Seek(0, io.SeekStart); err != nil { + return fmt.Errorf("unable to seek to start of file: %s: %s", store.headerFname, err.Error()) + } + + // Iterate over the header file + for { + var seqNum, size int + var offset int64 + if cnt, err := fmt.Fscanf(store.headerFile, "%d,%d,%d\n", &seqNum, &offset, &size); err != nil { + if errors.Is(err, io.EOF) { + break } + return fmt.Errorf("unable to read from file: %s: %s", store.headerFname, err.Error()) + } else if cnt < 3 || seqNum > endSeqNum { + // If we have reached the end of possible iteration then break + break + } else if seqNum < beginSeqNum { + // If we have not yet reached the starting sequence number then continue + continue + } + // Otherwise process the file + msg := make([]byte, size) + if _, err := store.bodyFile.ReadAt(msg, offset); err != nil { + return fmt.Errorf("unable to read from file: %s: %s", store.bodyFname, err.Error()) + } else if err = cb(msg); err != nil { + return err } } return nil