Externaliser son historique - Stockage long terme

Hello la communauté !

Juste pour vous tenir au courant, j’ai pas mal avancé et j’ai maintenant une solution fonctionnelle. Il me reste maintenant à peaufiner quelques filtres dans Nodered pour éviter d’injecter des valeurs nulles par exemple.

Je ferai un tuto dédié à ça une fois terminé, dans l’attente je vous laisse voir mes précédents échanges :
https://forum.hacf.fr/t/influxdb-v1-8-x-aller-encore-plus-loin/3673/97?u=neuvidor

Pour résumé ma solution, je tourne sur Proxmox de cette façon :

2 « J'aime »

Bonjour,

Ma base de données commençant à atteindre une taille indécente je m’intéresse fortement à ce sujet. D’autant plus que je trouve l’idée d’externaliser son historique à un endroit non lié à notre système domotique très pratique si on pense au futur et que l’on ne veut pas tout perdre. Et pour ça, comme le fait remarquer @Neuvidor, il faut garder la main aussi sur le formatage de ses données.
Et c’est un peu là mon problème : comment récupérer les valeurs d’un capteurs dans Node-Red pour ensuite formater les données reçues avant de les envoyer vers InfluxDB ? Pour des capteurs remontant dans MQTT je n’ai pas de problème mais pour les capteurs remontant directement via une intégration dans Home Assistant ?
J’ai bien installer le noeud « node-red-contrib-home-assistant-websocket » dans Node-Red mais ensuite impossible de recevoir les valeurs en utilisant « current state » :

Est-ce que quelqu’un a déjà pu mettre en place cela ?

Salut

Petite nuance quand même. La taille importante est généralement plus liée à la quantité des données à court terme.
Ensuite pourquoi passer par nodered pour injecter dans influxdb ? Si c’est pour choisir les valeurs à externaliser : le recorder de base fait pareil. Si c’est pour avoir la main sur le format, en dehors d’un calcul le reste sera imposé par la méthode influx

1 « J'aime »

Salut,

en dehors de la stratégie d stockage long terme dont parle @Pulpy-Luke, ta question sur NodeRed m’interpelle :slight_smile:

Je ne sais pas si c’est juste une illustration ou si ton screenshot est le flux que tu essayes de faire marcher?
Car si c’est tout ton flux, c’est normal, « current state » n’est pas un trigger, il te faut quelque chose avant et les infos de ton entité seront lues au passage.

Le plus simple étant un « inject » qui permet de lancer à la main. Mais en général plutôt un « state changed » ou autre trigger.

2 « J'aime »

Personnellement je procède de la manière ci-dessous qui est certainement perfectible :

STEP 01 : Je déclenche le flow chaque minute comme ceci :

STEP 02 : Je lis les données provenant de HA comme ceci :

  • image

Avec pour définition du serveur HA ceci :

  • image

STEP 03 : J’ordonne mes données comme ceci :

STEP 04 : J’injecte les données dans InfluxDB comme ceci :

  • image

Avec pour définition du serveur InfluxBD ceci :

  • image

Ce qui donne dans InfluxDB :
=> Attention à l’affichage dans InfluxDB, car par défaut cela affiche une moyenne (agregate function)

STEP 05 : Je lis les données InfluxDB sous Grafana ce qui me permet d’obtenir ceci :

from(bucket: "home1_raw_data")
 |> range(start: -1d)
  |> filter(fn: (r) => r["_measurement"] == "kWh")
  |> filter(fn: (r) => r["entity_id"] == "linky_base")
  |> filter(fn: (r) => r["_field"] == "value")
  |> filter(fn: (r) => r["domain"] == "energie")
  |> filter(fn: (r) => r["instance"] == "prod")
  |> filter(fn: (r) => r["source"] == "capteur")
  |> drop(columns: ["domain", "instance", "source", "_start","_stop", "_field", "_measurement"])
  |> aggregateWindow(every: 1h, fn: spread, createEmpty: false)
|> group() 
|> pivot(rowKey:["_time"], columnKey: ["entity_id"], valueColumn: "_value")
2 « J'aime »

Salut !

Pour le coup je suis d’accord, je pense qu’il faudrait que tu fasses un peu de ménage et vérifier la config de ton recorder. Perso je ne garde que 2 jours d’historique.

Personnellement cela me permet de classer mes données avec les arguments et l’échantillonnage que je souhaite. Si je ne dis pas de bétise ce n’est pas possible via le recorder d’aller autant dans le détails.

1 « J'aime »

