Simple Concurrency in Python

29/04/2018

Whenever I attempt concurrency in Python it seems that something like this happens:

KeyboardInterrupt
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "", line 1, in 
F  File "", line 1, in 
  File "", line 1, in 
  File "C:\Python35\lib\multiprocessing\__init__.py", line 16, in 
a  File "C:\Python35\lib\multiprocessing\__init__.py", line 16, in 
  File "C:\Python35\lib\multiprocessing\__init__.py", line 16, in 
i    from . import context
    from . import context
l  File "C:\Python35\lib\multiprocessing\context.py", line 5, in 
    from . import context
  File "C:\Python35\lib\multiprocessing\context.py", line 3, in 
e    from . import process
    import threading
d  File "C:\Python35\lib\multiprocessing\context.py", line 3, in 
  File "", line 969, in _find_and_load
  File "C:\Python35\lib\threading.py", line 7, in 
     import threading
  File "", line 958, in _find_and_load_unlocked
t    from traceback import format_exc as _format_exc
  File "C:\Python35\lib\threading.py", line 7, in 
  File "", line 673, in _load_unlocked
o  File "C:\Python35\lib\traceback.py", line 5, in 
    from traceback import format_exc as _format_exc
  File "", line 669, in exec_module
     import linecache
  File "C:\Python35\lib\traceback.py", line 5, in 
  File "", line 773, in get_code
i  File "C:\Python35\lib\linecache.py", line 11, in 
    import linecache
    import tokenize
m  File "", line 484, in _compile_bytecode
  File "C:\Python35\lib\linecache.py", line 11, in 
  File "C:\Python35\lib\tokenize.py", line 32, in 
pKeyboardInterrupt
    import tokenize
o    import re
  File "C:\Python35\lib\tokenize.py", line 32, in 
r  File "", line 969, in _find_and_load
t    import re
  File "", line 954, in _find_and_load_unlocked
   File "C:\Python35\lib\re.py", line 335, in 
t  File "", line 896, in _find_spec
h  File "", line 1147, in find_spec
e  File "", line 1121, in _get_spec
   File "", line 1229, in find_spec
s  File "", line 82, in _path_stat
iKeyboardInterrupt
te module
Traceback (most recent call last):
  File "C:\Python35\lib\site.py", line 563, in 
    main()
  File "C:\Python35\lib\site.py", line 550, in main
    known_paths = addsitepackages(known_paths)
  File "C:\Python35\lib\site.py", line 327, in addsitepackages
    addsitedir(sitedir, known_paths)
  File "C:\Python35\lib\site.py", line 206, in addsitedir
    addpackage(sitedir, name, known_paths)
  File "C:\Python35\lib\site.py", line 167, in addpackage
    import copyreg
  File "", line 969, in _find_and_load
    exec(line)
  File "", line 958, in _find_and_load_unlocked
  File "", line 1, in 
  File "", line 673, in _load_unlocked

And I'm sure it isn't just me - the simple fact is that doing concurrency in Python sucks - and even the libraries which try to make it nicer often don't do that good a job or only make your life easy if you structure your program in a very particular way.

This is all made worse by the fact that threads in Python are not really executed concurrent but just have their operations interleaved by the Python interpreter. This means most forms of concurrency in python don't actually make use of multiple CPU cores and are therefore fairly useless from a performance perspective. To get true concurrency in Python you have to use the multiprocessing module, which at first looks like a drop-in replacement for threads, but in reality works in a completely different way - by launching multiple Python interpreters in separate processes and allowing some communication between them.


The saving grace of all of this is that the Python multiprocessing module actually provides some pretty nice basic tools, and if you understand how it works at a high level then making use of these tools to achieve concurrency is actually a fairly painless experience.

But first a little revision on how Python multiprocessing actually works. Let's take a look at the following basic Python script.

from multiprocessing import Process, Pipe
    
def greet(name):
    print('Hello %s!' % (name))
    
if __name__ == '__main__':
    p = Process(target=greet, args=('Daniel',))
    p.start()
    p.join()

What actually happens when we run this script? Well everything runs basically as expected until we reach the line p.start().

When this line is executed a few important things happen. First, the arguments we supplied to the args parameter are pickled, which basically means they are converted into a raw stream of bytes and saved for later. Next, the script currently being run is run again by a new instance of Python. This second run of the script is given a different value for the __name__ variable, and has all of its output re-directed to the original process. The new run of the script executes everything as usual until it reaches the end of the script. At this point it unpickles the arguments that were saved and executes the function that was specified by the target parameter. Once it finishes executing this function it ends.

We can see this behavior if we run the same script as before but include another print statement in the middle:

from multiprocessing import Process, Pipe

def greet(name):
    print('Hello %s!' % (name))
    
print(__name__)
    
if __name__ == '__main__':
    p = Process(target=greet, args=('Daniel',))
    p.start()
    p.join()

When we run this version it outputs something like this:

__main__
__mp_main__
Hello Daniel!

We can see the different values given to __name__ variable for the different runs, and if we were to look in task manager while it was executing we would see two instances of python.exe too (on Windows).

This little experiment highlights the behavior and also the main two limitations of Python's multiprocessing module - namely that any values given as the args parameter must be pickleable, which usually means simple to serializable data - and secondly that the target parameter must always be a top level function in the script and independent from anything that happens inside of the if __name__ == '__main__': part of the script.

