Multi-threading in Python

Programming with threads is one of the more difficult tasks in programming. The Python threading and Queue modules make this significantly easier, but it still takes some deliberation to use threads in an efficient way.

Python's thread and threading libraries use POSIX threads. The threading library is the higher level of the two and is therefore the one to use in your typical programming tasks. The Queue module provides a thread-safe mechanism for communicating between threads, like a combination list and semaphore.

POSIX threads are expensive, so it takes a little planning to know when to use them. Generally, the best uses of threads are for multiple tasks that cause side effects but do not depend on the state of other threads, such as output. An example of this is a program that writes out a large number of files with data from a database or a large data migration. Here is the general template for a threading class that encapsulates the actions to be taken.

Assume we have around 500 XML documents to download off of a remote server (via HTTP). Each is a large enough file that it warrants downloading several at a time. The server and the network can take a fair load, but we don't want to simulate a botnet attack or overload our local network connection, which would slow down each download to a crawl and drastically increase collisions and errors while downloading. Let's limit the number of files we download at once to 4. We start off with a function to download the files:

import urllib

def get_file(url):
try:
f = urllib.urlopen(url)
contents = f.read()
f.close()
return contents
except IOError:
print "Could not open document: %s" % url


So much for error handling, but you get the idea. Assuming our url is stored in a variable with the name url, to execute this function in another thread, we run:

import threading

thread = threading.Thread(target=get_file, args=(url,))


We can make that a little simpler in two ways. The object oriented way is to implement the threading.Thread class. We would then put the code get_file(url) in our run() method. This is useful for instances when the result of the function is required for later processing. If the results are not needed, we can simplify using the functional programming method and utilize a partial application:

from functools import partial, threading

thread = threading.Thread(target=partial(get_file, url))


While that method is more fun, let's use the OO method (no pun intended) since we want to do something with this data. Remember, we are downloading the file and storing it as a string, rather than simply downloading the file to the local file system. That implies we have more work to do after the download.

import urllib, threading

class FileGetter(threading.Thread):
def __init__(self, url):
self.url = url
self.result = None

def get_result(self):
return self.result

def run(self):
try:
f = urllib.urlopen(url)
contents = f.read()
f.close()
return contents
except IOError:
print "Could not open document: %s" % url


Now we have our Thread implementation. Note that instantiating an instance of FileGetter does not cause the thread to start. That is done with the start() method. However, we don't want all of the threads running at the same time, so we need to use the Queue module and a couple of helper functions to manage our list of files.

import threading
from Queue import Queue

def get_files(files):
def producer(q, files):
for file in files:
thread = FileGetter(file)
thread.start()
q.put(thread, True)

finished = []
def consumer(q, total_files):
while len(finished) < total_files:
thread = q.get(True)
thread.join()
finished.append(thread.get_result())

q = Queue(3)
prod_thread = threading.Thread(target=producer, args=(q, files))
cons_thread = threading.Thread(target=consumer, args=(q, len(files))
prod_thread.start()
cons_thread.start()
prod_thread.join()
cons_thread.join()


Let's take a look at what we did here. The first function, producer, accepts the queue and the list of files. For each file, it starts a new FileGetter thread. The last line is significant. We add the thread to the queue. The second parameter, boolean True, tells the put() method to block until a slot is available. Note that the thread stores before the blocking does. This means that even if the queue is full, the thread will have started. Because of this, we reduce our queue size to 3.

The second function, the consumer, reads items out of the queue, blocking until an item is available in the queue. Then comes the important part, thread.join(). This causes the consumer to block until the thread completes its execution. This line is what keeps the queue from emptying before the next thread has complete execution (and therefore starting more threads). The consumer uses the module-level variable, finished, to store the results of each thread's execution.

Last, we begin a thread for the producer and the consumer, start them, and then block until they have completed. Here is the complete code:

import urllib, threading
from Queue import Queue

class FileGetter(threading.Thread):
def __init__(self, url):
self.url = url
self.result = None

def get_result(self):
return self.result

def run(self):
try:
f = urllib.urlopen(url)
contents = f.read()
f.close()
return contents
except IOError:
print "Could not open document: %s" % url

def get_files(files):
def producer(q, files):
for file in files:
thread = FileGetter(file)
thread.start()
q.put(thread, True)

finished = []
def consumer(q, total_files):
while len(finished) < total_files:
thread = q.get(True)
thread.join()
finished.append(thread.get_result())

q = Queue(3)
prod_thread = threading.Thread(target=producer, args=(q, files))
cons_thread = threading.Thread(target=consumer, args=(q, len(files))
prod_thread.start()
cons_thread.start()
prod_thread.join()
cons_thread.join()


Of course, this approach is not perfect. A queue is FIFO - first in, first out. If one of the threads currently executing finishes before the thread ahead of it, we lose efficiency in that now we only have three files downloading at a time. However, the solution to that is a complex one and outside the scope of this article.