Skip to content

Commit 7eb0c14

Browse files
committed
ADD: Add DBN encoding to C++
1 parent 77671fd commit 7eb0c14

28 files changed

+891
-160
lines changed

CHANGELOG.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,28 @@
22

33
## 0.20.0 - TBD
44

5-
This release improves historical symbology support with the new `TsSymbolMap` class that
5+
This release adds support for encoding DBN within the C++ client.
6+
It also improves historical symbology support with the new `TsSymbolMap` class that
67
handles mapping historical records to a text symbol. To support this class, several types
78
for date fields were changed from strings or ints to `date::year_month_day`.
89

910
### Enhancements
1011
- Added `TsSymbolMap` to support historical symbology where mappings change between days
12+
- Added `DbnEncoder` class for encoding DBN data
13+
- Added blocking API similar to `LiveBlocking` to `DbnFileStore` with new `GetMetadata`
14+
and `NextRecord` methods
1115
- Added `PitSymbol` map constructor from `Metadata` and a `date::year_month_day`
1216
- Added `Metadata::CreateSymbolMap` and `Metadata::CreateSymbolMapForDate` methods for
1317
creating symbology maps from historical metadata
18+
- Added blocking API similar to `LiveBlocking` to `DbnFileStore`
1419
- Added `SymbologyResolution::CreateSymbolMap` method for creating a symbology map from
1520
a symbology resolution response
21+
- Added `InFileStream` and `OutFileStream` helper classes for reading and writing binary
22+
output respectively
1623

