Raspberry pi + capteurs de porte + suivi conso le tout à la sauce python et mqtt

Hello,

Pour vous présenter mon dernier DIY, qui m’a pris un certain temps voir un temps certain et qui fait suite à ras le bol les piles sur le même concept [Concours] Ras le bol des piles

Remontée de 7 capteurs d’ouverture en filaire sur le pi ( ici un pi3 contrairement au pi2 des versions précédentes et tjs en POE ).

utilisation d’un PZEM-004T v3 pour suivre la conso de la maison ( j’ai tenté vainement de faire la même chose via un esp ou les gpio du pi modbus and co pour me rabattre sur la connection usb directe du PZEM-004 en USB

le circuit est un proto miniaturisé de ma V1 donc ça n’est pas forcément des plus esthétique et ça remonte en mqtt via 2 scripts python dans HA

Et accessoirement ça fonctionne :smiley:

en test

en prod quasi définitive

1

Script python pour la conso ( à améliorer )
import serial
import time
import paho.mqtt.client as mqtt

# MQTT configuration
MQTT_BROKER = "xxx"
MQTT_USERNAME = "xxx"
MQTT_PASSWORD = "xxx"
VOLTAGE_TOPIC = "zigbee2mqtt/tableauelecpi3/tension"
CURRENT_TOPIC = "zigbee2mqtt/tableauelecpi3/intensite"
POWER_TOPIC   = "zigbee2mqtt/tableauelecpi3/puissance"

# CRC Modbus

def calculate_crc(data):
    crc = 0xFFFF
    for byte in data:
        crc ^= byte
        for _ in range(8):
            if crc & 0x0001:
                crc = (crc >> 1) ^ 0xA001
            else:
                crc >>= 1
    return crc

# Requête de lecture

def build_modbus_request():
    slave_address = 0x01
    function_code = 0x04
    start_address = 0x0000
    num_registers = 0x0A

    request = bytearray([
        slave_address, function_code,
        (start_address >> 8) & 0xFF, start_address & 0xFF,
        (num_registers >> 8) & 0xFF, num_registers & 0xFF
    ])

    crc = calculate_crc(request)
    request.append(crc & 0xFF)
    request.append((crc >> 8) & 0xFF)
    return request

# Initialisation MQTT
client = mqtt.Client()
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
client.connect(MQTT_BROKER, 1883, 60)
client.loop_start()

# Port série
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1)

try:
    request = build_modbus_request()

    while True:
        ser.write(request)
        time.sleep(0.5)
        response = ser.read(25)

        if len(response) < 15:
            print("⚠ Trame trop courte")
            continue

        crc_received = response[-2] + (response[-1] << 8)
        crc_calculated = calculate_crc(response[:-2])
        if crc_received != crc_calculated:
            print("⚠ CRC invalide")
            continue

        voltage_raw = response[3] << 8 | response[4]
        current_raw = response[5] << 8 | response[6] | response[7] << 24 | response[8] << 16

        voltage = voltage_raw * 0.1
        current = current_raw * 0.001
        power = voltage * current

        # Filtrage de valeurs absurdes
        if voltage < 180 or voltage > 250 or current < 0 or current > 100:
            print(f"⚠ Valeur rejetée: V={voltage}V, A={current}A")
            time.sleep(2)
            continue

        print(f"Tension : {voltage:.1f} V → {VOLTAGE_TOPIC}")
        print(f"Courant : {current:.3f} A → {CURRENT_TOPIC}")
        print(f"Puissance : {power:.1f} W → {POWER_TOPIC}")

        client.publish(VOLTAGE_TOPIC, str(voltage))
        client.publish(CURRENT_TOPIC, str(current))
        client.publish(POWER_TOPIC, str(power))

        time.sleep(2)

except KeyboardInterrupt:
    print("Arrêt utilisateur\nFermeture propre.")

finally:
    ser.close()
    client.loop_stop()

cdt

2 « J'aime »

Super boulot @freetronic

Dommage pour le rpi :laughing:

1 « J'aime »

Re,

