A thread pool is a group of pre-instantiated, idle threads which stand ready to be given work. These are often preferred over instantiating new threads for each task when there is a large number of (short) tasks to be done rather than a small number of long ones.
Suppose you want do download 1000s of documents from the internet, but only have resources for downloading 50 at a time. The solution is to utilize is a thread pool, spawning a fixed number of threads to download all the URLs from a queue, 50 at a time.
In order to use thread pools, Python 3.x includes the ThreadPoolExecutor class, and both Python 2.x and 3.x have
multiprocessing.dummy replicates the API of multiprocessing but is no more than a wrapper around the threading module.
The downside of
multiprocessing.dummy.ThreadPool is that in Python 2.x, it is not possible to exit the program with eg. a KeyboardInterrupt before all tasks from the queue have been finished by the threads.
In order to achieve an interruptable thread queue in Python 2.x and 3.x (for use in PDFx), I’ve build this code, inspired by stackoverflow.com/a/7257510. It implements a thread pool which works with Python 2.x and 3.x:
import sys IS_PY2 = sys.version_info < (3, 0) if IS_PY2: from Queue import Queue else: from queue import Queue from threading import Thread class Worker(Thread): """ Thread executing tasks from a given tasks queue """ def __init__(self, tasks): Thread.__init__(self) self.tasks = tasks self.daemon = True self.start() def run(self): while True: func, args, kargs = self.tasks.get() try: func(*args, **kargs) except Exception as e: # An exception happened in this thread print(e) finally: # Mark this task as done, whether an exception happened or not self.tasks.task_done() class ThreadPool: """ Pool of threads consuming tasks from a queue """ def __init__(self, num_threads): self.tasks = Queue(num_threads) for _ in range(num_threads): Worker(self.tasks) def add_task(self, func, *args, **kargs): """ Add a task to the queue """ self.tasks.put((func, args, kargs)) def map(self, func, args_list): """ Add a list of tasks to the queue """ for args in args_list: self.add_task(func, args) def wait_completion(self): """ Wait for completion of all the tasks in the queue """ self.tasks.join() if __name__ == "__main__": from random import randrange from time import sleep # Function to be executed in a thread def wait_delay(d): print("sleeping for (%d)sec" % d) sleep(d) # Generate random delays delays = [randrange(3, 7) for i in range(50)] # Instantiate a thread pool with 5 worker threads pool = ThreadPool(5) # Add the jobs in bulk to the thread pool. Alternatively you could use # `pool.add_task` to add single jobs. The code will block here, which # makes it possible to cancel the thread pool with an exception when # the currently running batch of workers is finished. pool.map(wait_delay, delays) pool.wait_completion()
The queue size is similar to the number of threads (see
self.tasks = Queue(num_threads)), therefore adding tasks with
pool.add_task(..) blocks until a new slot in the Queue is available.
When you issue a KeyboardInterrupt by pressing Ctrl+C, the current batch of workers
will finish and the program quits with the exception at the
If you have suggestions or feedback, let me know via @metachris