Voici le code
Avec beaucoup d’aide de chat gpt
import os
import json
import logging
from datetime import datetime as dt, time
from requests import session, exceptions
import hassapi as hass
# Configuration du logging
logging.basicConfig(level=logging.DEBUG)
# Constantes
MATIERE_DECHETS_MENAGER = 1
BASE_URL = "https://{cc}.ecocito.com/Usager/Collecte/GetCollecte?skip=0&take=20&requireTotalCount=true&dateDebut={year_start}-01-01T01%3A00%3A00.000Z&dateFin={year_end}-12-31T23%3A59%3A00.000Z&charger=true&idMatiere={type_matiere}"
CHEMIN_FICHIER_SEMESTRE_1 = '/homeassistant/www/ecocito-app/fichier_filtre_semestre_1.json'
CHEMIN_FICHIER_SEMESTRE_2 = '/homeassistant/www/ecocito-app/fichier_filtre_semestre_2.json'
DATE_LIMITE = '2024-06-30T00:00:00'
DATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
DATE_FORMAT_OUTPUT = '%d-%m-%Y'
class Ecocito(hass.Hass):
def initialize(self):
# Utiliser datetime.time correctement
time_to_run = time(17, 0, 0)
self.connection_payload = {
'Identifiant': self.args["user"],
'MotDePasse': self.args["passw"],
'MaintenirConnexion': 'false',
'FranceConnectActif': 'False'
}
self.cc = self.args["cc"]
self.create_input_numbers()
self.set_sensors_callback()
self.run_daily(self.set_sensors_callback, time_to_run)
# Ajouter un écouteur d'événements pour surveiller les changements de prix_volume et prix_abonnement
self.listen_state(self.recalculate_prices, "input_number.ecocito_prix_volume")
self.listen_state(self.recalculate_prices, "input_number.ecocito_prix_abonnement")
logging.debug("Écouteurs d'événements ajoutés pour input_number.ecocito_prix_volume et input_number.ecocito_prix_abonnement")
def create_input_numbers(self):
"""Créer des input_numbers pour définir les prix."""
input_numbers = {
"input_number.ecocito_prix_volume": 0.42,
"input_number.ecocito_prix_abonnement": 60.00
}
for input_number, initial_value in input_numbers.items():
if not self.entity_exists(input_number):
self.set_state(input_number, state=initial_value)
logging.info(f"Création de {input_number} avec une valeur initiale de {initial_value} €.")
else:
logging.info(f"{input_number} existe déjà.")
def get_year_total(self, tt):
"""Calculer le total de QUANTITE_NETTE pour l'année."""
return sum(elem["QUANTITE_NETTE"] for elem in tt)
def filter_and_save_data(self, data, date_limite, chemin_fichier, condition):
"""Filtrer les données selon la condition et sauvegarder dans un fichier JSON."""
if condition == "before":
filtered_data = [item for item in data if dt.strptime(item['DATE_DONNEE'], DATE_FORMAT) <= date_limite]
else:
filtered_data = [item for item in data if dt.strptime(item['DATE_DONNEE'], DATE_FORMAT) > date_limite]
os.makedirs(os.path.dirname(chemin_fichier), exist_ok=True)
with open(chemin_fichier, 'w') as f:
json.dump({"data": filtered_data}, f, indent=4)
logging.info(f"Les données ont été filtrées et sauvegardées dans '{chemin_fichier}'.")
return filtered_data
def calculate_total(self, data, prix_volume, prix_abonnement):
"""Calculer la somme des QUANTITE_NETTE et le coût total."""
totalQuantNet = sum(item['QUANTITE_NETTE'] for item in data)
totalSommeEuro = round(totalQuantNet * prix_volume + (prix_abonnement / 2), 2)
return totalQuantNet, totalSommeEuro
def set_sensors_callback(self, *kwargs):
# Appeler recalculate_prices avec des arguments factices
self.recalculate_prices("input_number.ecocito_prix_volume", None, None, None, {})
def recalculate_prices(self, entity, attribute, old, new, kwargs):
logging.debug(f"recalculate_prices appelé avec entity={entity}, attribute={attribute}, old={old}, new={new}")
try:
# Récupérer les valeurs des input_numbers
prix_volume_state = self.get_state("input_number.ecocito_prix_volume")
prix_abonnement_state = self.get_state("input_number.ecocito_prix_abonnement")
if prix_volume_state is None or prix_abonnement_state is None:
logging.error("Erreur : La valeur de 'input_number.ecocito_prix_volume' ou 'input_number.ecocito_prix_abonnement' est None.")
return
prix_volume = float(prix_volume_state)
prix_abonnement = float(prix_abonnement_state)
with session() as c:
c.post(F'https://{self.cc}.ecocito.com/Usager/Profil/Connexion', data=self.connection_payload)
response = c.get(BASE_URL.format(cc=self.cc, year_start=dt.now().year, year_end=dt.now().year, type_matiere=MATIERE_DECHETS_MENAGER))
response.raise_for_status()
totalDechetsMenager = response.json()
# Vérifier la présence des données et ajouter un message de confirmation
if 'data' in totalDechetsMenager and totalDechetsMenager['data']:
logging.info("Données récupérées avec succès.")
else:
logging.error("Erreur : Les données récupérées sont vides ou incorrectes.")
return
total_count = totalDechetsMenager['totalCount']
data_entries = totalDechetsMenager['data']
date_donnee = data_entries[0]['DATE_DONNEE']
quantite_nette = data_entries[0]['QuantiteNette']
quantite_nette_val = data_entries[0]['QUANTITE_NETTE']
# Calculer le coût et arrondir à deux décimales
cost = round(quantite_nette_val * prix_volume + (prix_abonnement / 2), 2)
logging.info(f"Coût: {cost}€")
# Convertir la date au format souhaité
formatted_date = dt.strptime(date_donnee, DATE_FORMAT).strftime(DATE_FORMAT_OUTPUT)
logging.info(f"Dernière levée: {formatted_date}")
self.set_state("sensor.ecocito_dechets_menager", state=total_count, attributes={
"Dernière levée": {
"date": formatted_date,
"poid": quantite_nette,
"coût": f"{cost}€",
"test": self.get_year_total(data_entries)
}
})
date_limite = dt.strptime(DATE_LIMITE, DATE_FORMAT)
# Filtrer et sauvegarder les données pour le premier semestre
filtered_data_semestre_1 = self.filter_and_save_data(data_entries, date_limite, CHEMIN_FICHIER_SEMESTRE_1, "before")
totalQuantNet1, totalSommeEuro1 = self.calculate_total(filtered_data_semestre_1, prix_volume, prix_abonnement)
logging.info(f"La somme des QUANTITE_NETTE pour le semestre 1 est : {totalQuantNet1} kg, coût total : {totalSommeEuro1} €")
self.set_state("sensor.ecocito_semestre_1", state=totalQuantNet1, attributes={
"coût total": f"{totalSommeEuro1}€",
"prix poids total": f"{round(totalQuantNet1 * prix_volume, 2)}€",
"nombre de levées": len(filtered_data_semestre_1),
"unit_of_measurement": "kg",
"friendly_name": "Semestre 1"
})
# Filtrer et sauvegarder les données pour le deuxième semestre
filtered_data_semestre_2 = self.filter_and_save_data(data_entries, date_limite, CHEMIN_FICHIER_SEMESTRE_2, "after")
totalQuantNet2, totalSommeEuro2 = self.calculate_total(filtered_data_semestre_2, prix_volume, prix_abonnement)
logging.info(f"La somme des QUANTITE_NETTE pour le semestre 2 est : {totalQuantNet2} kg, coût total : {totalSommeEuro2} €")
self.set_state("sensor.ecocito_semestre_2", state=totalQuantNet2, attributes={
"coût total": f"{totalSommeEuro2}€",
"prix poids total": f"{round(totalQuantNet2 * prix_volume, 2)}€",
"nombre de levées": len(filtered_data_semestre_2),
"unit_of_measurement": "kg",
"friendly_name": "Semestre 2"
})
except (exceptions.RequestException, json.JSONDecodeError) as e:
logging.error(f"Erreur lors de la récupération ou du traitement des données: {e}")
except Exception as e:
logging.error(f"Erreur inattendue: {e}")
La dernière entité doit encore être modifiée