Ajout du sensor kWh pour intégrer directement dans energie, en test

script modifié avec ajout de kWh
import serial
import time
import os
import paho.mqtt.client as mqtt

# MQTT configuration
MQTT_BROKER = "xxx"
MQTT_USERNAME = "xxx"
MQTT_PASSWORD = "xxx"
VOLTAGE_TOPIC = "zigbee2mqtt/tableauelecpi3/tension"
CURRENT_TOPIC = "zigbee2mqtt/tableauelecpi3/intensite"
POWER_TOPIC   = "zigbee2mqtt/tableauelecpi3/puissance"
ENERGY_TOPIC  = "zigbee2mqtt/tableauelecpi3/energie_kwh"

# Fichier pour stockage local de l'énergie cumulée
ENERGY_FILE = "energie_kwh.txt"
INTERVAL_SEC = 2

# CRC Modbus
def calculate_crc(data):
    crc = 0xFFFF
    for byte in data:
        crc ^= byte
        for _ in range(8):
            if crc & 0x0001:
                crc = (crc >> 1) ^ 0xA001
            else:
                crc >>= 1
    return crc

# Requête de lecture
def build_modbus_request():
    slave_address = 0x01
    function_code = 0x04
    start_address = 0x0000
    num_registers = 0x0A

    request = bytearray([
        slave_address, function_code,
        (start_address >> 8) & 0xFF, start_address & 0xFF,
        (num_registers >> 8) & 0xFF, num_registers & 0xFF
    ])

    crc = calculate_crc(request)
    request.append(crc & 0xFF)
    request.append((crc >> 8) & 0xFF)
    return request

# Initialisation MQTT
client = mqtt.Client()
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
client.connect(MQTT_BROKER, 1883, 60)
client.loop_start()

# Port série
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1)

try:
    request = build_modbus_request()

    while True:
        ser.write(request)
        time.sleep(0.5)
        response = ser.read(25)

        if len(response) < 15:
            print("⚠ Trame trop courte")
            continue

        crc_received = response[-2] + (response[-1] << 8)
        crc_calculated = calculate_crc(response[:-2])
        if crc_received != crc_calculated:
            print("⚠ CRC invalide")
            continue

        voltage_raw = response[3] << 8 | response[4]
        current_raw = response[5] << 8 | response[6] | response[7] << 24 | response[8] << 16

        voltage = voltage_raw * 0.1
        current = current_raw * 0.001
        power = voltage * current  # en Watts

        # Filtrage de valeurs absurdes
        if voltage < 180 or voltage > 250 or current < 0 or current > 100:
            print(f"⚠ Valeur rejetée: V={voltage}V, A={current}A")
            time.sleep(INTERVAL_SEC)
            continue

        print(f"Tension : {voltage:.1f} V → {VOLTAGE_TOPIC}")
        print(f"Courant : {current:.3f} A → {CURRENT_TOPIC}")
        print(f"Puissance : {power:.1f} W → {POWER_TOPIC}")

        client.publish(VOLTAGE_TOPIC, str(voltage))
        client.publish(CURRENT_TOPIC, str(current))
        client.publish(POWER_TOPIC, str(power))

        # Lecture énergie cumulée
        if os.path.exists(ENERGY_FILE):
            with open(ENERGY_FILE, "r") as f:
                try:
                    total_kwh = float(f.read())
                except ValueError:
                    total_kwh = 0.0
        else:
            total_kwh = 0.0

        # Calcul énergie ajoutée (Watt -> kWh)
        total_kwh += (power / 1000.0) * (INTERVAL_SEC / 3600.0)

        # Sauvegarde et publication
        with open(ENERGY_FILE, "w") as f:
            f.write(str(total_kwh))

        print(f"Énergie : {total_kwh:.4f} kWh → {ENERGY_TOPIC}")
        client.publish(ENERGY_TOPIC, str(total_kwh))

        time.sleep(INTERVAL_SEC)

except KeyboardInterrupt:
    print("Arrêt utilisateur\nFermeture propre.")

finally:
    ser.close()
    client.loop_stop()

image

Anim1

cdt