So, this is a purely technical one, and is mostly for me. Apologies, in advance.
I've been doing a few experiments with mapping/reducing in Python using the multiprocessing module.
I've been doing a few experiments with mapping/reducing in Python using the multiprocessing module.
Some things I've learned
Some of these I suspected, but some were surprises to me.- Passing data between the parent and child processes is slow -- try not to pass big data.
- Splitting across multiple processes actually does speed up computation, as long as you don't have to pass back a lot of info about each thing.For example, this could be a good candidate for mapping to processes:
def square_all(nums): return sum([i*i for i in nums])This, maybe not so much:
def square_all(nums): return [i*i for i in nums]The first just returns a single number for each subsection of data. The latter returns a potentially large list.
- Using processes and queues directly is more flexible, but mapping over pools saves a lot of typing. For example, these functions accomplish similar things. The first is a function that just runs the processing on a single entire data set. The next two demonstrate running the data in multiple processes. In multi, it splits the data roughly evenly over n processes and immediately runs those processes in parallel. In pooled, it splits the data into n parts and doles each part out over p processes. When n == p, multi and pooled do basically the same thing.
Note that, in order to accommodate the Queue pattern in multi, I had to modify the function being called:
def square_all(nums, q=None): if q: q.put(sum([i*i for i in nums])) else: return sum([i*i for i in nums]) - Performance of the process pools was more erratic than the manually handled processes and queues. This may be due to memory constraints; for some reason, when my script got down to looping over the pools, the memory usage went way up. I took a screenshot of my system monitor running on half as much data.
- My computer performed about equally well above 3 processes when they were handled manually. I expected that 8 processes would not have worked out well, but I was wrong.
The script
Here's the source of the script that I used to run the experiment:#!/usr/bin/env python3
#-*- coding:utf-8 -*-
from itertools import chain
from math import floor
from multiprocessing import Pool, Process, Queue
# ---------------------------------------------------------------------------
# The data to process. Why a global? No reason really.
full_count = 1000000
full_range = range(full_count)
# ---------------------------------------------------------------------------
# The processing function. Something simple.
def square_all(nums, q=None):
"""
Simple function to loop over and square all the numbers in ``nums``.
"""
if q:
q.put(sum([i*i for i in nums]))
else:
return sum([i*i for i in nums])
# ---------------------------------------------------------------------------
# The processing wrappers.
def single():
"""
Run the the function on the full range of values.
"""
result = square_all(full_range)
return result
def multi(n):
"""
Partition the range into n parts and run each in a separate process.
"""
# Partition
parts = _partition(full_range, n)
# Map
queues = [Queue() for _ in range(n)]
processes = [Process(target=square_all, args=(part, queue))
for part, queue in zip(parts, queues)]
for process in processes:
process.start()
for process in processes:
process.join()
partial_results = [queue.get() for queue in queues]
# Reduce
result = _combine(partial_results)
return result
def pooled(n, p):
"""
Partition the range into n parts and run on a pool of p processes.
"""
# Partition
parts = _partition(full_range, n)
# Map
pool = Pool(p)
partial_results = pool.map(square_all, parts)
# Reduce
result = _combine(partial_results)
return result
# ---------------------------------------------------------------------------
# A few utilities for partitioning and combining.
def _partition(l, n):
"""
Partition the list l into n parts, and return a list of the parts.
"""
count = len(l)
return [l[floor(i / n * count):floor((i + 1) / n * count)]
for i in range(n)]
def _combine(partial_results):
"""
Combine the list of partial results into a final result.
"""
result = sum(partial_results)
return result
def _time(f, reps=10, args=tuple(), kwargs=dict()):
for _ in range(reps):
t = time.time()
f(*args, **kwargs)
print (' ', time.time() - t)
# ---------------------------------------------------------------------------
# Here's the script to generate the results.
if __name__ == '__main__':
import time
print('Time for single process:')
_time(single)
for n in range(2,9):
print('Time for {} process:'.format(n))
_time(multi, args=(n,))
for p in (2,3,4):
for n in (p,2*p):
print('Time for {} partitions over {} process:'.format(n, p))
_time(pooled, args=(n, p))
No comments:
Post a Comment