(It is worth noting that while the multiprocessing module was designed to have the exact same interface as the Python threading module, and as such appears like it can be used as a drop-in replacement - I find it better to think about the two as completely separately entities as the above limitations mean this is almost always not the case.)


So here is my little recipe for multiprocessing in Python. It starts with a function which calls another function with a given set of arguments and key word arguments asynchronous - that is, it calls the provided function without waiting for it to finish. Instead what it returns is a handle, something we can use later to wait for the called function to finish and get the return value. Also, we are going to communicate with this process by constructing a Pipe which is a little object split into two parts (one for the parent process and one for the child) which we can use to send and receive things between the two processes. This asynchronous call function looks something like this:

def call_async(f, *args, **kwargs):
    pipe_parent, pipe_child = Pipe()
    process = Process(target=call_dispatch, args=(pipe_child,))
    process.start()
    pipe_parent.send((f.__name__, args, kwargs))
    return (process, pipe_parent)

What this function does is creates a new Process, and runs the new process on a function called call_dispatch (which we will define later). Once this function is running we use the Pipe to send to the new process the name of the desired function we wish to call, as well as the arguments we want to use.

The call_dispatch function is itself very simple. It just waits for the function name and arguments to come from the pipe, calls the function at the top level with the given name, and then sends the return value back into the pipe once it is done.

def call_dispatch(pipe):
    name, args, kwargs = pipe.recv()
    output = globals()[name](*args, **kwargs)
    pipe.send(output)

Later on, when we want to wait for this function to finish, and get return value of it, we can use another function called call_await, which waits for the called function to finish and then returns its return value.

def call_await(h):
    process, pipe_parent = h
    output = pipe_parent.recv()
    process.join()
    return output

Using these little functions makes some things easy, such as asynchronously calling a function for each element of an array and then gathering the return values once they are all ready.

import time

def greet(i, name):
    print('%i Hello %s!' % (i, name))
    time.sleep(1)
    return i
    
if __name__ == '__main__':
    
    names = ['Dan', 'Chess', 'Tom', 'Mike']
    
    handles = []
    for i, name in enumerate(names):
        h = call_async(greet, i, name)
        handles.append(h)
        
    for h in handles:
        print(call_await(h))

Notice how evaluating this script takes only one second instead of four since each call to `greet` is performed in a separate process. Also notice how although the evaluation order can vary, the return order is always the same since we await for each of the processes to finish in sequence.

1 Hello Chess!
2 Hello Tom!
0 Hello Dan!
3 Hello Mike!
0
1
2
3

One little limitation to this approach is that each time we want to call a function asynchronous we have to spin up a whole new process. Unless the function you are calling takes at least a few seconds to execute, most of the time is going to be spent starting up new processes. To avoid this issue we can actually start each of our processes ahead of time before using them to call functions asynchronous. Let us make a new function which does this called fork:

def fork():
    pipe_parent, pipe_child = Pipe()
    process = Process(target=call_dispatch, args=(pipe_child,))
    process.start()
    return (process, pipe_parent)

And we'll change what happens in the call_dispatch function. Now the dispatch function will run in an infinite loop, each time waiting for a new top level function to call with a given set of arguments.

def call_dispatch(pipe):
    while True:
        name, args, kwargs = pipe.recv()
        if name == 'exit':
            break
        else:
            output = globals()[name](*args, **kwargs)
            pipe.send(output)

You'll notice that if this process get asked to call the exit function it will just break out of this loop and finish gracefully. We can make another function called join which does this for any new process allocated with fork.

def join(h):
    process, pipe_parent = h
    pipe_parent.send(('exit', (), {}))
    process.join()

Now we just need to define call_async and call_await. These are similar to before but just assume the process has already been started:

def call_async(h, f, *args, **kwargs):
    process, pipe_parent = h
    pipe_parent.send((f.__name__, args, kwargs))

def call_await(h):
    process, pipe_parent = h
    return pipe_parent.recv()

Now we can start processes ahead of time, and re-use them to call multiple different functions in parallel.

import time
    
def greet(i, name):
    print('%i Hello %s!' % (i, name))
    time.sleep(1)
    return i
    
def square(i):
    return i*i
    
if __name__ == '__main__':
    
    names = ['Dan', 'Chess', 'Tom', 'Mike']
    
    handles = [fork() for _ in range(len(names))]
    
    # Greet in Parallel
    
    for h, (i, name) in zip(handles, enumerate(names)):
        call_async(h, greet, i, name)
    
    for h in handles:
        print(call_await(h))

    # Square in Parallel
        
    for h, (i, name) in zip(handles, enumerate(names)):
        call_async(h, square, i)
    
    for h in handles:
        print(call_await(h))
        
    # Finish
        
    for h in handles:
        join(h)

Of course it isn't perfect, and there are times when this abstraction will fail, but I hope I've shown how it isn't too difficult to build simple useful things with the basic tools provided by the multiprocessing module as long as you have a high level understanding of how it works. With this sort of setup you can quickly start to think how you might build worker pools and other sorts of useful structures for parallel programming.

With that all said, good luck and happy concurrent programming!