Skip to content

Commit

Permalink
Added new file for collector example
Browse files Browse the repository at this point in the history
  • Loading branch information
Attumm committed Aug 19, 2024
1 parent 823377f commit 044d0dd
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 30 deletions.
50 changes: 50 additions & 0 deletions examples/example_collector_magic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import os
import sys
import json
import time

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from meesee import Meesee # noqa: E402


box = Meesee()


@box.produce()
def produce_to_process_data(items):
return items


@box.worker_producer(output_queue="foobar")
def process_data(item, worker_id):
item = json.loads(item)
wait = item["wait"]
print(f"{worker_id} processing: {item} for {wait} seconds and send it too foobar")
item["name"] = f"{item['name']}_processed"
time.sleep(wait)
return [item,]


@box.collector(wait=1, until=5)
def foobar(items):
return items


if __name__ == '__main__':
workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10
wait = int(sys.argv[sys.argv.index('--wait') + 1]) if '--wait' in sys.argv else 5
items = [{"name": f"name{i}", "wait": wait} for i in range(10)]
print(f"sending {len(items)} tasks to {workers} workers")
print(f"simulate processing with with a wait of {wait}")
start = time.time()

produce_to_process_data(items)
box.push_button(workers, wait=0.1)

result = foobar()
print(result)
print("-----")
result = foobar()
print(result)
print(f"done with running took: {round(time.time()- start, 2)}")
58 changes: 28 additions & 30 deletions examples/example_decorator_magic.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import os
import sys
import json
import time

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

Expand All @@ -11,40 +9,40 @@
box = Meesee()


@box.produce()
def produce_to_process_data(items):
return items
@box.worker()
def foobar(item, worker_id):
print('func: foobar, worker_id: {}, item: {}'.format(worker_id, item))


@box.worker_producer(output_queue="foobar")
def process_data(item, worker_id):
item = json.loads(item)
wait = item["wait"]
print(f"{worker_id} processing: {item} for {wait} seconds and send it too foobar")
item["name"] = f"{item['name']}_processed"
time.sleep(wait)
return [item,]
@box.worker()
def name_of_the_function(item, worker_id):
print('func: name_of_the_function, worker_id: {}, item: {}'.format(worker_id, item))


@box.worker(queue="passed_name")
def passed_name_not_this_one(item, worker_id):
print('func: passed_name_not_this_one, worker_id: {}, item: {}'.format(worker_id, item))


@box.produce(queue="passed_name")
def produce_some_items(amount):
yield from range(amount)

@box.collector(wait=1, until=5)
def foobar(items):

@box.produce()
def produce_to_foo(items):
return items


@box.worker_producer(input_queue="foo", output_queue="foobar")
def foo(item, worker_id):
print(f"{worker_id} {item} foo pass it too foobar")
return [item,]


if __name__ == '__main__':
workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10
wait = int(sys.argv[sys.argv.index('--wait') + 1]) if '--wait' in sys.argv else 5
items = [{"name": f"name{i}", "wait": wait} for i in range(10)]
print(f"sending {len(items)} tasks to {workers} workers")
print(f"simulate processing with with a wait of {wait}")
start = time.time()

produce_to_process_data(items)
box.push_button(workers, wait=0.1)

result = foobar()
print(result)
print("-----")
result = foobar()
print(result)
print(f"done with running took: {round(time.time()- start, 2)}")
produce_some_items(10)
items = [{"name": f"name{i}"} for i in range(10)]
produce_to_foo(items)
box.push_button(workers, wait=1)

0 comments on commit 044d0dd

Please sign in to comment.