Source code for dorado.parallel_routing
"""Parallel functionality using the native Python multiprocessing library.
Parallel functionality using multiprocessing (included with Python).
For local parallelization using CPU cores.
Project Homepage: https://github.com/passaH2O/dorado
"""
from __future__ import division, print_function, absolute_import
from builtins import range, map
import numpy as np
import sys
import os
import re
import string
from multiprocessing import Pool
# convert run script into a function that returns a single dictionary 'data'
[docs]def run_iter(pobj):
"""Wrapper run script for the particle iterations.
This is an internal function that should not be called directly. Rather it
is wrapped by the `parallel_routing` function and mapped to the CPU cores
to allow for particle routing in parallel. Uses a small `parallel_obj`
class to hold a bunch of attributes and do the particle generation and
routing.
**Inputs** :
pobj : :obj:`parallel_obj`
Special parallel object.
**Outputs** :
all_walk_data : `list`
Nested list of all x and y locations and travel times, with
details same as input previous_walk_data
"""
pobj.particles.generate_particles(pobj.Np_tracer, pobj.seed_xloc,
pobj.seed_yloc)
# do iterations
for i in list(range(0, pobj.num_iter)):
all_walk = pobj.particles.run_iteration()
return all_walk
[docs]def combine_result(par_result):
"""Combine results from each core.
Take the parallel resulting list and combine the entries so that a single
dictionary with the beginning/end indices and travel times for all
particles is returned as opposed to a list with one dictionary per
parallel process
**Inputs** :
par_result : `list`
List of length(num_cores) with a dictionary of the beg/end indices
and travel times for each particle computed by that process/core
**Outputs** :
single_result : `list`
Nested list that matches 'all_walk_data'
"""
# initiate final results dictionary
single_result = dict()
single_result['x_inds'] = []
single_result['y_inds'] = []
single_result['travel_times'] = []
# populate the dictionary
# loop through results for each core
for i in range(0, len(par_result)):
# append results for each category
for j in range(0, len(par_result[i][0])):
single_result['x_inds'].append(par_result[i][0][j])
single_result['y_inds'].append(par_result[i][1][j])
single_result['travel_times'].append(par_result[i][2][j])
return single_result
[docs]class parallel_obj:
"""
Empty class to hold parallel parameters.
This is an internal class used by `parallel_routing` that never needs to be
defined outside of that function.
"""
[docs] def __init__(self):
"""Initialize attributes."""
parallel_obj.particles = None
parallel_obj.num_iter = None
parallel_obj.Np_tracer = None
parallel_obj.seed_xloc = None
parallel_obj.seed_yloc = None
[docs]def parallel_routing(particles, num_iter, Np_tracer, seed_xloc, seed_yloc,
num_cores):
"""Do the parallel routing of particles.
Function to do parallel routing of particles. Does this by duplicating the
call to *Particle.run_iteration()* across different processes.
**Inputs** :
particle : :obj:`dorado.particle_track.Particles`
An initialized :obj:`particle_track.Particles` object with some
generated particles.
num_iter : `int`
Number of iterations to route particles for
Np_tracer : `int`
Number of particles to generate.
seed_xloc : `list`
List of x-coordinates over which to initially distribute the
particles.
seed_yloc : `list`
List of y-coordinates over which to initially distribute the
particles.
num_cores : `int`
Number of processors/cores to use for parallel part
**Outputs** :
par_result : `list`
List of length(num_cores) with a dictionary of the beg/end indices
and travel times for each particle computed by that process/core
"""
# make parallel object to assign to function
pobj = parallel_obj
pobj.particles = particles
pobj.num_iter = num_iter
pobj.Np_tracer = Np_tracer
pobj.seed_xloc = seed_xloc
pobj.seed_yloc = seed_yloc
# define list to pass to actual function that will be mapped to in parallel
p_list = [pobj] * num_cores
# create the parallel pool and run the process
p = Pool(processes=num_cores)
par_result = p.map(run_iter, p_list)
p.terminate()
return par_result