Simple Concurrency in Python
29/04/2018
Whenever I attempt concurrency in Python it seems that something like this happens:
KeyboardInterrupt
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
File "", line 1, in
F File "", line 1, in
File "", line 1, in
File "C:\Python35\lib\multiprocessing\__init__.py", line 16, in
a File "C:\Python35\lib\multiprocessing\__init__.py", line 16, in
File "C:\Python35\lib\multiprocessing\__init__.py", line 16, in
i from . import context
from . import context
l File "C:\Python35\lib\multiprocessing\context.py", line 5, in
from . import context
File "C:\Python35\lib\multiprocessing\context.py", line 3, in
e from . import process
import threading
d File "C:\Python35\lib\multiprocessing\context.py", line 3, in
File "", line 969, in _find_and_load
File "C:\Python35\lib\threading.py", line 7, in
import threading
File "", line 958, in _find_and_load_unlocked
t from traceback import format_exc as _format_exc
File "C:\Python35\lib\threading.py", line 7, in
File "", line 673, in _load_unlocked
o File "C:\Python35\lib\traceback.py", line 5, in
from traceback import format_exc as _format_exc
File "", line 669, in exec_module
import linecache
File "C:\Python35\lib\traceback.py", line 5, in
File "", line 773, in get_code
i File "C:\Python35\lib\linecache.py", line 11, in
import linecache
import tokenize
m File "", line 484, in _compile_bytecode
File "C:\Python35\lib\linecache.py", line 11, in
File "C:\Python35\lib\tokenize.py", line 32, in
pKeyboardInterrupt
import tokenize
o import re
File "C:\Python35\lib\tokenize.py", line 32, in
r File "", line 969, in _find_and_load
t import re
File "", line 954, in _find_and_load_unlocked
File "C:\Python35\lib\re.py", line 335, in
t File "", line 896, in _find_spec
h File "", line 1147, in find_spec
e File "", line 1121, in _get_spec
File "", line 1229, in find_spec
s File "", line 82, in _path_stat
iKeyboardInterrupt
te module
Traceback (most recent call last):
File "C:\Python35\lib\site.py", line 563, in
main()
File "C:\Python35\lib\site.py", line 550, in main
known_paths = addsitepackages(known_paths)
File "C:\Python35\lib\site.py", line 327, in addsitepackages
addsitedir(sitedir, known_paths)
File "C:\Python35\lib\site.py", line 206, in addsitedir
addpackage(sitedir, name, known_paths)
File "C:\Python35\lib\site.py", line 167, in addpackage
import copyreg
File "", line 969, in _find_and_load
exec(line)
File "", line 958, in _find_and_load_unlocked
File "", line 1, in
File "", line 673, in _load_unlocked
And I'm sure it isn't just me - the simple fact is that doing concurrency in Python sucks - and even the libraries which try to make it nicer often don't do that good a job or only make your life easy if you structure your program in a very particular way.
This is all made worse by the fact that threads in Python are not really
executed concurrent but just have their operations interleaved by the Python
interpreter. This means most forms of concurrency in python don't actually make
use of multiple CPU cores and are therefore fairly useless from a performance
perspective. To get true concurrency in Python you have to use the
multiprocessing
module, which at first looks like a drop-in replacement for
threads, but in reality works in a completely different way - by launching
multiple Python interpreters in separate processes and allowing some
communication between them.
The saving grace of all of this is that the Python multiprocessing
module
actually provides some pretty nice basic tools, and if you understand how it
works at a high level then making use of these tools to achieve concurrency
is actually a fairly painless experience.
But first a little revision on how Python multiprocessing actually works. Let's take a look at the following basic Python script.
from multiprocessing import Process, Pipe
def greet(name):
print('Hello %s!' % (name))
if __name__ == '__main__':
p = Process(target=greet, args=('Daniel',))
p.start()
p.join()
What actually happens when we run this script? Well everything runs basically
as expected until we reach the line p.start()
.
When this line is executed a few important things happen. First, the arguments
we supplied to the args
parameter are pickled, which basically means they
are converted into a raw stream of bytes and saved for later. Next, the script
currently being run is run again by a new instance of Python. This second run
of the script is given a different value for the __name__
variable, and has
all of its output re-directed to the original process. The new run of the script
executes everything as usual until it reaches the end of the script. At this
point it unpickles the arguments that were saved and executes the function
that was specified by the target
parameter. Once it finishes executing this
function it ends.
We can see this behavior if we run the same script as before but include another print statement in the middle:
from multiprocessing import Process, Pipe
def greet(name):
print('Hello %s!' % (name))
print(__name__)
if __name__ == '__main__':
p = Process(target=greet, args=('Daniel',))
p.start()
p.join()
When we run this version it outputs something like this:
__main__
__mp_main__
Hello Daniel!
We can see the different values given to __name__
variable for the different
runs, and if we were to look in task manager while it was executing we would
see two instances of python.exe
too (on Windows).
This little experiment highlights the behavior and also the main two
limitations of Python's multiprocessing
module - namely that any values
given as the args
parameter must be pickle
able, which usually means
simple to serializable data - and secondly that the target
parameter must
always be a top level function in the script and independent from anything that
happens inside of the if __name__ == '__main__':
part of the script.
(It is worth noting that while the multiprocessing
module was designed to
have the exact same interface as the Python threading
module, and as such
appears like it can be used as a drop-in replacement - I find it better to
think about the two as completely separately entities as the above limitations
mean this is almost always not the case.)
So here is my little recipe for multiprocessing
in Python. It starts with
a function which calls another function with a given set of arguments and key
word arguments asynchronous - that is, it calls the provided function without
waiting for it to finish. Instead what it returns is a handle, something we
can use later to wait for the called function to finish and get the return
value. Also, we are going to communicate with this process by constructing a Pipe
which is a little object split into two parts (one for the parent process and one for the child) which we can use to send and receive things between the two processes. This asynchronous call function looks something like this:
def call_async(f, *args, **kwargs):
pipe_parent, pipe_child = Pipe()
process = Process(target=call_dispatch, args=(pipe_child,))
process.start()
pipe_parent.send((f.__name__, args, kwargs))
return (process, pipe_parent)
What this function does is creates a new Process
, and runs the
new process on a function called call_dispatch
(which we will define later).
Once this function is running we use the Pipe
to send to the new process the name of the
desired function we wish to call, as well as the arguments we want to use.
The call_dispatch
function is itself very simple. It just waits for the
function name and arguments to come from the pipe, calls the function at the
top level with the given name, and then sends the return value back into the
pipe once it is done.
def call_dispatch(pipe):
name, args, kwargs = pipe.recv()
output = globals()[name](*args, **kwargs)
pipe.send(output)
Later on, when we want to wait for this function to finish, and get return
value of it, we can use another function called call_await
, which waits for
the called function to finish and then returns its return value.
def call_await(h):
process, pipe_parent = h
output = pipe_parent.recv()
process.join()
return output
Using these little functions makes some things easy, such as asynchronously calling a function for each element of an array and then gathering the return values once they are all ready.
import time
def greet(i, name):
print('%i Hello %s!' % (i, name))
time.sleep(1)
return i
if __name__ == '__main__':
names = ['Dan', 'Chess', 'Tom', 'Mike']
handles = []
for i, name in enumerate(names):
h = call_async(greet, i, name)
handles.append(h)
for h in handles:
print(call_await(h))
Notice how evaluating this script takes only one second instead of four since each call to `greet` is performed in a separate process. Also
notice how although the evaluation order can vary, the return order is always
the same since we await
for each of the processes to finish in sequence.
1 Hello Chess!
2 Hello Tom!
0 Hello Dan!
3 Hello Mike!
0
1
2
3
One little limitation to this approach is that each time we want to call a
function asynchronous we have to spin up a whole new process. Unless the
function you are calling takes at least a few seconds to execute, most
of the time is going to be spent starting up new processes. To avoid this issue
we can actually start each of our processes ahead of time before using them to
call functions asynchronous. Let us make a new function which does this called fork
:
def fork():
pipe_parent, pipe_child = Pipe()
process = Process(target=call_dispatch, args=(pipe_child,))
process.start()
return (process, pipe_parent)
And we'll change what happens in the call_dispatch
function. Now the dispatch
function will run in an infinite loop, each time waiting for a new top level
function to call with a given set of arguments.
def call_dispatch(pipe):
while True:
name, args, kwargs = pipe.recv()
if name == 'exit':
break
else:
output = globals()[name](*args, **kwargs)
pipe.send(output)
You'll notice that if this process get asked to call the exit
function it
will just break out of this loop and finish gracefully. We can make another
function called join
which does this for any new process allocated with fork
.
def join(h):
process, pipe_parent = h
pipe_parent.send(('exit', (), {}))
process.join()
Now we just need to define call_async
and call_await
. These are similar to before but just assume the process has already been started:
def call_async(h, f, *args, **kwargs):
process, pipe_parent = h
pipe_parent.send((f.__name__, args, kwargs))
def call_await(h):
process, pipe_parent = h
return pipe_parent.recv()
Now we can start processes ahead of time, and re-use them to call multiple different functions in parallel.
import time
def greet(i, name):
print('%i Hello %s!' % (i, name))
time.sleep(1)
return i
def square(i):
return i*i
if __name__ == '__main__':
names = ['Dan', 'Chess', 'Tom', 'Mike']
handles = [fork() for _ in range(len(names))]
# Greet in Parallel
for h, (i, name) in zip(handles, enumerate(names)):
call_async(h, greet, i, name)
for h in handles:
print(call_await(h))
# Square in Parallel
for h, (i, name) in zip(handles, enumerate(names)):
call_async(h, square, i)
for h in handles:
print(call_await(h))
# Finish
for h in handles:
join(h)
Of course it isn't perfect, and there are times when this abstraction will fail,
but I hope I've shown how it isn't too difficult to build simple useful things
with the basic tools provided by the multiprocessing
module as long as you
have a high level understanding of how it works. With this sort of setup you
can quickly start to think how you might build worker pools and other sorts of
useful structures for parallel programming.
With that all said, good luck and happy concurrent programming!