Mohegan SkunkWorks

Sat, 12 Sep 2009 17:03:23 EST

Threading the Sieve in Python

This is the first of two posts on threading and multiprocessing in Python. In this post I'll explore the thread module and in the second post I'll look at Python's multiprocessing module. My starting point is the multi-threaded implementation of the Sieve of Erasthones found in this tutorial on multi-threading in Python (pdf).

Threading a compute-bound algorithm, like the Sieve consists of subdividing of the main task into autonomous sub-tasks which share as little state as possible. Having no shared state eliminates the overhead that inevitably comes with locking. It turns out that Python is not very good at multi-threading compute-bound processes. This is not a surprise. CPython has a global interpretor lock (GIL) which prevents threads from running concurrently.

Regardless, there are other lessons I learned when multi-threading the Sieve algorithm. One is that sharing state between threads may be unavoidable to achieve reasonable performance. In fact, if you don't share state, performance can become predictable worse as the number of threads of execution increases.

The other is that locking can have a surprising impact on performance. It's not just the cost of locking per se, but the effect locking has on the distribution of work between the various threads.

Sieve of Erasthones

The Sieve of Erasthones is a way to find all the prime numbers smaller than N. You start with an array of size N, with all slots initialized to 1 (i.e. to 'true').
You start moving down the array, and if the value of the slot at your current position is 'true' (i.e. 1), you set the slots at multiples of your current position index to false (i.e. 0). At each position there only one transition, from true (1) to false (0), and not vice-versa. The largest multiplier for a position with index i is N/i. You're done when you hit the slot with index equal to the square root of N. Note, that as you make your way through the array, you zero out less and less positions.

For example, if you want to find all the primes up to 10, you start at position index 2, and zero out 4,6,8 and so on. You move to 3, and zero out 6 and9. since N = 10, you stop here. The next non-zero position is 5, as 4 was already set to false previously. All slots still flagged as true (1) are primes : 2, 3, 5 and 7.

Testing Platform

All tests are performed on a Toshiba A215-S4747 with an AMD Turion 64 X2 (dual-core) processor. The operating system is Ubuntu 8.04.3. I replaced the Python version shipped with Ubuntu with version 2.6.2.

For the Jython test I installed Jython 2.5, from the Jython web site in stand-alone mode. IronPython 1.1.1. was installed as well.

Each implementation I'm going to discuss below is part of prime_share.py in the blog-code package on github.

prime_share.py is designed to perform side-by-side comparisons of the various implementations. It allows you to specify a range of threads used in the distribution of the calculation. Timing is done using Python's timeit module, with garbage collection turned off.

Various Implementations

This section discusses four different multi-threaded implementations of the Sieve.

  • main_orig: From the tutorial (pdf).

  • main_nolocks: Drastically reduces locking.

  • main_nolocks_alt: Splits the sieve across threads.

  • main_nolocks_alt2: Distributes the work more equitably amongst the threads.

The main difference between the implementations is the amount of locking, and the way work is distributed across multiple threads.

I'll start out by showing the result of a performance test of each of these algorithms. The x-axis shows the number of threads used in the run. Each run calculates the number of primes up to 10000. The time it took to run this calculation 40 times in sucession is shown on the y-axis. Each data set is labelled 'main_xyz' where 'xyz' identifies the implementation.

You would expect the run time to decrease as the number of threads increases. That's clearly not the case. The performance the main_orig implementation is extremenly erratic, and the performance of the main_nolock_alt implementation decreases steadily as the number of threads increases.
The performance of the remaining two implementations is unaffected by the number of threads.

What's the reason for this ? Well, below are four sections discussing these four implementations.

main_orig : the original implementation

As I mentioned before, the implementation of the Sieve as shown in the tutorial is the starting point for subsequent implementations. The full code I used in my test is shown below :

def count_primes(prime) :  
	p = reduce(lambda x, y: x + y, prime) - 2  
	return p  
 
def dowork_orig(tn): # thread number tn  
	global n,prime_global,nexti_global,nextilock,nstarted,nstartedlock,donelock  
    donelock[tn].acquire()  
    nstartedlock.acquire()  
    nstarted += 1  
    nstartedlock.release()  
    lim = math.sqrt(n)  
 
    while 1:  
       nextilock.acquire()  
       k = nexti_global  
       nexti_global += 1  
       nextilock.release()  
       if k > lim: break  
       if prime_global[k]:  
           r = n / k  
           for i in range(2,r+1):  
               prime_global[i*k] = 0  
 
    donelock[tn].release()  
 
def main_orig(top, nthreads):  
   global n,prime_global,nexti_global,nextilock,nstarted,nstartedlock,donelock  
 
   n        = int(top)  
   nthreads  = int(nthreads)  
   prime_global = (n+1) * [1]  
 
   nstarted = 0  
   nexti_global = 2  
 
   nextilock = thread.allocate_lock()  
   nstartedlock = thread.allocate_lock()  
   donelock = []  
   for i in range(nthreads):  
       d = thread.allocate_lock()  
       donelock.append(d)  
       thread.start_new_thread(dowork_orig,(i,))  
   while nstarted < nthreads: pass  
   for i in range(nthreads):  
       donelock[i].acquire()  
 
   return count_primes(prime_global)    

The variable n is the upper limit of the search and nthreads is the number of threads. The global variable prime_global is the sieve, and nexti_global is its index.

The function dowork_orig implements the sieve algorithm along the lines mentioned earlier.

All the global variables are shared amongst the threads. There are three locking variables.
The first one is nstartedlock, which is used to set a counter. The counter is used in the main to wait for all threads to start.

The second one is the donelock array. This is used to implement a mechanism to wait for all the threads to finish, similar to 'join' in other threading packages. The number of entries is equal to the number of threads.

The last lock is nexti_lock. It's purpose is to protect access to the global index variable so that each thread processes a unique index.

Access to the 'sieve' variable prime_global is not protected by lock. There is no need to, since the value of each slot can only go from 1 (true) to 0 (false). If two threads access the same slot simultaneously they can't reach conflicting conclusions of it's final state. Obviously, the GIL effectively prevents any of this from happening, but it's good the know that if it didn't the algorithm wouldn't break. Even out-of-order execution where a higher index is processed first, is not going to lead to different final state (provided all indexes are processed).

As you can see in the side-by-side comparison the performance of this implementation is very erratic, and shows no obvious dependence on the number threads.

Here I show five data sets generated by running the main_orig implementation with the same input parameters as in the side-by-side comparison above. Note the how the speed varies dramatically in each data set, and between data sets.

For a single thread the performance in of all four data sets is roughly the same. The small differences are probably due to other activity on the box taking some time away from the test run. In general the calculation time increases as the number of threads increases, except for the first ('red') data set.

The reason for the wide variation in performance I believe are the nstartedlock and nextilock.

Each thread tries to acquire nstartedlock at startup. My suspicion is that this lock acquisition varies
the start of the threads sufficiently to change to way work is balanced between the threads for each subsequent run, leading to the dramatic variation in speed. That's because the amount of work a thread performs depends on the index it's processing. For example, let's say that we have run of three threads. The first thread is active, and the other two threads are dormant. The first thread will process indices two and three, and therefore do most of the work. When the other threads wake up, they 'll do less work. On a second run, a context switch occurs sooner and the second thread works on index three, leading to a more equitable distribution of the work.

In addition, there is contention on nextilock. As I pointed out, for higher index values, less work is done. So when the prime check return false (as it would most of the time for these values), the thread will try to acquire the lock again which puts it in contention with other threads. The way the lock is hit by a particular thread probably varies bit from run to run, and this results in the erratic performance profile.

The main difference between this implementation and the three others discussed below is the removal of this lock, and the lock around the index variable. As you can see this has dramatic impact on performance.

main_nolocks: remove locking; load balance naively

In this implementation I've removed the nstartedlock as well as the nextilock lock. The donelock array is kept in place, since the thread package has no 'join' mechanism.

Here's the complete code for the variation. The other two main_nolocks* which I' m discussing below are very similar.

def dowork2(n, nexti_ns, prime_nl) :  
 
   k     = nexti_ns[0]  
   lim   = nexti_ns[1]  
   if nexti_ns[0] > nexti_ns[1] :  
       raise "boundaries out-of-order"  
 
   while 1 :  
 
       if not (k < lim) : break  
 
       if prime_nl[k] == 1 :  
           r = n / k  
           for i in range(2, r+1) :  
               prime_nl[i*k] = 0  
 
       k   = k + 1  
 
 
return prime_nl  
 
def dowork_th(tn, donelock, n, nexti_ns) :  
   global prime_nl  
   prime_nl = dowork2(n, nexti_ns, prime_nl)  
   donelock.release()  
 
 
def load_balance(s, th) :  
 
   len_s = len(s)  
   if len_s == 0 :  
       return [ s ]  
 
   base = len_s / th  
   rem  = len_s - base * th  
   K = map(lambda i : i * base, range(1, th+1))  
   t = range(1, rem + 1 )  + (th - rem )*[rem]  
   K = map(lambda p : p[0] + p[1], zip(K, t))  
   K = zip([0] + K, K)  
   last = s[len_s - 1]  
   s.append(last+1)  
   K = map(lambda p : (s[p[0]], s[p[1]]), K)  
   return K  
 
