Skip to content
Snippets Groups Projects
simulation_parameters.py 12.3 KiB
Newer Older
from pydantic import BaseModel
from source.forcage import *
import logging
from datetime import datetime


class SimulationParameters(BaseModel):
    model: str
    project_id: int
    sources: list[str]
    start_date: datetime
    end_date: datetime
    naos_token: str
    write_in_naos: bool

    def __init__(self, model: str, project_id: int, sources: list[str], start_date: str, end_date: str,
                 naos_token: str, write_in_naos: bool):
        BaseModel.__init__(self, model=model, project_id=project_id, sources=sources, start_date=start_date,
                           end_date=end_date, naos_token=naos_token, write_in_naos=write_in_naos)
        self.model = model
        self.project_id = project_id
        self.sources = sources
        self.start_date = datetime.strptime(start_date, "%Y-%m-%dT%H:%M:%S.%fZ")
        self.end_date = datetime.strptime(end_date, "%Y-%m-%dT%H:%M:%S.%fZ")
        self.naos_token = naos_token
        self.write_in_naos = write_in_naos

        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

    def prepare_forcages_houle_2d(self, start_date: datetime, end_date: datetime) \
            -> tuple[list[Forcage], list[SteeringFile]]:

        # Create the forcages
        atmos_sources = []
        debits_sources = []
        houle_sources = []
        forcages = []
        model_timestep_str = ""

        for source_name in self.sources:
            source = SOURCES_NAMES[source_name]
            if source.variables[0] in VariableAtmospheric:
                atmos_sources.append(source)
            elif source.variables[0] in VariableWave:
                houle_sources.append(source)
            else:
                debits_sources.append(source)

        if atmos_sources:
            forcages.append(
                Forcage("paramètres atmosphériques",
                        atmos_sources,
                        "forcage_atmos_OM_headers.dat",
                        "meteo_estuaire.dat"))
        if houle_sources:
            forcages.append(
                Forcage("houle",
                        houle_sources,
                        "input_tomawac_headers.dat",
                        "input_tomawac.dat"))
        if debits_sources:
            forcages.append(
                Forcage("débits",
                        debits_sources,
                        "debits_headers.dat",
                        "debits.dat"))

        # Retrieve the steering file for gitlab repository
        steering_files = get_steering_files(self.project_id, self.naos_token)
        if not steering_files:
            raise ValueError(f"Unable to get steering file of {self.project_id}")

        # Update the steering file content
        # Only the t2d steering file has the TIME STEP variable
        for steering_file in steering_files:
            if steering_file.filename.lower().startswith("t2d"):
                t2d_steering_file = steering_file
                try:
                    model_timestep_str = t2d_steering_file.find_in_steering_file(
                        r"(?<=TIME STEP)\s+=\s+(?P<content>[0-9]+.?[0-9]?)")
                except ValueError as err:
                    raise ValueError(f"Unable to update steering file. Reason: {err}") from err
                break

        model_timestep = int(float(model_timestep_str))

        forecast_duration_days = (end_date - start_date).days
        forecast_duration_seconds = int((end_date - start_date).total_seconds())
        nr_steps = forecast_duration_seconds // model_timestep

        templates = FORCAGE_TEMPLATES[self.model]
        start_date_forcage_fmt = start_date.strftime("%Y%m%d")

        try:
            for steering_file in steering_files:

                if steering_file.filename.lower().startswith("t2d"):
                    result_file = templates["result-file-t2d"] % (start_date_forcage_fmt, forecast_duration_days)
                    result_regex = "RESULTS FILE"
                    time_regex = "NUMBER OF TIME STEPS"
                    date_regex = "ORIGINAL DATE OF TIME"
                    date = "{dt.year};{dt.month};{dt.day}".format(dt=start_date)
                else:
                    result_file = templates["result-file-tom"] % (start_date_forcage_fmt, forecast_duration_days)
                    result_regex = "FICHIER DES RESULTATS 2D"
                    time_regex = "NOMBRE DE PAS DE TEMPS"
                    date_regex = "DATE DE DEBUT DU CALCUL"
                    date = start_date.strftime("%Y%m%d") + "0000"

                steering_file.update_steering_file(rf"{result_regex}\s+=\s+(?P<content>[^\r\n]+)", result_file)
                steering_file.update_steering_file(rf"(?<={time_regex})\s+=\s+(?P<content>\S+)", str(nr_steps))
                steering_file.update_steering_file(rf"(?<={date_regex})\s+=\s+(?P<content>\S+)", date)

        except ValueError as err:
            raise err
        return forcages, steering_files

    def prepare_forcages_gironde_xl_3d(self, start_date: datetime, end_date: datetime) -> \
            tuple[list[Forcage], list[SteeringFile]]:
        # Create the forcages
        atmos_sources = []
        frliq_sources = []
        forcages = []
        for source_name in self.sources:
            source = SOURCES_NAMES[source_name]
            if source.variables[0] in VariableAtmospheric:
                atmos_sources.append(source)
            else:
                frliq_sources.append(source)

        if atmos_sources:
            forcages.append(
                Forcage("paramètres atmosphériques", atmos_sources, "forcage_atmos_OM_headers.dat",
                        "forcage_atmos_OM.dat"))
        if frliq_sources:
            forcages.append(
                Forcage("frontière liquide", frliq_sources, "forcage_FRLIQ_ST_headers.dat", "forcage_FRLIQ_ST.dat"))

        # Retrieve the steering file for gitlab repository
        steering_files = get_steering_files(self.project_id, self.naos_token)
        if not steering_files:
            raise ValueError(f"Unable to get steering file of {self.project_id}")

        # Gironde-Xl-3d has only one steering_file
        steering_file = steering_files[0]

        # Update the steering file content
        try:
            model_timestep_str = steering_file.find_in_steering_file(
                r"(?<=TIME STEP)\s+=\s+(?P<content>[0-9]+.?[0-9]?)")
        except ValueError as err:
            raise ValueError(f"Unable to update steering file. Reason: {err}") from err

        model_timestep = int(float(model_timestep_str))

        forecast_duration_days = (end_date - start_date).days
        forecast_duration_seconds = int((end_date - start_date).total_seconds())
        nr_steps = forecast_duration_seconds // model_timestep

        templates = FORCAGE_TEMPLATES[self.model]

        start_date_forcage_fmt = start_date.strftime("%Y%m%d")

        result_file_3d = templates["result-file-3d"] % (start_date_forcage_fmt, forecast_duration_days)
        result_file_2d = templates["result-file-2d"] % (start_date_forcage_fmt, forecast_duration_days)

        steering_file.update_steering_file(r"3D RESULT FILE  =[\r\n]+(?P<content>[^\r\n]+)", result_file_3d)
        steering_file.update_steering_file(r"2D RESULT FILE  =[\r\n]+(?P<content>[^\r\n]+)", result_file_2d)
        steering_file.update_steering_file(r"(?<=NUMBER OF TIME STEPS =)\s?(?P<content>\S+)", str(nr_steps))
        steering_file.update_steering_file(r"(?<=ORIGINAL DATE OF TIME =)\s?(?P<content>\S+)",
                                           "{dt.year};{dt.month};{dt.day}".format(dt=start_date))

        return forcages, [steering_file]

    def prepare_forcages(self, start_date: datetime, end_date: datetime) \
            -> tuple[list[Forcage], list[SteeringFile]]:
        model_used = self.model
        if model_used not in FORCAGE_TEMPLATES.keys():
            raise ValueError(f"{model_used} is not a valid model name")

        try:
            if model_used == "gironde-xl-3d":
                return self.prepare_forcages_gironde_xl_3d(start_date, end_date)
            else:
                return self.prepare_forcages_houle_2d(start_date, end_date)
        except ValueError as err:
            raise ValueError(f"Unable to prepare forcages. Reason: {err}") from err

    def update_repository(self, steering_files: list[SteeringFile], forcages: list[Forcage]) -> bool:
        """Update the gitlab repository with new parameter files to run a simulation.

        First, this checks if some previous parameters files need to be
        deleted. Then, this updates the repository by uploading the new
        parameter files and deleting the previous ones.

        :param list[SteeringFile] steering_files : a list of SteeringFile
        :param list[Forcage] forcages: a list of forcage files
        :returns: False on error

        """
        # Retrieve existing files from the forecast directory
        try:
            existing_paths = [elt["path"] for elt in list_files(self.project_id, self.naos_token)]
        except ValueError:
            return False

        commit_actions = []

        # Add the generated forcages and the Telemac3d file to the commit
        for forcage in forcages:
            file_path = forcage.filename
            existing_path = file_path in existing_paths
            action = "update" if existing_path else "create"
            commit_actions.append({"action": action, "file_path": file_path, "content": forcage.content})

        # Add the steering files to the commit
        for steering_file in steering_files:
            steering_path = steering_file.filename
            existing_steering = steering_path in existing_paths
            steering_action = "update" if existing_steering else "create"
            commit_actions.append(
                {"action": steering_action, "file_path": steering_path, "content": steering_file.content})

        # Do the commit
        today = datetime.now()
        commit_msg = f"forcage - {today:%Y-%m-%d}"
        try:
            commit(self.project_id, commit_actions, commit_msg, self.naos_token)
        except ValueError:
            return False

        return True

    def create_simulation_parameters(self) -> None:
        start_date = self.start_date.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=None)
        end_date = self.end_date.replace(tzinfo=None)

        # If the end date is not set at midnight, the next day at midnight
        # must be used to have a full coverage of the last day.
        if (end_date.microsecond != 0
                or end_date.second != 0
                or end_date.minute != 0
                or end_date.hour != 0):
            end_date = (end_date + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)

        forcages, steering_files = self.prepare_forcages(start_date, end_date)

        # fill headers and download forecast for each forcage
        module_dir = os.path.abspath('')

        forecast_data_dir = os.path.join(module_dir, "data", "simulation")

        model_used = self.model
        for forcage in forcages:
            for source in forcage.sources:
                try:
                    source.download(start_date, end_date)
                except Exception as err:
                    raise ValueError(err) from err
            forcage.fill_content(model_used, forecast_data_dir, start_date, end_date)

        # push files to gitlab
        if self.write_in_naos:
            updated = self.update_repository(steering_files, forcages)
            if not updated:
                raise ValueError("Error uploading forcing files")

    def create_simulation(self) -> dict:
        diff_date = self.end_date - self.start_date
        if diff_date.days > 30:
            return {"error": 1,
                    "result": f"Requested simulation is too long: {diff_date.days} days."
                    }

        try:
            self.create_simulation_parameters()
        except Exception as err:
            return {"error": 1,
                    "result": f"Creation of new simulation parameters failed : {err}"
                    }

        if self.write_in_naos:
            return {"error": 0,
                    "result": f"Creation of new simulation parameters OK : "
                              f"{get_result_url(self.project_id, self.naos_token)}"
                    }
        else:
            return {"error": 0,
                    "result": "Creation of new simulation parameters OK "
                    }