From 3e36392b979c8be6f388b26cc1d07b9a2f4ad2ad Mon Sep 17 00:00:00 2001 From: soroush Date: Thu, 12 Mar 2015 00:30:47 -0700 Subject: [PATCH 1/4] window_aggregate added --- sdk_example/__init__.pyc | Bin 110 -> 110 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/sdk_example/__init__.pyc b/sdk_example/__init__.pyc index ed72b9032b325cd3f3961c783f795635a025f051..f6a0d38ca01ae1a6c69242e9080244574ac0caf5 100644 GIT binary patch delta 14 Vcmd1H<6!>G%eCK*F?1qF8UP@h1VjJ; delta 14 Vcmd1H<6!>G%Vo5uA!H&)8UP|I1hW7D From 2ad7ea00f367c1ae5c5dfc3716094a886cd9b639 Mon Sep 17 00:00:00 2001 From: soroush Date: Thu, 12 Mar 2015 00:35:17 -0700 Subject: [PATCH 2/4] window_aggregate --- sdk_example/WindowAggregate.cpp | 262 ++++++++++++++++++++++++++++++++ 1 file changed, 262 insertions(+) create mode 100644 sdk_example/WindowAggregate.cpp diff --git a/sdk_example/WindowAggregate.cpp b/sdk_example/WindowAggregate.cpp new file mode 100644 index 0000000..33097a9 --- /dev/null +++ b/sdk_example/WindowAggregate.cpp @@ -0,0 +1,262 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace graphlab; + +class BASE_AGGREGATE { + + protected: + flexible_type state; + bool _is_incremental; + size_t num_elements; + public: + + BASE_AGGREGATE() { + initiate(); + } + // init phase + virtual void initiate() { + state = 0; + _is_incremental = false; + num_elements = 0; + } + // process phase + virtual void add_element(const flexible_type & value) { } + + // delete phase + virtual void remove_element(const flexible_type & value) { } + + // emit phase + virtual flexible_type emit() { return state; } + + virtual bool is_incremental() { + return _is_incremental; + } + + virtual flex_type_enum output_type(const flex_type_enum & input_type) { + return input_type; + } +}; + +class AVG : public BASE_AGGREGATE { + public: + AVG(): BASE_AGGREGATE() { + initiate(); + } + // init phase + void initiate() { + state = 0; + _is_incremental = true; + num_elements = 0; + } + + // process phase + void add_element(const flexible_type & value) { + state += value; + num_elements++; + } + + // delete phase + void remove_element(const flexible_type & value) { + state -= value; + num_elements--; + } + + // emit phase + flexible_type emit() { + flex_float state_float = state; + return state_float / (num_elements) ; + } + + flex_type_enum output_type(const flex_type_enum & input_type) { + return flex_type_enum::FLOAT; + } +}; + +class MAX : public BASE_AGGREGATE { + public: + MAX(): BASE_AGGREGATE() { + initiate(); + } + // init phase + void initiate() { + num_elements = 0; + state = 0; + _is_incremental = false; + } + + // process phase + void add_element(const flexible_type & value) { + if(num_elements == 0) + state = value; + else if(value > state) + state = value; + num_elements++; + } + + // emit phase + flexible_type emit() { + return state; + } + +}; + +class MIN : public BASE_AGGREGATE { + public: + MIN(): BASE_AGGREGATE() { + initiate(); + } + // init phase + void initiate() { + num_elements = 0; + state = 0; + _is_incremental = false; + } + + // process phase + void add_element(const flexible_type & value) { + if(num_elements == 0) + state = value; + else if(value < state) + state = value; + num_elements++; + } + + // emit phase + flexible_type emit() { + return state; + } + +}; + + + +class COUNT : public BASE_AGGREGATE { + public: + COUNT(): BASE_AGGREGATE() { + initiate(); + } + // init phase + void initiate() { + state = 0; + _is_incremental = true; + } + + // process phase + void add_element(const flexible_type & value) { + state += 1; + } + + // delete phase + void remove_element(const flexible_type & value) { + state -= 1; + } + + // emit phase + flexible_type emit() { + return state; + } + +}; + +class SUM: public BASE_AGGREGATE { + public: + SUM(): BASE_AGGREGATE() { + initiate(); + } + // init phase + void initiate() { + state = 0; + _is_incremental = true; + } + + // process phase + void add_element(const flexible_type & value) { + state += value; + } + + // delete phase + void remove_element(const flexible_type & value) { + state -= value; + } + + // emit phase + flexible_type emit() { + return state; + } +}; + +size_t COUNT_AGG() { + return (size_t)new COUNT(); +} + +size_t SUM_AGG() { + return (size_t)new SUM(); +} + +size_t MIN_AGG() { + return (size_t)new MIN(); +} + +size_t MAX_AGG() { + return (size_t)new MAX(); +} + +size_t AVG_AGG() { + return (size_t)new AVG(); +} + +gl_sarray window_aggregate(std::function fn,gl_sarray & sa,size_t window_size) { + BASE_AGGREGATE * agg = reinterpret_cast(fn()); + gl_sarray_writer writer(agg->output_type(sa.dtype()), 1); + if(window_size <= 0) + return writer.close(); + + std::queue * queue = new std::queue(); + + for (const auto &elem: sa.range_iterator()) { + queue->push(elem); + agg->add_element(elem); + + if(queue->size() == window_size) { + flexible_type agg_value = agg->emit(); + writer.write(agg_value,0); + if(agg->is_incremental()) { + flexible_type & oldest_value = queue->front(); + agg->remove_element(oldest_value); + queue->pop(); + } else { + queue->pop(); + std::queue * queue2 = new std::queue(); + agg->initiate(); + while (!queue->empty()){ + flexible_type & oldest_value = queue->front(); + queue2->push(oldest_value); + agg->add_element(oldest_value); + queue->pop(); + } + delete queue; + queue = queue2; + } + } + + } + delete agg; + delete queue; + return writer.close(); +} + +BEGIN_FUNCTION_REGISTRATION +REGISTER_FUNCTION(SUM_AGG); +REGISTER_FUNCTION(COUNT_AGG); +REGISTER_FUNCTION(MAX_AGG); +REGISTER_FUNCTION(MIN_AGG); +REGISTER_FUNCTION(AVG_AGG); +REGISTER_FUNCTION(window_aggregate, "aggregate_fn","sarray","window_size"); +END_FUNCTION_REGISTRATION From ae4091e8c34ecbd5792b3295ac7ee01a93718e9e Mon Sep 17 00:00:00 2001 From: soroush Date: Thu, 12 Mar 2015 00:38:50 -0700 Subject: [PATCH 3/4] sdk_example/__init__.pyc deleted --- sdk_example/__init__.pyc | Bin 110 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 sdk_example/__init__.pyc diff --git a/sdk_example/__init__.pyc b/sdk_example/__init__.pyc deleted file mode 100644 index f6a0d38ca01ae1a6c69242e9080244574ac0caf5..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 110 zcmZSn%*(akjxjWu0SXv_v;z Date: Mon, 23 Mar 2015 13:04:51 -0700 Subject: [PATCH 4/4] addressing the comment: using deque --- sdk_example/WindowAggregate.cpp | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/sdk_example/WindowAggregate.cpp b/sdk_example/WindowAggregate.cpp index 33097a9..7de572a 100644 --- a/sdk_example/WindowAggregate.cpp +++ b/sdk_example/WindowAggregate.cpp @@ -1,6 +1,6 @@ #include #include -#include +#include #include #include #include @@ -218,37 +218,32 @@ gl_sarray window_aggregate(std::function fn,gl_sarray & sa,size_t wind if(window_size <= 0) return writer.close(); - std::queue * queue = new std::queue(); + std::deque * mydeque = new std::deque(); for (const auto &elem: sa.range_iterator()) { - queue->push(elem); + mydeque->push_back(elem); agg->add_element(elem); - if(queue->size() == window_size) { + if(mydeque->size() == window_size) { flexible_type agg_value = agg->emit(); writer.write(agg_value,0); if(agg->is_incremental()) { - flexible_type & oldest_value = queue->front(); + flexible_type & oldest_value = mydeque->front(); agg->remove_element(oldest_value); - queue->pop(); + mydeque->pop_front(); } else { - queue->pop(); - std::queue * queue2 = new std::queue(); + mydeque->pop_front(); agg->initiate(); - while (!queue->empty()){ - flexible_type & oldest_value = queue->front(); - queue2->push(oldest_value); - agg->add_element(oldest_value); - queue->pop(); + std::deque::iterator it = mydeque->begin(); + while (it != mydeque->end()){ + agg->add_element(*it++); } - delete queue; - queue = queue2; } } } delete agg; - delete queue; + delete mydeque; return writer.close(); } @@ -258,5 +253,5 @@ REGISTER_FUNCTION(COUNT_AGG); REGISTER_FUNCTION(MAX_AGG); REGISTER_FUNCTION(MIN_AGG); REGISTER_FUNCTION(AVG_AGG); -REGISTER_FUNCTION(window_aggregate, "aggregate_fn","sarray","window_size"); +REGISTER_FUNCTION(window_aggregate,"aggregate_fn","sarray","window_size"); END_FUNCTION_REGISTRATION