Newer
Older

Karine PARRA
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from datetime import datetime, timedelta
from enum import IntEnum
from source.variable import *
class DataSourceMode(IntEnum):
FORECAST = 0
HISTORICAL = 1
@dataclass
class DataSource:
constants_time_step = 3600
def __init__(self, name="", forecast_duration=None, history_duration=None, url=None, api_params=None,
variables=None):
self.name = name
self.forecast_duration = forecast_duration
self.history_duration = history_duration
self.url = url
self.api_params = api_params
self.variables = variables
def __str__(self):
return self.name
@staticmethod
def download(start_date, end_date) -> bool:
return False
def compute_real_dates(self, start_date: datetime, end_date: datetime) -> tuple[datetime, datetime, DataSourceMode]:
forecast_end_date = datetime.now().replace(hour=23, minute=59, second=59, microsecond=0) + timedelta(
seconds=self.forecast_duration)
end_date_cut = min(forecast_end_date, end_date)
# If the end date has been cut, because of the forecast limit,
# it needs to end the next day at midnight.
if (end_date_cut.hour != 0
or end_date_cut.minute != 0
or end_date_cut.second != 0
or end_date_cut.microsecond != 0):
end_date_cut = (end_date_cut + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
start_date_cut = start_date.replace(hour=0, minute=0, second=0, microsecond=0)
for history_duration in self.history_duration:
forecast_limit = datetime.now() - timedelta(seconds=history_duration)
if end_date_cut < forecast_limit:
return start_date_cut, end_date_cut, DataSourceMode.HISTORICAL
else:
return start_date_cut, end_date_cut, DataSourceMode.FORECAST