Source code for imagine.tools.mpi_helper

"""
This MPI helper module is designed for parallel computing and data handling.

For the testing suits, please turn to "imagine/tests/tools_tests.py".
"""

# %% IMPORTS
# Built-in imports
from copy import deepcopy
import logging as log

# Package imports
from e13tools import add_to_all
from mpi4py import MPI
import numpy as np

# GLOBALS
comm = MPI.COMM_WORLD
mpisize = comm.Get_size()
mpirank = comm.Get_rank()

# All declaration
__all__ = []


# %% FUNCTION DEFINITIONS
[docs]@add_to_all def mpi_arrange(size): """ With known global size, number of mpi nodes, and current rank, returns the begin and end index for distributing the global size. Parameters ---------- size : integer (positive) The total size of target to be distributed. It can be a row size or a column size. Returns ------- result : numpy.uint The begin and end index [begin,end] for slicing the target. """ log.debug('@ mpi_helper::mpi_arrange') assert (size > 0) res = min(mpirank, size%mpisize) ave = size//mpisize if (ave == 0): raise ValueError('over distribution') return np.uint64(res + mpirank*ave), np.uint64(res + (mpirank+1)*ave + np.uint64(mpirank < size%mpisize))
[docs]@add_to_all def mpi_shape(data): """ Returns the global number of rows and columns of given distributed data. Parameters ---------- data : numpy.ndarray The distributed data. Returns ------- result : numpy.uint Glboal row and column number. """ global_row = np.array(0, dtype=np.uint64) comm.Allreduce([np.array(data.shape[0], dtype=np.uint64), MPI.LONG], [global_row, MPI.LONG], op=MPI.SUM) global_column = np.array(data.shape[1], dtype=np.uint64) return global_row, global_column
[docs]@add_to_all def mpi_prosecutor(data): """ Check if the data is distributed in the correct way covariance matrix is distributed exactly the same manner as multi-realization data if not, an error will be raised. Parameters ---------- data : numpy.ndarray The distributed data to be examined. """ log.debug('@ mpi_helper::mpi_prosecutor') assert isinstance(data, np.ndarray) # get the global shape local_rows = np.empty(mpisize, dtype=np.uint64) local_cols = np.empty(mpisize, dtype=np.uint64) comm.Allgather([np.array(data.shape[0], dtype=np.uint64), MPI.LONG], [local_rows, MPI.LONG]) comm.Allgather([np.array(data.shape[1], dtype=np.uint64), MPI.LONG], [local_cols, MPI.LONG]) check_begin, check_end = mpi_arrange(np.sum(local_rows)) if (data.shape[0] != check_end - check_begin): raise ValueError('incorrect data allocation') if np.any((local_cols-local_cols[0]).astype(bool)): raise ValueError('incorrect data allocation')
[docs]@add_to_all def mpi_mean(data): """ calculate the mean of distributed array prefers averaging along column direction but if given (1,n) data shape the average is done along row direction the result note that the numerical values will be converted into double Parameters ---------- data : numpy.ndarray Distributed data. Returns ------- result : numpy.ndarray Copied data mean, which means the mean is copied to all nodes. """ log.debug('@ mpi_helper::mpi_mean') assert (len(data.shape)==2) assert isinstance(data, np.ndarray) # get the global shape local_rows = np.empty(mpisize, dtype=np.uint64) local_cols = np.empty(mpisize, dtype=np.uint64) comm.Allgather([np.array(data.shape[0], dtype=np.uint64), MPI.LONG], [local_rows, MPI.LONG]) comm.Allgather([np.array(data.shape[1], dtype=np.uint64), MPI.LONG], [local_cols, MPI.LONG]) # do summation first before averaging out partial_sum = np.empty(data.shape[1], dtype=np.float64) partial_sum = np.sum(data, axis=0).reshape((data.shape[1],)) total_sum = np.empty(data.shape[1], dtype=np.float64) comm.Allreduce ([partial_sum, MPI.DOUBLE], [total_sum, MPI.DOUBLE], op=MPI.SUM) avg = (total_sum / np.sum(local_rows)).reshape((1, data.shape[1])) return avg
[docs]@add_to_all def mpi_trans(data): """ Transpose distributed data, note that the numerical values will be converted into double. Parameters ---------- data : numpy.ndarray Distributed data. Returns ------- result : numpy.ndarray Transposed data in distribution. """ log.debug('@ mpi_helper::mpi_trans') assert (len(data.shape)==2) assert isinstance(data, np.ndarray) # get the global shape before transpose local_rows = np.empty(mpisize, dtype=np.uint64) local_cols = np.empty(mpisize, dtype=np.uint64) comm.Allgather([np.array(data.shape[0], dtype=np.uint64), MPI.LONG], [local_rows, MPI.LONG]) comm.Allgather([np.array(data.shape[1], dtype=np.uint64), MPI.LONG], [local_cols, MPI.LONG]) # the algorithm cuts local data into sub-pieces and passing them to the corresponding nodes # which means we need to calculate the arrangement of pre-trans "columns" into post-trans "rows" cut_col_begin, cut_col_end = mpi_arrange(local_cols[0]) cut_col_begins = np.empty(mpisize, dtype=np.uint64) cut_col_ends = np.empty(mpisize, dtype=np.uint64) comm.Allgather([np.array(cut_col_begin, dtype=np.uint64), MPI.LONG], [cut_col_begins, MPI.LONG]) comm.Allgather([np.array(cut_col_end, dtype=np.uint64), MPI.LONG], [cut_col_ends, MPI.LONG]) # prepare empty post-trans local data shape new_data = np.empty((cut_col_end-cut_col_begin, np.sum(local_rows)), dtype=np.float64) # send and receive sub-pieces for target in range(mpisize): if (target != mpirank): # send to other ranks # the np.array(..., dtype=np.float64) is to ensure memory contiguous and data type correct # the np.transpose won't work, but changing the memory order will (cross-node copy) local_sent_buf = np.array(data[:, cut_col_begins[target]:cut_col_ends[target]], dtype=np.float64, order='F') comm.Isend([local_sent_buf, MPI.DOUBLE], dest=target, tag=target) else: # recv from self # note that transpose works here (in-node copy) new_col_begin = np.sum(local_rows[0:mpirank]) new_col_end = new_col_begin + local_rows[mpirank] new_data[:, new_col_begin:new_col_end] = np.transpose((data[:, cut_col_begin:cut_col_end]).astype(np.float64)) for source in range(mpisize): if (source != mpirank): # recv from other ranks local_recv_buf = np.empty((cut_col_ends[mpirank]-cut_col_begins[mpirank], local_rows[source]), dtype=np.float64) comm.Recv([local_recv_buf, MPI.DOUBLE], source=source, tag=mpirank) new_data[:, np.sum(local_rows[0:source]):np.sum(local_rows[0:source+1])] = local_recv_buf return new_data
[docs]@add_to_all def mpi_mult(left, right): """ Calculate matrix multiplication of two distributed data, the result is data1*data2 in multi-node distribution note that the numerical values will be converted into double. We send the distributed right rows into other nodes (aka cannon method). Parameters ---------- left : numpy.ndarray Distributed left side data. right : numpy.ndarray Distributed right side data. Returns ------- result : numpy.ndarray Distributed multiplication result. """ log.debug('@ mpi_helper::mpi_mult') assert (len(left.shape) == 2) assert (len(right.shape) == 2) assert isinstance(left, np.ndarray) assert isinstance(right, np.ndarray) # know the total rows result_global_row = np.array(0, dtype=np.uint64) comm.Allreduce([np.array(left.shape[0], dtype=np.uint64), MPI.LONG], [result_global_row, MPI.LONG], op=MPI.SUM) # prepare the distributed result result = np.zeros((left.shape[0], result_global_row), dtype=np.float64) # collect left and right matrix row info left_rows = np.empty(mpisize, dtype=np.uint64) right_rows = np.empty(mpisize, dtype=np.uint64) comm.Allgather([np.array(left.shape[0], dtype=np.uint64), MPI.LONG], [left_rows, MPI.LONG]) comm.Allgather([np.array(right.shape[0], dtype=np.uint64), MPI.LONG], [right_rows, MPI.LONG]) assert (np.sum(right_rows) == left.shape[1]) # ensure left*right is legal # local self mult left_col_begin = np.sum(right_rows[:mpirank]) left_col_end = left_col_begin + right_rows[mpirank] left_block = np.array(left[:, left_col_begin:left_col_end], dtype=np.float64) result += np.dot(left_block, right) # local mult with right cannons # allocate fixed bufs for itr in range(1, mpisize): target = (mpirank + itr) % mpisize source = (mpirank - itr) % mpisize # fire cannons comm.Isend([right.astype(np.float64), MPI.DOUBLE], dest=target, tag=target) # receive cannons local_recv_buf = np.zeros((right_rows[source], right.shape[1]), dtype=np.float64) comm.Recv([local_recv_buf, MPI.DOUBLE], source=source, tag=mpirank) # accumulate local mult left_col_begin = np.sum(right_rows[0:source]) left_col_end = left_col_begin + right_rows[source] left_block = np.array(left[:, left_col_begin:left_col_end], dtype=np.float64) result += np.dot(left_block, local_recv_buf) return result
[docs]@add_to_all def mpi_trace(data): """ Computes the trace of the given distributed data. Parameters ---------- data : numpy.ndarray Array of data distributed over different processes. Returns ------- result : numpy.float64 Copied trace of given data. """ log.debug('@ mpi_helper::mpi_trace') assert (len(data.shape) == 2) assert isinstance(data, np.ndarray) local_acc = np.array(0, dtype=np.float64) local_row_begin, local_row_end = mpi_arrange(data.shape[1]) for i in range(local_row_end - local_row_begin): eye_pos = local_row_begin + np.uint64(i) local_acc += np.float64(data[i, eye_pos]) result = np.array(0, dtype=np.float64) comm.Allreduce([local_acc, MPI.DOUBLE], [result, MPI.DOUBLE], op=MPI.SUM) return result
[docs]@add_to_all def mpi_diag(data): """ Gets the diagonal of a distributed matrix Parameters ---------- data : numpy.ndarray Array of data distributed over different processes. Returns ------- result : numpy.ndarray Diagonal """ log.debug('@ mpi_helper::mpi_diag') assert (len(data.shape) == 2) assert isinstance(data, np.ndarray) local_row_begin, local_row_end = mpi_arrange(data.shape[1]) local_diagonal = data[:, local_row_begin:local_row_end].diagonal() diagonal = comm.allgather(local_diagonal) return np.concatenate(diagonal)
[docs]@add_to_all def mpi_new_diag(data): """ Constructs a distributed matrix with a given diagonal Parameters ---------- data : numpy.ndarray Array of data distributed over different processes. Returns ------- result : numpy.ndarray Diagonal """ log.debug('@ mpi_helper::mpi_diag') assert (len(data.shape) == 1) assert isinstance(data, np.ndarray) size = data.size local_row_begin, local_row_end = mpi_arrange(size) local_matrix = np.zeros((local_row_end - local_row_begin, size)) for i in range(local_row_end - local_row_begin): diag_pos = int(local_row_begin + i) local_matrix[i, diag_pos] = data[diag_pos] return local_matrix
[docs]@add_to_all def mpi_eye(size): """ Produces an eye matrix according of shape (size,size) distributed over the various running MPI processes Parameters ---------- size : integer Distributed matrix size. Returns ------- result : numpy.ndarray, double data type Distributed eye matrix. """ log.debug('@ mpi_helper::mpi_eye') local_row_begin, local_row_end = mpi_arrange(size) local_matrix = np.zeros((local_row_end - local_row_begin, size)) for i in range(local_row_end - local_row_begin): eye_pos = int(local_row_begin + i) local_matrix[i, eye_pos] = 1.0 return local_matrix
[docs]@add_to_all def mpi_distribute_matrix(full_matrix): """ Parameters ---------- size : integer Distributed matrix size. Returns ------- result : numpy.ndarray, double data type Distributed eye matrix. """ size, s = full_matrix.shape assert size==s local_row_begin, local_row_end = mpi_arrange(size) local_matrix = full_matrix[local_row_begin:local_row_end, :] return local_matrix
[docs]@add_to_all def mpi_lu_solve(operator, source): """ Simple LU Gauss method WITHOUT pivot permutation. Parameters ---------- operator : distributed numpy.ndarray Matrix representation of the left-hand-side operator. source : copied numpy.ndarray Vector representation of the right-hand-side source. Returns ------- result : numpy.ndarray, double data type Copied solution to the linear algebra problem. """ log.debug('@ mpi_helper::mpi_lu_solve') assert isinstance(operator, np.ndarray) assert isinstance(source, np.ndarray) global_rows = operator.shape[1] assert (source.shape == (1, global_rows)) u = deepcopy(operator.astype(np.float64)) x = deepcopy(source.astype(np.float64)) # split x xsplit_begin, xsplit_end = mpi_arrange(global_rows) xsplit = np.array(x[0,xsplit_begin:xsplit_end]) # collect local rows for each node local_rows = np.empty(mpisize, dtype=np.uint64) xsplit_begins = np.empty(mpisize, dtype=np.uint64) comm.Allgather([np.array(u.shape[0], dtype=np.uint64), MPI.LONG], [local_rows, MPI.LONG]) comm.Allgather([np.array(xsplit_begin, dtype=np.uint64), MPI.LONG], [xsplit_begins, MPI.LONG]) # start gauss method # goes column by column for c in range(global_rows-1): # find the pivot rank and local row pivot_rank = np.array(0, dtype=np.uint64) pivot_r = np.uint64(c) # local row index hosting the pivot for i in range(len(local_rows)): if (pivot_r >= local_rows[i]): pivot_r -= local_rows[i] else: pivot_rank = np.uint64(i) break # propagate pivot rank and pivot row if mpirank == pivot_rank: pivot_row = np.array(u[pivot_r, :]) else: pivot_row = np.empty(global_rows, dtype=np.float64) comm.Bcast([np.array(pivot_rank), MPI.LONG], root=pivot_rank) comm.Bcast([pivot_row, MPI.DOUBLE], root=pivot_rank) # gauss elimination max_row = max(local_rows) for local_r in range(max_row): if (local_r + xsplit_begin > c and local_r < local_rows[mpirank]): ratio = u[local_r, c]/pivot_row[c] u[local_r,:] -= ratio*pivot_row xsplit[local_r] -= ratio*x[0, c] # manipulate split x instead x # gather xsplit comm.Allgatherv([xsplit, MPI.DOUBLE], [x, local_rows, xsplit_begins, MPI.DOUBLE]) # solve Ux=b for i in range(mpisize): op_rank = mpisize - 1 - i # operational rank if (mpirank == op_rank): for j in range(local_rows[mpirank]): local_r = np.uint64(local_rows[mpirank] - 1 - j) local_c = np.uint64(xsplit_begin + local_r) local_c_plus = np.uint64(local_c + 1) x[0,local_c] = (x[0,local_c] - np.dot(u[local_r, local_c_plus:], x[0,local_c_plus:]) )/u[local_r,local_c] # update x comm.Bcast([x, MPI.DOUBLE], root=op_rank) return x
[docs]@add_to_all def mpi_slogdet(data): """ Computes log determinant according to simple LU Gauss method WITHOUT pivot permutation. Parameters ---------- data : numpy.ndarray Array of data distributed over different processes. Returns ------- sign : numpy.ndarray Single element numpy array containing the sign of the determinant (copied to all nodes). logdet : numpy.ndarray Single element numpy array containing the log of the determinant (copied to all nodes). """ log.debug('@ mpi_helper::mpi_slogdet') assert isinstance(data, np.ndarray) global_rows = data.shape[1] u = deepcopy(data.astype(np.float64)) # collect local rows for each node local_rows = np.empty(mpisize, dtype=np.uint64) comm.Allgather([np.array(u.shape[0], dtype=np.uint64), MPI.LONG], [local_rows, MPI.LONG]) # start gauss method # the hidden global row count in other nodes global_row_begin = np.sum(local_rows[0:mpirank]) # goes column by column for c in range(global_rows-1): # find the pivot rank and local row pivot_rank = np.array(0, dtype=np.uint64) pivot_r = np.uint64(c) # local row index hosting the pivot for i in range(len(local_rows)): if (pivot_r >= local_rows[i]): pivot_r -= local_rows[i] else: pivot_rank = np.uint64(i) break # propagate pivot rank and pivot row if mpirank == pivot_rank: pivot_row = np.array(u[pivot_r, :]) else: pivot_row = np.empty(global_rows, dtype=np.float64) comm.Bcast([np.array(pivot_rank), MPI.LONG], root=pivot_rank) comm.Bcast([pivot_row, MPI.DOUBLE], root=pivot_rank) # gauss elimination max_row = max(local_rows) for local_r in range(max_row): if (local_r + global_row_begin > c and local_r < local_rows[mpirank]): ratio = u[local_r, c]/pivot_row[c] u[local_r,:] -= ratio*pivot_row # calculate diagonal mult in the upper matrix sign = np.array(1.0, dtype=np.float64) logdet = np.array(0.0, dtype=np.float64) local_sign = np.array(1.0, dtype=np.float64) local_logdet = np.array(0.0, dtype=np.float64) for local_r in range(local_rows[mpirank]): local_c = np.uint64(local_r + global_row_begin) target = u[local_r, local_c] target_sign = 2.0*np.float64(target>0) - 1.0 local_sign *= target_sign local_logdet += np.log(target*target_sign) # reduce local diagonal element mult comm.Allreduce([local_logdet, MPI.DOUBLE], [logdet, MPI.DOUBLE], op=MPI.SUM) comm.Allreduce([local_sign, MPI.DOUBLE], [sign, MPI.DOUBLE], op=MPI.PROD) assert (logdet != 0 and sign != 0) return sign, logdet
[docs]@add_to_all def mpi_global(data): """ Gathers data spread accross different processes. Parameters ---------- data : numpy.ndarray Array of data distributed over different processes. Returns ------- global array : numpy.ndarray The root process returns the gathered data, other processes return `None`. """ local_rows = np.array(data.shape[0], dtype=np.uint64) global_rows = np.array(0, dtype=np.uint64) comm.Allreduce([local_rows, MPI.LONG], [global_rows, MPI.LONG], op=MPI.SUM) local_row_begin, local_row_end = mpi_arrange(global_rows) if not mpirank: global_array = np.empty((global_rows, data.shape[1]), dtype=np.float64) row_begins = np.empty(mpisize, dtype=np.uint64) row_ends = np.empty(mpisize, dtype=np.uint64) else: global_array = None row_begins = None row_ends = None comm.Gather([np.array(local_row_begin, dtype=np.uint64), MPI.LONG], [row_begins, MPI.LONG], root=0) comm.Gather([np.array(local_row_end, dtype=np.uint64), MPI.LONG], [row_ends, MPI.LONG], root=0) if not mpirank: global_array[:local_row_end,:] = data for source in range(1,mpisize): comm.Recv([global_array[row_begins[source]:row_ends[source],:], MPI.DOUBLE] ,source=source, tag=source) else: pass comm.Isend([np.array(data, dtype=np.float64), MPI.DOUBLE], dest=0, tag=mpirank) return global_array """ # the alternative way global_array = comm.gather(data, root=root) if global_array is not None: global_array = np.vstack(global_array) return global_array """
[docs]@add_to_all def mpi_local(data): """ Distributes data over available processes Parameters ---------- data : numpy.ndarray Array of data to be distributed over available processes. Returns ------- local array : numpy.ndarray Return the distributed array on all preocesses. """ if not mpirank: global_shape = np.array(data.shape, dtype=np.uint64) row_begins = np.empty(mpisize, dtype=np.uint64) row_ends = np.empty(mpisize, dtype=np.uint64) else: global_shape = np.empty(2, dtype=np.uint64) row_begins = None row_ends = None comm.Bcast([global_shape, MPI.LONG], root=0) local_row_begin, local_row_end = mpi_arrange(global_shape[0]) comm.Gather([np.array(local_row_begin, dtype=np.uint64), MPI.LONG], [row_begins, MPI.LONG], root=0) comm.Gather([np.array(local_row_end, dtype=np.uint64), MPI.LONG], [row_ends, MPI.LONG], root=0) # start slicing if not mpirank: local_data = np.array(data[local_row_begin:local_row_end,:], dtype=np.float64) for target in range(1,mpisize): sendbuf = np.array(data[row_begins[target]:row_ends[target],:], dtype=np.float64) comm.Send([sendbuf, MPI.DOUBLE], dest=target, tag=target) else: local_data = np.empty((local_row_end-local_row_begin,global_shape[1]), dtype=np.float64) comm.Recv([local_data, MPI.DOUBLE], source=0, tag=mpirank) return local_data