Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

window_aggregate sdk example #4

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 257 additions & 0 deletions sdk_example/WindowAggregate.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
#include <string>
#include <vector>
#include <deque>
#include <graphlab/flexible_type/flexible_type.hpp>
#include <graphlab/sdk/toolkit_function_macros.hpp>
#include <graphlab/sdk/toolkit_class_macros.hpp>
#include <graphlab/sdk/gl_sarray.hpp>
#include <graphlab/sdk/gl_sframe.hpp>

using namespace graphlab;

class BASE_AGGREGATE {

protected:
flexible_type state;
bool _is_incremental;
size_t num_elements;
public:

BASE_AGGREGATE() {
initiate();
}
// init phase
virtual void initiate() {
state = 0;
_is_incremental = false;
num_elements = 0;
}
// process phase
virtual void add_element(const flexible_type & value) { }

// delete phase
virtual void remove_element(const flexible_type & value) { }

// emit phase
virtual flexible_type emit() { return state; }

virtual bool is_incremental() {
return _is_incremental;
}

virtual flex_type_enum output_type(const flex_type_enum & input_type) {
return input_type;
}
};

class AVG : public BASE_AGGREGATE {
public:
AVG(): BASE_AGGREGATE() {
initiate();
}
// init phase
void initiate() {
state = 0;
_is_incremental = true;
num_elements = 0;
}

// process phase
void add_element(const flexible_type & value) {
state += value;
num_elements++;
}

// delete phase
void remove_element(const flexible_type & value) {
state -= value;
num_elements--;
}

// emit phase
flexible_type emit() {
flex_float state_float = state;
return state_float / (num_elements) ;
}

flex_type_enum output_type(const flex_type_enum & input_type) {
return flex_type_enum::FLOAT;
}
};

class MAX : public BASE_AGGREGATE {
public:
MAX(): BASE_AGGREGATE() {
initiate();
}
// init phase
void initiate() {
num_elements = 0;
state = 0;
_is_incremental = false;
}

// process phase
void add_element(const flexible_type & value) {
if(num_elements == 0)
state = value;
else if(value > state)
state = value;
num_elements++;
}

// emit phase
flexible_type emit() {
return state;
}

};

class MIN : public BASE_AGGREGATE {
public:
MIN(): BASE_AGGREGATE() {
initiate();
}
// init phase
void initiate() {
num_elements = 0;
state = 0;
_is_incremental = false;
}

// process phase
void add_element(const flexible_type & value) {
if(num_elements == 0)
state = value;
else if(value < state)
state = value;
num_elements++;
}

// emit phase
flexible_type emit() {
return state;
}

};



class COUNT : public BASE_AGGREGATE {
public:
COUNT(): BASE_AGGREGATE() {
initiate();
}
// init phase
void initiate() {
state = 0;
_is_incremental = true;
}

// process phase
void add_element(const flexible_type & value) {
state += 1;
}

// delete phase
void remove_element(const flexible_type & value) {
state -= 1;
}

// emit phase
flexible_type emit() {
return state;
}

};

class SUM: public BASE_AGGREGATE {
public:
SUM(): BASE_AGGREGATE() {
initiate();
}
// init phase
void initiate() {
state = 0;
_is_incremental = true;
}

// process phase
void add_element(const flexible_type & value) {
state += value;
}

// delete phase
void remove_element(const flexible_type & value) {
state -= value;
}

// emit phase
flexible_type emit() {
return state;
}
};

size_t COUNT_AGG() {
return (size_t)new COUNT();
}

size_t SUM_AGG() {
return (size_t)new SUM();
}

size_t MIN_AGG() {
return (size_t)new MIN();
}

size_t MAX_AGG() {
return (size_t)new MAX();
}

size_t AVG_AGG() {
return (size_t)new AVG();
}

gl_sarray window_aggregate(std::function<size_t()> fn,gl_sarray & sa,size_t window_size) {
BASE_AGGREGATE * agg = reinterpret_cast<BASE_AGGREGATE*>(fn());
gl_sarray_writer writer(agg->output_type(sa.dtype()), 1);
if(window_size <= 0)
return writer.close();

std::deque<flexible_type> * mydeque = new std::deque<flexible_type>();

for (const auto &elem: sa.range_iterator()) {
mydeque->push_back(elem);
agg->add_element(elem);

if(mydeque->size() == window_size) {
flexible_type agg_value = agg->emit();
writer.write(agg_value,0);
if(agg->is_incremental()) {
flexible_type & oldest_value = mydeque->front();
agg->remove_element(oldest_value);
mydeque->pop_front();
} else {
mydeque->pop_front();
agg->initiate();
std::deque<flexible_type>::iterator it = mydeque->begin();
while (it != mydeque->end()){
agg->add_element(*it++);
}
}
}

}
delete agg;
delete mydeque;
return writer.close();
}

BEGIN_FUNCTION_REGISTRATION
REGISTER_FUNCTION(SUM_AGG);
REGISTER_FUNCTION(COUNT_AGG);
REGISTER_FUNCTION(MAX_AGG);
REGISTER_FUNCTION(MIN_AGG);
REGISTER_FUNCTION(AVG_AGG);
REGISTER_FUNCTION(window_aggregate,"aggregate_fn","sarray","window_size");
END_FUNCTION_REGISTRATION
Binary file removed sdk_example/__init__.pyc
Binary file not shown.