longling.lib.concurrency 源代码

# coding: utf-8
# 2020/4/17 @ tongshiwei

import asyncio
from concurrent.futures import ThreadPoolExecutor as StdTPE
# from multiprocessing.pool import Pool
from multiprocess.pool import Pool

__all__ = ["concurrent_pool", "ThreadPool", "ProcessPool", "CoroutinePool"]

SERIAL_LEVEL = 0
PROCESS_LEVEL = 1
THREAD_LEVEL = 2
COROUTINE = 3

_LEVEL = dict(
    d=SERIAL_LEVEL,
    debug=SERIAL_LEVEL,
    n=SERIAL_LEVEL,
    s=SERIAL_LEVEL,
    serial=SERIAL_LEVEL,
    t=THREAD_LEVEL,
    thread=THREAD_LEVEL,
    threading=THREAD_LEVEL,
    p=PROCESS_LEVEL,
    process=PROCESS_LEVEL,
    processing=PROCESS_LEVEL,
    c=COROUTINE,
    coro=COROUTINE,
    coroutine=COROUTINE,
)


[文档]class ThreadPool(StdTPE): def __init__(self, max_workers=None, thread_name_prefix='', ret: list = None): """ Initializes a new ThreadPoolExecutor instance. Parameters ---------- max_workers: The maximum number of threads that can be used to execute the given calls. thread_name_prefix: An optional name prefix to give our threads. """ super(ThreadPool, self).__init__(max_workers=max_workers, thread_name_prefix=thread_name_prefix) self._result = [] self.ret = ret
[文档] def submit(self, fn, *args, **kwargs): _result = super(ThreadPool, self).submit(fn, *args, **kwargs) self._result.append(_result) return _result
def __exit__(self, exc_type, exc_val, exc_tb): for r in self._result: if r.exception() is not None: raise r.exception() elif self.ret is not None: self.ret.append(r.result()) super(ThreadPool, self).__exit__(exc_type, exc_val, exc_tb)
[文档]class ProcessPool(Pool): def __init__(self, processes=None, *args, ret: list = None, **kwargs): super(ProcessPool, self).__init__(processes, *args, **kwargs) self._result = [] self.ret = ret def submit(self, func, *args, **kwargs): _result = self.apply_async(func, args, kwargs) self._result.append(_result) return _result def __exit__(self, exc_type, exc_val, exc_tb): for r in self._result: _r = r.get() if self.ret is not None: self.ret.append(_r) super(ProcessPool, self).__exit__(exc_type, exc_val, exc_tb)
class CoroutinePool(object): def __init__(self, *args, ret: list = None, **kwargs): self.pools = [] self.ret = ret self._e = None def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): loop = asyncio.get_event_loop() self._e = asyncio.gather(*[func(*arg, **kwd) for func, arg, kwd in self.pools]) loop.run_until_complete(self._e) loop.close() if self.ret is not None: for r in self._e.result(): self.ret.append(r) def submit(self, func, *args, **kwargs): return self.pools.append([func, args, kwargs]) class SerialPool(object): def __init__(self, *args, ret=None, **kwargs): self.ret = ret def __enter__(self): return self def submit(self, fn, *args, **kwargs): r = fn(*args, **kwargs) if self.ret is not None: self.ret.append(r) return r def __exit__(self, exc_type, exc_val, exc_tb): pass LEVEL_CLASS = { SERIAL_LEVEL: SerialPool, PROCESS_LEVEL: ProcessPool, THREAD_LEVEL: ThreadPool, COROUTINE: CoroutinePool, }
[文档]def concurrent_pool(level: str, pool_size: int = None, ret: list = None): """ Simple api for start completely independent concurrent programs: * thread * process * coroutine Examples -------- .. code-block :: python def pseudo(idx): return idx .. code-block :: python ret = [] with concurrent_pool("p", ret=ret) as e: # or concurrent_pool("t", ret=ret) for i in range(4): e.submit(pseudo, i) print(ret) ``[0, 1, 2, 3]`` """ _level = _LEVEL[level] return LEVEL_CLASS[_level](pool_size, ret=ret)