Skip to content
Snippets Groups Projects
data_hubeau.py 3.71 KiB
Newer Older
from dateutil.parser import parse
from source.data_source import *
from typing import Any


class DataHubeau(DataSource):

    def parse_response_hubeau_without_value(self, times):
        self.variables[2].values = np.zeros(times.shape[0])
        self.variables[2].times = times
        self.variables[3].values = np.zeros(times.shape[0])
        self.variables[3].times = times
        self.variables[4].values = np.zeros(times.shape[0])
        self.variables[4].times = times

    def parse_response_hubeau_with_value(self, variable_idx, values, times):
        sediment_first_idx = 2 * (variable_idx + 1)
        sediment_modelisation = self.variables[sediment_first_idx].modelisation
        assert sediment_modelisation is not None
        sediment_values = sediment_modelisation(values)
        self.variables[sediment_first_idx].values = sediment_values
        self.variables[sediment_first_idx].times = times
        self.variables[sediment_first_idx + 1].values = sediment_values
        self.variables[sediment_first_idx + 1].times = times

    def parse_dict(self, api_response: dict[str, Any], variable: Variable,
                   mode: DataSourceMode) -> None:
        """Parse the HubEau API response to store the result in the source variables

        :param api_response: the api response
        :param VariableFrLiq variable: Garonne or Dordogne
        :param DataSourceMode mode: historical or forecast mode
        """
        logging.info(f"-- Parsing Hub'Eau datas for variable : {variable} ")
        if mode == DataSourceMode.FORECAST:
            result_key = "resultat_obs"
            date_key = "date_obs"
        else:
            result_key = "resultat_obs_elab"
            date_key = "date_obs_elab"

        # value need to be converted from l/s to m3/s
        nr_data = len(api_response["data"])
        values = np.zeros(nr_data, dtype=np.float64)
        times = np.zeros(len(api_response["data"]), dtype="datetime64[s]")
        for idx, data in enumerate(api_response["data"]):
            # divide by 1000 to convert to g/l
            values[idx] = data[result_key] / 1000.0
            times[idx] = parse(data[date_key]).replace(tzinfo=None)

        variable_idx = self.variables.index(variable)
        self.variables[variable_idx].values = values
        self.variables[variable_idx].times = times

        # 3D model, Compute Sediment values
        if self.name == "hub-eau-3d":
            self.parse_response_hubeau_with_value(variable_idx, values, times)
        elif variable_idx == 0:
            self.parse_response_hubeau_without_value(times)

    def download(self, start_date: datetime, end_date: datetime) -> bool:
        """Download forecast api_response from hub eau api
        Data are requested from a city specified by code_entite

        :param HubEauSource3D | HubEauSource2D self: hub eau source
        :param datetime start_date: start date
        :param datetime end_date: end date
        :returns: bool
        """
        logging.info("Downloading Hub'Eau datas")
        real_start_date, real_end_date, mode = self.compute_real_dates(start_date, end_date)
        if mode == DataSourceMode.FORECAST:
            date_debut_field = "date_debut_obs"
            date_fin_field = "date_fin_obs"
        else:
            date_debut_field = "date_debut_obs_elab"
            date_fin_field = "date_fin_obs_elab"

        for idx, param in enumerate(self.api_params[mode]):
            req_url = (
                f"{self.url[mode]}?{param}"
                f"&{date_debut_field}={real_start_date:%Y-%m-%d}"
                f"&{date_fin_field}={real_end_date:%Y-%m-%d}"
            )
            response = get_curl_json(req_url)
            self.parse_dict(response, self.variables[idx], mode)

        return True