Friday, February 10, 2012

Playing around with mapping & reducing


So, this is a purely technical one, and is mostly for me.  Apologies, in advance.
I've been doing a few experiments with mapping/reducing in Python using the multiprocessing module.

Some things I've learned

Some of these I suspected, but some were surprises to me.
  • Passing data between the parent and child processes is slow -- try not to pass big data.
  • Splitting across multiple processes actually does speed up computation, as long as you don't have to pass back a lot of info about each thing.
    For example, this could be a good candidate for mapping to processes:
    def square_all(nums):
        return sum([i*i for i in nums])
    
    This, maybe not so much:
    def square_all(nums):
        return [i*i for i in nums]
    
    The first just returns a single number for each subsection of data. The latter returns a potentially large list.
  • Using processes and queues directly is more flexible, but mapping over pools saves a lot of typing. For example, these functions accomplish similar things. The first is a function that just runs the processing on a single entire data set. The next two demonstrate running the data in multiple processes. In multi, it splits the data roughly evenly over n processes and immediately runs those processes in parallel. In pooled, it splits the data into n parts and doles each part out over p processes. When n == p, multi and pooled do basically the same thing.
    Note that, in order to accommodate the Queue pattern in multi, I had
    to modify the function being called:
    def square_all(nums, q=None):
        if q:
            q.put(sum([i*i for i in nums]))
        else:
            return sum([i*i for i in nums])
    
  • You cannot pass around simple generators (i.e., like (i*i for i in nums)), because they cannot be pickled. There are ways around this.
  • Performance of the process pools was more erratic than the manually handled processes and queues. This may be due to memory constraints; for some reason, when my script got down to looping over the pools, the memory usage went way up. I took a screenshot of my system monitor running on half as much data.
  • My computer performed about equally well above 3 processes when they were handled manually. I expected that 8 processes would not have worked out well, but I was wrong.

The script

Here's the source of the script that I used to run the experiment:
#!/usr/bin/env python3
#-*- coding:utf-8 -*-

from itertools import chain
from math import floor
from multiprocessing import Pool, Process, Queue

# ---------------------------------------------------------------------------
# The data to process.  Why a global?  No reason really.

full_count = 1000000
full_range = range(full_count)

# ---------------------------------------------------------------------------
# The processing function.  Something simple.

def square_all(nums, q=None):
    """
    Simple function to loop over and square all the numbers in ``nums``.
    """
    if q:
        q.put(sum([i*i for i in nums]))
    else:
        return sum([i*i for i in nums])

# ---------------------------------------------------------------------------
# The processing wrappers.

def single():
    """
    Run the the function on the full range of values.
    """
    result = square_all(full_range)
    return result

def multi(n):
    """
    Partition the range into n parts and run each in a separate process.
    """
    # Partition
    parts = _partition(full_range, n)

    # Map
    queues = [Queue() for _ in range(n)]
    processes = [Process(target=square_all, args=(part, queue))
                 for part, queue in zip(parts, queues)]

    for process in processes:
        process.start()

    for process in processes:
        process.join()

    partial_results = [queue.get() for queue in queues]

    # Reduce
    result = _combine(partial_results)
    return result

def pooled(n, p):
    """
    Partition the range into n parts and run on a pool of p processes.
    """
    # Partition
    parts = _partition(full_range, n)

    # Map
    pool = Pool(p)
    partial_results = pool.map(square_all, parts)

    # Reduce
    result = _combine(partial_results)
    return result

# ---------------------------------------------------------------------------
# A few utilities for partitioning and combining.

def _partition(l, n):
    """
    Partition the list l into n parts, and return a list of the parts.
    """
    count = len(l)
    return [l[floor(i / n * count):floor((i + 1) / n * count)]
            for i in range(n)]

def _combine(partial_results):
    """
    Combine the list of partial results into a final result.
    """
    result = sum(partial_results)
    return result

def _time(f, reps=10, args=tuple(), kwargs=dict()):
    for _ in range(reps):
        t = time.time()
        f(*args, **kwargs)
        print (' ', time.time() - t)

# ---------------------------------------------------------------------------
# Here's the script to generate the results.

if __name__ == '__main__':
    import time

    print('Time for single process:')
    _time(single)

    for n in range(2,9):
        print('Time for {} process:'.format(n))
        _time(multi, args=(n,))

    for p in (2,3,4):
        for n in (p,2*p):
            print('Time for {} partitions over {} process:'.format(n, p))
            _time(pooled, args=(n, p))