Skip to content

Commit

Permalink
external C API baseline - compat w/ liberasurecode
Browse files Browse the repository at this point in the history
  • Loading branch information
vrancurel committed Dec 7, 2018
1 parent 259414a commit 04fa15b
Show file tree
Hide file tree
Showing 9 changed files with 905 additions and 16 deletions.
8 changes: 4 additions & 4 deletions benchmark/benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -478,9 +478,9 @@ bool Benchmark<T>::encode()
reset_c_streams();

if (operation_on_packet)
fec->encode_packet(*d_streams, *c_streams, c_props);
fec->encode_streams_vertical(*d_streams, *c_streams, c_props);
else
fec->encode_bufs(*d_streams, *c_streams, c_props);
fec->encode_streams_horizontal(*d_streams, *c_streams, c_props);

// update stats
enc_stats->add(fec->total_enc_usec);
Expand All @@ -506,14 +506,14 @@ bool Benchmark<T>::decode()
reset_r_streams();

if (operation_on_packet) {
if (!fec->decode_packet(
if (!fec->decode_streams_vertical(
d_streams_shuffled,
c_streams_shuffled,
c_props_shuffled,
*r_streams))
return false;
} else {
if (!fec->decode_bufs(
if (!fec->decode_streams_horizontal(
d_streams_shuffled,
c_streams_shuffled,
c_props_shuffled,
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ set(LIB_SRC
${SOURCE_DIR}/gf_nf4.cpp
${SOURCE_DIR}/gf_ring.cpp
${SOURCE_DIR}/property.cpp
${SOURCE_DIR}/quadiron-c.cpp

CACHE
INTERNAL
Expand Down
287 changes: 279 additions & 8 deletions src/fec_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,17 @@ class FecCode {
bool read_pkt(char* pkt, std::istream& stream);
bool write_pkt(char* pkt, std::ostream& stream);

void encode_bufs(
void encode_streams_horizontal(
std::vector<std::istream*> input_data_bufs,
std::vector<std::ostream*> output_parities_bufs,
std::vector<Properties>& output_parities_props);

void encode_packet(
void encode_streams_vertical(
std::vector<std::istream*> input_data_bufs,
std::vector<std::ostream*> output_parities_bufs,
std::vector<Properties>& output_parities_props);

bool decode_bufs(
bool decode_streams_horizontal(
std::vector<std::istream*> input_data_bufs,
std::vector<std::istream*> input_parities_bufs,
const std::vector<Properties>& input_parities_props,
Expand All @@ -199,12 +199,27 @@ class FecCode {
size_t size = 0,
vec::Buffers<T>* output = nullptr);

bool decode_packet(
bool decode_streams_vertical(
std::vector<std::istream*> input_data_bufs,
std::vector<std::istream*> input_parities_bufs,
const std::vector<Properties>& input_parities_props,
std::vector<std::ostream*> output_data_bufs);

void encode_blocks_vertical(
std::vector<uint8_t*> data_bufs,
std::vector<uint8_t*> parities_bufs,
std::vector<Properties>& parities_props,
std::vector<int> wanted_idxs,
int block_size_bytes);

bool decode_blocks_vertical(
std::vector<uint8_t*> data_bufs,
std::vector<uint8_t*> parities_bufs,
const std::vector<Properties>& parities_props,
std::vector<int> missing_idxs,
std::vector<int> wanted_idxs,
int block_size_bytes);

const gf::Field<T>& get_gf()
{
return *gf;
Expand Down Expand Up @@ -393,7 +408,7 @@ inline bool FecCode<T>::write_pkt(char* pkt, std::ostream& stream)
* @note all streams must be of equal size
*/
template <typename T>
void FecCode<T>::encode_bufs(
void FecCode<T>::encode_streams_horizontal(
std::vector<std::istream*> input_data_bufs,
std::vector<std::ostream*> output_parities_bufs,
std::vector<Properties>& output_parities_props)
Expand Down Expand Up @@ -451,7 +466,7 @@ void FecCode<T>::encode_bufs(
}

template <typename T>
void FecCode<T>::encode_packet(
void FecCode<T>::encode_streams_vertical(
std::vector<std::istream*> input_data_bufs,
std::vector<std::ostream*> output_parities_bufs,
std::vector<Properties>& output_parities_props)
Expand Down Expand Up @@ -542,7 +557,7 @@ void FecCode<T>::encode_packet(
* @return true if decode succeded, else false
*/
template <typename T>
bool FecCode<T>::decode_bufs(
bool FecCode<T>::decode_streams_horizontal(
std::vector<std::istream*> input_data_bufs,
std::vector<std::istream*> input_parities_bufs,
const std::vector<Properties>& input_parities_props,
Expand Down Expand Up @@ -868,7 +883,7 @@ void FecCode<T>::decode_apply(
* @return true if decode succeeded, else false
*/
template <typename T>
bool FecCode<T>::decode_packet(
bool FecCode<T>::decode_streams_vertical(
std::vector<std::istream*> input_data_bufs,
std::vector<std::istream*> input_parities_bufs,
const std::vector<Properties>& input_parities_props,
Expand Down Expand Up @@ -1007,6 +1022,262 @@ bool FecCode<T>::decode_packet(
return true;
}

/**
* Encode blocks
*
* @param data_bufs if SYSTEMATIC must be exactly n_data otherwise it is
* unused (use nullptr when missing)
* @param parities_bufs if SYSTEMATIC must be exactly n_parities otherwise
* get_n_outputs() (use nullptr when missing)
* @param parities_props if SYSTEMATIC must be exactly n_parities
* otherwise get_n_outputs() caller is supposed to provide specific information
* bound to parities
* @param wanted_idxs array of missing_idxs of len n_parities indicating wanted
* (value 1) or not wanted fragments (value 0) - wanted blocks MUST BE allocated
* by caller
* @param block_size_bytes the block size
*
* @pre All streams must be of equal size
*/
template <typename T>
void FecCode<T>::encode_blocks_vertical(
std::vector<uint8_t*> data_bufs,
std::vector<uint8_t*> parities_bufs,
std::vector<Properties>& parities_props,
std::vector<int> wanted_idxs,
int block_size_bytes)
{
assert(data_bufs.size() == n_data);
assert(parities_bufs.size() == n_outputs);
assert(parities_props.size() == n_outputs);

// clear property vectors
for (auto& props : parities_props) {
props.clear();
}

off_t offset = 0;
int block_size = block_size_bytes / word_size;

// vector of buffers storing data read from chunk
vec::Buffers<uint8_t> words_char(n_data, buf_size);
const std::vector<uint8_t*> words_mem_char = words_char.get_mem();
// vector of buffers storing data that are performed in encoding, i.e. FFT
vec::Buffers<T> words(n_data, pkt_size);
const std::vector<T*> words_mem_T = words.get_mem();

int output_len = get_n_outputs();

// vector of buffers storing data that are performed in encoding, i.e. FFT
vec::Buffers<T> output(output_len, pkt_size);
const std::vector<T*> output_mem_T = output.get_mem();
// vector of buffers storing data in output chunk
vec::Buffers<uint8_t> output_char(output_len, buf_size);
const std::vector<uint8_t*> output_mem_char = output_char.get_mem();

reset_stats_enc();

while (offset < block_size) {
size_t remain_size = block_size - offset;
size_t copy_size = std::min(pkt_size, remain_size);
// TODO: get number of read bytes -> true buf size
for (unsigned i = 0; i < n_data; i++) {
memcpy(
reinterpret_cast<char*>(words_mem_char.at(i)),
data_bufs[i] + offset * word_size,
copy_size * word_size);
}

vec::pack<uint8_t, T>(
words_mem_char, words_mem_T, n_data, copy_size, word_size);

timeval t1 = tick();
uint64_t start = hw_timer();
encode(output, parities_props, offset, words);
uint64_t end = hw_timer();
uint64_t t2 = hrtime_usec(t1);

total_enc_usec += t2;
total_encode_cycles += (end - start) / (copy_size * word_size);
n_encode_ops++;

vec::unpack<T, uint8_t>(
output_mem_T, output_mem_char, output_len, pkt_size, word_size);

for (unsigned i = 0; i < n_outputs; i++) {
if (wanted_idxs[i]) {
memcpy(
parities_bufs[i] + offset * word_size,
reinterpret_cast<char*>(output_mem_char.at(i)),
copy_size * word_size);
}
}
offset += pkt_size;
}
}

/**
* Decode blocks
*
* @param data_bufs if SYSTEMATIC must be exactly n_data otherwise it is
* unused (use nullptr when missing)
* @param parities_bufs if SYSTEMATIC must be exactly n_parities otherwise
* get_n_outputs() (use nullptr when missing)
* @param parities_props if SYSTEMATIC must be exactly n_parities
* otherwise get_n_outputs() caller is supposed to provide specific information
* bound to parities
* @param missing_idxs array of missing_idxs of len code_len indicating presence
* (value 1) or absence of fragments (value 0) - non missing blocks MUST BE
* allocated by caller
* @param wanted_idxs array of missing_idxs of len n_data indicating wanted
* (value 1) or not wanted fragments (value 0) - wanted blocks MUST BE allocated
* by caller
* @param block_size_bytes the block size
*
* @pre All streams must be of equal size
*
* @return true if decode succeeded, else false
*/
template <typename T>
bool FecCode<T>::decode_blocks_vertical(
std::vector<uint8_t*> data_bufs,
std::vector<uint8_t*> parities_bufs,
const std::vector<Properties>& parities_props,
std::vector<int> missing_idxs,
std::vector<int> wanted_idxs,
int block_size_bytes)
{
bool cont = true;
off_t offset = 0;
int block_size = block_size_bytes / word_size;

unsigned fragment_index = 0;
unsigned parity_index = 0;
unsigned avail_data_nb = 0;

if (type == FecType::SYSTEMATIC) {
assert(data_bufs.size() == n_data);
}
assert(parities_bufs.size() == n_outputs);
assert(parities_props.size() == n_outputs);

// ids of received fragments, from 0 to codelen-1
vec::Vector<T> fragments_ids(*(this->gf), n_data);

if (type == FecType::SYSTEMATIC) {
for (unsigned i = 0; i < n_data; i++) {
if (!missing_idxs[i]) {
decode_add_data(fragment_index, i);
fragments_ids.set(fragment_index, i);
fragment_index++;
}
avail_data_nb = fragment_index;
// data is in clear so nothing to do
if (fragment_index == n_data)
return true;
}
}

vec::Vector<T> avail_parity_ids(*(this->gf), n_data - avail_data_nb);

if (fragment_index < n_data) {
// finish with parities available
for (unsigned i = 0; i < n_outputs; i++) {
unsigned j = (type == FecType::SYSTEMATIC) ? n_data + i : i;
if (!missing_idxs[j]) {
decode_add_parities(fragment_index, i);
fragments_ids.set(fragment_index, j);
avail_parity_ids.set(parity_index, i);
fragment_index++;
parity_index++;
// stop when we have enough parities
if (fragment_index == n_data)
break;
}
}
// unable to decode
if (fragment_index < n_data)
return false;
}
fragments_ids.sort();

decode_build();

// vector of buffers storing data read from chunk
vec::Buffers<uint8_t> words_char(n_data, buf_size);
const std::vector<uint8_t*> words_mem_char = words_char.get_mem();
// vector of buffers storing data that are performed in encoding, i.e. FFT
vec::Buffers<T> words(n_data, pkt_size);
const std::vector<T*> words_mem_T = words.get_mem();

int output_len = n_data;

// vector of buffers storing data that are performed in decoding, i.e. FFT
vec::Buffers<T> output(output_len, pkt_size);
const std::vector<T*> output_mem_T = output.get_mem();
// vector of buffers storing data in output chunk
vec::Buffers<uint8_t> output_char(output_len, buf_size);
const std::vector<uint8_t*> output_mem_char = output_char.get_mem();

std::unique_ptr<DecodeContext<T>> context =
init_context_dec(fragments_ids, pkt_size, &output);

reset_stats_dec();

while (offset < block_size) {
size_t remain_size = block_size - offset;
size_t copy_size = std::min(pkt_size, remain_size);
// TODO: get number of read bytes -> true buf size
if (type == FecType::SYSTEMATIC) {
for (unsigned i = 0; i < avail_data_nb; i++) {
unsigned data_idx = fragments_ids.get(i);
memcpy(
reinterpret_cast<char*>(words_mem_char.at(i)),
data_bufs[data_idx] + offset * word_size,
copy_size * word_size);
}
}
for (unsigned i = 0; i < n_data - avail_data_nb; ++i) {
unsigned parity_idx = avail_parity_ids.get(i);
memcpy(
reinterpret_cast<char*>(words_mem_char.at(avail_data_nb + i)),
parities_bufs[parity_idx] + offset * word_size,
copy_size * word_size);
}

if (!cont)
break;

vec::pack<uint8_t, T>(
words_mem_char, words_mem_T, n_data, pkt_size, word_size);

timeval t1 = tick();
uint64_t start = hw_timer();
decode(*context, output, parities_props, offset, words);
uint64_t end = hw_timer();
uint64_t t2 = hrtime_usec(t1);

total_dec_usec += t2;
total_decode_cycles += (end - start) / word_size;
n_decode_ops++;

vec::unpack<T, uint8_t>(
output_mem_T, output_mem_char, output_len, pkt_size, word_size);

for (unsigned i = 0; i < n_data; i++) {
if (wanted_idxs[i]) {
memcpy(
data_bufs[i] + offset * word_size,
reinterpret_cast<char*>(output_mem_char.at(i)),
copy_size * word_size);
}
}
offset += pkt_size;
}

return true;
}

/**
* Perform a Lagrange interpolation to find the coefficients of the
* polynomial
Expand Down
Loading

0 comments on commit 04fa15b

Please sign in to comment.