So, this is a purely technical one, and is mostly for me. Apologies, in advance.
I've been doing a few experiments with mapping/reducing in Python using the multiprocessing module.
I've been doing a few experiments with mapping/reducing in Python using the multiprocessing module.
Some things I've learned
Some of these I suspected, but some were surprises to me.- Passing data between the parent and child processes is slow -- try not to pass big data.
- Splitting across multiple processes actually does speed up computation, as long as you don't have to pass back a lot of info about each thing.For example, this could be a good candidate for mapping to processes:
def square_all(nums): return sum([i*i for i in nums])
This, maybe not so much:
def square_all(nums): return [i*i for i in nums]
The first just returns a single number for each subsection of data. The latter returns a potentially large list.
- Using processes and queues directly is more flexible, but mapping over pools saves a lot of typing. For example, these functions accomplish similar things. The first is a function that just runs the processing on a single entire data set. The next two demonstrate running the data in multiple processes. In multi, it splits the data roughly evenly over n processes and immediately runs those processes in parallel. In pooled, it splits the data into n parts and doles each part out over p processes. When n == p, multi and pooled do basically the same thing.
Note that, in order to accommodate the Queue pattern in multi, I had to modify the function being called:
def square_all(nums, q=None): if q: q.put(sum([i*i for i in nums])) else: return sum([i*i for i in nums])
- Performance of the process pools was more erratic than the manually handled processes and queues. This may be due to memory constraints; for some reason, when my script got down to looping over the pools, the memory usage went way up. I took a screenshot of my system monitor running on half as much data.
- My computer performed about equally well above 3 processes when they were handled manually. I expected that 8 processes would not have worked out well, but I was wrong.
The script
Here's the source of the script that I used to run the experiment:#!/usr/bin/env python3 #-*- coding:utf-8 -*- from itertools import chain from math import floor from multiprocessing import Pool, Process, Queue # --------------------------------------------------------------------------- # The data to process. Why a global? No reason really. full_count = 1000000 full_range = range(full_count) # --------------------------------------------------------------------------- # The processing function. Something simple. def square_all(nums, q=None): """ Simple function to loop over and square all the numbers in ``nums``. """ if q: q.put(sum([i*i for i in nums])) else: return sum([i*i for i in nums]) # --------------------------------------------------------------------------- # The processing wrappers. def single(): """ Run the the function on the full range of values. """ result = square_all(full_range) return result def multi(n): """ Partition the range into n parts and run each in a separate process. """ # Partition parts = _partition(full_range, n) # Map queues = [Queue() for _ in range(n)] processes = [Process(target=square_all, args=(part, queue)) for part, queue in zip(parts, queues)] for process in processes: process.start() for process in processes: process.join() partial_results = [queue.get() for queue in queues] # Reduce result = _combine(partial_results) return result def pooled(n, p): """ Partition the range into n parts and run on a pool of p processes. """ # Partition parts = _partition(full_range, n) # Map pool = Pool(p) partial_results = pool.map(square_all, parts) # Reduce result = _combine(partial_results) return result # --------------------------------------------------------------------------- # A few utilities for partitioning and combining. def _partition(l, n): """ Partition the list l into n parts, and return a list of the parts. """ count = len(l) return [l[floor(i / n * count):floor((i + 1) / n * count)] for i in range(n)] def _combine(partial_results): """ Combine the list of partial results into a final result. """ result = sum(partial_results) return result def _time(f, reps=10, args=tuple(), kwargs=dict()): for _ in range(reps): t = time.time() f(*args, **kwargs) print (' ', time.time() - t) # --------------------------------------------------------------------------- # Here's the script to generate the results. if __name__ == '__main__': import time print('Time for single process:') _time(single) for n in range(2,9): print('Time for {} process:'.format(n)) _time(multi, args=(n,)) for p in (2,3,4): for n in (p,2*p): print('Time for {} partitions over {} process:'.format(n, p)) _time(pooled, args=(n, p))