Source code for subsettools.subsetting

"""Functions to subset gridded files from national datasets in HydroData.

The following functions can be used to subset gridded input files to set up a
ParFlow simulation.
    - subset static model inputs
    - subset meteorological forcings
    - subset initial pressure data
    - subset gridded CLM inputs (vegm)
"""

# pylint: disable=C0301,R0913,R0914,R0917
import os
from datetime import datetime, timedelta
import threading
import re
import warnings
import numpy as np
import hf_hydrodata
from parflow.tools.io import write_pfb
from ._common import (
    get_utc_time,
    get_hf_gridded_data,
)
from ._error_checking import (
    _validate_grid,
    _validate_dir,
    _validate_grid_bounds,
    _validate_date,
)


_HOURS_PER_FORCING_FILE = 24


[docs] def subset_static( ij_bounds, dataset, write_dir, var_list=( "slope_x", "slope_y", "pf_indicator", "mannings", "pf_flowbarrier", "pme", "ss_pressure_head", ), ): """Subset static input files from national datasets in HydroData. The subset values will be written as ParFlow binary files (pfbs) in write_dir. By default the following variables will be subset. - Slope in the east/west direction (slope_x) - Slope in the north/south direction (slope_y) - Subsurface units indicator file (pf_indicator) - Mannings roughness coefficients (mannings) - Depth to bedrock (pf_flowbarrier) - Long term average precipitation minus evaporation (i.e. recharge) (pme) - Steady state pressure head used to initialize transient simulations (ss_pressure_head) Note that some datasets might not contain all 7 static input variables. In that case, the subset_static function is going to raise a ValueError for any variables that do not exist in the dataset. The default variable list contains the necessary static variables for the CONUS2 grid. For CONUS1-based datasets, "mannings" and "pf_flowbarrier" should be removed from the list. Args: ij_bounds (tuple[int]): bounding box for subset. This should be given as i,j index values where 0,0 is the lower left hand corner of a domain. ij_bounds are given relative to whatever grid is being used for the subset. dataset (str): static inputs dataset name from the HydroData catalog e.g. "conus1_domain" write_dir (str): directory where the subset files will be written var_list (tuple[str]): tuple of variables to subset from the dataset. By default all 7 variables above will be subset. The user can specify a subset of these variables or list additional variables that are available in their dataset of choice. Returns: A dictionary mapping the static variable names to the corresponding file paths where the subset data were written. Example: .. code-block:: python # Subsetting static variables for a CONUS1 workflow # We need to remove "pf_flowbarrier" and "mannings" from the list filepaths = subset_static( ij_bounds=(375, 239, 487, 329), dataset="conus1_domain", write_dir="/path/to/your/chosen/directory", var_list=("slope_x", "slope_y", "pf_indicator", "pme", "ss_pressure_head") ) # Subsetting static variables for a CONUS2 workflow # Note that we can use the default var_list here filepaths = subset_static( ij_bounds=(3701, 1544, 3792, 1633), dataset="conus2_domain", write_dir="/path/to/your/chosen/directory", ) """ warnings.warn( "Note that for subsettools versions >= 2.0.0, this function will raise " "a ValueError if a variable in var_list is not supported in the " "dataset. (In older versions, it just printed an error message and " "continued executing normally). You can check in the HydroData " "documentation which variables are contained in each dataset " "(https://hf-hydrodata.readthedocs.io/en/latest/available_data.html).", DeprecationWarning, stacklevel=2, ) _validate_grid_bounds(ij_bounds) if not isinstance(dataset, str): raise TypeError("dataset name must be a string.") _validate_dir(write_dir) if not all(isinstance(var, str) for var in var_list): raise TypeError("All variable names should be strings.") file_paths = {} options = { "dataset": dataset, "file_type": "pfb", "temporal_resolution": "static", "grid_bounds": ij_bounds, } for var in var_list: options["variable"] = var subset_data = get_hf_gridded_data(options) file_path = os.path.join(write_dir, f"{var}.pfb") write_pfb(file_path, subset_data, dist=False) file_paths[var] = file_path print(f"Wrote {var}.pfb in specified directory.") return file_paths
[docs] def subset_press_init(ij_bounds, dataset, date, write_dir, time_zone="UTC"): """Subset a pressure file from a national dataset in HydroData. This function will select the pressure file for midnight on the date provided and subset the selected pressure file to the ij_bounds provided. The subset data will be written out as a ParFlow binary file (pfb) to be used as an initial pressure file for a ParFlow simulation. Args: ij_bounds (tuple[int]): bounding box for subset. This should be given as i,j index values where 0,0 is the lower left hand corner of a domain. ij_bounds are given relative to whatever grid is being used for the subset. dataset (str): dataset name from the HydroData catalog that the pressure file will be subset from e.g. "conus1_baseline_mod" date (str): The date of the pressure file that you would like to subset, in the form 'yyyy-mm-dd' write_dir (str): directory where the subset file will be written time_zone (str): timezone information for subset date. Data will be subset at midnight in the specified timezone. This should be a zoneinfo-supported time zone. Defaults to "UTC". Returns: The filepath of the subset file, which includes datetime information, so that it can be used by later functions (e.g. edit_runscript_for_subset). Example: .. code-block:: python filepath = subset_press_init( ij_bounds=(375, 239, 487, 329), dataset="conus1_baseline_mod", date="2005-12-15", write_dir="/path/to/your/chosen/directory", time_zone="EST" ) """ _validate_grid_bounds(ij_bounds) if not isinstance(dataset, str): raise TypeError("dataset name must be a string.") _validate_date(date) _validate_dir(write_dir) if not isinstance(time_zone, str): raise TypeError("time_zone must be a string.") new_date = get_utc_time(date, time_zone) print(f"UTC Date: {new_date}") date_string = new_date.strftime("%Y.%m.%d:%H.%M.%S_UTC0") options = { "dataset": dataset, "variable": "pressure_head", "file_type": "pfb", "temporal_resolution": "hourly", "grid_bounds": ij_bounds, "start_time": new_date, } subset_data = get_hf_gridded_data(options) file_path = os.path.join(write_dir, f"{dataset}_{date_string}_press.pfb") write_pfb(file_path, subset_data[0, :, :, :], dist=False) print(f"Wrote {file_path} in specified directory.") return file_path
[docs] def subset_forcing( ij_bounds, grid, start, end, dataset, write_dir, time_zone="UTC", forcing_vars=( "precipitation", "downward_shortwave", "downward_longwave", "specific_humidity", "air_temp", "atmospheric_pressure", "east_windspeed", "north_windspeed", ), dataset_version=None, ): """Subset forcing files from national datasets in HydroData. Subset forcing data will be written out as pfb files formatted for a ParFlow run with 24 hours per forcing file. Per ParFlow-CLM convention separate files will be written for each variable following the standard clm variable naming convention. Forcing file outputs will be numbered starting with 0000 and data will start at midnight local time for the timezone that has been provided. If no timezone is provided it will default to midnight UTC. Args: ij_bounds (tuple[int]): bounding box for subset. This should be given as i,j index values where 0,0 is the lower left hand corner of a domain. ij_bounds are given relative to whatever grid is being used for the subset. grid (str): The spatial grid that the ij indices are calculated relative to and that the subset data will be returned on. Possible values: "conus1" or "conus2" start (str): start date (inclusive), in the form 'yyyy-mm-dd' end (str): end date (exlusive), in the form 'yyyy-mm-dd' dataset (str): forcing dataset name from the HydroData catalog that the forcing files will be subset from e.g. "NLDAS2". write_dir (str): directory where the subset files will be written time_zone (str): timezone information for start and end dates. Data will be subset starting at midnight in the specified timezone. This should be a zoneinfo-supported time zone. Defaults to "UTC". forcing_vars (tuple[str]): tuple of forcing variables to subset. By default all 8 variables needed to run ParFlow-CLM will be subset. dataset_version (str): version of the forcing dataset. By default the latest version of a dataset will be returned. Returns: A dictionary mapping the forcing variable names to the corresponding file paths where the subset data were written. Example: .. code-block:: python filepaths = subset_forcing( ij_bounds=(1225, 1738, 1347, 1811), grid="conus2", start="2005-11-01", end="2005-12-01", dataset="CW3E", write_dir="/path/to/your/chosen/directory", forcing_vars=("precipitation", "air_temp"), dataset_version="0.9", ) """ _validate_grid_bounds(ij_bounds) _validate_grid(grid) _validate_date(start) _validate_date(end) if not isinstance(dataset, str): raise TypeError("dataset name must be a string.") _validate_dir(write_dir) if not isinstance(time_zone, str): raise TypeError("time_zone must be a string.") if not all(isinstance(var, str) for var in forcing_vars): raise TypeError("All variable names should be strings.") forcing_vars = tuple(set(forcing_vars)) outputs = {} start_date = get_utc_time(start, time_zone) end_date = get_utc_time(end, time_zone) exit_event = threading.Event() lock = threading.Lock() threads = [ threading.Thread( target=_subset_forcing_variable, args=( variable, ij_bounds, grid, start_date, end_date, dataset, write_dir, dataset_version, outputs, exit_event, lock, ), ) for variable in forcing_vars ] for thread in threads: thread.start() try: for thread in threads: thread.join() except KeyboardInterrupt: print("Interrupted. Stopping threads...") exit_event.set() for thread in threads: thread.join() print("All threads stopped.") if exit_event.is_set(): raise ValueError("One or more threads were interrupted.") return outputs
def _subset_forcing_variable( variable, ij_bounds, grid, start_date, end_date, dataset, write_dir, dataset_version, outputs, exit_event, lock, ): """ Helper to read forcing data of one variable and write it daily files in write_dir folder. """ (base_file_path, hf_filter_options) = _get_forcing_file_basename( dataset, variable, grid, dataset_version, ij_bounds ) # Allocate a numpy array to buffer 24 hours (1 day) of data to be written to each file block_day_np = np.full( (24, ij_bounds[3] - ij_bounds[1], ij_bounds[2] - ij_bounds[0]), np.nan ) # compute timezone hours offset between UTC time and requested timezone time start_date_midnight = datetime(start_date.year, start_date.month, start_date.day) timezone_offset = (start_date - start_date_midnight).total_seconds() / 3600 # Get optimal hours to read at time from hf_hydrodata based on grid size block_hours_per_read = _get_read_block_size(ij_bounds) # Read data in blocks of data using hf_hydrodata start_block_date = start_date_midnight day = 1 hour = 0 skipped_offset_in_day_1 = False write_paths = [] while start_block_date < end_date and not exit_event.is_set(): (subset_data, end_block_date) = _read_hf_hydrodata_block( hf_filter_options, start_block_date, end_date, block_hours_per_read ) (block_day_np, hour, day, skipped_offset_in_day_1) = _write_block_to_files( subset_data, block_day_np, hour, day, skipped_offset_in_day_1, ij_bounds, timezone_offset, base_file_path, write_dir, write_paths, ) # increment start_block_date (in hours) for next hf_hydrodata block read start_block_date = end_block_date if not exit_event.is_set(): with lock: outputs[variable] = write_paths print(f"Finished writing {variable} to folder") def _read_hf_hydrodata_block( hf_filter_options, start_block_date, end_date, block_hours_per_read ): """ Read a block of data from hydrodata to support subset_forcing function. Parameters: hf_filter_options: The hf_hydrodata filter options to be used for the read. start_block_date: The value to put put in start_time options of hf_filter options. end_date: The final end date for the subset_forcing function. block_hours_per_read: The number of hours to read to be used to set end_time option in read. Returns: A tuple (subset_data, end_block_date) The subset data is numpy array containing dimensions (z, y, x) where z is the hours of read data. The end_block_date is the start date of the next call to this function. The date range read may be a range of 24 hour UTC periods or may be range of hours less than 24 hours. This depends on the ij_bound size. """ block_delta_per_read = timedelta(hours=block_hours_per_read) if block_hours_per_read >= 24: # The block is more than a day so read the block_hours_per_read hours end_block_date = min(start_block_date + block_delta_per_read, end_date) else: # If the block is less than 24 hours then the start/end must be in the same UTC day end_block_date = min(start_block_date + block_delta_per_read, end_date) start_block_date_midnight = datetime( start_block_date.year, start_block_date.month, start_block_date.day ) end_block_date_midnight = datetime( end_block_date.year, end_block_date.month, end_block_date.day ) if start_block_date_midnight != end_block_date_midnight: # The end block date must in the same UTC day as the start date end_block_date = end_block_date_midnight # Read the block using get_gridded_data hf_filter_options["start_time"] = start_block_date hf_filter_options["end_time"] = end_block_date subset_data = get_hf_gridded_data(hf_filter_options) return (subset_data, end_block_date) def _write_block_to_files( subset_data, block_day_np, hour, day, skipped_offset, ij_bounds, timezone_offset, base_filename, write_dir, write_paths, ): """ Process the subset_data of one block and write the data to daily forcing files. Parameters: subset_data: The numpy array of a block returned by get_gridded_data. block_day_np: A numpy buffer the size of 24 hours if ij_bounds to be written to file. hour: The next hour to be added to the block_day_np buffer (number from 0-23). day: The next day to be used to for the next daily file to be written. skipped_offset: True if we already skipped the first hours of the block for timezone reads. ij_bounds: The grid bounds of the requested data [min_x, min_y, max_x, max_y]. timezone_offset:The number of hours of the timezone from UTC time (is 0 for UTC). base_filename: The base file name of the daily forcing files to be written. write_dir: The directory to write the daily forcing files. write_paths: An array of full file paths of files written to write_dir. Returns: block_day_np, hour, day, skipped_offset) The returned block_day_np may the the same as the one passed in and filled with new hours or it may be a newly allocated buffer if the buffer was just written to a file. If the block_day_np buffer is filled to 24 hours then the buffer is written to a daily forcing file and the hours set back to 0 to be filled by the next hours from subset_data. The subset_data block may contain many days of files so this may write many daily files or if the bounds is large this function may write no files, but only partiall fill the buffer. The hour, day and skipped offset are returned as updated values after processing the hours data in the subset_data and updating the block_day_np. """ if timezone_offset == 0: # There is no timezone offset, but subset_data might be 1 hour or 24 hours or > 24 hours # Loop over all hours in the hf_hydrodata result (z is the hour dimension of the result) for z in range(0, subset_data.shape[0]): block_day_np[hour, :, :] = subset_data[z, :, :] hour = hour + 1 if hour == 24: _write_day_to_file( block_day_np, base_filename, day, write_dir, write_paths ) block_day_np = np.full( (24, ij_bounds[3] - ij_bounds[1], ij_bounds[2] - ij_bounds[0]), np.nan, ) day = day + 1 hour = 0 else: # There is a timezone offset # Loop over all hours in the hf_hydrodata result (z is the hour dimension of the result) for z in range(0, subset_data.shape[0]): if day == 1 and hour < timezone_offset and not skipped_offset: # These are the first hours before timezone offset of the first day # Skip these rows, until we read all the hours before timezone offset hour = hour + 1 if hour == timezone_offset: skipped_offset = True hour = 0 elif hour < _HOURS_PER_FORCING_FILE - timezone_offset: # These are the first hours of an output day, this does not fill day block_day_np[hour, :, :] = subset_data[z, :, :] hour = hour + 1 elif hour < 24: # These are hours from the next 24 hour UTC block to fill up the output day block_day_np[hour, :, :] = subset_data[z, :, :] hour = hour + 1 if hour == 24: _write_day_to_file( block_day_np, base_filename, day, write_dir, write_paths ) block_day_np = np.full( ( 24, ij_bounds[3] - ij_bounds[1], ij_bounds[2] - ij_bounds[0], ), np.nan, ) day = day + 1 hour = 0 else: # This should not happen because hour should be zero after writting 24 hour day raise ValueError("Too many hours in a day for file.") return (block_day_np, hour, day, skipped_offset) def _get_forcing_file_basename(dataset, variable, grid, dataset_version, ij_bounds): """ Get the base file name to be used to write daily forcing files by subset_forcing function. Parameters: dataset: The hf_hydrodata dataset version of the forcing file. variable: The hf_hydrodata variable name of the focing file. grid: The grid of the requested forcing data dataset_version:The hf_hydrodata version of the dataset of the forcing file. ij_bounds: The grid_bounds of the request to put into the return hf_filter_options. Returns: A tuple (base_filename, hf_filter_options). The base_filename is the base file name of the forcing file stored in /hydrodata. The hf_filter_options are the options to be passed to get_gridded_data to get forcing data. """ # Get base path name of output forcing file using hf_hydrodata path hf_filter_options = { "dataset": dataset, "variable": variable, "grid": grid, "file_type": "pfb", "grid_bounds": ij_bounds, "mask": "false", "temporal_resolution": "hourly", "dataset_version": dataset_version, } path = hf_hydrodata.get_paths(hf_filter_options)[0] base_filename = os.path.basename(path) return (base_filename, hf_filter_options) def _get_read_block_size(ij_bounds): """ Get the number of hours of data to be read by the blocking reads to hf_hydrodata to optimize the performance of reads for the subset_forcing function. Parameters: ij_bounds: This is the grid bounds of the request [min_x, min_y, max_x, max_y]. Returns: The number of hours of data to return each each block call for subset_forcing function. This may return many days of data for a small subgrid or only a few hours of data for a large subgrid that is too large to return 24 hours of data in a single hf_hydrodata call. """ float64_byte_size = 8 max_chunking_memory_bytes = 1500000000 # Compute the time delta between hf_hydrodata chunking block reads of data to optimize performance # The block of time we can read depends on the ij_bounds of the requested subset of data # First try to find the number of days of data we can read using the max bytes per hf_hydrodata read subgrid_byte_size = ( abs(ij_bounds[0] - ij_bounds[2]) * abs(ij_bounds[1] - ij_bounds[3]) * float64_byte_size ) block_days_per_read = int( max_chunking_memory_bytes / (subgrid_byte_size * _HOURS_PER_FORCING_FILE) ) # Limit number of days per read to 1 year block_days_per_read = min(block_days_per_read, 366) # Get number of hours to read per block block_hours_per_read = block_days_per_read * _HOURS_PER_FORCING_FILE if block_days_per_read == 0: # ij_bounds is so large we cannot download 24 hours in one get_gridded_data call # So use the most hours we can at a time instead of blocks of 24 hours in one call block_hours_per_read = int(max_chunking_memory_bytes / (subgrid_byte_size)) return block_hours_per_read def _write_day_to_file(data_np, base_filename, day, write_dir, write_paths): """ Write a file containing 1 day of hourly data to a file. Parameters: data_np: A number array of dimension (z, y, x) where z is 24 hours. base_filename: The base name of the daily file to be written. write_dir: The directory to write the daily file. write_paths: An array of full file paths written. The path is appended to this. """ write_path = os.path.join(write_dir, _adjust_filename_hours(base_filename, day)) write_paths.append(write_path) write_pfb(write_path, data_np, dist=False) def _adjust_filename_hours(filename, day): """Adjust the part of the the forcing filename representing hours. The aim is to match what a parflow simulation expects on each day of the simulation. The first day of the simulation the hours will be “.000001_to_000024.”, the second day the hours will be “.000025_to_000048.” and so on. This is in case the first day of simulation does not coincide with the first day of the water year (Oct 1st), as the dataset filenames assume day 1 is Oct 1st. The input and output filenames must match the regular expression “*.*.[0-9]{6}_to_[0-9]{6}.*” Parameters: filename (str): original forcing filename day (int): day relative to the start date of forcing file subsetting Returns: The new forcing filename with adjusted hours. """ if day < 1: raise ValueError("day must be >= 1") s1, s2, s3, s4 = filename.split(".") if not s1 or not s2 or not s3: raise ValueError("invalid forcing filename") pattern = re.compile("[0-9]{6}_to_[0-9]{6}") if pattern.fullmatch(s3) is None: raise ValueError("invalid forcing filename") start = str(_HOURS_PER_FORCING_FILE * (day - 1) + 1).rjust(6, "0") end = str(_HOURS_PER_FORCING_FILE * day).rjust(6, "0") s3 = start + "_to_" + end if pattern.fullmatch(s3) is None: raise ValueError("invalid forcing filename") return ".".join([s1, s2, s3, s4])