Source code for grids._ts

"""
Author: Riley Hales
Copyright: Riley Hales, RCH Engineering, 2021
License: BSD Clear 3 Clause License
All rights reserved
"""
import datetime
import os
import warnings

import affine
import geopandas as gpd
import h5py
import netCDF4 as nc
import numpy as np
import pandas as pd
import rasterio.features as riof
import requests
import xarray as xr
from pydap.cas.urs import setup_session

try:
    import pygrib
except ImportError:
    pygrib = None

from ._coords import _map_coords_to_slice

from ._utils import _assign_eng
from ._utils import _guess_time_var
from ._utils import _array_by_eng
from ._utils import _array_to_stat_list
from ._utils import _attr_by_eng
from ._utils import _delta_to_time
from ._utils import _gen_stat_list

from ._consts import ALL_ENGINES
from ._consts import SPATIAL_X_VARS
from ._consts import SPATIAL_Y_VARS

__all__ = ['TimeSeries', ]


[docs]class TimeSeries: """ Creates a time series of values from arrays contained in netCDF, grib, hdf, or geotiff formats. Values in the series are extracted by specifying coordinates of a point, range of coordinates, a spatial data file, or computing statistics for the entire array. Args: files (list): A list (even if len==1) of either absolute file paths to netcdf, grib, hdf5, or geotiff files or urls to an OPeNDAP service (but beware the data transfer speed bottleneck) var (str or int or list or tuple): The name of the variable(s) to query as they are stored in the file (e.g. often 'temp' or 'T' instead of Temperature) or the band number if you are using grib files *and* you specify the engine as pygrib. If the var is contained in a group, include the group name as a unix style path e.g. 'group_name/var'. dim_order (tuple): A tuple of the names of the dimensions/coordinate variables for all of the variables listed in the "var" parameter, listed in order. If the coordinate variables are contained in a group, include the group name as a unix style path e.g. 'group_name/coord_var'. Keyword Args: t_var (str): Name of the time dimension/coordinate variable if it is used in the data. Grids will attempt to guess if it is not "time" and you do not specify one with this argument. x_var (str): Name of the x dimension/coordinate variable if it is used in the data. Grids will attempt to guess if it is not specified and not a standard name. y_var (str): Name of the y dimension/coordinate variable if it is used in the data. Grids will attempt to guess if it is not specified and not a standard name. engine (str): the python package used to power the file reading. Defaults to best for the type of input data. The options include 'xarray', 'opendap', 'auth-opendap', 'netcdf4', 'cfgrib', 'pygrib', 'h5py', 'rasterio' xr_kwargs (dict): A dictionary of kwargs that you might need when opening complex grib files with xarray user (str): a username used for authenticating remote datasets, if required by your remote data source pswd (str): a password used for authenticating remote datasets, if required by your remote data source session (requests.Session): a requests Session object preloaded with credentials/tokens for authentication stats (str or tuple): How to reduce arrays of values to a single value in the series dataframe. Provide a list of strings (e.g. ['mean', 'max']), or a comma separated string (e.g. 'mean,max,min'). Options include: mean, median, max, min, sum, std, or a percentile (e.g. '25%'). Or, provide 'all' which is interpreted as (mean, median, max, min, sum, std, ). Or, provide 'box' or 'boxplot' which is interpreted as (max, 75%, median, mean, 25%, min, ). Or, provide 'values' to get a flat list of all non-null values in the query so you can compute other stats. fill_value (int): The value used for filling no_data/null values in the variable's array. Default: -9999.0 interp_units (bool): If your data conforms to the CF NetCDF standard for time data, choose True to convert the values in the time variable to datetime strings in the pandas output. The units string for the time variable of each file is checked separately unless you specify it in the unit_str parameter. origin_format (str): A datetime.strptime string for extracting the origin time from the units string. unit_str (str): a CF Standard conforming string indicating how the spacing and origin of the time values. This is helpful if your files do not contain a units string. Only specify this if ALL files that you query use the same units string. Usually this looks like "step_size since YYYY-MM-DD HH:MM:SS" such as "days since 2000-01-01 00:00:00". strp_filename (str): A datetime.strptime string for extracting datetimes from patterns in file names. Only for datasets which contain 1 time step per file. Methods: point: Extracts a time series of values at a point for a given coordinate pair multipoint: Extracts a time series of values for several points given a series of coordinate values bound: Extracts a time series of values with a bounding box for each requested statistic range: Alias for TimeSeries.bound() shape: Extracts a time series of values on a line or within a polygon for each requested statistic """ # core parameters from user files: list var: tuple dim_order: tuple # handling non-standard dimension names or organizations t_var: str x_var: str y_var: str # help opening data engine: str xr_kwargs: dict user: str pswd: str session: requests.session # reducing arrays to numbers stats: str or list or tuple or np.ndarray fill_value: int or float or bool # how to handle the time data interp_units: bool origin_format: str unit_str: str strp_filename: str t_index: int # derived from dim_order and t_var t_var_in_dims: bool # derived from dim_order and t_var def __init__(self, files: list, var: str or int or list or tuple, dim_order: tuple, **kwargs): # parameters configuring how the data is interpreted self.files = (files,) if isinstance(files, str) else files self.variables = (var,) if isinstance(var, str) else var assert len(self.variables) >= 1, 'specify at least 1 variable' self.dim_order = dim_order # optional parameters describing how to access the data self.engine = kwargs.get('engine', _assign_eng(files[0])) assert self.engine in ALL_ENGINES, f'engine "{self.engine}" not recognized' self.xr_kwargs = kwargs.get('xr_kwargs', None) # optional parameters modifying how dimension/coordinate variable names are interpreted self.t_var = kwargs.get('t_var', None) self.x_var = kwargs.get('x_var', None) self.y_var = kwargs.get('y_var', None) if self.x_var is None: for a in self.dim_order: if a in SPATIAL_X_VARS: self.x_var = a if self.y_var is None: for a in self.dim_order: if a in SPATIAL_Y_VARS: self.y_var = a if self.t_var is None: self.t_var = _guess_time_var(dim_order) self.t_var_in_dims = self.t_var in self.dim_order self.t_index = self.dim_order.index(self.t_var) if self.t_var_in_dims else False # additional options describing how the time data should be interpreted self.interp_units = kwargs.get('interp_units', False) self.strp_filename = kwargs.get('strp_filename', False) self.unit_str = kwargs.get('unit_str', None) self.origin_format = kwargs.get('origin_format', '%Y-%m-%d %X') # optional parameter modifying which statistics to process self.stats = _gen_stat_list(kwargs.get('stats', ('mean',))) self.fill_value = kwargs.get('fill_value', -9999.0) # optional authentication for remote datasets self.user = kwargs.get('user', None) self.pswd = kwargs.get('pswd', None) self.session = kwargs.get('session', False) if 'opendap' in self.engine and not self.session and self.user is not None and self.pswd is not None: a = requests.Session() a.auth = (self.user, self.pswd) self.session = a # validate that some parameters are compatible if self.engine == 'rasterio': assert isinstance(self.variables, int), 'GeoTIFF variables must be integer band numbers' if not self.dim_order == ('y', 'x'): warnings.warn('For GeoTIFFs, the correct dim order is ("y", "x")') self.dim_order = ('y', 'x') elif self.engine == 'pygrib': if pygrib is None: raise ModuleNotFoundError('pygrib engine only available if optional pygrib dependency is installed') assert isinstance(self.variables, int), 'pygrib engine variables must be integer band numbers' def __bool__(self): return True def __str__(self): string = 'grids.TimeSeries' for p in vars(self): if p == 'files': string += f'\n\t{p}: {len(self.__getattribute__(p))}' else: string += f'\n\t{p}: {self.__getattribute__(p)}' return string def __repr__(self): return self.__str__()
[docs] def point(self, *coords: int or float or None, ) -> pd.DataFrame: """ Extracts a time series at a point for a given series of coordinate values Args: coords (int or float or None): provide a coordinate value (integer or float) for each dimension of the array which you are creating a time series for. You need to provide exactly the same number of coordinates as there are dimensions Returns: pandas.DataFrame with an index, a column named datetime, and a column named values. """ assert len(self.dim_order) == len(coords), 'Specify 1 coordinate for each dimension of the array' # make the return item results = dict(datetime=[]) for var in self.variables: results[var] = [] # map coordinates -> cell indices -> python slice() objects slices = self._gen_dim_slices(coords, 'point') # iterate over each file extracting the value and time for each for num, file in enumerate(self.files): # open the file opened_file = self._open_data(file) tsteps, tslices = self._handle_time(opened_file, file, (coords, coords)) results['datetime'] += list(tsteps) slices[self.t_index] = tslices if self.t_var_in_dims else slices[self.t_index] for var in self.variables: # extract the appropriate values from the variable vs = _array_by_eng(opened_file, var, tuple(slices)) if vs.ndim == 0: if vs == self.fill_value: vs = np.nan results[var].append(vs) elif vs.ndim == 1: vs[vs == self.fill_value] = np.nan for v in vs: results[var].append(v) else: raise ValueError('Too many dimensions remain after slicing') if self.engine != 'pygrib': opened_file.close() # return the data stored in a dataframe return pd.DataFrame(results)
[docs] def multipoint(self, *coords: list, labels: list = None, ) -> pd.DataFrame: """ Extracts a time series at many points for a given series of coordinate values. Each point should have the same time coordinate and different coordinates for each other dimension. Args: coords (int or float or None): a list of coordinate tuples or a 2D numpy array. Each coordinate pair in the list should provide a coordinate value (integer or float) for each dimension of the array, e.g. len(coordinate_pair) == len(dim_order). See TimeSeries.point for more explanation. labels (list): an optional list of strings which label each of the coordinates provided. len(labels) should be equal to len(coords) Returns: pandas.DataFrame with an index, a column named datetime, and a column named values. """ assert len(self.dim_order) == len(coords[0]), 'Specify 1 coordinate for each dimension of the array' if labels is None: labels = [f'point{i}' for i in range(len(coords))] assert len(labels) == len(coords), 'You must provide a label for each point or use auto numbering' datalabels = [] for label in labels: for var in self.variables: datalabels.append(f'{var}_{label}') # make the return item results = dict(datetime=[]) for datalabel in datalabels: results[datalabel] = [] # map coordinates -> cell indices -> python slice() objects slices = self._gen_dim_slices(coords, 'multipoint') # iterate over each file extracting the value and time for each for file in self.files: opened_file = self._open_data(file) tsteps, tslices = self._handle_time(opened_file, file, (coords[0], coords[0])) results['datetime'] += list(tsteps) for var in self.variables: for i, slc in enumerate(slices): slc[self.t_index] = tslices if self.t_var_in_dims else slc[self.t_index] # extract the appropriate values from the variable vs = _array_by_eng(opened_file, var, tuple(slc)) if vs.ndim == 0: if vs == self.fill_value: vs = np.nan results[f'{var}_{labels[i]}'].append(vs) elif vs.ndim == 1: vs[vs == self.fill_value] = np.nan for v in vs: results[f'{var}_{labels[i]}'].append(v) else: raise ValueError('There are too many dimensions after slicing') if self.engine != 'pygrib': opened_file.close() # return the data stored in a dataframe return pd.DataFrame(results)
[docs] def bound(self, min_coords: tuple, max_coords: tuple, stats: str or tuple = None, ) -> pd.DataFrame: """ Args: min_coords (tuple): a tuple containing minimum coordinates of a bounding box range- coordinates given in order of the dimensions of the source arrays. max_coords (tuple): a tuple containing maximum coordinates of a bounding box range- coordinates given in order of the dimensions of the source arrays. stats (str or tuple): How to reduce arrays of values to a single value for the series. See class docstring. Returns: pandas.DataFrame with an index, a datetime column, and a column named for each statistic specified """ assert len(self.dim_order) == len(min_coords) == len(max_coords), \ 'Specify 1 min and 1 max coordinate for each dimension' # handle the optional arguments self.stats = _gen_stat_list(stats) if stats is not None else self.stats # make the return item results = dict(datetime=[]) # add a list for each stat requested for var in self.variables: for stat in self.stats: results[f'{var}_{stat}'] = [] # map coordinates -> cell indices -> python slice() objects slices = self._gen_dim_slices((min_coords, max_coords), 'range') # iterate over each file extracting the value and time for each for file in self.files: # open the file opened_file = self._open_data(file) tsteps, tslices = self._handle_time(opened_file, file, (min_coords, max_coords)) results['datetime'] += list(tsteps) slices[self.t_index] = tslices if self.t_var_in_dims else slices[self.t_index] for var in self.variables: # slice the variable's array, returns array with shape corresponding to dimension order and size vs = _array_by_eng(opened_file, var, tuple(slices)) vs[vs == self.fill_value] = np.nan for stat in self.stats: results[f'{var}_{stat}'] += _array_to_stat_list(vs, stat) if self.engine != 'pygrib': opened_file.close() # return the data stored in a dataframe return pd.DataFrame(results)
[docs] def range(self, min_coordinates: tuple, max_coordinates: tuple, stats: str or tuple = None, ) -> pd.DataFrame: """ Alias for TimeSeries.bound(). Refer to documentation for the bound method. """ return self.bound(min_coordinates, max_coordinates, stats)
[docs] def shape(self, mask: str or np.ndarray, time_range: tuple = (None, None), behavior: str = 'dissolve', label_attr: str = None, feature: str = None, stats: str or tuple = None, ) -> pd.DataFrame: """ Applicable only to source data with exactly 2 spatial dimensions, x and y, and a time dimension. Args: mask (str): path to any spatial polygon file, e.g. shapefile or geojson, which can be read by gpd. time_range: a tuple of the min and max time range to query a time series for behavior (str): determines how the vector data is used to mask the arrays. Options are: dissolve, features - dissolve: treats all features as if they were 1 feature and masks the entire set of polygons in 1 grid - features: treats each feature as a separate entity, must specify an attribute shared by each feature with unique values for each feature used to label the resulting series label_attr: The name of the attribute in the vector data features to label the several outputs feature: A value of the label_attr attribute for 1 or more features found in the provided shapefile stats (str or tuple): How to reduce arrays of values to a single value for the series. See class docstring. Returns: pandas.DataFrame with an index, a datetime column, and a column named for each statistic specified """ if not len(self.dim_order) == 3: raise RuntimeError('You can only extract by polygon if the data is exactly 3 dimensional: time, y, x') # cache the behavior and organization parameters self.stats = _gen_stat_list(stats) if stats is not None else self.stats if isinstance(mask, str): masks = self._create_spatial_mask_array(mask, behavior, label_attr, feature) elif isinstance(mask, np.ndarray): masks = ['masked', mask] else: raise ValueError('Unusable data provided for the "mask" argument') # make the return item results = dict(datetime=[]) for mask in masks: for stat in self.stats: for var in self.variables: results[f'{var}_{mask[0]}_{stat}'] = [] # slice data on all dimensions slices = [slice(None), ] * len(self.dim_order) # iterate over each file extracting the value and time for each for file in self.files: # open the file opened_file = self._open_data(file) tsteps, tslices = self._handle_time(opened_file, file, (time_range[0], time_range[1])) results['datetime'] += list(tsteps) slices[self.t_index] = tslices if self.t_var_in_dims else slices[self.t_index] num_time_steps = len(tsteps) for var in self.variables: # slice the variable's array, returns array with shape corresponding to dimension order and size for i in range(num_time_steps): slices[self.t_index] = slice(i, i + 1) vals = _array_by_eng(opened_file, var, tuple(slices)) for mask in masks: masked_vals = np.where(mask[1], vals, np.nan).squeeze() masked_vals[masked_vals == self.fill_value] = np.nan for stat in self.stats: results[f'{var}_{mask[0]}_{stat}'] += _array_to_stat_list(masked_vals, stat) if self.engine != 'pygrib': opened_file.close() # return the data stored in a dataframe return pd.DataFrame(results)
def _gen_dim_slices(self, coords: tuple, slice_style: str): if self.engine == 'pygrib': revert_engine = self.engine self.engine = 'cfgrib' else: revert_engine = False slices = [] tmp_file = self._open_data(self.files[0]) if slice_style in ('point', 'range'): for index, dim in enumerate(self.dim_order): if dim == self.t_var: slices.append(None) continue vals = _array_by_eng(tmp_file, dim) if slice_style == 'point': slices.append(_map_coords_to_slice(vals, coords[index], coords[index], dim)) else: slices.append(_map_coords_to_slice(vals, coords[0][index], coords[1][index], dim)) elif slice_style == 'multipoint': for index, dim in enumerate(self.dim_order): if dim == self.t_var: slices.append([None, ] * len(coords)) continue vals = _array_by_eng(tmp_file, dim) dim_slices = [] for coord in coords: dim_slices.append(_map_coords_to_slice(vals, coord[index], coord[index], dim)) slices.append(dim_slices) slices = np.transpose(slices) else: raise RuntimeError("Slice behavior not implemented") if revert_engine: self.engine = revert_engine return slices def _create_spatial_mask_array(self, vector: str, behavior: str, label_attr: str, feature: str) -> np.ma: if self.x_var is None or self.y_var is None: raise ValueError('Unable to determine x and y dimensions') sample_data = self._open_data(self.files[0]) x = _array_by_eng(sample_data, self.x_var) y = _array_by_eng(sample_data, self.y_var) if self.engine != 'pygrib': sample_data.close() # catch the case when people use improper 2d instead of proper 1d coordinate dimensions if x.ndim == 2: x = x[0, :] if y.ndim == 2: y = y[:, 0] # check if you need to vertically invert the array mask (if y vals go from small to large) # or if you need to transpose the mask (if the dimensions go x then y, should be y then x- think of the shape) invert = y[-1] > y[0] transpose = self.dim_order.index(self.x_var) < self.dim_order.index(self.y_var) # read the shapefile vector_gdf = gpd.read_file(vector) # set up the variables to create and storing masks masks = [] # what is the shape of the grid to be masked gshape = (y.shape[0], x.shape[0],) # calculate the affine transformation of the grid to be masked aff = affine.Affine(np.abs(x[1] - x[0]), 0, x.min(), 0, np.abs(y[1] - y[0]), y.min()) # creates a binary/boolean mask of the shapefile # in the same crs, over the affine transform area, for a certain masking behavior if behavior == 'dissolve': m = riof.geometry_mask(vector_gdf.geometry, gshape, aff, invert=invert) if transpose: m = np.transpose(m) masks.append(('shape', m)) elif behavior == 'feature': assert label_attr in vector_gdf.keys(), \ 'label_attr parameter not found in attributes list of the vector data' assert feature is not None, \ 'Provide a value for the feature argument to query for certain features' vector_gdf = vector_gdf[vector_gdf[label_attr] == feature] assert not vector_gdf.empty, f'No features have value "{feature}" for attribute "{label_attr}"' m = riof.geometry_mask(vector_gdf.geometry, gshape, aff, invert=invert) if transpose: m = np.transpose(m) masks.append((feature, m)) elif behavior == 'features': assert label_attr in vector_gdf.keys(), \ 'label_attr parameter not found in attributes list of the vector data' for idx, row in vector_gdf.iterrows(): m = riof.geometry_mask(gpd.GeoSeries(row.geometry), gshape, aff, invert=invert) if transpose: m = np.transpose(m) masks.append((row[label_attr], m)) return masks def _handle_time(self, opened_file, file_path: str, time_range: tuple) -> tuple: if self.strp_filename: # strip the datetime from the file name tvals = [datetime.datetime.strptime(os.path.basename(file_path), self.strp_filename), ] elif self.engine == 'pygrib': tvals = [opened_file[self.variables].validDate, ] else: tvals = _array_by_eng(opened_file, self.t_var) if isinstance(tvals, np.datetime64): tvals = [tvals] if tvals.ndim == 0: ... else: tvals = [t for t in tvals] # convert the time variable array's numbers to datetime representations if self.interp_units: if self.engine == 'xarray': ... elif self.unit_str is None: tvals = _delta_to_time(tvals, _attr_by_eng(opened_file, self.t_var, 'units'), self.origin_format) else: tvals = _delta_to_time(tvals, self.unit_str, self.origin_format) tvals = np.array(tvals) # if the variable depends on time then there should be a coordinate provided for it if self.t_var_in_dims: t1 = time_range[0] t2 = time_range[1] if isinstance(t1, list) or isinstance(t1, tuple): t1 = t1[self.t_index] if isinstance(t2, list) or isinstance(t2, tuple): t2 = t2[self.t_index] # otherwise, no time coordinates provided. else: t1 = None t2 = None time_slices = _map_coords_to_slice(tvals, t1, t2, 'time') return tvals[(time_slices,)], time_slices def _open_data(self, path): if self.engine == 'xarray': return xr.open_dataset(path, backend_kwargs=self.xr_kwargs) elif self.engine == 'opendap': try: if self.session: return xr.open_dataset(xr.backends.PydapDataStore.open(path, session=self.session)) else: return xr.open_dataset(path) except ConnectionRefusedError as e: raise e except Exception as e: print('Unexpected Error') raise e elif self.engine == 'auth-opendap': return xr.open_dataset(xr.backends.PydapDataStore.open( path, session=setup_session(self.user, self.pswd, check_url=path))) elif self.engine == 'netcdf4': return nc.Dataset(path, 'r') elif self.engine == 'cfgrib': return xr.open_dataset(path, engine='cfgrib', backend_kwargs=self.xr_kwargs) elif self.engine == 'pygrib': a = pygrib.open(path) return a.read() elif self.engine == 'h5py': return h5py.File(path, 'r') elif self.engine == 'rasterio': return xr.open_rasterio(path) else: raise ValueError(f'Unable to open file, unsupported engine: {self.engine}')