Skip to content

Commit

Permalink
extract the invalidation bits
Browse files Browse the repository at this point in the history
  • Loading branch information
danielhrisca committed Dec 20, 2024
1 parent 6415e71 commit 1704326
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 49 deletions.
5 changes: 4 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ find_package(
${SKBUILD_SABI_COMPONENT}
NumPy
)

# Add submodule libdeflate
add_subdirectory(ext/libdeflate)

python_add_library(cutils MODULE WITH_SOABI USE_SABI 3.9 src/asammdf/blocks/cutils.c)

target_link_libraries(cutils PRIVATE Python::NumPy)
target_link_libraries(cutils PRIVATE Python::NumPy libdeflate::libdeflate_static)

install(TARGETS cutils DESTINATION "asammdf/blocks")
189 changes: 141 additions & 48 deletions src/asammdf/blocks/cutils.c
Original file line number Diff line number Diff line change
Expand Up @@ -1380,6 +1380,34 @@ static PyObject *get_invalidation_bits_array(PyObject *self, PyObject *args)
}


static PyObject *get_invalidation_bits_array_C(uint8_t * data, int64_t cycles, int64_t invalidation_pos, int64_t invalidation_size)
{
if (invalidation_pos<0) {
return Py_None;
}
else {

PyObject *out;
uint8_t mask, *inptr, *outptr;

mask = (uint8_t ) (1 << (invalidation_pos % 8));
inptr = data + invalidation_pos / 8;

npy_intp dims[1];
dims[0] = cycles;
out = (PyArrayObject *)PyArray_EMPTY(1, dims, NPY_BOOL, 0);
outptr = (uint8_t *)PyArray_GETPTR1(out, 0);

for (int i=0; i<cycles; i++) {
*outptr++ = (*inptr) & mask ? 1 : 0;
inptr += invalidation_size;
}

return out;
}
}