def start_th(fn, args) :  
   return  thread.start_new_thread(fn, args)  
 
def main_nolocks(top, nthreads) :  
   global prime_nl  
   n             = int(top)  
   nthreads      = int(nthreads)  
 
   prime_nl      = (n + 1)  * [1]  
   donelock      = map(lambda l : l.acquire() and l,  
                       map(lambda i : thread.allocate_lock(), range(nthreads)))  
 
   lim   = int(math.sqrt(n)) + 1  
   nexti_ns = range(2, lim, 1)  
 
   B = load_balance(nexti_ns, nthreads)  
 
   map(lambda i : start_th(dowork_th, (i, donelock[i], n, B[i])),  
       range(nthreads))  
 
   map(lambda i : donelock[i].acquire(), range(nthreads) )  
 
   return count_primes(prime_nl)    

In the original implementation the nexti_global variable was shared between the threads and used to balance the load between them. In this version that's replaced by the load_balance routine.

load_balance takes as its input an array and the number of threads and returns a list of pairs which represent the start and end point of the sub arrays each thread will work on.

For example, if we want to know the number of primes smaller than 101, the indexes we consider in the sieve algorithm would run from 2 to 10. If the number of threads is 3, load_balance would return the following array : [ (2, 5), (5, 8), (8, 11) ]

This is naive, because the thread working on the first pair will do a lot more work than the one assigned the last pair.

The difference in performance with the original implementation is quite dramatic.

The big difference in performance even for a single thread between this and the previous version must be due to the difference in locking.

The original implementation acquires a lock at the startup of each thread and uses locks to control access to the global index counter. In this implementation these locks have been removed. The benefit this removal is quite obvious from the graph.

The effect of the GIL is also quite obvious: The performance does not depend on the number of threads at all.

main_nolocks_alt: remove locking, split the sieve

This version is a minor variation of the previous one. Previously the range of indices from 2 to sqrt(N) was split up between the threads.

In this version each thread processes the full index range, but the sieve array is split between the threads: One thread works on the first part of the sieve, the second on a second part and so on.

For example, if the size of the sieve is 100, and we have three threads, the first thread works on the sieve up to index 33, the second thread takes indexes 34 to 66 and thread 3 takes the rest.

The dowork routine changes a little bit :

def dowork3(n, irange, prime_nla) :  
   k     = 2  
   lim   = int(math.sqrt(n)) + 1  
   istart, iend = irange  
   while 1 :  
 
       if not (k < lim)  : break  
       if not (k < iend) : break  
 
       if k < istart :  
            s = (istart / k ) + 1  
            r = (iend / k) + 1  
            for i in range(s, r) :  
               prime_nla[i*k] = 0  
       elif prime_nla[k] == 1 :  
           assert k >= istart and k <= iend  
           s = 2  
           r = (iend / k) + 1  
           for i in range(s, r) :  
               prime_nla[i*k] = 0  
 
       k   = k + 1  
 
return prime_nla 

If the index is smaller than the starting index of the part of the sieve array under consideration, we can just zero out multiples of the index. If that's not case we proceed as before, but stop at the upper index of the array if it's smaller than sqrt(N).

If you look at the results , notice that the single-threaded performance of this and the previous version are the same. That's to be expected as both versions perform the same amount of work. Less expected is that the performance of this version decreases steadily as the number of threads increases.

The reason is that the amount of work performed across all threads (i.e. on the process level) increases as the number of threads increases. That's because threads working on the 'higher' part of the sieve can't take advantage of the primes 'discovered' by the thread working on the first, 'lower' part of the sieve.

Suppose we start with N = 100, and two threads. The first thread works on the sieve range from 0 to 50. It performs the basic sieve operations for the case N = 50. The second thread basically zeros out multiples of values in the range 2 to 11. This thread can't probe the lower part of the sieve to see if it's working with a prime, so it needs to calculate multiples of every element in the range. The more disconnected pieces, the more extra work is done.

In the absence of the GIL, the additional work per thread could be balanced by the performance gain from distributing the load across multiple threads. I"ll revisit this when tmultiprocessing module is discussed, as it doesn't use a GIL.

main_nolocks_alt2: remove locking, load balance more accurately

In this last version I try to distribute the work more evenly amongst the threads. This is in contrast with to just splitting the range in pieces, and assigning each thread a piece. This leads to the thread assigned the lowest index range doing most of the work.

