huette-energy-consumption-w.../main.py

83 lines
2.3 KiB
Python
Raw Permalink Normal View History

import datetime # Import datetime module
import requests
import paho.mqtt.client as mqtt
import paho.mqtt.enums as mqtt_enums
import os
import json
import time
base_url = "https://prom.c3re.de/api/v1/query"
def get_params(timestamp):
return {
"query": 'energy{location="Hütte", type="Energymeter"}',
"time": str(timestamp),
}
def calc_prometheus_response_value_diff(response_before, response_after):
return int(response_after.json()["data"]["result"][0]["value"][1]) - int(
response_before.json()["data"]["result"][0]["value"][1]
)
def get_energy_consumption_between(b, a):
response_before = requests.get(base_url, params=get_params(b.timestamp()))
response_after = requests.get(base_url, params=get_params(a.timestamp()))
return calc_prometheus_response_value_diff(response_before, response_after)
def get_energy_consumption_1_day(n):
return get_energy_consumption_between(n - datetime.timedelta(days=1), n)
def get_energy_consumption_today():
return get_energy_consumption_between(
datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0),
datetime.datetime.now(),
)
def main():
host = os.getenv("MQTT_HOST")
port = int(os.getenv("MQTT_PORT", 1883))
topic = os.getenv("MQTT_TOPIC")
username = os.getenv("MQTT_USERNAME")
password = os.getenv("MQTT_PASSWORD")
mqttc = mqtt.Client(mqtt_enums.CallbackAPIVersion.VERSION2)
if username:
if not password:
raise ValueError("MQTT_USERNAME is set but MQTT_PASSWORD is not set")
mqttc.username_pw_set(username, password)
mqttc.connect(host, port)
mqttc.loop_start()
while True:
consumptions = []
consumptions.append(get_energy_consumption_today())
n = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
days_back = 1
while days_back <= 7:
consumptions.append(get_energy_consumption_1_day(n))
days_back += 1
n -= datetime.timedelta(days=1)
print("sending data")
print(consumptions)
consumptions_msg = mqttc.publish(topic, json.dumps(consumptions))
consumptions_msg.wait_for_publish()
time.sleep(15)
if __name__ == "__main__":
main()