typedef struct MyData {
uint8_t * inptr;
uint8_t * outptr;
Expand Down Expand Up @@ -1936,6 +1964,7 @@ typedef struct InfoBlock {
int64_t address;
int64_t original_size;
int64_t compressed_size;
int64_t block_limit;
Py_ssize_t param;
Py_ssize_t block_type;
Py_ssize_t idx;
Expand All @@ -1944,8 +1973,8 @@ typedef struct InfoBlock {
} InfoBlock, *PtrInfoBlock;


typedef struct SignalInfo{
int64_t byte_offset;
typedef struct SignalInfo {
int64_t byte_offset;
int64_t byte_count;
int32_t invalidation_bit_position;
uint8_t *data;
Expand All @@ -1958,10 +1987,9 @@ typedef struct SignalInfo{

typedef struct ProcessesingBlock {
uint8_t stop;
Py_ssize_t out_size;
uint8_t ** outptr;
int64_t cycles;
uint8_t * inptr;
int64_t cycles;
PtrInfoBlock block_info;
struct SignalInfo *signals;
Py_ssize_t signal_count;
Expand All @@ -1978,19 +2006,18 @@ typedef struct ProcessesingBlock {
void * get_channel_raw_bytes_complete_C_windows(void *lpParam )
{
Py_ssize_t count, byte_count, byte_offset, delta, thread_count, param, block_type, use_miniz;
int64_t original_size, compressed_size;
int64_t original_size, compressed_size, block_limit, cycles;
PtrProcessesingBlock thread_info;
thread_info = (PtrProcessesingBlock) lpParam;
PtrInfoBlock block_info;
char * deflate_lib_path;

Py_ssize_t signal_count, thread_idx, record_size, in_size, cols, lines;

byte_offset = thread_info->byte_offset;
byte_count = thread_info->byte_count;
record_size = thread_info->record_size;
use_miniz = thread_info->use_miniz;
deflate_lib_path = thread_info->deflate_lib_path;
deflate_lib_path = thread_info->deflate_lib_path;

int result;

Expand Down Expand Up @@ -2025,6 +2052,9 @@ void * get_channel_raw_bytes_complete_C_windows(void *lpParam )
param = thread_info->block_info->param;
block_type = thread_info->block_info->block_type;

//printf("Thread %d original_size=%d compressed_size=%d param=%d\n",
//thread_info->idx, original_size, compressed_size, param);

cols = param;
lines = original_size / cols;

Expand Down Expand Up @@ -2064,23 +2094,42 @@ void * get_channel_raw_bytes_complete_C_windows(void *lpParam )
pUncomp = outptr;
}

outptr = (uint8_t *) malloc(count * byte_count);

read = pUncomp + byte_offset;
write = outptr;
if (thread_info->block_info->block_limit >= 0) {
cycles = thread_info->block_info->block_limit / record_size ;
thread_info->cycles = cycles;
}
else {
cycles = count;
thread_info->cycles = count;
}

for (Py_ssize_t i = 0; i < count; i++)
{
memcpy(write, read, byte_count);
write += byte_count;
read += record_size;
//printf("Thread %d cycles=%d block_limit=%d\n",
//thread_info->idx, cycles, thread_info->block_info->block_limit);

for (int i =0; i<thread_info->signal_count; i++) {
byte_offset = thread_info->signals[i].byte_offset;
byte_count = thread_info->signals[i].byte_count;

//printf("Thread %d ch=%d byte_offset=%d byte_count=%d\n",
//thread_info->idx, i, byte_offset, byte_count);

outptr = (uint8_t *) malloc(cycles * byte_count);

read = pUncomp + byte_offset;
write = outptr;

for (Py_ssize_t j = 0; j < cycles; j++)
{
memcpy(write, read, byte_count);
write += byte_count;
read += record_size;
}
thread_info->outptr[i] = outptr;
}

free(pUncomp);

thread_info->outptr = outptr;
thread_info->out_size = count * byte_count;

SetEvent(thread_info->bytes_ready);
}

Expand All @@ -2092,16 +2141,16 @@ void * get_channel_raw_bytes_complete_C_windows(void *lpParam )

static PyObject *get_channel_raw_bytes_complete_windows(PyObject *self, PyObject *args)
{
Py_ssize_t info_count, signal_count, thread_count=11, use_miniz=0;
PyObject *data_blocks_info, *signals, *out = NULL, *item, *ref;
Py_ssize_t info_count, signal_count, signal_and_invalidation_count, thread_count=11, use_miniz=0;
PyObject *data_blocks_info, *signals, *out = NULL, *item, *ref, *obj;

char *outptr, *file_name, *deflate_lib_path=NULL;
char *read_pos = NULL, *write_pos = NULL;
Py_ssize_t position = 0, record_size = 0,
cycles, step = 0;
Py_ssize_t isize = 0, offset = 0,byte_count, byte_offset;
cycles, step = 0, invalidation_bytes;
Py_ssize_t isize = 0, offset = 0;
int is_list;
int64_t byte_offset, byte_count;
int64_t byte_offset, byte_count, new_cycles;
int32_t invalidation_bit_position;

PtrInfoBlock block_info;
Expand All @@ -2112,10 +2161,10 @@ static PyObject *get_channel_raw_bytes_complete_windows(PyObject *self, PyObject
FILE *fptr;
uint8_t *buffer;
int result;
int is_list;

if (!PyArg_ParseTuple(args, "OOsnnns|nn",
&data_blocks_info, &signals, &file_name, &cycles, &record_size, &invalidation_bytes, &deflate_lib_path, &thread_count, &use_miniz))
if (!PyArg_ParseTuple(args, "OOsnnns|nn",
&data_blocks_info, &signals, &file_name, &cycles, &record_size, &invalidation_bytes,
&deflate_lib_path, &thread_count, &use_miniz))
{
return NULL;
}
Expand All @@ -2129,23 +2178,25 @@ static PyObject *get_channel_raw_bytes_complete_windows(PyObject *self, PyObject
dwThreadIdArray = (DWORD *) malloc(sizeof(DWORD) * thread_count);
block_ready = (HANDLE *) malloc(sizeof(HANDLE) * thread_count);
bytes_ready = (HANDLE *) malloc(sizeof(HANDLE) * thread_count);

PtrSignalInfo signal_info;

is_list = PyList_Check(signals);
if (is_list) {
signal_count = PyList_Size(signals);
}
else {
signal_count = PyTuple_Size(signals);
}

if (invalidation_bytes) {
signal_info = (PtrSignalInfo) malloc(sizeof(SignalInfo) * (signal_count + 1));
}
else {
signal_info = (PtrSignalInfo) malloc(sizeof(SignalInfo) * signal_count);
}
signal_and_invalidation_count = signal_count +1;
signal_info = (PtrSignalInfo) malloc(sizeof(SignalInfo) * (signal_count + 1));
}
else {
signal_and_invalidation_count = signal_count;
signal_info = (PtrSignalInfo) malloc(sizeof(SignalInfo) * signal_count);
}
for (int i=0; i<signal_count; i++) {
if (is_list) {
obj = PyList_GetItem(signals, i);
Expand All @@ -2164,7 +2215,7 @@ static PyObject *get_channel_raw_bytes_complete_windows(PyObject *self, PyObject
byte_count = PyLong_AsLongLong(PyTuple_GetItem(obj, 1));
invalidation_bit_position = PyLong_AsLong(PyTuple_GetItem(obj, 2));
}

obj = PyByteArray_FromStringAndSize(NULL, byte_count * cycles);

signal_info[i].byte_offset = byte_offset;
Expand All @@ -2175,11 +2226,11 @@ static PyObject *get_channel_raw_bytes_complete_windows(PyObject *self, PyObject
signal_info[i].obj = obj;

}

if (invalidation_bytes) {
obj = PyByteArray_FromStringAndSize(NULL, invalidation_bytes * cycles);
signal_info[signal_count].byte_offset = record_size - invalidation_bytes;
signal_info[signal_count].byte_count = invalidation_bytes;
obj = PyByteArray_FromStringAndSize(NULL, invalidation_bytes * cycles);
signal_info[signal_count].byte_offset = record_size - invalidation_bytes;
signal_info[signal_count].byte_count = invalidation_bytes;
signal_info[signal_count].invalidation_bit_position = -1;
signal_info[signal_count].data = (uint8_t *) PyByteArray_AsString(obj);
signal_info[signal_count].data_position = signal_info[signal_count].data;
Expand Down Expand Up @@ -2218,15 +2269,16 @@ static PyObject *get_channel_raw_bytes_complete_windows(PyObject *self, PyObject
);

thread_info[i].block_info = NULL;
thread_info[i].byte_count = byte_count;
thread_info[i].byte_offset = byte_offset;
thread_info[i].signals = signal_info;
thread_info[i].signal_count = signal_and_invalidation_count;
thread_info[i].record_size = record_size;
thread_info[i].stop = 0;
thread_info[i].idx = i;
thread_info[i].use_miniz = use_miniz;
thread_info[i].deflate_lib_path = deflate_lib_path;
thread_info[i].block_ready = block_ready[i];
thread_info[i].bytes_ready = bytes_ready[i];
thread_info[i].outptr = (uint8_t **) malloc(sizeof(uint8_t *) * signal_and_invalidation_count);
}

for (int i=0; i<info_count; i++) {
Expand Down Expand Up @@ -2274,6 +2326,17 @@ static PyObject *get_channel_raw_bytes_complete_windows(PyObject *self, PyObject

block_info[i].param = PyLong_AsSsize_t(ref);
Py_XDECREF(ref);
ref = PyObject_GetAttrString(
item,
"block_limit");
if (ref == Py_None) {
block_info[i].block_limit = -1;
}
else {
block_info[i].block_limit = PyLong_AsLongLong(ref);
}
Py_XDECREF(ref);

}

out = PyByteArray_FromStringAndSize(NULL, cycles * byte_count);
Expand Down Expand Up @@ -2305,9 +2368,13 @@ static PyObject *get_channel_raw_bytes_complete_windows(PyObject *self, PyObject
if (i >= thread_count) {
WaitForSingleObject(bytes_ready[position], INFINITE);
ResetEvent(bytes_ready[position]);
memcpy(outptr, thread->outptr, thread->out_size);
outptr += thread->out_size;
free(thread->outptr);
new_cycles = thread->cycles;
thread->cycles = 0;
for (int j=0; j<signal_and_invalidation_count; j++) {
memcpy(signal_info[j].data_position, thread->outptr[j], signal_info[j].byte_count * new_cycles);
signal_info[j].data_position += signal_info[j].byte_count * new_cycles;
free(thread->outptr[j]);
}
free(thread->inptr);
}

Expand All @@ -2329,10 +2396,14 @@ static PyObject *get_channel_raw_bytes_complete_windows(PyObject *self, PyObject

WaitForSingleObject(bytes_ready[position], INFINITE);
ResetEvent(bytes_ready[position]);
memcpy(outptr, thread->outptr, thread->out_size);
outptr += thread->out_size;
free(thread->outptr);
free(thread->inptr);
new_cycles = thread->cycles;
thread->cycles = 0;
for (int j=0; j<signal_and_invalidation_count; j++) {
memcpy(signal_info[j].data_position, thread->outptr[j], signal_info[j].byte_count * new_cycles);
signal_info[j].data_position += signal_info[j].byte_count * new_cycles;
free(thread->outptr[j]);
}

thread->stop = 1;

SetEvent(block_ready[position]);
Expand All @@ -2348,12 +2419,34 @@ static PyObject *get_channel_raw_bytes_complete_windows(PyObject *self, PyObject
CloseHandle(bytes_ready[i]);
}

for (int i=0; i<thread_count; i++) free(thread_info[i].outptr);

free(block_info);
free(thread_info);
}

fclose(fptr);

out = PyTuple_New(signal_count);

uint8_t * invalidation_data = NULL;
if (invalidation_bytes) {
invalidation_data = signal_info[signal_count].data;
}

for (int i=0; i<signal_count; i++) {
ref = PyTuple_New(2);
PyTuple_SetItem(ref, 0, signal_info[i].obj);
if (invalidation_data) {
PyTuple_SetItem(ref, 1, get_invalidation_bits_array_C(invalidation_data, cycles, signal_info[i].invalidation_bit_position, invalidation_bytes));
}
else {
PyTuple_SetItem(ref, 1, Py_None);
}
PyTuple_SetItem(out, i, ref);
}

free(signal_info);
free(hThreads);
free(block_ready);
free(bytes_ready);
Expand Down

0 comments on commit 1704326

Please sign in to comment.