bonjour,
j’ai une solution un peu compliquée car je l’utilise avec plusieurs données différentes. je te la propose quand même :
1/ script
script:
- service: python_script.exec
alias: cumul des précipitations sur 24h
data:
new_datage: sensor.macon1_reference_time
new_data: sensor.macon1_precipitation
sensor_cumul_nom: sensor.macon1_precipitation_cumul_24h
sensor_cumul_attribut: données_24
sensor_cumul_heures: 24
sensor_cumul_cle_data: cumul_24h
sensor_cumul_attributs_complementaires: >
{'friendly_name': "mâcon 1h précipitation cumul 24h", 'unit_of_measurement': 'mm', 'icon': 'mdi:weather-rainy',
'alias': 'cumul des précipitations 1 heure sur 24 heures en mm'}
file: include/python_scripts/meteofrance_cumul.py
2/fichier Python
'''
meteofrance_cumul.py
calcul du cumul de pluie, ensoleillement,... sur 1h, 24h,... à partir des données 6mn ou 1h
'''
from datetime import datetime, timedelta
import json
print = logger.debug
print(f'démarrage de meteofrance_cumul.py pour {data.get('sensor_cumul_nom')}')
def ComputeCumul(print, state_datage: datetime, state_last: float, sensor_cumul, attr_historique: str, duree_heures: int) -> [float, list, str]:
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
if state_datage.second != 0: # on ne prend pas en compte les secondes
return None, None, None
# cumul
data_cumul_hh = None
txt_reset = None
if sensor_cumul is not None:
state_cumul = sensor_cumul.state # state
attr_cumul = sensor_cumul.attributes.copy() # attributs
data_cumul_hh = attr_cumul.get(attr_historique) # historique -> [[datetime, float],...]
datage_dernier_cumul = datetime.strptime(attr_cumul.get('datage'), '%d/%m/%Y %H:%M:%S') # datage du dernier passage
# passage à minuit => on reprend à 0
if True:
txt_reset = 'pas de remise à 0 à minuit'
else:
txt_reset = attr_cumul.get('txt_reset') # si pas de modif, on conserve la précédente valeur
if state_datage.day != datage_dernier_cumul.day:
txt_reset = f'day={state_datage.day}, cumul={datage_dernier_cumul.day}, now={datetime.now()}' # nouvelle valeur
data_cumul_hh = None # on reprend avec une liste vide
# cumul HH heures
if data_cumul_hh is not None: # réinitialisation manuelle possible du sensor en supprimant l'attribut 'données_24'
# ajout de la nouvelle donnée
if (len(data_cumul_hh) > 0) and (state_datage != data_cumul_hh[-1][0]): # pour éviter le 'surcumul' en cas de déclenchement manuel
print(f'{duree_heures}/ajouté : {state_datage}, {state_last}')
data_cumul_hh.append([state_datage, state_last])
# calcul du cumul
datage_limite = state_datage - relativedelta(hours=duree_heures)
print(f'{duree_heures}/datage limite={datage_limite}')
cumul_hh = 0
nbre_hh = 0
n_limite = -1
for n, p in enumerate(data_cumul_hh):
# print(f'{duree_heures}/?? : {p}')
if p[0] <= datage_limite:
print(f'{duree_heures}/supprimé : {p[0]}')
n_limite = n
continue
print(f'{duree_heures}/conservé : {p[0]}, {p[1]}')
cumul_hh += p[1]
nbre_hh += 1
data_cumul_hh = data_cumul_hh[n_limite + 1:]
print(f'{duree_heures}/cumul de {nbre_hh} valeurs = {cumul_hh}')
else:
# nouvelle donnée
print(f'{duree_heures}/nouvelle donnée : {state_last}')
data_cumul_hh = [[state_datage, state_last]]
cumul_hh = state_last
nbre_hh = 1
return cumul_hh, data_cumul_hh, txt_reset
# datage de la nouvelle donnée
sensor_datage = hass.states.get(data.get('new_datage'))
attr = data.get('new_datage_attribut')
if attr is not None:
attributs = sensor_datage.attributes.copy() # attributs
datage = attributs.get(attr)
else:
datage = sensor_datage.state
state_datage = datetime.strptime(datage, '%d/%m/%Y %H:%M:%S')
state_datage -= timedelta(seconds=state_datage.second) # on ne prend pas en compte les secondes
# nouvelle donnée
sensor_last = hass.states.get(data.get('new_data'))
state_last = float(sensor_last.state)
# cumul
sensor_cumul_nom = data.get('sensor_cumul_nom')
sensor_cumul_attribut = data.get('sensor_cumul_attribut')
sensor_cumul_heures = data.get('sensor_cumul_heures')
sensor_cumul = hass.states.get(sensor_cumul_nom)
# compléments pour les attributs du sensor de cumul : clés et attributs complémentaires
sensor_cumul_cle_data = data.get('sensor_cumul_cle_data')
sensor_cumul_attributs_complementaires = data.get('sensor_cumul_attributs_complementaires')
sensor_cumul_attributs_complementaires = sensor_cumul_attributs_complementaires.replace("'", "\"") # mise en forme pour json
sensor_cumul_attributs_complementaires = json.loads(sensor_cumul_attributs_complementaires) # passage en dict
# calcul
print(f'calcul de {sensor_cumul_nom} sur {sensor_cumul_heures} heures')
cumul_hh, data_cumul_hh, txt_reset = ComputeCumul(print, state_datage, state_last, sensor_cumul, sensor_cumul_attribut, sensor_cumul_heures)
# mise à jour du sensor de cumul
attr = {'datage': datetime.now().strftime("%d/%m/%Y %H:%M:%S"),
'txt_reset': txt_reset,
sensor_cumul_attribut: data_cumul_hh, sensor_cumul_cle_data: cumul_hh} # nouvelles données
attr = {**attr, **sensor_cumul_attributs_complementaires} # attributs complémentaires
hass.states.set(sensor_cumul_nom, cumul_hh, attr)
print(f'meteofrance_cumul terminé pour {sensor_cumul_nom}')
j’espère que les commentaires sont suffisants !
bon courage