Oui ça apporte une souplesse supplémentaire. Mais c’est pas la seule solution : pour l’échantillonnage par exemple, tu le fais aussi directement dans influxdb (min/max/moy)… Dans ton cas pour les conso électriques, tu exploses les données brutes pour les classer. C’est possible de faire ça coté HA et de collecter les sensors individuellement.
ça se défends par certains aspects…

J’ai pas compris ce que tu voulais dire, tu peux m’en dire plus car ton point de vue m’intéresse.

Absolument, il y a rarement qu’une seule façon de faire, l’important c’est que cela convienne au besoin et qu’il y ai une logique derrière la démarche.

Un cas d’usage que je vois bien, c’est si tu disposes d’un linky (avec la conso globale) et de qqs autres mesures (des prises bien placées par exemple) ça pourrait servir à recréer la répartition. L’inverse de ce que tu fais avec l’eau

Par contre juste pour renommer (?) c’est le luxe


Surtout que dans HA un nom moins compliqué c’est bien aussi

Mdr, entièrement d’accord ! Comme il n’apparaît que dans une seule carte j’ai eu la flemme de le renommer…

Sauf erreur de ma part je fais plus que renommer :

J’ai pas le contenu de ton fonction, donc possible

EDIT:
Vu. C’est rigolo que tu mettre une instance ‹ prod ›… Personnellement j’aurai fait un bucket séparé (prod/hors prod), directement dans le recoder d’Ha

Je viens d’éditer mon message et rajouter la fonction, même si elle est simple.

Salut à tous !

Suite à la demande de @Galadan, je vous partage mes tasks sur InfluxDB.
Je tiens à dire que je ne suis absolument pas informaticien donc il est fort probable que le code ne soit pas optimisé. Je suis bien évidemment preneur de toutes vos remarques constructives :slight_smile:

Voici donc la liste de mes tasks avec le code ci-dessous :

# Task « daily_kwh » :

import "date"
import "experimental"
import "math"

option task = {
    name: "daily_kwh",
    cron: "0 1 * * *",
}

TODAY = date.truncate(t: now(), unit: 1d)
YESTERDAY = experimental.addDuration(d: -1d, to: TODAY)

DATA1 = from(bucket: "home1_raw_data")
    |> range(start: YESTERDAY, stop: TODAY)
    |> filter(fn: (r) => r["_measurement"] == "kWh")
    |> filter(fn: (r) => r["_field"] == "value")

DATA1
    |> set(key: "source", value: "calculated")
    |> reduce(
        fn: (r, accumulator) => ({
            min: if r._value < accumulator.min then r._value else accumulator.min,
            max: if r._value > accumulator.max then r._value else accumulator.max,
            _time: r._time,
        }),
        identity: {min: 10000000000.0, max: -10000000000.0, _time: YESTERDAY},
    )
    |> map(fn: (r) => ({r with _time: YESTERDAY}))
    |> map(fn: (r) => ({r with _value: math.round(x: (r.max - r.min) * 100.0) / 100.0}))
    |> map(fn: (r) => ({r with min: r.min}))
    |> map(fn: (r) => ({r with min: r.max}))
    |> to(bucket: "home1_daily_data")

# Task « daily_kwh_current_day » :

import "date"
import "experimental"
import "math"

option task = {name: "daily_kwh_current_day", cron: "* * * * *"}

TODAY = date.truncate(t: now(), unit: 1d)

DATA1 = from(bucket: "home1_raw_data")
    |> range(start: TODAY, stop: now())
    |> filter(fn: (r) => r["_measurement"] == "kWh")
    |> filter(fn: (r) => r["_field"] == "value")

DATA1
    |> set(key: "source", value: "calculated")
    |> reduce(
        fn: (r, accumulator) => ({
            min: if r._value < accumulator.min then r._value else accumulator.min,
            max: if r._value > accumulator.max then r._value else accumulator.max,
            _time: r._time,
        }),
        identity: {min: 10000000000.0, max: -10000000000.0, _time: now()},
    )
    |> map(fn: (r) => ({r with _time: TODAY}))
    |> map(fn: (r) => ({r with _value: math.round(x: (r.max - r.min) * 100.0) / 100.0}))
    |> to(bucket: "home1_daily_data")

# Task « daily_m3 » :

import "date"
import "experimental"
import "math"

option task = {
    name: "daily_m3",
    cron: "0 1 * * *",
}

TODAY = date.truncate(t: now(), unit: 1d)
YESTERDAY = experimental.addDuration(d: -1d, to: TODAY)

