Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File store uses files exclusively #680

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions internal/testsuite/store_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package testsuite

import (
"sort"
"time"

"github.com/quickfixgo/quickfix"
Expand Down Expand Up @@ -106,8 +107,13 @@ func (s *StoreTestSuite) TestMessageStoreSaveMessageGetMessage() {
2: "they were forced to eat Robin's minstrels",
3: "and there was much rejoicing",
}
for seqNum, msg := range expectedMsgsBySeqNum {
s.Require().Nil(s.MsgStore.SaveMessage(seqNum, []byte(msg)))
var seqNums []int
for seqNum := range expectedMsgsBySeqNum {
seqNums = append(seqNums, seqNum)
}
sort.Ints(seqNums)
for _, seqNum := range seqNums {
s.Require().Nil(s.MsgStore.SaveMessage(seqNum, []byte(expectedMsgsBySeqNum[seqNum])))
}

// When the messages are retrieved from the MessageStore
Expand Down Expand Up @@ -141,8 +147,13 @@ func (s *StoreTestSuite) TestMessageStoreSaveMessageAndIncrementGetMessage() {
2: "they were forced to eat Robin's minstrels",
3: "and there was much rejoicing",
}
for seqNum, msg := range expectedMsgsBySeqNum {
s.Require().Nil(s.MsgStore.SaveMessageAndIncrNextSenderMsgSeqNum(seqNum, []byte(msg)))
var seqNums []int
for seqNum := range expectedMsgsBySeqNum {
seqNums = append(seqNums, seqNum)
}
sort.Ints(seqNums)
for _, seqNum := range seqNums {
s.Require().Nil(s.MsgStore.SaveMessageAndIncrNextSenderMsgSeqNum(seqNum, []byte(expectedMsgsBySeqNum[seqNum])))
}
s.Equal(423, s.MsgStore.NextSenderMsgSeqNum())

Expand Down
78 changes: 30 additions & 48 deletions store/file/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,20 @@ import (
"path"
"strconv"
"strings"
"sync"
"time"

"github.com/pkg/errors"

"github.com/quickfixgo/quickfix"
"github.com/quickfixgo/quickfix/config"
)

type msgDef struct {
offset int64
size int
}

type fileStoreFactory struct {
settings *quickfix.Settings
}

type fileStore struct {
sessionID quickfix.SessionID
cache quickfix.MessageStore
offsets sync.Map
bodyFname string
headerFname string
sessionFname string
Expand Down Expand Up @@ -107,7 +99,6 @@ func newFileStore(sessionID quickfix.SessionID, dirname string, fileSync bool) (
store := &fileStore{
sessionID: sessionID,
cache: memStore,
offsets: sync.Map{},
bodyFname: path.Join(dirname, fmt.Sprintf("%s.%s", sessionPrefix, "body")),
headerFname: path.Join(dirname, fmt.Sprintf("%s.%s", sessionPrefix, "header")),
sessionFname: path.Join(dirname, fmt.Sprintf("%s.%s", sessionPrefix, "session")),
Expand Down Expand Up @@ -199,18 +190,6 @@ func (store *fileStore) Refresh() (err error) {
}

func (store *fileStore) populateCache() (creationTimePopulated bool, err error) {
if tmpHeaderFile, err := os.Open(store.headerFname); err == nil {
defer tmpHeaderFile.Close()
for {
var seqNum, size int
var offset int64
if cnt, err := fmt.Fscanf(tmpHeaderFile, "%d,%d,%d\n", &seqNum, &offset, &size); err != nil || cnt != 3 {
break
}
store.offsets.Store(seqNum, msgDef{offset: offset, size: size})
}
}

if timeBytes, err := os.ReadFile(store.sessionFname); err == nil {
var ctime time.Time
if err := ctime.UnmarshalText(timeBytes); err == nil {
Expand Down Expand Up @@ -348,7 +327,6 @@ func (store *fileStore) SaveMessage(seqNum int, msg []byte) error {
}
}

store.offsets.Store(seqNum, msgDef{offset: offset, size: len(msg)})
return nil
}

Expand All @@ -360,34 +338,38 @@ func (store *fileStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []
return store.IncrNextSenderMsgSeqNum()
}

func (store *fileStore) getMessage(seqNum int) (msg []byte, found bool, err error) {
msgInfoTemp, found := store.offsets.Load(seqNum)
if !found {
return
}
msgInfo, ok := msgInfoTemp.(msgDef)
if !ok {
return nil, true, fmt.Errorf("incorrect msgInfo type while reading file: %s", store.bodyFname)
}

msg = make([]byte, msgInfo.size)
if _, err = store.bodyFile.ReadAt(msg, msgInfo.offset); err != nil {
return nil, true, fmt.Errorf("unable to read from file: %s: %s", store.bodyFname, err.Error())
}

return msg, true, nil
}

func (store *fileStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error {
for seqNum := beginSeqNum; seqNum <= endSeqNum; seqNum++ {
m, found, err := store.getMessage(seqNum)
if err != nil {
return err
}
if found {
if err = cb(m); err != nil {
return err
// Sync files and seek to start of header file
if err := store.bodyFile.Sync(); err != nil {
return fmt.Errorf("unable to flush file: %s: %s", store.bodyFname, err.Error())
} else if err = store.headerFile.Sync(); err != nil {
return fmt.Errorf("unable to flush file: %s: %s", store.headerFname, err.Error())
} else if _, err = store.headerFile.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("unable to seek to start of file: %s: %s", store.headerFname, err.Error())
}

// Iterate over the header file
for {
var seqNum, size int
var offset int64
if cnt, err := fmt.Fscanf(store.headerFile, "%d,%d,%d\n", &seqNum, &offset, &size); err != nil {
if errors.Is(err, io.EOF) {
break
}
return fmt.Errorf("unable to read from file: %s: %s", store.headerFname, err.Error())
} else if cnt < 3 || seqNum > endSeqNum {
// If we have reached the end of possible iteration then break
break
} else if seqNum < beginSeqNum {
// If we have not yet reached the starting sequence number then continue
continue
}
// Otherwise process the file
msg := make([]byte, size)
if _, err := store.bodyFile.ReadAt(msg, offset); err != nil {
return fmt.Errorf("unable to read from file: %s: %s", store.bodyFname, err.Error())
} else if err = cb(msg); err != nil {
return err
}
}
return nil
Expand Down
Loading