Source code for phydrus.read

"""
The read module contains methods that can be used to read in- and output
files. The methods can be used stand-alone but are also available from the
Model object. All the methods return the data as Pandas DataFrames.

Examples
--------
>>> import phydrus as ps
>>> ps.read.read_obs_node()

or

>>> ml.read_obs_node()

"""

from pandas import read_csv, DataFrame, to_numeric

from .decorators import check_file_path


[docs]def read_profile(path="PROFILE.OUT"): """ Method to read the PROFILE.OUT output file. Parameters ---------- path: str, optional String with the name of the profile out file. default is "PROFILE.OUT". Returns ------- data: pandas.DataFrame Pandas with the profile data """ data = _read_file(path=path, start="depth", idx_col="n") return data
[docs]def read_run_inf(path="RUN_INF.OUT", usecols=None): """ Method to read the RUN_INF.OUT output file. Parameters ---------- path: str, optional String with the name of the run_inf out file. default is "RUN_INF.OUT". usecols: list of str optional List with the names of the columns to import. Default is all columns. Returns ------- data: pandas.DataFrame Pandas with the run_inf data """ data = _read_file(path=path, start="TLevel", idx_col="TLevel", usecols=usecols) return data
[docs]@check_file_path def read_i_check(path="I_CHECK.OUT"): """ Method to read the I_CHECK.OUT output file. Parameters ---------- path: str, optional String with the name of the I_Check out file. default is "I_Check.OUT". Returns ------- data: dict Dictionary with the node as a key and a Pandas DataFrame as a value. """ names = ["theta", "h", "log_h", "C", "K", "log_K", "S", "Kv"] end = [] start = 0 with open(path) as file: # Find the starting line for i, line in enumerate(file.readlines()): if "theta" in line: start = i elif "end" in line and start > 0: end.append(i) data = {} for i, e in enumerate(end): file.seek(0) # Go back to start of file # Read data into a Pandas DataFrame nrows = e - start - 2 data[i] = read_csv(file, skiprows=start + 1, nrows=nrows, skipinitialspace=True, delim_whitespace=True, names=names, dtype=float) start = e return data
[docs]def read_tlevel(path="T_LEVEL.OUT", usecols=None): """ Method to read the T_LEVEL.OUT output file. Parameters ---------- path: str, optional String with the name of the t_level out file. default is "T_LEVEL.OUT". usecols: list of str optional List with the names of the columns to import. By default only the real fluxes are imported and not the cumulative fluxes. Options are: "rTop", "rRoot", "vTop", "vRoot", "vBot", "sum(rTop)", "sum(rRoot)", "sum(vTop)", "sum(vRoot)", "sum(vBot)", "hTop", "hRoot", "hBot", "RunOff", "sum(RunOff)", "Volume", "sum(Infil)", "sum(Evap)", "TLevel", "Cum(WTrans)", "SnowLayer". Returns ------- data: pandas.DataFrame Pandas with the t_level data """ data = _read_file(path=path, start="rTop", idx_col="Time", remove_first_row=True, usecols=usecols) data = data.set_index(to_numeric(data.index, errors='coerce')) return data
[docs]def read_alevel(path="A_LEVEL.OUT", usecols=None): """ Method to read the A_LEVEL.OUT output file. Parameters ---------- path: str, optional String with the name of the t_level out file. default is "A_LEVEL.OUT". usecols: list of str optional List with the names of the columns to import. Returns ------- data: pandas.DataFrame Pandas with the a_level data """ data = _read_file(path=path, start="Time", idx_col="Time", remove_first_row=True, usecols=usecols) return data
[docs]def read_solute(path="SOLUTE1.OUT"): """ Method to read the SOLUTE.OUT output file. Parameters ---------- path: str, optional String with the name of the solute out file. default is "SOLUTE1.OUT". Returns ------- data: pandas.DataFrame Pandas with the a_level data """ data = _read_file(path=path, start="Time", idx_col="Time", remove_first_row=True) return data
@check_file_path def _read_file(path, start, end="end", usecols=None, idx_col=None, remove_first_row=False): """ Internal method to read Hydrus output files. Parameters ---------- path: str String with the filepath. start: str String that determines the start of the data to be imported. end: str, optional String that determines the end of the data to be imported. usecols: list, optional List with the names of the columns to import. Default is all columns. idx_col: str, optional String with the name used for the index column. remove_first_row: bool, optional Remove the first row if True. Default is False. Returns ------- data: pandas.DataFrame Pandas DataFrame with the imported data. """ with open(path) as file: # Find the starting line for i, line in enumerate(file.readlines()): if start in line: s = i elif end in line: e = i break file.seek(0) # Go back to start of file # Read data into a Pandas DataFrame data = read_csv(file, skiprows=s, nrows=e - s - 2, usecols=usecols, index_col=idx_col, skipinitialspace=True, delim_whitespace=True) if remove_first_row: data = data.drop(index=data.index[0]).apply(to_numeric, errors="ignore") else: data = data.apply(to_numeric, errors="ignore") return data
[docs]@check_file_path def read_obs_node(path="OBS_NODE.OUT", nodes=None, conc=False, cols=None): """ Method to read the OBS_NODE.OUT output file. Parameters ---------- path: str, optional String with the name of the OBS_NODE out file. default is "OBS_NODE.OUT". nodes: list of ints, optional nodes to imoport conc: boolean, optional cols: list of strs, optional List of strings with the columns to read. Returns ------- data: dict Dictionary with the node as a key and a Pandas.DataFrame as a value. """ data = {} with open(path) as file: # Find the starting times to read the information for i, line in enumerate(file.readlines()): if "time" in line: start = i elif "end" in line: end = i break df1 = read_csv(path, skiprows=start, index_col=0, nrows=end - start - 1, skipinitialspace=True, delim_whitespace=True, engine="c") if cols is None: cols = ["h", "theta", "Temp"] if conc: cols.append("Conc") for i, node in enumerate(nodes): if i > 0: usecols = [f"{c}.{i}" for c in cols] else: usecols = cols df = df1.loc[:, usecols] df.columns = cols data[node] = df return data
[docs]@check_file_path def read_nod_inf(path="NOD_INF.OUT", times=None): """ Method to read the NOD_INF.OUT output file. Parameters ---------- path: str, optional String with the name of the NOD_INF out file. default is "NOD_INF.OUT". times: int, optional Create a DataFrame with nodal values of the pressure head, the water content, the solution and sorbed concentrations, and temperature, etc, at the time "times". default is None. Returns ------- data: dict Dictionary with the time as a key and a Pandas DataFrame as a value. """ use_times = [] start = [] end = [] with open(path) as file: # Find the starting times to read the information for i, line in enumerate(file.readlines()): if "Time" in line and "Date" not in line: time = line.replace(" ", "").split(":")[1].replace("\n", "") use_times.append(float(time)) elif "Node" in line: start.append(i) elif "end" in line: end.append(i) if times is None: times = use_times # Read the data into a Pandas DataFrame data = {} for s, e, time in zip(start, end, use_times): if time in times: file.seek(0) # Go back to start of file data[time] = read_csv(file, skiprows=s, skipinitialspace=True, delim_whitespace=True, nrows=e - s - 2) data[time] = data[time].drop([0]) data[time] = data[time].apply(to_numeric) if len(data) == 1: return next(iter(data.values())) else: return data
[docs]@check_file_path def read_balance(path="BALANCE.OUT", usecols=None): """ Method to read the BALANCE.OUT output file. Parameters ---------- path: str, optional String with the name of the run_inf out file. default is "BALANCE.OUT". usecols: list of str optional List with the names of the columns to import. By default: ['Area','W-volume','In-flow','h Mean','Top Flux', 'Bot Flux', 'WatBalT','WatBalR']. Returns ------- data: pandas.DataFrame Pandas with the balance data """ if usecols is None: usecols = ["Area", "W-volume", "In-flow", "h Mean", "Top Flux", "Bot Flux", "WatBalT", "WatBalR"] lines = open(path).readlines() use_times = [] start = [] end = [16] for i, line in enumerate(lines): for x in usecols: if x in line: line1 = x line2 = line.replace("\n", "").split(" ")[-1] line3 = line.replace(" ", " ").split(" ")[-2] lines[i] = [line1, line2, line3] if "Time" in line and "Date" not in line: time = float( line.replace(" ", "").split("]")[1].replace("\n", "")) use_times.append(time) if "Area" in line: start.append(i) if "WatBalR" in line: end.append(i + 1) if "Sub-region" in line: subreg = line.replace(" ", " ").replace("\n", "").split(" ")[-1] data = {} for s, e, time in zip(start, end, use_times): df = DataFrame(lines[s:e]).set_index(0).T index = {} for x in range(int(subreg) + 1): index[x + 1] = x df = df.rename(index=index) data[time] = df return data