DATA = from(bucket: "home1_raw_data")
    |> range(start: YESTERDAY, stop: TODAY)
    |> filter(fn: (r) => r["_measurement"] == "m3")
    |> filter(fn: (r) => r["_field"] == "value")

DATA
    |> set(key: "source", value: "calculated")
    |> reduce(
        fn: (r, accumulator) => ({
            min: if r._value < accumulator.min then r._value else accumulator.min,
            max: if r._value > accumulator.max then r._value else accumulator.max,
            _time: YESTERDAY,
        }),
        identity: {min: 10000000000.0, max: -10000000000.0, _time: YESTERDAY},
    )
    |> map(fn: (r) => ({r with _time: YESTERDAY}))
    |> map(fn: (r) => ({r with _value: math.round(x: (r.max - r.min) * 1000.0) / 1000.0}))
    |> to(bucket: "home1_daily_data")

DATA
    |> set(key: "source", value: "calculated")
    |> reduce(
        fn: (r, accumulator) => ({
            min: if r._value < accumulator.min then r._value else accumulator.min,
            max: if r._value > accumulator.max then r._value else accumulator.max,
            _time: YESTERDAY,
        }),
        identity: {min: 10000000000.0, max: -10000000000.0, _time: YESTERDAY},
    )
    |> map(fn: (r) => ({r with _time: YESTERDAY}))
    |> map(fn: (r) => ({r with _value: math.round(x: (r.max - r.min) * 1000.0) / 1000.0}))
    |> pivot(rowKey: ["_time"], columnKey: ["entity_id"], valueColumn: "_value")
    |> map(fn: (r) => ({r with _value: math.round(x: (r.eau_adoucie - r.eau_chaude) * 1000.0) / 1000.0}))
    |> set(key: "entity_id", value: "eau_froide")
    |> to(bucket: "home1_daily_data")

# Task « daily_m3_current_day » :

import "date"
import "experimental"
import "math"

option task = {name: "daily_m3_current_day", cron: "* * * * *"}

TODAY = date.truncate(t: now(), unit: 1d)

DATA = from(bucket: "home1_raw_data")
    |> range(start: TODAY, stop: now())
    |> filter(fn: (r) => r["_measurement"] == "m3")
    |> filter(fn: (r) => r["_field"] == "value")

DATA
    |> set(key: "source", value: "calculated")
    |> reduce(
        fn: (r, accumulator) => ({min: if r._value < accumulator.min then r._value else accumulator.min, max: if r._value > accumulator.max then r._value else accumulator.max, _time: TODAY}),
        identity: {min: 10000000000.0, max: -10000000000.0, _time: TODAY},
    )
    |> map(fn: (r) => ({r with _time: TODAY}))
    |> map(fn: (r) => ({r with _value: math.round(x: (r.max - r.min) * 1000.0) / 1000.0}))
    |> to(bucket: "home1_daily_data")
DATA
    |> set(key: "source", value: "calculated")
    |> reduce(
        fn: (r, accumulator) => ({min: if r._value < accumulator.min then r._value else accumulator.min, max: if r._value > accumulator.max then r._value else accumulator.max, _time: TODAY}),
        identity: {min: 10000000000.0, max: -10000000000.0, _time: TODAY},
    )
    |> map(fn: (r) => ({r with _time: TODAY}))
    |> map(fn: (r) => ({r with _value: math.round(x: (r.max - r.min) * 1000.0) / 1000.0}))
    |> pivot(rowKey: ["_time"], columnKey: ["entity_id"], valueColumn: "_value")
    |> map(fn: (r) => ({r with _value: math.round(x: (r.eau_adoucie - r.eau_chaude) * 1000.0) / 1000.0}))
    |> set(key: "entity_id", value: "eau_froide")
    |> to(bucket: "home1_daily_data")

# Task « daily_temperature » :

import "date"
import "experimental"
import "math"

option task = {name: "daily_temperature", cron: "0 1 * * *"}

TODAY = date.truncate(t: now(), unit: 1d)
YESTERDAY = experimental.addDuration(d: -1d, to: TODAY)
DATA = from(bucket: "home1_raw_data")
    |> range(start: YESTERDAY, stop: TODAY)
    |> filter(fn: (r) => r["_measurement"] == "°C")
    |> filter(fn: (r) => r["_field"] == "value")

DATA
    |> mean()
    |> map(fn: (r) => ({r with _time: YESTERDAY}))
    |> set(key: "statistics", value: "mean")
    |> set(key: "source", value: "calculated")
    |> map(fn: (r) => ({r with _value: math.round(x: r._value * 10.0) / 10.0}))
    |> to(bucket: "home1_daily_data")
