Skip to content

Commit

Permalink
allow kwargs to tqdm
Browse files Browse the repository at this point in the history
  • Loading branch information
MDM988 committed Aug 25, 2023
1 parent 637f00f commit a423431
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 52 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ You can `pip install tqdm-pathos`.

A pool with an automatically detected number of cores is set up by default. To choose the number of cores, use the `n_cpus` kwarg.
Alternatively, an existing pool can be used by passing it to the `pool` kwarg.
Extra `kwargs` can be passed to the `tqdm` progress bar using the `tqdm_kwargs` dictionary argument, e.g., `tqdm_kwargs = {'desc': 'pbar description'}`.

Function of a single iterable:
```
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import setup, find_packages

name = 'tqdm_pathos'
version = '0.2'
version = '0.3'

with open('README.md' ,'r') as f:
long_description = f.read().strip()
Expand Down
121 changes: 70 additions & 51 deletions tqdm_pathos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ def func_starmap(func_iterables_args_kwargs):
return func(*list(iterables) + args, **kwargs)


def async_pbar(result, n_tasks, chunksize):
def async_pbar(result, n_tasks, chunksize, tqdm_kwargs):

remaining = n_tasks

with tqdm(total=n_tasks) as pbar:
with tqdm(total=n_tasks, **tqdm_kwargs) as pbar:
while True:
if result.ready():
pbar.update(remaining)
Expand All @@ -50,7 +50,7 @@ def async_pbar(result, n_tasks, chunksize):
result.wait(1)


def map_or_starmap(which, func, iterable, args, kwargs):
def map_or_starmap(which, func, iterable, args, kwargs, tqdm_kwargs):

n_cpus = kwargs.pop('n_cpus', CPUs)
pool = kwargs.pop('pool', None)
Expand All @@ -65,92 +65,111 @@ def map_or_starmap(which, func, iterable, args, kwargs):
repeat(kwargs),
)

if which == 'map':
_func = func_map
elif which == 'starmap':
_func = func_starmap

if pool is None:
pool = pathos.pools._ProcessPool(n_cpus)
close_pool = True
else:
close_pool = False

chunksize = get_chunksize(n_tasks, len(pool._pool))
result = pool.map_async(_func, func_iterable_args_kwargs, chunksize)
result = pool.map_async(which, func_iterable_args_kwargs, chunksize)
if close_pool:
pool.close()
async_pbar(result, n_tasks, chunksize)
async_pbar(result, n_tasks, chunksize, tqdm_kwargs)
output = result.get()
if close_pool:
pool.join()

return output


def map(func, iterable, *args, n_cpus=CPUs, pool=None, **kwargs):
def map(
func,
iterable,
*args,
n_cpus=CPUs,
pool=None,
tqdm_kwargs={},
**kwargs,
):

return map_or_starmap(
'map', func, iterable, args,
func_map,
func,
iterable,
args,
{'n_cpus': n_cpus, 'pool': pool, **kwargs},
tqdm_kwargs,
)


def starmap(func, iterables, *args, n_cpus=CPUs, pool=None, **kwargs):
def starmap(
func,
iterables,
*args,
n_cpus=CPUs,
pool=None,
tqdm_kwargs={},
**kwargs,
):

return map_or_starmap(
'starmap', func, iterables, args,
func_starmap,
func,
iterables,
args,
{'n_cpus': n_cpus, 'pool': pool, **kwargs},
tqdm_kwargs,
)


def _map_or_starmap(which, func, iterable, args, kwargs):
# def _map_or_starmap(which, func, iterable, args, kwargs):

n_cpus = kwargs.pop('n_cpus', CPUs)
pool = kwargs.pop('pool', None)
# n_cpus = kwargs.pop('n_cpus', CPUs)
# pool = kwargs.pop('pool', None)

iterable = list(iterable)
n_tasks = len(iterable)
# iterable = list(iterable)
# n_tasks = len(iterable)

func_iterable_args_kwargs = zip(
repeat(func),
iterable,
repeat(list(args)),
repeat(kwargs),
)
# func_iterable_args_kwargs = zip(
# repeat(func),
# iterable,
# repeat(list(args)),
# repeat(kwargs),
# )

if which == 'map':
_func = func_map
elif which == 'starmap':
_func = func_starmap
# if which == 'map':
# _func = func_map
# elif which == 'starmap':
# _func = func_starmap

if pool is not None:
chunksize = get_chunksize(n_tasks, len(pool._pool))
return list(tqdm(
pool.imap(_func, func_iterable_args_kwargs, chunksize),
total=n_tasks,
))
# if pool is not None:
# chunksize = get_chunksize(n_tasks, len(pool._pool))
# return list(tqdm(
# pool.imap(_func, func_iterable_args_kwargs, chunksize),
# total=n_tasks,
# ))

with pathos.pools._ProcessPool(n_cpus) as pool:
chunksize = get_chunksize(n_tasks, n_cpus)
return list(tqdm(
pool.imap(_func, func_iterable_args_kwargs, chunksize),
total=n_tasks,
))
# with pathos.pools._ProcessPool(n_cpus) as pool:
# chunksize = get_chunksize(n_tasks, n_cpus)
# return list(tqdm(
# pool.imap(_func, func_iterable_args_kwargs, chunksize),
# total=n_tasks,
# ))


def _map(func, iterable, *args, n_cpus=CPUs, pool=None, **kwargs):
# def _map(func, iterable, *args, n_cpus=CPUs, pool=None, **kwargs):

return _map_or_starmap(
'map', func, iterable, args,
{'n_cpus': n_cpus, 'pool': pool, **kwargs},
)
# return _map_or_starmap(
# 'map', func, iterable, args,
# {'n_cpus': n_cpus, 'pool': pool, **kwargs},
# )


def _starmap(func, iterables, *args, n_cpus=CPUs, pool=None, **kwargs):
# def _starmap(func, iterables, *args, n_cpus=CPUs, pool=None, **kwargs):

return _map_or_starmap(
'starmap', func, iterables, args,
{'n_cpus': n_cpus, 'pool': pool, **kwargs},
)
# return _map_or_starmap(
# 'starmap', func, iterables, args,
# {'n_cpus': n_cpus, 'pool': pool, **kwargs},
# )

0 comments on commit a423431

Please sign in to comment.