To accomplish a more equitable distribution, I estimate the number of operations per index i. I then assign each thread a set of indices so that the total number of operations per thread varies very little.

An upper limit for the number of operations for index i for a sieve of length N is :

1 + N/i - i 

This assumes that the sieve array is not shared amongst the threads. If it is, than we're over-counting the number of operations, since it also counts operations on indices that are multiples of each other.

For example, when N = 200, for i = 2 the number of operations is 99. For i = 4 the that number is in fact 0 as all multiples of four are also multiples of two.

Here's the code fragment where the load balancing takes place :

def smp_load_balance(th , n) :  
 
   def operations(t) :  
       return int((n / t) + 1 - t)  
 
   def find_min(thr_alloc) :  
       min, lst = thr_alloc[0]  
       if min == 0 :  
           return 0  
       midx = 0  
       for index in range(1, len(thr_alloc)) :  
           count, lst = thr_alloc[index]  
           if count < min :  
               min   = count  
               midx  = index  
       return midx  
 
   lim           = int(math.sqrt(n)) + 1  
   nexti_lb      = range(2, lim, 1)  
 
   if th < 2 :  
       return [nexti_lb]  
 
   thr_allocs = map(lambda i : (0, [] ), range(th))  
   Z = map(operations, nexti_lb)  
 
   L = zip(map(operations, nexti_lb), nexti_lb)  
 
   for i in L :  
       ops, index = i  
       mindex = find_min(thr_allocs)  
       cnt, lst = thr_allocs[mindex]  
       cnt += ops  
       lst.append(index)  
       thr_allocs[mindex] = (cnt, lst)  
 
   return map(lambda p: p[1], thr_allocs) 

The distribution between the threads is done as follows: For each index, estimate the number of operations using the equation above. Then, assign the index to the thread with the least amount of work already assigned to it.

For example, for N = 200 the estimated number of operations per index assuming no sharing is shown in this graph .

For three threads of execution the distribution of the sieve indices given

by smp_load_balance is :

  • thread 1 : [2, 9, 12, 14], with a total number of operations of 120.

  • thread 2 : [3, 6, 8, 11], with a total number of operations of 119.

  • thread 3 : [4, 5, 7, 10, 13], with a total number of operations of 123.

When the sieve is shared, the amount of work remains unbalanced. Consider the example above. Thread 3 is assigned index 4, but - when the sieve is shared - there's no work to be done.

The timing test shows that there is no difference
when using naive load balancing as is done in main_nolocks. In fact, the performance of both is comparable.

That's because the total number of operations across all threads of execution remains the same, and the GIL effectively prevents real parallel execution of the threads. So it really doesn't matter how unbalanced the work is.

What about Jython and IronPython ?

I repeated the runs shown earlier on Jython 2.5. I run Jython in stand-alone mode.

Note that all run times are about a factor three higher than under CPython. The Jython run times are pretty much the same. However, you still see that the locking used in the original implementation creates a performance hit. In addition, the performance doesn't depend on the number of threads either.

I attempted to repeat this experiment with IronPython. However, IronPython will not run the timing tests with garbage collection disabled.

Conclusions

The thread module in Python is not well suited when multi-treading compute-bound algorithms. That's nothing knew.

There are other lessons I learned from the various ways the Sieve can be distributed across threads: avoid locking, consider sharing state, and be aware how much work each thread does.

Let's start with locking. It's obvious from the results that locking comes at a huge cost. Removing the locks proved to be the single most effective performance booster.

The Sieve algorithm is interesting in that shared state is quite beneficial to performance. Look at the main_nolocks_alt implementation. That was an attempt to eliminate shared state amongst the threads. Each thread proceeds based on it's local data only. But because the prime numbers discovered by one of the threads couldn't be shared with the others, they ended up doing more work, and performance decreased. It could be that when real parallel processing is possible, this may be balanced out by the increase in performance due to better parallelism. That however remains to be seen.

An alternative approach is to remove the global sieve variable in main_nolocks_alt. Each thread would work independently on a local seive array, resulting in a partially processed seive. The 'partial' sieves need to be combined to get the final sieve array.

One way to think of this is as a 'map' operation over the inputs, where each 'map' operation works independently. The results of the mapping phase are the combined (i.e. 'reduced') to get the final result, hence map-reduce.

The amount of work for each index in the Sieve algorithm varies quite a bit. So care should be taken to make sure that the work load is properly balanced amongst the threads. The effect of the (lack of) balance wasn't so obvious here, since the GIL limits parallelism.

These last two points are going to play a part in my next post where I "Al be using the Sieve to explore the multiprocessing module.