From 984c422e434862eef004ea032ca6a6eb8b8bcf39 Mon Sep 17 00:00:00 2001 From: Joe Hamman Date: Sun, 9 Feb 2014 19:28:39 -0800 Subject: [PATCH] Add C convolution routine for speedup -Also changes to structure of ring and unit hydrographs. Changes are internal and are backward compatable. -Fixed bug in agg_tsteps logic --- rvic/c_convolve.c | 52 ++++++++++++++ rvic/convolution_wrapper.py | 101 +++++++++++++++++++++++++++ rvic/history.py | 23 ++++--- rvic/read_forcing.py | 9 ++- rvic/share.py | 2 +- rvic/variables.py | 132 +++++++++++++++++++++++++----------- rvic_model.py | 24 ++++--- 7 files changed, 281 insertions(+), 62 deletions(-) create mode 100644 rvic/c_convolve.c create mode 100644 rvic/convolution_wrapper.py diff --git a/rvic/c_convolve.c b/rvic/c_convolve.c new file mode 100644 index 0000000..d79c9d2 --- /dev/null +++ b/rvic/c_convolve.c @@ -0,0 +1,52 @@ +#include +#include + +void c_convolve(const int nsources, /*scalar - number of sources*/ + const int noutlets, /*scalar - length of subset*/ + const int subset_length, /*scalar - length of subset*/ + const int x_size, + const int* source2outlet_ind, /*1d array - source to outlet mapping*/ + const int* source_y_ind, /*1d array - source y location*/ + const int* source_x_ind, /*1d array - source x location*/ + const int* source_time_offset, /*1d array - source time offset*/ + const double* unit_hydrograph, /*2d array[times][sources] - unit hydrographs*/ + const double* aggrunin, /*2d array[ysize][xsize] - vic runoff flux*/ + double* ring) /*2d array[times][outlets] - convolution ring*/ +{ + const double* runin; /*pointer to sources runoff flux*/ + int s, i, j; /*counters*/ + int y, x, offset, outlet; /*2d indicies*/ + int xyind, rind, uhind; /*1d indicies*/ + + /*Loop through all sources*/ + for (s = 0; s < nsources; s++) { + + outlet = source2outlet_ind[s]; + y = source_y_ind[s]; + x = source_x_ind[s]; + offset = source_time_offset[s]; + + //1d index location + //2d-->1d indexing goes like this: ind = y*x_size + x + xyind = y*x_size + x; + + runin = &aggrunin[xyind]; + + // if (*runin > 10000.0) { + // printf("runin is big: %f\n", *runin); + // printf("xyind: %i\n", xyind); + // printf("y: %i\n", y); + // printf("x: %i\n", x); + + /* Do the convolution */ + for (i = 0; i < subset_length; i++) { + j = i + offset; + + //1d index locations + rind = j * noutlets + outlet; + uhind = i * nsources + s; + + ring[rind] += unit_hydrograph[uhind] * *runin; + } + } +} diff --git a/rvic/convolution_wrapper.py b/rvic/convolution_wrapper.py new file mode 100644 index 0000000..f78f0c6 --- /dev/null +++ b/rvic/convolution_wrapper.py @@ -0,0 +1,101 @@ +""" +convolution_wrapper.py + +ctypes wrapper for convolve.c + +gcc -shared -o c_convolve.so c_convolve.c +""" + +import numpy as np +import ctypes + +import os +# os.system('gcc -shared -o c_convolve.so c_convolve.c') + +try: + path_to_file = os.path.split(__file__)[0] + + _convolution = np.ctypeslib.load_library('c_convolve.so', path_to_file) + args = [ctypes.c_int, + ctypes.c_int, + ctypes.c_int, + ctypes.c_int, + np.ctypeslib.ndpointer(np.int32), + np.ctypeslib.ndpointer(np.int32), + np.ctypeslib.ndpointer(np.int32), + np.ctypeslib.ndpointer(np.int32), + np.ctypeslib.ndpointer(np.float64), + np.ctypeslib.ndpointer(np.float64), + np.ctypeslib.ndpointer(np.float64)] + _convolution.c_convolve.argtypes = args + _convolution.c_convolve.restype = None + + def rvic_convolve(*args): + """args: + + nsources, + noutlets, + subset_length, + xsize, + source2outlet_ind, + source_y_ind, + source_x_ind, + source_time_offset, + unit_hydrograph, + aggrunin, + ring + """ + _convolution.c_convolve(*args) + + return +except: + print('Using brodcasting convolution method because there was a problem ' + 'loading c_convolve.c') + + def rvic_convolve(nsources, noutlets, subset_length, xsize, + source2outlet_ind, source_y_ind, source_x_ind, + source_time_offset, unit_hydrograph, aggrunin, ring): + # numpy brodcasting method + for s, outlet in enumerate(source2outlet_ind): + y = source_y_ind[s] + x = source_x_ind[s] + i = source_time_offset[s] + j = i + subset_length + ring[i:j, outlet] += np.squeeze(unit_hydrograph[:, s] * aggrunin[y, x]) + + # # pure python convolution + # for s, outlet in enumerate(source2outlet_ind): + # y = source_y_ind[s] + # x = source_x_ind[s] + # for i in xrange(subset_length): + # j = i + source_time_offset[s] + # ring[j, outlet] += (unit_hydrograph[i, s] * aggrunin[y, x]) + return + + +def test(): + nsources = 20 + subset_length = 10 + full_time_length = 15 + noutlets = 4 + source2outlet_ind = np.linspace(0, noutlets, nsources, + endpoint=False).astype(np.int32) + + ysize = 12 + xsize = 15 + + source_y_ind = np.random.randint(0, ysize-1, nsources).astype(np.int32) + source_x_ind = np.random.randint(0, xsize-1, nsources).astype(np.int32) + + source_time_offset = np.random.randint(0, 4, nsources).astype(np.int32) + + aggrunin = np.random.uniform(size=(ysize, xsize)) + unit_hydrograph = np.zeros((subset_length, nsources), dtype=np.float64) + unit_hydrograph[0:4, :] = 0.25 + ring = np.zeros((full_time_length, noutlets), dtype=np.float64) + + for i in xrange(10): + aggrunin = np.random.uniform(size=(ysize, xsize)) + rvic_convolve(nsources, noutlets, subset_length, xsize, + source2outlet_ind, source_y_ind, source_x_ind, + source_time_offset, unit_hydrograph, aggrunin, ring) diff --git a/rvic/history.py b/rvic/history.py index 8bb382b..e9216bc 100644 --- a/rvic/history.py +++ b/rvic/history.py @@ -3,10 +3,12 @@ Summary: This is the core history file module for the rvic model. - The core of the module is the Tape class. The basic procedure is as follows: + The core of the module is the Tape class. The basic procedure is as + follows: - initialization: sets tape options, determines filenames, etc. - update: method that incorporates new fluxes into the history tape. - - __next_update_out_data: method to determine when to update the outdata container + - __next_update_out_data: method to determine when to update the + outdata container """ import os @@ -17,9 +19,11 @@ from logging import getLogger from log import LOG_NAME from share import SECSPERDAY, HOURSPERDAY, TIMEUNITS, NC_INT, NC_FLOAT -from share import NC_DOUBLE, NC_CHAR, WATERDENSITY +from share import NC_DOUBLE, WATERDENSITY +# from share import NC_CHAR, RVIC_TRACERS import share + # -------------------------------------------------------------------- # # create logger log = getLogger(LOG_NAME) @@ -60,8 +64,8 @@ def __init__(self, time_ord, caseid, Rvar, tape_num=0, self._glob_ats = glob_ats self._out_data_i = 0 # position counter for out_data array - self._out_times = np.zeros(self._mfilt, type=np.float64) - self._out_time_bnds = np.zeros((self._mfilt, 2), type=np.float64) + self._out_times = np.zeros(self._mfilt, dtype=np.float64) + self._out_time_bnds = np.zeros((self._mfilt, 2), dtype=np.float64) self.__get_rvar(Rvar) # Get the initial Rvar fields self._grid_shape = grid_area.shape @@ -189,8 +193,9 @@ def update(self, data2tape, time_ord): # ------------------------------------------------------------ # # Update the fields for field in self._fincl: + tracer = 'LIQ' log.debug('updating {0}'.format(field)) - fdata = data2tape[field] + fdata = data2tape[field][tracer] if self._avgflag == 'A': self._temp_data[field] += fdata elif self._avgflag == 'I': @@ -237,7 +242,9 @@ def __update_out_data(self): if self._outtype == 'grid': # ---------------------------------------------------- # # Grid the fields - self._out_data[field][self._out_data_i, self._outlet_y_ind, self._outlet_x_ind] = self._temp_data[field][:] + self._out_data[field][self._out_data_i, + self._outlet_y_ind, + self._outlet_x_ind] = self._temp_data[field][:] # ---------------------------------------------------- # else: self._out_data[field][self._out_data_i, :] = self._temp_data[field] @@ -265,7 +272,7 @@ def __update_out_data(self): # ---------------------------------------------------------------- # # Get import rvar fields def __get_rvar(self, rvar): - """ Get the rvar Fields that are useful in writing output """ + """ Get the rvar Fields that are useful for writing output """ self._dt = rvar.unit_hydrograph_dt self._num_outlets = rvar.n_outlets self._outlet_decomp_ind = rvar.outlet_decomp_ind diff --git a/rvic/read_forcing.py b/rvic/read_forcing.py index f85dffc..14bdf2a 100644 --- a/rvic/read_forcing.py +++ b/rvic/read_forcing.py @@ -249,18 +249,21 @@ def read(self, timestamp): if i == 0: forcing['LIQ'] = self.current_fhdl.variables[fld][self.current_tind, :, :] * self.fld_mult[fld] else: - forcing['LIQ'] += self.current_fhdl.variables[fld][self.current_tind, :, :] * self.fld_mult[fld] + forcing['LIQ'] += self.current_fhdl.variables[fld][self.current_tind, :, :] * self.fld_mult[fld].astype(np.float64) for i, fld in enumerate(self.ice_flds): self.current_fhdl.variables[fld].set_auto_maskandscale(False) if i == 0: - forcing['ICE'] = self.current_fhdl.variables[fld][self.current_tind, :, :] * self.fld_mult[fld] + forcing['ICE'] = self.current_fhdl.variables[fld][self.current_tind, :, :] * self.fld_mult[fld].astype(np.float64) else: - forcing['ICE'] += self.current_fhdl.variables[fld][self.current_tind, :, :] * self.fld_mult[fld] + forcing['ICE'] += self.current_fhdl.variables[fld][self.current_tind, :, :] * self.fld_mult[fld].astype(np.float64) # move forward one step self.current_tind += 1 + for field in forcing: + forcing[field] = forcing[field].astype(np.float64) + return forcing # ------------------------------------------------------------ # # ---------------------------------------------------------------- # diff --git a/rvic/share.py b/rvic/share.py index 448a16a..0e346fa 100644 --- a/rvic/share.py +++ b/rvic/share.py @@ -53,7 +53,7 @@ RPOINTER = 'rpointer' # tracers -RVIC_TRACERS = ('LIQ',) +RVIC_TRACERS = ('LIQ',) # Before changing, update history module # Calendar key number for linking with CESM CALENDAR_KEYS = {0: ['None'], diff --git a/rvic/variables.py b/rvic/variables.py index 65ef606..e9ab138 100644 --- a/rvic/variables.py +++ b/rvic/variables.py @@ -10,6 +10,7 @@ from share import TIMEUNITS, NC_INT, NC_DOUBLE, NC_CHAR from share import RVIC_TRACERS, NcGlobals, SECSPERDAY, MAX_NC_CHARS from share import CALENDAR_KEYS, REFERENCE_DATE, REFERENCE_TIME +from convolution_wrapper import rvic_convolve import share # -------------------------------------------------------------------- # @@ -42,12 +43,14 @@ def __init__(self, lat='', lon='', gridx='', gridy='', routx='', self.name = 'outlet_{:3.4f}_{:3.4f}'.format(self.lat, self.lon) def __str__(self): - return "Point({}, {:3.4f}, {:3.4f}, {:3.4f}, {:3.4f})".format(self.name, self.lat, self.lon, - self.gridy, self.gridx) + return ("Point({}, {:3.4f}, {:3.4f}, {:3.4f}, " + "{:3.4f})".format(self.name, self.lat, self.lon, self.gridy, + self.gridx)) def __repr__(self): - return "Point({}, {:3.4f}, {:3.4f}, {:3.4f}, {:3.4f})".format(self.name, self.lat, self.lon, - self.gridy, self.gridx) + return ("Point({}, {:3.4f}, {:3.4f}, {:3.4f}, " + "{:3.4f})".format(self.name, self.lat, self.lon, self.gridy, + self.gridx)) # -------------------------------------------------------------------- # @@ -78,7 +81,17 @@ def __init__(self, param_file, case_name, calendar, out_dir, file_format): self.outlet_lat = f.variables['outlet_lat'][:] self.outlet_mask = f.variables['outlet_mask'][:] self.outlet_decomp_ind = f.variables['outlet_decomp_ind'][:] - self.unit_hydrograph = f.variables['unit_hydrograph'][:] + self.unit_hydrograph = {} + for tracer in RVIC_TRACERS: + tname = 'unit_hydrograph_{0}'.format(tracer) + try: + self.unit_hydrograph[tracer] = f.variables[tname][:] + except KeyError: + log.warning('Could not find unit hydrograph var %s', tname) + log.warning('trying var name unit_hydrograph') + self.unit_hydrograph[tracer] = f.variables['unit_hydrograph'][:] + except: + raise ValueError('Cant find Unit Hydrograph Variable') self.outlet_name = f.variables['outlet_name'][:] self.RvicDomainFile = f.RvicDomainFile self.RvicPourPointsFile = f.RvicPourPointsFile @@ -91,13 +104,17 @@ def __init__(self, param_file, case_name, calendar, out_dir, file_format): RvicFdrFile=f.RvicFdrFile, RvicDomainFile=f.RvicDomainFile, casename=case_name) + f.close() # ------------------------------------------------------------ # # Initialize state variables - self.ring = np.zeros((self.full_time_length, - self.n_outlets, - len(RVIC_TRACERS)), dtype=np.float64) + self.ring = {} + for tracer in RVIC_TRACERS: + + self.ring[tracer] = np.zeros((self.full_time_length, + self.n_outlets,), + dtype=np.float64) # ------------------------------------------------------------ # self._calendar = calendar @@ -113,13 +130,13 @@ def __init__(self, param_file, case_name, calendar, out_dir, file_format): # ---------------------------------------------------------------- # # Check that grid file matches - def check_grid_file(self, domain_file): + def _check_domain_file(self, domain_file): """ Confirm that the grid files match in the parameter and domain files """ input_file = os.path.split(domain_file)[1] - log.info('domain_file: %s' % input_file) - log.info('Parameter RvicDomainFile: %s' % self.RvicDomainFile) + log.info('domain_file: %s', input_file) + log.info('Parameter RvicDomainFile: %s', self.RvicDomainFile) if input_file == self.RvicDomainFile: log.info('Grid files match in parameter and domain file') @@ -128,13 +145,32 @@ def check_grid_file(self, domain_file): 'domain file') # ---------------------------------------------------------------- # + def set_domain(self, dom_data, domain): + """ Set the domain size """ + self._check_domain_file(domain['FILE_NAME']) + + self.domain_shape = dom_data[domain['LAND_MASK_VAR']].shape + + self.ysize = self.domain_shape[0] + self.xsize = self.domain_shape[1] + + if self.source_y_ind.max() >= self.ysize: + raise ValueError('source_y_ind.max() ({0}) > domain ysize' + ' ({1})'.format(self.source_y_ind, self.ysize)) + if self.source_x_ind.max() >= self.xsize: + raise ValueError('source_x_ind.max() ({0}) > domain xsize' + ' ({1})'.format(self.source_x_ind, self.xsize)) + log.info('set domain') + # ---------------------------------------------------------------- # # Initilize State def init_state(self, state_file, run_type, timestamp): if run_type in ['startup', 'restart']: log.info('reading state_file: %s', state_file) f = Dataset(state_file, 'r') - self.ring = f.variables['ring'][:] + for tracer in RVIC_TRACERS: + self.ring[tracer] = f.variables['{0}_ring'.format(tracer)][:] + file_timestamp = ord_to_datetime(f.variables['time'][:], f.variables['time'].units, calendar=f.variables['time'].calendar) @@ -199,29 +235,36 @@ def convolve(self, aggrunin, time_ord): # ------------------------------------------------------------ # - # ------------------------------------------------------------ # - # First update the ring - log.debug('rolling the ring') - # Zero out current ring - self.ring[0, :, 0] = 0 - # Equivalent to Fortran 90 cshift function - self.ring = np.roll(self.ring, -1, axis=0) - # ------------------------------------------------------------ # - # ------------------------------------------------------------ # # Do the convolution log.debug('convolving') - # this matches the fortran implementation, it may be faster to use - # np.convolve but testing can be done later - # also this is where the parallelization could happen - # loop over all source points - for nt, tracer in enumerate(RVIC_TRACERS): - for s, outlet in enumerate(self.source2outlet_ind): - y = self.source_y_ind[s] - x = self.source_x_ind[s] - for i in xrange(self.subset_length): - j = i + self.source_time_offset[s] - self.ring[j, outlet, nt] += (self.unit_hydrograph[i, s, nt] * aggrunin[tracer][y, x]) + + for tracer in RVIC_TRACERS: + # -------------------------------------------------------- # + # First update the ring + log.debug('rolling the ring') + + # Zero out current ring + self.ring[tracer][0, :] = 0. + + # Equivalent to Fortran 90 cshift function + self.ring[tracer] = np.roll(self.ring[tracer], -1, axis=0) + # -------------------------------------------------------- # + + # -------------------------------------------------------- # + # C convolution call + rvic_convolve(self.n_sources, + self.n_outlets, + self.subset_length, + self.xsize, + self.source2outlet_ind, + self.source_y_ind, + self.source_x_ind, + self.source_time_offset, + self.unit_hydrograph[tracer][:, :], + aggrunin[tracer], + self.ring[tracer][:, :]) + # -------------------------------------------------------- # # ------------------------------------------------------------ # # ------------------------------------------------------------ # @@ -260,13 +303,19 @@ def get_time_mode(self, cpl_secs_per_step): # ---------------------------------------------------------------- # def get_rof(self): """Extract the current rof""" - return self.ring[0, :, 0] # Current timestep flux (units=kg m-2 s-1) + rof = {} + for tracer in RVIC_TRACERS: + rof[tracer] = self.ring[tracer][0, :] + return rof # Current timestep flux (units=kg m-2 s-1) # ---------------------------------------------------------------- # # ---------------------------------------------------------------- # def get_storage(self): """Extract the current storage""" - return self.ring.sum(axis=1) + storage = {} + for tracer in RVIC_TRACERS: + storage[tracer] = self.ring[tracer].sum(axis=1) + return storage # ---------------------------------------------------------------- # # ---------------------------------------------------------------- # @@ -424,13 +473,14 @@ def write_restart(self, current_history_files, history_restart_files): tcoords = ('timesteps', ) + coords - for i, tracer in enumerate(RVIC_TRACERS): - ring = f.createVariable(tracer+'_ring', NC_DOUBLE, tcoords) - ring[:, :] = self.ring[:, :, i] + for tracer in RVIC_TRACERS: + ring = f.createVariable('{0}_ring'.format(tracer), + NC_DOUBLE, tcoords) + ring[:, :] = self.ring[tracer][:, :] - for key, val in share.ring.__dict__.iteritems(): - if val: - setattr(ring, key, val) + for key, val in share.ring.__dict__.iteritems(): + if val: + setattr(ring, key, val) # ------------------------------------------------------------ # # ------------------------------------------------------------ # @@ -443,7 +493,7 @@ def write_restart(self, current_history_files, history_restart_files): # ------------------------------------------------------------ # f.close() - log.info('Finished writing %s' % filename) + log.info('Finished writing %s', filename) return filename # ---------------------------------------------------------------- # diff --git a/rvic_model.py b/rvic_model.py index 9289d8b..65ea2a8 100755 --- a/rvic_model.py +++ b/rvic_model.py @@ -118,7 +118,8 @@ def rvic_mod_init(config_file): rout_var = Rvar(config_dict['PARAM_FILE']['FILE_NAME'], options['CASEID'], options['CALENDAR'], directories['restarts'], options['REST_NCFORM']) - rout_var.check_grid_file(domain['FILE_NAME']) + + rout_var.set_domain(dom_data, domain) # ---------------------------------------------------------------- # # ---------------------------------------------------------------- # @@ -236,8 +237,8 @@ def rvic_mod_run(hist_tapes, data_model, rout_var, dom_data, time_handle, data2tape = {} aggrunin = {} - for t in RVIC_TRACERS: - aggrunin[t] = 0.0 + for tracer in RVIC_TRACERS: + aggrunin[tracer] = 0.0 aggcounter = 0 # ---------------------------------------------------------------- # @@ -261,15 +262,20 @@ def rvic_mod_run(hist_tapes, data_model, rout_var, dom_data, time_handle, # Get this time_handlesteps forcing runin = data_model.read(timestamp) - for t in RVIC_TRACERS: - aggrunin[t] += runin[t] + for tracer in RVIC_TRACERS: + aggrunin[tracer] += runin[tracer] aggcounter += 1 + # ------------------------------------------------------------ # # ------------------------------------------------------------ # # Do the Convolution # (end_timestamp is the timestamp at the end of the convolution period) - end_timestamp = rout_var.convolve(aggrunin, time_ord) + if aggcounter == rout_var.agg_tsteps: + end_timestamp = rout_var.convolve(aggrunin, time_ord) + for tracer in RVIC_TRACERS: + aggrunin[tracer][:] = 0.0 + aggcounter = 0 # ------------------------------------------------------------ # # ------------------------------------------------------------ # @@ -279,7 +285,7 @@ def rvic_mod_run(hist_tapes, data_model, rout_var, dom_data, time_handle, # Update the history Tape(s) for tapename, tape in hist_tapes.iteritems(): - log.debug('Updating Tape:%s' % tapename) + log.debug('Updating Tape: %s', tapename) tape.update(data2tape, time_ord) # ------------------------------------------------------------ # @@ -290,8 +296,8 @@ def rvic_mod_run(hist_tapes, data_model, rout_var, dom_data, time_handle, history_files = [] history_restart_files = [] for tapename, tape in hist_tapes.iteritems(): - log.debug('Writing Restart File for Tape:%s' % tapename) - hist_fname, rest_fname = tape.write_restart() + log.debug('Writing Restart File for Tape: %s', tapename) + # hist_fname, rest_fname = tape.write_restart() history_files.append(tape.filename) history_restart_files.append(tape.rest_filename)