# -*- coding: utf-8 -*-
"""
Core functions to handle the Lagrangian random walk movement of the particles.
Project Homepage: https://github.com/passaH2O/dorado
"""
from __future__ import division, print_function, absolute_import
from builtins import range, map
from math import cos
import numpy as np
from numpy.random import random
from numpy import maximum, nansum
[docs]def random_pick_seed(choices, probs=None):
"""Randomly pick a number from array of choices.
**Inputs** :
choices : `ndarray`
Array of possible values to draw from
probs : `ndarray`
*Optional*, can add weighted probabilities to draw
**Outputs** :
choices[idx] : `int`
The randomly chosen value
"""
# randomly pick tracer drop cell to use given a list of potential spots
if not probs:
probs = np.array([1 for i in list(range(len(choices)))])
# find the corresp. index value from input 'choices' list of indices
cutoffs = np.cumsum(probs)
idx = cutoffs.searchsorted(np.random.uniform(0, cutoffs[-1]))
return choices[idx]
[docs]def big_sliding_window(raster):
"""Creates 3D array organizing local neighbors at every index
Returns a raster of shape (L,W,9) which organizes (along the third
dimension) all of the neighbors in the original raster at a given
index, in the order [NW, N, NE, W, 0, E, SW, S, SE]. Outputs are
ordered to match np.ravel(), so it functions similarly to a loop
applying ravel to the elements around each index.
For example, the neighboring values in raster indexed at (i,j) are
raster(i-1:i+2, j-1:j+2).ravel(). These 9 values have been mapped to
big_ravel(i,j,:) for ease of computations. Helper function for make_weight.
**Inputs** :
raster : `ndarray`
2D array of values (e.g. stage, qx)
**Outputs** :
big_ravel : `ndarray`
3D array which sorts the D8 neighbors at index (i,j) in
raster into the 3rd dimension at (i,j,:)
"""
big_ravel = np.zeros((raster.shape[0],raster.shape[1],9))
big_ravel[1:-1,1:-1,0] = raster[0:-2,0:-2]
big_ravel[1:-1,1:-1,1] = raster[0:-2,1:-1]
big_ravel[1:-1,1:-1,2] = raster[0:-2,2:]
big_ravel[1:-1,1:-1,3] = raster[1:-1,0:-2]
big_ravel[1:-1,1:-1,4] = raster[1:-1,1:-1]
big_ravel[1:-1,1:-1,5] = raster[1:-1,2:]
big_ravel[1:-1,1:-1,6] = raster[2:,0:-2]
big_ravel[1:-1,1:-1,7] = raster[2:,1:-1]
big_ravel[1:-1,1:-1,8] = raster[2:,2:]
return big_ravel
[docs]def tile_local_array(local_array, L, W):
"""Take a local array [[NW, N, NE], [W, 0, E], [SW, S, SE]]
and repeat it into an array of shape (L,W,9), where L, W are
the shape of the domain, and the original elements are ordered
along the third axis. Helper function for make_weight.
**Inputs** :
local_array : `ndarray`
2D array of represnting the D8 neighbors around
some index (e.g. ivec, jvec)
L : `int`
Length of the domain
W : `int`
Width of the domain
**Outputs** :
tiled_array : `ndarray`
3D array repeating local_array.ravel() LxW times
"""
return np.tile(local_array.ravel(), (L, W, 1))
[docs]def tile_domain_array(raster):
"""Repeat a large 2D array 9 times along the third axis.
Helper function for make_weight.
**Inputs** :
raster : `ndarray`
2D array of values (e.g. stage, qx)
**Outputs** :
tiled_array : `ndarray`
3D array repeating raster 9 times
"""
return np.repeat(raster[:, :, np.newaxis], 9, axis=2)
[docs]def clear_borders(tiled_array):
"""Set to zero all the edge elements of a vertical stack
of 2D arrays. Helper function for make_weight.
**Inputs** :
tiled_array : `ndarray`
3D array repeating raster (e.g. stage, qx) 9 times
along the third axis
**Outputs** :
tiled_array : `ndarray`
The same 3D array as the input, but with the borders
in the first and second dimension set to 0.
"""
tiled_array[0,:,:] = 0.
tiled_array[:,0,:] = 0.
tiled_array[-1,:,:] = 0.
tiled_array[:,-1,:] = 0.
return
[docs]def make_weight(Particles):
"""Create the weighting array for particle routing
Function to compute the routing weights at each index, which gets
stored inside the :obj:`dorado.particle_track.Particles` object
for use when routing. Called when the object gets instantiated.
**Inputs** :
Particles : :obj:`dorado.particle_track.Particles`
A :obj:`dorado.particle_track.Particles` object
**Outputs** :
Updates the weights array inside the
:obj:`dorado.particle_track.Particles` object
"""
L, W = Particles.stage.shape
# calculate surface slope weights
weight_sfc = (tile_domain_array(Particles.stage) \
- big_sliding_window(Particles.stage))
weight_sfc /= tile_local_array(Particles.distances, L, W)
weight_sfc[weight_sfc <= 0] = 0
clear_borders(weight_sfc)
# calculate inertial component weights
weight_int = (tile_domain_array(Particles.qx)*tile_local_array(Particles.jvec, L, W)) \
+ (tile_domain_array(Particles.qy)*tile_local_array(Particles.ivec, L, W))
weight_int /= tile_local_array(Particles.distances, L, W)
weight_int[weight_int <= 0] = 0
clear_borders(weight_int)
# get depth and cell types for neighboring cells
depth_ind = big_sliding_window(Particles.depth)
ct_ind = big_sliding_window(Particles.cell_type)
# set weights for cells that are too shallow, or invalid 0
weight_sfc[(depth_ind <= Particles.dry_depth) | (ct_ind == 2)] = 0
weight_int[(depth_ind <= Particles.dry_depth) | (ct_ind == 2)] = 0
# if sum of weights is above 0 normalize by sum of weights
norm_sfc = np.nansum(weight_sfc, axis=2)
idx_sfc = tile_domain_array((norm_sfc > 0))
weight_sfc[idx_sfc] /= tile_domain_array(norm_sfc)[idx_sfc]
norm_int = np.nansum(weight_int, axis=2)
idx_int = tile_domain_array((norm_int > 0))
weight_int[idx_int] /= tile_domain_array(norm_int)[idx_int]
# define actual weight by using gamma, and weight components
weight = Particles.gamma * weight_sfc + \
(1 - Particles.gamma) * weight_int
# modify the weight by the depth and theta weighting parameter
weight = depth_ind ** Particles.theta * weight
# if the depth is below the minimum depth then set weight to 0
weight[(depth_ind <= Particles.dry_depth) | (ct_ind == 2)] = 0
# if it's a dead end with only nans and 0's, choose deepest cell
zero_weights = tile_domain_array((np.nansum(weight, axis=2) <= 0))
deepest_cells = (depth_ind == tile_domain_array(np.max(depth_ind,axis=2)))
choose_deep_cells = (zero_weights & deepest_cells)
weight[choose_deep_cells] = 1.0
# Final checks, eliminate invalid choices
clear_borders(weight)
# set weight in the true weight array
Particles.weight = weight
[docs]def get_weight(Particles, ind):
"""Choose new cell location given an initial location.
Function to randomly choose 1 of the surrounding 8 cells around the
current index using the routing weights from make_weight.
**Inputs** :
Particles : :obj:`dorado.particle_track.Particles`
A :obj:`dorado.particle_track.Particles` object
ind : `tuple`
Tuple (x,y) with the current location indices
**Outputs** :
new_cell : `int`
New location given as a value between 1 and 8 (inclusive)
"""
# randomly pick the new cell for the particle to move to using the
# random_pick function and the set of weights
if Particles.steepest_descent is not True:
new_cell = random_pick(Particles.weight[ind[0], ind[1], :])
elif Particles.steepest_descent is True:
new_cell = steep_descent(Particles.weight[ind[0], ind[1], :])
return new_cell
[docs]def calculate_new_ind(ind, new_cell, iwalk, jwalk):
"""Add new cell location (1-8 value) to the previous index.
**Inputs** :
ind : `tuple`
Tuple (x,y) of old particle location
new_cell : `int`
Integer 1-8 indicating new cell location relative to the old
one in a D-8 sense
iwalk : `ndarray`
A D8 array with the positive and negative x directions
jwalk : `ndarray`
A D8 array with the positive and negative y directions
**Outputs** :
new_ind : `tuple`
tuple (x,y) of the new particle location
"""
# add the index and the flattened x and y walk component
# x,y walk component is related to the next cell chosen as a
# 1-8 location
new_ind = (ind[0] + jwalk.flat[new_cell], ind[1] + iwalk.flat[new_cell])
return new_ind
[docs]def step_update(new_cell, distances, dx):
"""Get distance to new particle location.
Function to check new location is some distance away from old one,
also provides way to track the travel distance of the particles
**Inputs** :
new_cell : `int`
Integer 1-8 indicating new location in D-8 way
distances : `ndarray`
D8 distances between cells
dx : `float`
Length along one square cell face
**Outputs** :
dist : `float`
Distance between current (old) and new particle location
"""
# compute the step distance to be taken
dist = distances.flat[new_cell]*float(dx)
return dist
[docs]def calc_travel_times(Particles, new_cell, ind, new_ind, dist):
"""Calculate travel time for particle to get to the new location.
Function to calculate the travel time for the particle to get from the
current location to the new location. Calculated by taking the inverse
of the velocity at the old and new locations.
**Inputs** :
Particles : :obj:`dorado.particle_track.Particles`
A :obj:`dorado.particle_track.Particles` object
new_cell : `int`
Integer 1-8 indicating new location in D-8 way
ind : `tuple`
Tuple (x,y) of the current location
new_ind : `tuple`
Tuple (x,y) of the new location
dist : `float`
Distance between current (old) and new particle location
**Outputs** :
trav_time : `float`
Travel time it takes the particle to get from the current
location to the new proposed location using the inverse of the
average velocity
"""
# make sure the new location is different from the current one
if ind != new_ind:
# get old position velocity value
old_vel = Particles.velocity[ind[0], ind[1]]
# new position velocity value
new_vel = Particles.velocity[new_ind[0], new_ind[1]]
# Compute diffusion term
# (sample uniform distribution centered at 0 and diff_coeff wide)
if Particles.diff_coeff > 0:
diff = (0.5 - random())*Particles.diff_coeff
else:
diff = 0.0
# Compute distance traveled in the orientation of mean flow path
# If we moved backwards/orthogonal, step was instantaneous
projected_dist = dist*max(0, cos(Particles.angles.flat[new_cell] -
Particles.velocity_angle[ind[0], ind[1]]))
# Compute average velocity over step
trav_time = 0.5*projected_dist*(1/old_vel + 1/new_vel)*(1+diff)
else:
trav_time = 0 # particle did not move
return trav_time
[docs]def check_for_boundary(new_inds, current_inds, cell_type):
"""Ensure new location is not a boundary cell.
Function to make sure particle is not exiting the boundary with the
proposed new location.
**Inputs** :
new_inds : `list`
List [] of tuples (x,y) of new indices
current_inds : `list`
List [] of tuples (x,y) of old indices
cell_type : `numpy.ndarray`
Array of the different types of cells in the domain where 2 = land,
1 = channel, 0 = ocean, and -1 = edge. If not explicitly defined
then the values are estimated based on the depth array and the
defined dry_depth
**Outputs** :
new_inds : `list`
list [] of tuples (x,y) of new indices where any proposed
indices outside of the domain have been replaced by the old
indices so those particles will not travel this iteration
"""
# Check if the new indices are on an edge (type==-1)
# If so, then stop moving particle
for i in range(0, len(new_inds)):
# If particle borders an edge, cancel out any additional steps
if -1 in cell_type[current_inds[i][0]-1:current_inds[i][0]+2,
current_inds[i][1]-1:current_inds[i][1]+2]:
new_inds[i][0] = current_inds[i][0]
new_inds[i][1] = current_inds[i][1]
return new_inds
[docs]def random_pick(probs):
"""Pick value from weighted probability array.
Randomly pick a number weighted by array probs (len 8)
Return the index of the selected weight in array probs
**Inputs** :
probs : `list`
8 values indicating the probability (weight) associated with
the surrounding cells for the random walk
**Outputs** :
idx : `int`
1-8 value chosen randomly based on the weighted probabilities
"""
probs[np.isnan(probs)] = 0 # any nans are assigned as 0
cutoffs = np.cumsum(probs) # cumulative sum of all probabilities
# randomly pick indices from cutoffs based on uniform distribution
idx = cutoffs.searchsorted(np.random.uniform(0, cutoffs[-1]))
return idx
[docs]def steep_descent(probs):
"""Choose value with greatest probability, no longer random.
Pick the array value with the greatest probability, no longer a
stochastic process, instead just choosing the steepest descent
**Inputs** :
probs : `float`
8 values indicating probability (weight) associated with the
surrounding cells
**Outputs** :
idx : `int`
1-8 value chosen by greatest probs
"""
max_val = np.nanmax(probs)
# remove initial location (index 4) from consideration
probs[4] = 0
# remove any locations from consideration beneath max value
probs[probs < max_val] = 0
# any nans become ignored too
probs[np.isnan(probs)] = 0
# will pick either the index corresponding to the max value if there
# is just 1, or it will randomly choose between values in the event
# of a tie
cutoffs = np.cumsum(probs) # cumulative sum of all probabilities
idx = cutoffs.searchsorted(np.random.uniform(0, cutoffs[-1]))
return idx
[docs]def particle_stepper(Particles, current_inds, travel_times):
"""Step particles a single iteration.
**Inputs** :
Particles: :obj:`dorado.particle_track.Particles`
A :obj:`dorado.particle_track.Particles` object.
current_inds : `list`
List of tuples of the current particle (x,y) locations in space
travel_times : `list`
List of initial travel times for the particles
**Outputs** :
new_inds : `list`
List of the new particle locations after the single iteration
travel_times : `list`
List of the travel times associated with the particle movements
"""
inds = current_inds # get indices as coordinates in the domain
# split the indices into tuples
inds_tuple = [(inds[i][0], inds[i][1]) for i in range(len(inds))]
# for each particle index get the weights
new_cells = [get_weight(Particles, x)
if x != (0, 0) else 4 for x in inds_tuple]
# for each particle get the new index
new_inds = list(map(lambda x, y:
calculate_new_ind(x, y, Particles.iwalk,
Particles.jwalk)
if y != 4 else x, inds_tuple, new_cells))
# move each particle to the new index
dist = list(map(lambda x: step_update(x, Particles.distances,
Particles.dx),
new_cells))
# put new indices into array
new_inds = np.array(new_inds, dtype=np.int32)
new_inds[np.array(dist) == 0] = 0
# see if the indices are at boundaries
new_inds = check_for_boundary(new_inds, inds, Particles.cell_type)
# transform from np array to list
new_inds = new_inds.tolist()
# add the travel times
temp_travel = list(map(lambda w, x, y, z:
calc_travel_times(Particles, w, x, y, z),
new_cells, current_inds, new_inds, dist))
# add to existing times
travel_times = [travel_times[i] + temp_travel[i]
for i in range(0, len(travel_times))]
travel_times = list(travel_times)
return new_inds, travel_times