"""Utility functions: unit conversions, path helpers, time-series pivoting, XML parsing."""
import inspect
from numbers import Real
from os import PathLike
from pathlib import Path
from bs4 import BeautifulSoup
import numpy as np
import pandas as pd
[docs]
def script_local_path(filename, must_exist=True, caller_file=None):
"""Resolve a path relative to the calling script file.
Args:
filename: File name or relative path from the script directory.
must_exist: If True, raise FileNotFoundError when the resolved path does not exist.
caller_file: Optional script path override; defaults to the caller's file.
"""
if not isinstance(filename, (str, PathLike)):
raise TypeError("Input 'filename' must be a path string or path-like object.")
if isinstance(filename, str) and not filename.strip():
raise ValueError("Input 'filename' cannot be an empty string.")
if not isinstance(must_exist, bool):
raise TypeError("Input 'must_exist' must be a boolean.")
candidate = Path(filename)
if candidate.is_absolute():
resolved = candidate
else:
if caller_file is None:
caller_file = inspect.stack()[1].filename
script_dir = Path(caller_file).resolve().parent
resolved = script_dir / candidate
if must_exist and not resolved.exists():
raise FileNotFoundError(f"File not found: {resolved}")
return resolved
[docs]
def ensure_dir(path):
"""Create a directory (and parents) if missing and return it as a Path."""
if not isinstance(path, (str, PathLike)):
raise TypeError("Input 'path' must be a path string or path-like object.")
if isinstance(path, str) and not path.strip():
raise ValueError("Input 'path' cannot be an empty string.")
directory = Path(path)
directory.mkdir(parents=True, exist_ok=True)
return directory
def _convert_with_factor(val, factor):
"""Apply a multiplicative conversion factor to scalar and vector-like numeric inputs."""
if isinstance(val, bool):
raise TypeError("Input 'val' must be numeric (bool is not allowed).")
if isinstance(val, Real):
return val * factor
if isinstance(val, pd.Series):
if pd.api.types.is_bool_dtype(val.dtype) or not pd.api.types.is_numeric_dtype(val.dtype):
raise TypeError("Input 'val' pandas Series must have a numeric dtype.")
return val * factor
if isinstance(val, np.ndarray):
if np.issubdtype(val.dtype, np.bool_) or not np.issubdtype(val.dtype, np.number):
raise TypeError("Input 'val' NumPy array must have a numeric dtype.")
return val * factor
if isinstance(val, list):
if any((isinstance(x, bool) or not isinstance(x, Real)) for x in val):
raise TypeError("Input 'val' list must contain only real numbers.")
return [x * factor for x in val]
if isinstance(val, tuple):
if any((isinstance(x, bool) or not isinstance(x, Real)) for x in val):
raise TypeError("Input 'val' tuple must contain only real numbers.")
return tuple(x * factor for x in val)
raise TypeError(
"Input 'val' must be a real number, pandas Series, numpy array, list, or tuple of real numbers."
)
[docs]
def convert_cfs_to_af(val):
"""Convert cubic feet per second (daily) to acre-feet for numeric scalar/vector inputs."""
return _convert_with_factor(val, 1.98347)
[docs]
def convert_cfs_to_cms(val):
"""Convert cubic feet per second to cubic meters per second for numeric scalar/vector inputs."""
return _convert_with_factor(val, 0.028316831998814504)
[docs]
def convert_cms_to_mcm(val):
"""Convert cubic meters per second (daily) to million cubic meters for numeric scalar/vector inputs."""
return _convert_with_factor(val, 24 * 60 * 60 / 1e6)
[docs]
def pivot_timeseries_by_year(data, value_column=None, aggfunc="mean"):
"""Pivot a DatetimeIndex time series into day-of-year (rows) by year (columns).
Args:
data: pandas Series or DataFrame with a DatetimeIndex.
value_column: Required for multi-column DataFrames; ignored for Series.
aggfunc: Aggregation used by pandas pivot_table when duplicate day/year
entries occur. Common options: ``"mean"`` (good default for rate-like
data), ``"sum"`` (additive quantities), ``"min"``, ``"max"``,
``"median"``, ``"first"``, ``"last"``. Any aggregation string
accepted by pandas pivot_table is valid.
"""
if not isinstance(data, (pd.Series, pd.DataFrame)):
raise TypeError("Input 'data' must be a pandas Series or DataFrame.")
if not isinstance(data.index, pd.DatetimeIndex):
raise TypeError("Input 'data' must use a pandas DatetimeIndex.")
if isinstance(data, pd.Series):
values = data.copy(deep=True)
if value_column is not None:
raise ValueError("Input 'value_column' must be None when 'data' is a Series.")
else:
if value_column is None:
if len(data.columns) != 1:
raise ValueError(
"Input 'value_column' is required when 'data' is a DataFrame with multiple columns."
)
value_column = data.columns[0]
if not isinstance(value_column, str) or value_column not in data.columns:
raise ValueError("Input 'value_column' must name a column in 'data'.")
values = data[value_column].copy(deep=True)
if not isinstance(aggfunc, str) or not aggfunc.strip():
raise ValueError("Input 'aggfunc' must be a non-empty string.")
return pd.pivot_table(
pd.DataFrame({"value": values}),
values="value",
index=values.index.dayofyear,
columns=values.index.year,
aggfunc=aggfunc,
)
[docs]
def read_xml(filename):
"""Read an XML file and return a BeautifulSoup XML document."""
if not isinstance(filename, (str, PathLike)):
raise TypeError("Input 'filename' must be a path string or path-like object.")
if isinstance(filename, str) and not filename.strip():
raise ValueError("Input 'filename' cannot be an empty string.")
with open(filename, "r", encoding="utf-8") as f:
data = f.read()
if not data.strip():
raise ValueError(f"XML file '{filename}' is empty.")
return BeautifulSoup(data, "xml")
[docs]
def process_xml(filename, include_objective_directions=False, default_objective_direction="minimize"):
"""Parse XML configuration into decision names, objective names, and optional objective directions.
Args:
filename: Path to XML configuration file.
include_objective_directions: If True, also return objective direction values.
default_objective_direction: Direction to use when an objective has no <sense> tag.
Must be either "minimize" or "maximize".
"""
if not isinstance(include_objective_directions, bool):
raise TypeError("Input 'include_objective_directions' must be a boolean.")
if not isinstance(default_objective_direction, str) or not default_objective_direction.strip():
raise ValueError("Input 'default_objective_direction' must be a non-empty string.")
default_direction = default_objective_direction.strip().lower()
valid_directions = {"minimize", "maximize"}
if default_direction not in valid_directions:
raise ValueError("Input 'default_objective_direction' must be 'minimize' or 'maximize'.")
bs_param = read_xml(filename)
rw_inputs = bs_param.find_all("rwInput")
objectives = bs_param.find_all("objective")
objective_names = []
objective_directions = []
for objective in objectives:
name_tag = objective.find("name")
if name_tag is None or not name_tag.text or not name_tag.text.strip():
raise ValueError("Each <objective> entry must contain a non-empty <name> tag.")
objective_names.append(name_tag.text.strip())
sense_tag = objective.find("sense")
if sense_tag and sense_tag.text and sense_tag.text.strip():
sense = sense_tag.text.strip().lower()
if sense not in valid_directions:
raise ValueError(
f"Objective sense '{sense_tag.text}' is invalid. Use 'Minimize' or 'Maximize'."
)
objective_directions.append(sense)
else:
objective_directions.append(default_direction)
decision_variable_names = []
for rw_input in rw_inputs:
name_tag = rw_input.find("name")
if name_tag is None or not name_tag.text or not name_tag.text.strip():
raise ValueError("Each <rwInput> entry must contain a non-empty <name> tag.")
decision_variable_names.append(name_tag.text.strip())
if include_objective_directions:
return decision_variable_names, objective_names, objective_directions
return decision_variable_names, objective_names