So you have some serial task that takes forever, and you’re thinking it should be parallelizable, but you find the documentation on this to be obtuse? Yea.
Usually I’m interested in either creating lots of data in parallel, or inputting lots of data in parallel, and it’s often something that I first implemented as a loop but got tired of how slow it runs. These involve embarrassingly parallel tasks in that they don’t depend on one another.
There’s a simple prescription for parallelizing most of these kinds of tasks in Python. It goes as follows: 1. Have some kind of task performed in a for loop. 2. Write a function that does what you want for one “instance.” For example, take what’s inside one of your for loops, put all that in a separate function. 3. As a check, keep your loop but use only the function call. Make sure it produces the same results as the original version of your code. 4. Use functools.partial to create a wrapper for your function. 5. Replace the loop with a call to Pool.map().
In the following, we’ll cover 3 examples for parallel tasks: 1. Generate a bunch of files 2. Read a bunch of files into a list 3. Filling a numpy array
Example 1: Generate a bunch of files
Let’s say you have some important synthetic data that you want to generate lots of instances of. For now, for simplicity, we’re just going to generate images of, let’s say, random noise. And to make it interesting we’ll generate 2000 of them.
Here’s the serial for-loop version:
import numpy as np
import cv2
= 2000
n_images = 100, 100
size_x, size_y for i in range(n_images):
= 255*np.random.rand(size_x,size_y)
arr = 'image_'+str(i)+'.png'
filename print("writing file ",filename)
cv2.imwrite(filename,arr)
Now we write a dedicated function, put it in a partial
wrapper, and call it as follows:
import numpy as np
import cv2
from functools import partial
def write_one_file(size_x, size_y, name_prefix, i):
= 255*np.random.rand(size_x,size_y)
arr = name_prefix + str(i) + '.png'
filename print("writing file ",filename)
cv2.imwrite(filename,arr)
= 2000
n_images = 100, 100
size_x, size_y
= partial(write_one_file, size_x, size_y, 'image_')
wrapper for i in range(n_images):
wrapper(i)
Finally we replace the loop with a multiprocessing pool. We can either use all the cpus on the machine (which is the default) or specify how many to use, by giving an argument to Pool()
:
import numpy as np
import cv2
from functools import partial
import multiprocessing as mp
def write_one_file(size_x, size_y, name_prefix, i):
= 255*np.random.rand(size_x,size_y)
arr = name_prefix + str(i) + '.png'
filename print("writing file ",filename)
cv2.imwrite(filename,arr)
= 2000
n_images = 100, 100
size_x, size_y
= partial(write_one_file, size_x, size_y, 'image_')
wrapper
= mp.cpu_count() # or can replace with some number of processes to use
num_procs = mp.Pool(num_procs)
pool = range(n_images)
indices = pool.map(wrapper, indices)
results
pool.close() pool.join()
There are other ways you can do this to get more control, e.g. to have each process in the pool receive a particular range of indices, but this basic setup will get the job done. And if you turn off the printing to screen and time the execution, you’ll see the speedup.
Example 2: Read a bunch of files into a list
This example is actually of limited utility and you may want to just skip down to “Example 3: Filling a numpy array,” but it’s still an illustrative example that motivates Example 3, and offers a bit of variety in how one might do things. In this case we’re not going to use Pool.map; instead we’re going to use a context manager for the particular datatype of list
.
Let’s try to load in all the image files we just generated, into a list. Here’s the serial version:
import glob
import cv2
= 'image_'
name_prefix # we'll use glob to get the list of available files
# note that glob order isn't...easily discernible, so we'll sort.
= sorted(glob.glob(name_prefix+'*.png'))
img_file_list = len(img_file_list)
n_files print(n_files,"files available.")
= []
img_data_list for i in range(n_files):
= name_prefix + str(i) + '.png'
filename print("Reading file",filename)
= cv2.imread(filename)
img
img_data_list.append(img)
print(len(img_data_list),"images in list.")
(If we wanted to, we could easily convert this list of images to a numpy array. But let’s hold off on that.)
This time, we’ll split up the tasks manually into equal numbers for each process. Parallelizing this can take the following form:
from multiprocessing import Process, Manager, cpu_count
import glob
import cv2
def load_one_proc(img_data_list, img_file_list, iproc, per_proc):
= iproc * per_proc, (iproc+1) * per_proc
istart, iend for i in range(istart,iend): # each process will read a range of files
= img_file_list[i]
filename print("Reading file",filename)
= cv2.imread(filename)
img
img_data_list.append(img)return
= 'image_'
name_prefix # we'll use glob to get the list of available files
# note that glob order isn't...easily discernible, so we'll sort.
= sorted(glob.glob(name_prefix+'*.png'))
img_file_list = len(img_file_list)
n_files print(n_files,"files available.")
# We'll split up the list manually
= cpu_count()
num_procs print("Parallelizing across",num_procs,"processes.")
= n_files // num_procs # Number of files per processor to load
per_proc assert n_files == per_proc * num_procs # Make sure tasks divide evenly. Obvously one can do something more sophisticated than this!
with Manager() as manager:
= manager.list()
img_data_list = []
processes for iproc in range(num_procs):
= Process(target=load_one_proc, args=(img_data_list, img_file_list, iproc, per_proc))
p
p.start()
processes.append(p)for p in processes:
p.join()= list(img_data_list) # Copy out of the Manager context (there may be a better way to do this)
outside_list
print(len(outside_list),"images in list.")
Okay, great. The thing is, that set of processes operates asynchronously, so there’s no telling what order the final list is going to be in. Maybe you don’t care, but sometimes I care. One way of dealing with this is to add an index item within the list for each item, and then sort on that index.
But most of the time what I really want in the end is a numpy array. So let’s just look at how to fill one of those, directly.
Example 3: Filling a NumPy array
Data scientist Jonas Teuwen made a great post which got me started on how to do this, but then it seems I uncovered a bug in numpy’s garbage collection for which there’s now a patch. Even without the patch, there are a couple workarounds one can use, and I’ll choose the simpler of the two workarounds.
Let’s load all those images into a numpy array instead of a list. First the serial version:
import numpy as np
import glob
import cv2
= 'image_'
name_prefix = sorted(glob.glob(name_prefix+'*.png'))
img_file_list = len(img_file_list)
n_files
= cv2.imread(img_file_list[0])
first_image print(n_files,"files available. Shape of first image is",first_image.shape)
print("Assuming all images are that size.")
= np.zeros([n_files]+list(first_image.shape)) # allocate storage
img_data_arr
for i in range(n_files):
= img_file_list[i]
filename print("Reading file",filename)
= cv2.imread(filename)
img_data_arr[i]
print("Finished.")
For the parallel version, we’re going to have to use a global variable. Sorry, there’s no away around it, because of Python’s Global Interpreter Lock (GIL).
Without further ado, here’s the parallel, numpy version of the ‘loading a list of images’ shown earlier in Example 2. (One other change: rather than specifying ranges of images for each processor – which I did just for the sake of variety – this time we’ll let Pool.map decide how to – evenly – distribute the tasks.)
import numpy as np
import glob
import cv2
from multiprocessing import Pool, sharedctypes, cpu_count
from functools import partial
import gc
= None # global variable for array
mp_shared_array def load_one_proc(img_file_list, i):
global mp_shared_array
# tmp will end up pointing to the memory address of the shared array we want to populate
= np.ctypeslib.as_array(mp_shared_array)
tmp
= img_file_list[i]
filename print("Reading file",filename)
= cv2.imread(filename) # assign the values into the memory of the shared array
tmp[i] return
= 'image_'
name_prefix = sorted(glob.glob(name_prefix+'*.png'))
img_file_list = len(img_file_list)
n_files
= cv2.imread(img_file_list[0])
first_image print(n_files,"files available. Shape of first image is",first_image.shape)
print("Assuming all images are that size.")
= np.zeros([n_files]+list(first_image.shape)) # allocate storage
img_data_arr = np.ctypeslib.as_ctypes(img_data_arr) # tmp variable avoids numpy garbage-collection bug
tmp
print("Allocating shared storage for multiprocessing (this can take a while)")
= sharedctypes.RawArray(tmp._type_, tmp)
mp_shared_array
= cpu_count()
num_procs print("Parallelizing across",num_procs,"processes.")
= Pool(num_procs)
p
= partial(load_one_proc, img_file_list)
wrapper = range(n_files)
indices = p.map(wrapper, indices) # here's where we farm out the op
result = np.ctypeslib.as_array(mp_shared_array, shape=img_data_arr.shape) # this actually happens pretty fast
img_data_arr
p.close()
p.join()
# Next couple lines are here just in case you want to move on to other things
# and force garbage collection
= None
mp_shared_array
gc.collect()
print("Finished.")
So that’s the basic implementation. Let me know in the comments if you have suggestions for improvements, or other ideas!
P.S.- Final Remarks
Added in a couple thoughts, post-facto:
1. Performance. Notice that what gets passed to p.map()
is an iterator, which typically you’ll use as either the indices of the members of an array (as we did just now), or it as a range over the number of processors (kinda like we did in Example 2). In the former case, the system is likely to spawn lots and lots of processes (not all at the same time, but as one ‘job’ finishes the system will spawn a new one, and will keep the “pool” going), which will have a bit of overhead – i.e. latency – each time these start and stop. It’s not much though, and so you probably won’t notice if your goal is merely, “I’m doing this so that I only have to wait 5 minutes instead of an hour to get something done.” If instead you make the indices in the map over the range of processors on your machine and manually break up the array indices into chunks (sort of like we did in Example 2), then you won’t be spawning as many processes and so your latency will be considerably lower. But the gain may be small enough (depending on your system) that you may not notice the difference in performance. Still, if you want to go all-out for performance, then make the pool.map go over the number of procs you want. Otherwise, feel free to trust the system to do its thing for you and just use the array (or list) indices for the iterator.
2. When Processes Die. Debugging multiprocessing
runs is a pain. If one process dies (crashes, seg faults, generates any kind of “Error”), it will hang the entire pool and you won’t know why because the error messages won’t come to stdout
or stderr
. Look elsewhere for tutorials on tools for debugging multiprocessing runs. Good news is that regular print
statements still come to stdout
for all processes, so one way of debugging is the age-old method of just loading your code with print
statements.