DATA
    |> min()
    |> map(fn: (r) => ({r with _time: YESTERDAY}))
    |> set(key: "statistics", value: "min")
    |> set(key: "source", value: "calculated")
    |> to(bucket: "home1_daily_data")
DATA
    |> max()
    |> map(fn: (r) => ({r with _time: YESTERDAY}))
    |> set(key: "statistics", value: "max")
    |> set(key: "source", value: "calculated")
    |> to(bucket: "home1_daily_data")

# Task « daily_temperature_current_day » :

import "date"
import "experimental"
import "math"

option task = {name: "daily_temperature_current_day", cron: "* * * * *"}

TODAY = date.truncate(t: now(), unit: 1d)

DATA = from(bucket: "home1_raw_data")
    |> range(start: TODAY, stop: now())
    |> filter(fn: (r) => r["_measurement"] == "°C")
    |> filter(fn: (r) => r["_field"] == "value")

DATA
    |> mean()
    |> map(fn: (r) => ({r with _time: TODAY}))
    |> set(key: "statistics", value: "mean")
    |> set(key: "source", value: "calculated")
    |> map(fn: (r) => ({r with _value: math.round(x: r._value * 10.0) / 10.0}))
    |> to(bucket: "home1_daily_data")
DATA
    |> min()
    |> map(fn: (r) => ({r with _time: TODAY}))
    |> set(key: "statistics", value: "min")
    |> set(key: "source", value: "calculated")
    |> to(bucket: "home1_daily_data")
DATA
    |> max()
    |> map(fn: (r) => ({r with _time: TODAY}))
    |> set(key: "statistics", value: "max")
    |> set(key: "source", value: "calculated")
    |> to(bucket: "home1_daily_data")
2 « J'aime »

Merci beaucoup @Neuvidor et @AlexHass pour vos retours. Ca marche désormais parfaitement. Il ne me reste plus qu’à mettre tout cela en forme !

@Pulpy-Luke : je m’embête peut-être un peu mais dans le cas présent j’aime bien choisir ce que je vais envoyer mais surtout le format dans lequel je l’envoie. J’ai fait un test en envoyant directement de HA et je n’ai alors pas la main sur les colonnes. Si un jour je veux envoyer les infos d’un autre système alors cela ne correspondra pas forcément.
Là je peux envoyer les donner en provenance de MQTT, de HA ou de n’importe quel autre source. Et je trouve cela confortable.

1 « J'aime »

Salut.

Oui, par contre justement c’est assez normé influxdb, pour conserver un modèle de données homogènes et léger:

  • field key
  • field set
  • field value
  • measurement
  • point

Pour le reste, c’est les tags qui servent.
Quant à utiliser NR pour traiter l’import des données « autres », pourquoi pas mais il y a des connecteurs pour ça aussi (au même titre qu’il te faut un node NR)
Une simple question de gouts et de complexité

2 « J'aime »

Hello!

La documentation d’InfluxDB m’avait pas mal aidé à comprendre comment les données doivent être organisés, même si c’est souvent une histoire de compromis.

https://docs.influxdata.com/influxdb/v2.1/write-data/best-practices/schema-design/

1 « J'aime »

Merci pour le script.
Pour la consommation électrique journalière, j’ai fait une petite modification dans le reduce. J’exclus les valeurs à zéro pour éviter d’avoir un range énorme.

    |> reduce(
        fn: (r, accumulator) =>
            ({
                min:
                    if r._value < accumulator.min and r._value != 0.0 then
                        r._value
                    else
                        accumulator.min,
                max: if r._value > accumulator.max then r._value else accumulator.max,
                _time: r._time,
            }),
        identity: {min: 10000000000.0, max: -10000000000.0, _time: yday},
    )
    |> map(fn: (r) => ({r with _time: yday}))
    

et dans le map qui renvoi la valeur max il y a une coquille.

  |> map(fn: (r) => ({r with min: r.min}))
  |> map(fn: (r) => ({r with max: r.max}))

Superbe post!!
Je veux installer, comme vous l’avez fait, influxDB et Node-red dans des contener Proxmox, pouvez vous m’indiquer un tutoriel pour cela???
Ca m’aidera à gagner du temps sans passer par des tutos pas toujours adaptés

Il doit même y avoir moyen de se passer de tuto

Merci, pour le lien. Je n’aurais pas trouver tout seul.
Par contre je n’ai jamais créer de container dans Proxmox, uniquement des VMs. Il me manque vraiment les bases pour les containers.