Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop WDMClass #34

Open
wants to merge 23 commits into
base: develop
Choose a base branch
from
Open

Develop WDMClass #34

wants to merge 23 commits into from

Conversation

ptomasula
Copy link
Member

Pull request to start a dialog about next steps for the WDMReader class.

ptomasula and others added 23 commits February 19, 2021 14:43
Adds new function 'process_groups' which replaces the 'getfloats' function.

This new function processes WDM files by blocks which consist of a control word (32bit integer) and then one or more float32 values which contain the data for the block. This approach will provide support for Timeseries records with irregular timestep intervals.
Adding an additional WDM test file rpo772.wdm. This file contains single time with an irregular timestep.
My understanding is that when working with numpy array allocating the array size up front is very important for performance, as expanding the array later essentially means copying the array to a larger array in new block of memory. With the block processing approach there is presently no way to determine the exact size of the resultant array until you read through the groups. Allocating an array of a fixed size can cause files to fail with an IndexError. A quick solution to this was to implement a chunk allocation approach which allocates numpy arrays in large 100mb chunks and only expands and array when processing the next block will exceed the boundaries of the existing array. This solution is far from perfect but at least resolves the IndexErrors. We should consult with Jack to see if there is a way to determine the number of elements prior to processing the timeseries, or alternatively consider switching to Lists instead because adding an element to a list is time constant and should perform better than this approach.
Blocks in a timeseries consist of a minimum of 2 elements, the block control word and one or more float data values. When block ends there is either another block, the end of the group, or the end of the record (meaning we'd go to the next record in the chain). Some of our test WDM files have a block that ends on the 511th element of a record. In these example files the 512th element also did not parse to a valid block control word. At this time I'm not sure what the significance of the 512th elements when a block end, but processing those elements as a control word causes a series of errors which will throw off the accuracy of all subsequent blocks processed. We'll need to confirm the significance of these elements with Jack, but for now I implemented logic to skip to the next record if the end of a block fails the 511th element of the record.
From @PaulDudaRESPEC, added to new `docs` directory that we'll want to build out over time. Connects to #20 & #21.
# 1. used lists to replace numpy matrix;
# 2. added a loop to iterate each group and used ending date as the ending condition
The rewrite to process wdm files as groups led to the deprecation of the getfloats and adjustNval functions. Additionally Hua refactored the original process_groups method in a previous commit as process_groups2. The original process_groups method was removed and process_groups2 was renamed to process_group.
A single leading underscore is one of the methods used in python (see PEP8) to denote internal classes and functions. Internal functions come with no guarantee of backward compatibility. We want to update the naming of supporting functions that we do not the public to interface with.
General refactoring to cleanup code by removing old comments and slight restructuring to increase readability. Also replace print statements for error with raise exceptions.
Merge updates to readUCI and GQUAL from respec/HSPsquared - develop branch
The constraints of Numba meant that datetime conversion cannot occur with the main block processing loop (_process_groups function). This commit replaces the previous use of python datetime object for a bit approach in which date components are stored in a single integer who individual bits can be parsed into the time step components. Conversion to a datetime object now occurs outside of the processing loop prior to output to HDF.
The datetime functions added to support numba in commit e5d64a1 required that integers input into these functions are 64bits or year information will be lost during bit manipulations. The previous implementation left integer type up to numba and in some instances could produce a in32 object. This commit causes integer conversion to be explicitly int64 so that year information is not lost.
Even with datetime conversions removed from the group processing loop, the conversion time using datetime.datetime() remains slow. After trying attempts using some datetime conversion approaches with pandas I was still unable to achieve a significant performance boost.

Numba does not support the creation of datetime objects, however it does support datetime arithmetic. This commit adds in a numba compatible datetime conversion function which calculates a dates offset from the epoch and adds the appropriate timedelta64 objects to return a datetime64 object.
I missed committing 3 line deletions which remove the old pandas.apply based datetime conversion approach.
Copy link
Member Author

@ptomasula ptomasula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a solid first effort, so thanks to @htaolimno for carrying this forward. Here's some general suggestions on next steps to further the development of this class. Most suggestions are related to some restructuring and further breaking up of the functions into smaller pieces.


store.put('TIMESERIES/SUMMARY', self.dfsummary, format='t', data_columns=True)

#@njit
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to ensure that numba is supported in our group processing functions to get the performance we're after. See if the new commits allow for these to be re-enabled and if we encounter issues let's discuss.


#@njit
@staticmethod
def bits_to_date(x):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I was the one who introduced this method, but lets update the naming for these internal methods to start with an underscore (i.e. '_bits_to_date'). The single underscore is an additional signal this is an internal interface and doesn't guarantee backwards compatibility.


#@njit
@staticmethod
def date_to_bits(year, month, day, hour, minute, second):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as comment on line 223.


#@njit
@staticmethod
def increment_date(date, timecode, timestep):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as comment on line 223.


#@njit
@staticmethod
def correct_date(year, month, day, hour, minute, second):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as comment on line 223.

self.freq = {7:'100YS', 6:'YS', 5:'MS', 4:'D', 3:'H', 2:'min', 1:'S'} # pandas date_range() frequency by TCODE, TGROUP


def readWDM(self): #, hdffile, compress_output=True):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could split this main function into a few smaller pieces. I think the first line 39-100 could be some sort of initialize or validate the WDM function. Then the logic in the for loop which loops over the individual DNS records could be it's own function. I think we will ultimately need a way to read a single DSN from a WDM file once we start implementing the IO abstraction class. Additionally that approach would make parallelization easier.

store.put('TIMESERIES/SUMMARY', self.dfsummary, format='t', data_columns=True)

#@njit
@staticmethod
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice use of the staticmethod decorator.

dates_converted = date_convert(dates, date_epoch, dt_year, dt_month, dt_day, dt_hour, dt_minute, dt_second)
series = pd.Series(values, index=dates_converted)

self.datasets[dsn] = series
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned that storing all of the DSN series in a dictionary will be too memory intensive for large WDM files. See my comment about about splitting this function into some smaller pieces. I think we should have a function which processes a single DSN and return this series which is immediately written to a file (or better yet passed to the eventual IO abstraction class for output into the desired output format).

#return dfsummary

# export data to HD5 file
def WriteHD5(self, hdffile, compress_output=True):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call on isolating the write process! This will eventually be replaced with the IO abstraction class so it is great that you are already isolating the output method.

Comment on lines +357 to +369
reader = WDMReader("../WDM/rpo772.wdm")
#reader.readWDM("../WDM/rpo772.wdm", "rpo772.h5", True)
def func():
reader.readWDM()
reader.WriteHD5("rpo772.h5", True)
# reader = WDMReader("../WDM/rpo772.wdm")
# reader.readWDM("../WDM/rpo772.wdm", "rpo772.h5", True)
# #reader.readWDM("../WDM/test.wdm", "test.h5")
# #reader.readWDM("../WDM/2_RPO_SWMM48LINKS2017_CBOD.wdm", "2_RPO_SWMM48LINKS2017_CBOD.h5")

execution_time = timeit.timeit(func, number=1)

print(execution_time)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gather this is all for testing which is fine. We'll just need to make sure we remove this before we merge this back into the development branch.

@aufdenkampe
Copy link
Member

@ptomasula, thanks for setting up this PR and for providing a detailed review!

We should merge PR #35 first, after we fix a potential bug with running HSP2 from HDF5 files created with the updated readWDM file.

return nval
return False

@njit
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I resolved the merge conflicts in commit d64c25a, I failed to make this a class method. As this is currently written, it will produce a runtime error. Fix by either adding the staticmethod decorator or a self argument.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants