Chris Hager
Programming, Technology & More

Python Thread Pool

A thread pool is a group of pre-instantiated, idle threads which stand ready to be given work. These are often preferred over instantiating new threads for each task when there is a large number of (short) tasks to be done rather than a small number of long ones.

Suppose you want do download 1000s of documents from the internet, but only have resources for downloading 50 at a time. The solution is to utilize is a thread pool, spawning a fixed number of threads to download all the URLs from a queue, 50 at a time.

In order to use thread pools, Python 3.x includes the ThreadPoolExecutor class, and both Python 2.x and 3.x have multiprocessing.dummy.ThreadPool. multiprocessing.dummy replicates the API of multiprocessing but is no more than a wrapper around the threading module.

The downside of multiprocessing.dummy.ThreadPool is that in Python 2.x, it is not possible to exit the program with eg. a KeyboardInterrupt before all tasks from the queue have been finished by the threads.

In order to achieve an interruptable thread queue in Python 2.x and 3.x (for use in PDFx), I’ve build this code, inspired by stackoverflow.com/a/7257510. It implements a thread pool which works with Python 2.x and 3.x:

import sys
IS_PY2 = sys.version_info < (3, 0)

if IS_PY2:
    from Queue import Queue
else:
    from queue import Queue

from threading import Thread


class Worker(Thread):
    """ Thread executing tasks from a given tasks queue """
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception as e:
                # An exception happened in this thread
                print(e)
            finally:
                # Mark this task as done, whether an exception happened or not
                self.tasks.task_done()


class ThreadPool:
    """ Pool of threads consuming tasks from a queue """
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):
            Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """ Add a task to the queue """
        self.tasks.put((func, args, kargs))

    def map(self, func, args_list):
        """ Add a list of tasks to the queue """
        for args in args_list:
            self.add_task(func, args)

    def wait_completion(self):
        """ Wait for completion of all the tasks in the queue """
        self.tasks.join()


if __name__ == "__main__":
    from random import randrange
    from time import sleep

    # Function to be executed in a thread
    def wait_delay(d):
        print("sleeping for (%d)sec" % d)
        sleep(d)

    # Generate random delays
    delays = [randrange(3, 7) for i in range(50)]

    # Instantiate a thread pool with 5 worker threads
    pool = ThreadPool(5)

    # Add the jobs in bulk to the thread pool. Alternatively you could use
    # `pool.add_task` to add single jobs. The code will block here, which
    # makes it possible to cancel the thread pool with an exception when
    # the currently running batch of workers is finished.
    pool.map(wait_delay, delays)
    pool.wait_completion()

The queue size is similar to the number of threads (see self.tasks = Queue(num_threads)), therefore adding tasks with pool.map(..) and pool.add_task(..) blocks until a new slot in the Queue is available.

When you issue a KeyboardInterrupt by pressing Ctrl+C, the current batch of workers will finish and the program quits with the exception at the pool.map(..) step.


If you have suggestions or feedback, let me know via @metachris



Share this post if you like it <3
swirl

Posted by Chris Hager in #python  ·  11 Apr 2016


swirl