Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 246 additions & 0 deletions src/ewatercycle/observation/grdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,3 +352,249 @@ def _extract_metadata(lines, key, cast=str, default="NA"):
return default
warnings.warn(f"{key} not found, set to {default}", stacklevel=2)
return default


##------- Monthly GRDC data ----------
def get_grdc_data_monthly(
station_id: str,
start_time: str,
end_time: str,
data_home: str | None = None,
column1: str = "original streamflow",
column2: str = "calculated streamflow",
column3: str = "flag",
) -> xr.Dataset:
"""Load monthly GRDC discharge data for a specific station.

This function is similar to `get_grdc_data` but specifically for **monthly data**.
It returns a Dataset with three time series columns:
- column1: original streamflow (default: "original streamflow")
- column2: calculated streamflow (default: "calculated streamflow")
- column3: flag (default: "flag")

Currently, this function only reads GRDC `.txt` files.
NetCDF support (.nc) is not implemented.

Args:
station_id : str
GRDC station identifier.
start_time : str
Start of the period to extract, in ISO format (e.g., "YYYY-MM-DDTHH:MMZ").
end_time : str
End of the period to extract, in ISO format (e.g., "YYYY-MM-DDTHH:MMZ").
data_home : str | None, optional
Path to the directory containing the GRDC files. Defaults to None.
If None, falls back to `CFG.grdc_location`.
column1 : str, optional
Name for the first data column (original streamflow).
column2 : str, optional
Name for the second data column (calculated streamflow).
column3 : str, optional
Name for the third data column (flag).

Returns:
grdc data in a xarray dataset.
Shaped like a filtered version of the GRDC daily NetCDF file.
Data description:
Original - original (provided) data
Calculated - GRDC calculated from daily data
Flag - percentage of valid values used for calculation from daily data

Raises:
ValueError: If no data for the requested station id
and period could not be found.
NotImplementedError
If a NetCDF (.nc) file is found (support not implemented).
"""
if data_home:
data_path = to_absolute_path(data_home)
elif CFG.grdc_location:
data_path = to_absolute_path(CFG.grdc_location)
else:
msg = (
"Provide the grdc path using `data_home` argument"
"or using `grdc_location` in ewatercycle configuration file."
)
raise ValueError(msg)

if not data_path.exists():
msg = f"The grdc directory {data_path} does not exist!"
raise ValueError(msg)

# # Read the NetCDF file
nc_file = data_path / "GRDC-Monthly.nc"
if nc_file.exists():
msg = ".nc support not implemented at this point"
raise NotImplementedError(msg)

# Read the text data
raw_file = data_path / f"{station_id}_Q_Month.txt"
if not raw_file.exists():
if nc_file.exists():
msg = f"The grdc station {station_id} is not in the {nc_file} file and {raw_file} does not exist!" # noqa: E501
raise ValueError(msg)
msg = f"The grdc file {raw_file} does not exist!"
raise ValueError(msg)

# Convert the raw data to an dataframe
metadata, df = _grdc_read_monthly(
raw_file,
start=get_time(start_time).date(),
end=get_time(end_time).date(),
column1=column1,
column2=column2,
column3=column3,
)

return xr.Dataset.from_dict(
{
"coords": {
"time": {
"dims": ("time",),
"attrs": {"long_name": "time"},
"data": df.index.to_numpy(),
},
"id": {
"dims": (),
"attrs": {"long_name": "grdc number"},
"data": int(station_id),
},
},
"dims": {
"time": len(df.index),
},
"attrs": {
"title": metadata["dataSetContent"],
"Conventions": "CF-1.7",
"references": "grdc.bafg.de",
"institution": "GRDC",
"history": f"Converted from {raw_file.name} of {metadata['file_generation_date']} to netcdf by eWaterCycle Python package", # noqa: E501
"missing_value": "-999.000",
},
"data_vars": {
"Original discharge": {
"dims": ("time",),
"attrs": {
"units": "m3/s",
"long_name": "Mean monthly discharge (MQ)",
},
"data": df[column1].to_numpy(),
},
"Calculated discharge": {
"dims": ("time",),
"attrs": {
"units": "m3/s",
"long_name": "Mean monthly discharge (MQ)",
},
"data": df[column2].to_numpy(),
},
"Flag": {
"dims": ("time",),
"attrs": {
"units": "%",
"long_name": "percentage of valid daily values used for calculation", # noqa: E501
},
"data": df[column3].to_numpy(),
},
"area": {
"dims": (),
"attrs": {"units": "km2", "long_name": "catchment area"},
"data": metadata["grdc_catchment_area_in_km2"],
},
"country": {
"dims": (),
"attrs": {
"long_name": "country name",
"iso2": "ISO 3166-1 alpha-2 - two-letter country code",
},
"data": metadata["country_code"],
},
"geo_x": {
"dims": (),
"attrs": {
"units": "degree_east",
"long_name": "station longitude (WGS84)",
},
"data": metadata["grdc_longitude_in_arc_degree"],
},
"geo_y": {
"dims": (),
"attrs": {
"units": "degree_north",
"long_name": "station latitude (WGS84)",
},
"data": metadata["grdc_latitude_in_arc_degree"],
},
"geo_z": {
"dims": (),
"attrs": {
"units": "m",
"long_name": "station altitude (m above sea level)",
},
"data": metadata["altitude_masl"],
},
"owneroforiginaldata": {
"dims": (),
"attrs": {"long_name": "Owner of original data"},
"data": metadata["Owner of original data"],
},
"river_name": {
"dims": (),
"attrs": {"long_name": "river name"},
"data": metadata["river_name"],
},
"station_name": {
"dims": (),
"attrs": {"long_name": "station name"},
"data": metadata["station_name"],
},
"timezone": {
"dims": (),
"attrs": {
"units": "00:00",
"long_name": "utc offset, in relation to the national capital",
},
"data": nan,
},
},
}
)


def _grdc_read_monthly(grdc_station_path, start, end, column1, column2, column3):
"""Private helper function for reading monthly grdc data."""
with grdc_station_path.open("r", encoding="cp1252", errors="ignore") as file:
data = file.read()

metadata = _grdc_metadata_reader(grdc_station_path, data)

all_lines = data.split("\n")
header = 0
for i, line in enumerate(all_lines):
if line.startswith("# DATA"):
header = i + 1
break

# Import GRDC data into dataframe and modify dataframe format
grdc_data = pd.read_csv(
grdc_station_path,
encoding="cp1252",
skiprows=header,
delimiter=";",
parse_dates=["YYYY-MM-DD"],
na_values="-999",
)
grdc_station_df = pd.DataFrame(
{
column1: grdc_data[" Original"].array,
column2: grdc_data[" Calculated"].array,
column3: grdc_data[" Flag"].array,
},
index=grdc_data["YYYY-MM-DD"].array,
)
grdc_station_df.index.rename("time", inplace=True) # noqa: PD002

# Select GRDC station data that matches the forecast results Date
grdc_station_select = grdc_station_df.loc[start:end]

return metadata, grdc_station_select
Loading
Loading