1724
### Breaking changes
1825
- Added new dependency on [Howard Hinnant's date library](https://howardhinnant.github.io/date/date.html)
26+
- Added `ILogReceiver*` parameter to all `DbnDecoder` constructors and one `DbnFileStore` constructor
1927
- Removed type `StrMappingInterval`. `MappingInterval` is now also used in `SymbologyResolution`.
2028
- Changed type of `start_date` and `end_date` in `MappingInterval` to `date::year_month_day`
2129
- Added `stype_in` and `stype_out` fields to `SymbologyResolution` to support creating

cmake/SourcesAndHeaders.cmake

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ set(headers
55
include/databento/datetime.hpp
66
include/databento/dbn.hpp
77
include/databento/dbn_decoder.hpp
8+
include/databento/dbn_encoder.hpp
89
include/databento/dbn_file_store.hpp
9-
include/databento/detail/file_stream.hpp
1010
include/databento/detail/http_client.hpp
1111
include/databento/detail/json_helpers.hpp
1212
include/databento/detail/scoped_fd.hpp
@@ -16,6 +16,7 @@ set(headers
1616
include/databento/detail/zstd_stream.hpp
1717
include/databento/enums.hpp
1818
include/databento/exceptions.hpp
19+
include/databento/file_stream.hpp
1920
include/databento/fixed_price.hpp
2021
include/databento/flag_set.hpp
2122
include/databento/historical.hpp
@@ -39,9 +40,10 @@ set(sources
3940
src/compat.cpp
4041
src/datetime.cpp
4142
src/dbn.cpp
43+
src/dbn_constants.hpp
4244
src/dbn_decoder.cpp
45+
src/dbn_encoder.cpp
4346
src/dbn_file_store.cpp
44-
src/detail/file_stream.cpp
4547
src/detail/http_client.cpp
4648
src/detail/json_helpers.cpp
4749
src/detail/scoped_fd.cpp
@@ -50,6 +52,7 @@ set(sources
5052
src/detail/zstd_stream.cpp
5153
src/enums.cpp
5254
src/exceptions.cpp
55+
src/file_stream.cpp
5356
src/fixed_price.cpp
5457
src/flag_set.cpp
5558
src/historical.cpp

include/databento/dbn_decoder.hpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,24 @@
66
#include <string>
77

88
#include "databento/dbn.hpp"
9-
#include "databento/detail/file_stream.hpp"
109
#include "databento/detail/shared_channel.hpp"
1110
#include "databento/enums.hpp" // Upgrade Policy
11+
#include "databento/file_stream.hpp"
1212
#include "databento/ireadable.hpp"
13+
#include "databento/log.hpp"
1314
#include "databento/record.hpp" // Record, RecordHeader
1415

1516
namespace databento {
1617
// DBN decoder. Set upgrade_policy to control how DBN version 1 data should be
1718
// handled. Defaults to upgrading DBNv1 data to version 2 (the current version).
1819
class DbnDecoder {
1920
public:
20-
explicit DbnDecoder(detail::SharedChannel channel);
21-
explicit DbnDecoder(detail::FileStream file_stream);
22-
explicit DbnDecoder(std::unique_ptr<IReadable> input);
23-
DbnDecoder(std::unique_ptr<IReadable> input,
21+
DbnDecoder(ILogReceiver* log_receiver, detail::SharedChannel channel);
22+
DbnDecoder(ILogReceiver* log_receiver, InFileStream file_stream);
23+
DbnDecoder(ILogReceiver* log_receiver, std::unique_ptr<IReadable> input);
24+
DbnDecoder(ILogReceiver* log_receiver, std::unique_ptr<IReadable> input,
2425
VersionUpgradePolicy upgrade_policy);
2526

26-
// Decode metadata from the given buffer.
27-
static Metadata DecodeMetadata(const std::vector<std::uint8_t>& buffer);
2827
static std::pair<std::uint8_t, std::size_t> DecodeMetadataVersionAndSize(
2928
const std::uint8_t* buffer, std::size_t size);
3029
static Metadata DecodeMetadataFields(std::uint8_t version,
@@ -60,8 +59,10 @@ class DbnDecoder {
6059
std::vector<std::uint8_t>::const_iterator buffer_end_it);
6160
bool DetectCompression();
6261
std::size_t FillBuffer();
62+
std::size_t GetReadBufferSize() const;
6363
RecordHeader* BufferRecordHeader();
6464

65+
ILogReceiver* log_receiver_;
6566
std::uint8_t version_{};
6667
VersionUpgradePolicy upgrade_policy_;
6768
bool ts_out_{};

include/databento/dbn_encoder.hpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
#pragma once
2+
3+
#include <cstdint> // uint32_t
4+
5+
#include "databento/dbn.hpp" // Metadata
6+
#include "databento/iwritable.hpp"
7+
#include "databento/record.hpp"
8+
#include "databento/with_ts_out.hpp"
9+
10+
namespace databento {
11+
class DbnEncoder {
12+
public:
13+
explicit DbnEncoder(const Metadata& metadata, IWritable* output);
14+
15+
static void EncodeMetadata(const Metadata& metadata, IWritable* output);
16+
static void EncodeRecord(const Record& record, IWritable* output);
17+
18+
template <typename R>
19+
void EncodeRecord(const R& record) {
20+
static_assert(
21+
has_header_v<R>,
22+
"must be a DBN record struct with an `hd` RecordHeader field");
23+
EncodeRecord(Record{&record.hd});
24+
}
25+
template <typename R>
26+
void EncodeRecord(const WithTsOut<R> record) {
27+
static_assert(
28+
has_header_v<R>,
29+
"must be a DBN record struct with an `hd` RecordHeader field");
30+
EncodeRecord(Record{&record.rec.hd});
31+
}
32+
void EncodeRecord(const Record& record);
33+
34+
private:
35+
static std::uint32_t CalcLength(const Metadata& metadata);
36+
37+
IWritable* output_;
38+
};
39+
} // namespace databento

include/databento/dbn_file_store.hpp

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,39 @@
22

33
#include <string>
44

5+
#include "databento/dbn.hpp" // DecodeMetadata
56
#include "databento/dbn_decoder.hpp" // DbnDecoder
67
#include "databento/enums.hpp" // VersionUpgradePolicy
7-
#include "databento/timeseries.hpp" // MetadataCallback, RecordCallback
8+
#include "databento/log.hpp"
9+
#include "databento/record.hpp"
10+
#include "databento/timeseries.hpp" // MetadataCallback, RecordCallback
811

912
namespace databento {
10-
// A reader for DBN files.
13+
// A reader for DBN files. This class provides both a callback API similar to
14+
// TimeseriesGetRange in historical data and LiveThreaded for live data as well
15+
// as a blocking API similar to that of LiveBlocking. Only one API should be
16+
// used on a given instance.
1117
class DbnFileStore {
1218
public:
1319
explicit DbnFileStore(const std::string& file_path);
14-
DbnFileStore(const std::string& file_path,
20+
DbnFileStore(ILogReceiver* log_receiver, const std::string& file_path,
1521
VersionUpgradePolicy upgrade_policy);
1622

23+
// Callback API: calling Replay consumes the input.
1724
void Replay(const MetadataCallback& metadata_callback,
1825
const RecordCallback& record_callback);
1926
void Replay(const RecordCallback& record_callback);
2027

28+
// Blocking API
29+
const Metadata& GetMetadata();
30+
// Returns the next record or `nullptr` if there are no remaining records.
31+
const Record* NextRecord();
32+
2133
private:
22-
DbnDecoder parser_;
34+
void MaybeDecodeMetadata();
35+
36+
DbnDecoder decoder_;
37+
Metadata metadata_{};
38+
bool has_decoded_metadata_{false};
2339
};
2440
} // namespace databento

include/databento/detail/zstd_stream.hpp

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,22 @@
88
#include <vector>
99

1010
#include "databento/ireadable.hpp"
11+
#include "databento/iwritable.hpp"
12+
#include "databento/log.hpp"
1113

1214
namespace databento {
1315
namespace detail {
14-
class ZstdStream : public IReadable {
16+
class ZstdDecodeStream : public IReadable {
1517
public:
16-
explicit ZstdStream(std::unique_ptr<IReadable> input);
17-
ZstdStream(std::unique_ptr<IReadable> input,
18-
std::vector<std::uint8_t>&& in_buffer);
18+
explicit ZstdDecodeStream(std::unique_ptr<IReadable> input);
19+
ZstdDecodeStream(std::unique_ptr<IReadable> input,
20+
std::vector<std::uint8_t>&& in_buffer);
1921

2022
// Read exactly `length` bytes into `buffer`.
2123
void ReadExact(std::uint8_t* buffer, std::size_t length) override;
2224
// Read at most `length` bytes. Returns the number of bytes read. Will only
2325
// return 0 if the end of the stream is reached.
24-
size_t ReadSome(std::uint8_t* buffer, std::size_t max_length) override;
26+
std::size_t ReadSome(std::uint8_t* buffer, std::size_t max_length) override;
2527

2628
private:
2729
std::unique_ptr<IReadable> input_;
@@ -30,5 +32,27 @@ class ZstdStream : public IReadable {
3032
std::vector<std::uint8_t> in_buffer_;
3133
ZSTD_inBuffer z_in_buffer_;
3234
};
35+
36+
class ZstdCompressStream : public IWritable {
37+
public:
38+
explicit ZstdCompressStream(IWritable* output);
39+
ZstdCompressStream(ILogReceiver* log_receiver, IWritable* output);
40+
ZstdCompressStream(const ZstdCompressStream&) = delete;
41+
ZstdCompressStream& operator=(const ZstdCompressStream&) = delete;
42+
ZstdCompressStream(ZstdCompressStream&&) = delete;
43+
ZstdCompressStream& operator=(ZstdCompressStream&&) = delete;
44+
~ZstdCompressStream() override;
45+
46+
void WriteAll(const std::uint8_t* buffer, std::size_t length) override;
47+
48+
private:
49+
ILogReceiver* log_receiver_;
50+
IWritable* output_;
51+
std::unique_ptr<ZSTD_CStream, std::size_t (*)(ZSTD_CStream*)> z_cstream_;
52+
std::vector<std::uint8_t> in_buffer_;
53+
ZSTD_inBuffer z_in_buffer_;
54+
std::size_t in_size_;
55+
std::vector<std::uint8_t> out_buffer_;
56+
};
3357
} // namespace detail
3458
} // namespace databento

include/databento/detail/file_stream.hpp renamed to include/databento/file_stream.hpp

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,16 @@
22

33
#include <cstddef> // size_t
44
#include <cstdint> // uint8_t
5-
#include <fstream> // ifstream
5+
#include <fstream> // ifstream, ofstream
66
#include <string>
77

88
#include "databento/ireadable.hpp"
9+
#include "databento/iwritable.hpp"
910

1011
namespace databento {
11-
namespace detail {
12-
class FileStream : public IReadable {
12+
class InFileStream : public IReadable {
1313
public:
14-
explicit FileStream(const std::string& file_path);
14+
explicit InFileStream(const std::string& file_path);
1515

1616
// Read exactly `length` bytes into `buffer`.
1717
void ReadExact(std::uint8_t* buffer, std::size_t length) override;
@@ -22,5 +22,14 @@ class FileStream : public IReadable {
2222
private:
2323
std::ifstream stream_;
2424
};
25-
} // namespace detail
25+
26+
class OutFileStream : public IWritable {
27+
public:
28+
explicit OutFileStream(const std::string& file_path);
29+
30+
void WriteAll(const std::uint8_t* buffer, std::size_t length) override;
31+
32+
private:
33+
std::ofstream stream_;
34+
};
2635
} // namespace databento

include/databento/historical.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ class Historical {
241241
DbnFileStore TimeseriesGetRangeToFile(const HttplibParams& params,
242242
const std::string& file_path);
243243

244+
ILogReceiver* log_receiver_;
244245
const std::string key_;
245246
const std::string gateway_;
246247
detail::HttpClient client_;

include/databento/iwritable.hpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#pragma once
2+
3+
#include <cstddef> // size_t
4+
#include <cstdint> // uint8_t
5+
6+
namespace databento {
7+
// An abstract class for writable objects to allow for runtime polymorphism
8+
// around DBN encoding.
9+
class IWritable {
10+
public:
11+
virtual ~IWritable() = default;
12+
13+
virtual void WriteAll(const std::uint8_t* buffer, std::size_t length) = 0;
14+
};
15+
} // namespace databento

src/dbn_constants.hpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#pragma once
2+
3+
#include <cstddef>
4+
#include <cstdint>
5+
#include <limits>
6+
7+
namespace databento {
8+
constexpr std::size_t kMagicSize = 4;
9+
constexpr std::size_t kMetadataPreludeSize = 8;
10+
constexpr std::uint32_t kZstdMagicNumber = 0xFD2FB528;
11+
constexpr auto kDbnPrefix = "DBN";
12+
constexpr std::size_t kFixedMetadataLen = 100;
13+
constexpr std::size_t kDatasetCstrLen = 16;
14+
constexpr std::size_t kMetadataReservedLen = 53;
15+
constexpr std::size_t kMetadataReservedLenV1 = 47;
16+
constexpr std::size_t kBufferCapacity = 8UL * 1024;
17+
constexpr std::uint16_t kNullSchema = std::numeric_limits<std::uint16_t>::max();
18+
constexpr std::uint8_t kNullSType = std::numeric_limits<std::uint8_t>::max();
19+
constexpr std::uint64_t kNullRecordCount =
20+
std::numeric_limits<std::uint64_t>::max();
21+
} // namespace databento

0 commit comments

Comments
 (0)