Skip to content

Commit

Permalink
fix: spec change for Xignite Feeder plugin (#271)
Browse files Browse the repository at this point in the history
- change the time (06:00JST -> 07:00JST, etc.) when the quotes data are backfilled every day
- add "DateTime" and "LastSize" column to the stored data
  • Loading branch information
dakimura authored and Notargets committed Dec 3, 2019
1 parent d9e61c0 commit 4e2fd27
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 29 deletions.
10 changes: 5 additions & 5 deletions contrib/xignitefeeder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ bgworkers:
- IND_NIKKEI # NIKKEI INDICES
# time when target symbols in the exchanges are updated everyday.
# this time is also used for the historical data backfill (UTC)
updatingHour: 21 #:00:00
updatingHour: 22 # (UTC). = every day at 07:00:00 (JST)
# XigniteFeeder writes data to "{identifier}/{timeframe}/TICK" TimeBucketKey
timeframe: "1Sec"
# Auth token for Xignite API
Expand All @@ -39,7 +39,7 @@ bgworkers:
# Interval [sec] to call Xignite API
interval: 10
# XigniteFeeder runs from openTime ~ closeTime (UTC)
openTime: "23:55:00" # 08:55 (JST)
openTime: "23:00:00" # 08:00 (JST)
closeTime: "06:10:00" # 15:10 (JST)
# XigniteFeeder doesn't run on the following days
closedDaysOfTheWeek:
Expand Down Expand Up @@ -92,13 +92,13 @@ bgworkers:
# if backfill is enabled, historical daily chart data for all symbols in the target exchanges
# are aggregated using Xignite API (=GetQuotesRange endpoint) and stored to "{symbol}/{timeframe}/OHLCV" bucket.
backfill:
enabled: false
since: "2008-04-01"
enabled: true
since: "2008-01-01"
timeframe: "1D"
# In addition to the daily-chart backfill above,
# Xignite Feeder can feed 5-minute chart data of the target symbols for the past X business days. The data is stored to {symbol}/{timeframe}/OHLCV bucket (e.g. "1400/5Min/OHLCV" )
recentBackfill:
enabled: true
enabled: false
days: 7 # Xignite Feeder feeds the data for {days} business days
timeframe: "5Min"
```
Expand Down
3 changes: 3 additions & 0 deletions contrib/xignitefeeder/api/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ type Security struct {
type Quote struct {
DateTime XigniteDateTime `json:"DateTime,omitempty"`
Ask float32 `json:"Ask"`
AskSize float32 `json:"AskSize"`
AskDateTime XigniteDateTime `json:"AskDateTime,omitempty"`
Bid float32 `json:"Bid"`
BidSize float32 `json:"BidSize"`
BidDateTime XigniteDateTime `json:"BidDateTime,omitempty"`
// price of the most recent deal
Last float32 `json:"Last"`
LastSize float32 `json:"LastSize"`
Open float32 `json:"Open"`
High float32 `json:"High"`
Low float32 `json:"Low"`
Expand Down
37 changes: 32 additions & 5 deletions contrib/xignitefeeder/feed/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,26 @@ import (
type Backfill struct {
symbolManager symbols.Manager
apiClient api.Client
writer writer.QuotesRangeWriter
writer writer.QuotesWriter
rangeWriter writer.QuotesRangeWriter
since time.Time
}

// NewBackfill initializes the module to backfill the historical daily chart data to marketstore
func NewBackfill(symbolManager symbols.Manager, apiClient api.Client, writer writer.QuotesRangeWriter, Since time.Time) *Backfill {
return &Backfill{symbolManager: symbolManager, apiClient: apiClient, writer: writer, since: Since}
func NewBackfill(symbolManager symbols.Manager, apiClient api.Client, writer writer.QuotesWriter,
rangeWriter writer.QuotesRangeWriter, Since time.Time,
) *Backfill {
return &Backfill{symbolManager: symbolManager, apiClient: apiClient,
writer: writer, rangeWriter: rangeWriter, since: Since,
}
}

// Update calls UpdateSymbols and UpdateIndexSymbols functions sequentially
func (b *Backfill) Update() {
b.UpdateSymbols()
b.UpdateIndexSymbols()
// In order to get and store adjusted closing prices
b.UpdateClosingPrice()
}

// UpdateSymbols aggregates daily chart data since the specified date and store it to "{symbol}/{timeframe}/OHLCV" bucket in marketstore
Expand All @@ -48,7 +55,7 @@ func (b *Backfill) UpdateSymbols() {
}

// write the data to marketstore
err = b.writer.Write(resp.Security.Symbol, resp.ArrayOfEndOfDayQuote, false)
err = b.rangeWriter.Write(resp.Security.Symbol, resp.ArrayOfEndOfDayQuote, false)
if err != nil {
log.Error(fmt.Sprintf("failed to backfill the daily chart data to marketstore. identifier=%v", identifier))
}
Expand Down Expand Up @@ -79,7 +86,7 @@ func (b *Backfill) UpdateIndexSymbols() {
}

// write the data to marketstore
err = b.writer.Write(resp.IndexAndGroup.Symbol, resp.ArrayOfEndOfDayQuote, true)
err = b.rangeWriter.Write(resp.IndexAndGroup.Symbol, resp.ArrayOfEndOfDayQuote, true)
if err != nil {
log.Error(fmt.Sprintf("failed to backfill the daily chart data to marketstore. identifier=%v", identifier))
}
Expand All @@ -89,3 +96,23 @@ func (b *Backfill) UpdateIndexSymbols() {

log.Info("Data backfill is successfully done.")
}

// UpdateClosingPrice get real-time quotes data for the target symbols and store them into the local marketstore server.
func (b *Backfill) UpdateClosingPrice() {
// call Xignite API to get Quotes data
identifiers := b.symbolManager.GetAllIdentifiers()
response, err := b.apiClient.GetRealTimeQuotes(identifiers)
if err != nil {
log.Error(fmt.Sprintf("failed to get data from Xignite API. %v, err=", identifiers) + err.Error())
return
}

// write Quotes data
err = b.writer.Write(response)
if err != nil {
log.Error("failed to write quotes data. err=" + err.Error())
return
}

log.Info("Storing adjusted closing prices has been successfully done.")
}
24 changes: 18 additions & 6 deletions contrib/xignitefeeder/feed/backfill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,32 @@ func (mqrw *MockQuotesRangeWriter) WriteIndex(quotesRange api.GetIndexQuotesRang
// 3 writes should be successfully done with the 3 identifiers
func TestBackfill_Update(t *testing.T) {
// --- given ---
var w writer.QuotesRangeWriter = &MockQuotesRangeWriter{WriteCount: 0}
var rw writer.QuotesRangeWriter = &MockQuotesRangeWriter{WriteCount: 0}
var w writer.QuotesWriter = &MockQuotesWriter{WriteCount: 0}

SUT := &Backfill{
symbolManager: internal.MockSymbolsManager{Identifiers: TestIdentifiers},
apiClient: &internal.MockAPIClient{},
writer: w,
rangeWriter: rw,
since: time.Now().UTC(),
}

// --- when ---
SUT.Update()

// --- then ---
if mw, ok := w.(*MockQuotesRangeWriter); ok {
if mw.WriteCount != 3 {
t.Errorf("3 writes should be performed. got: WriteCount=%v", mw.WriteCount)
if mrw, ok := rw.(*MockQuotesRangeWriter); ok {
if mrw.WriteCount != 3 {
t.Errorf("3 writes should be performed (1 write for 1 symbol). got: WriteCount=%v", mrw.WriteCount)
}
} else {
t.Fatalf("type error")
}

if mw, ok := w.(*MockQuotesWriter); ok {
if mw.WriteCount != 1 {
t.Errorf("1 writes should be performed (1 write for 3 symbols). got: WriteCount=%v", mw.WriteCount)
}
} else {
t.Fatalf("type error")
Expand All @@ -80,12 +90,14 @@ func TestBackfill_Update(t *testing.T) {
// Even if Xignite returns Outcome:"RequestError" to an identifier, Backfill writes data for the other identifiers
func TestBackfill_Update_RequestErrorIdentifier(t *testing.T) {
// --- given ---
var w writer.QuotesRangeWriter = &MockQuotesRangeWriter{WriteCount: 0}
var rw writer.QuotesRangeWriter = &MockQuotesRangeWriter{WriteCount: 0}
var w writer.QuotesWriter = &MockQuotesWriter{WriteCount: 0}

SUT := &Backfill{
symbolManager: internal.MockSymbolsManager{Identifiers: []string{"XTKS.1301", "XTKS.1305", "XJAS.1376"}},
apiClient: &MockErrorAPIClient{},
writer: w,
rangeWriter: rw,
since: time.Now().UTC(),
}

Expand All @@ -94,7 +106,7 @@ func TestBackfill_Update_RequestErrorIdentifier(t *testing.T) {

// --- then ---
// write fails for 1 out of 3 identifiers
if mw, ok := w.(*MockQuotesRangeWriter); ok {
if mw, ok := rw.(*MockQuotesRangeWriter); ok {
if mw.WriteCount != 2 {
t.Errorf("2 writes should be performed. got: WriteCount=%v", mw.WriteCount)
}
Expand Down
4 changes: 4 additions & 0 deletions contrib/xignitefeeder/writer/quotes_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,12 @@ func (q QuotesWriterImpl) newColumnSeries(epoch int64, eq api.EquityQuote) *io.C
cs := io.NewColumnSeries()
cs.AddColumn("Epoch", []int64{epoch})
cs.AddColumn("Ask", []float32{eq.Quote.Ask})
cs.AddColumn("AskSize", []float32{eq.Quote.AskSize})
cs.AddColumn("Bid", []float32{eq.Quote.Bid})
cs.AddColumn("BidSize", []float32{eq.Quote.BidSize})
cs.AddColumn("Last", []float32{eq.Quote.Last})
cs.AddColumn("LastSize", []float32{eq.Quote.LastSize})
cs.AddColumn("DateTime", []int64{time.Time(eq.Quote.DateTime).Unix()})
cs.AddColumn("Open", []float32{eq.Quote.Open})
cs.AddColumn("High", []float32{eq.Quote.High})
cs.AddColumn("Low", []float32{eq.Quote.Low})
Expand Down
25 changes: 12 additions & 13 deletions contrib/xignitefeeder/xignitefeeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,20 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) {
sm.Update()
timer.RunEveryDayAt(ctx, config.UpdatingHour, sm.Update)

// init Quotes Writer & QuotesRange Writer
var msqw writer.QuotesWriter = writer.QuotesWriterImpl{
MarketStoreWriter: &writer.MarketStoreWriterImpl{},
Timeframe: config.Timeframe,
Timezone: utils.InstanceConfig.Timezone,
}
var msqrw writer.QuotesRangeWriter = &writer.QuotesRangeWriterImpl{
MarketStoreWriter: &writer.MarketStoreWriterImpl{},
Timeframe: config.Backfill.Timeframe,
}

// init QuotesRangeWriter to backfill daily chart data every day
if config.Backfill.Enabled {
msqrw := &writer.QuotesRangeWriterImpl{
MarketStoreWriter: &writer.MarketStoreWriterImpl{},
Timeframe: config.Backfill.Timeframe,
}

bf := feed.NewBackfill(sm, apiClient, msqrw, time.Time(config.Backfill.Since))
bf := feed.NewBackfill(sm, apiClient, msqw, msqrw, time.Time(config.Backfill.Since))
bf.Update()
timer.RunEveryDayAt(ctx, config.UpdatingHour, bf.Update)
}
Expand All @@ -70,13 +76,6 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) {
timer.RunEveryDayAt(ctx, config.UpdatingHour, rbf.Update)
}

// init Quotes Writer
var msqw writer.QuotesWriter = writer.QuotesWriterImpl{
MarketStoreWriter: &writer.MarketStoreWriterImpl{},
Timeframe: config.Timeframe,
Timezone: utils.InstanceConfig.Timezone,
}

return &feed.Worker{
MarketTimeChecker: timeChecker,
APIClient: apiClient,
Expand Down

0 comments on commit 4e2fd27

Please sign in to comment.