From 38fdd947920db8bbc7ae2363cd15e13b9d9d6dfd Mon Sep 17 00:00:00 2001 From: Dirk Heilig Date: Wed, 14 Aug 2024 14:51:03 +0200 Subject: [PATCH] init --- .gitignore | 1 + Readme.md | 86 +++++++++++++ docker-compose.yml | 19 +++ mqtt2prom/Dockerfile | 14 ++ mqtt2prom/run | 295 +++++++++++++++++++++++++++++++++++++++++++ webserv/Dockerfile | 7 + webserv/prom.conf | 8 ++ 7 files changed, 430 insertions(+) create mode 100644 .gitignore create mode 100644 Readme.md create mode 100644 docker-compose.yml create mode 100644 mqtt2prom/Dockerfile create mode 100755 mqtt2prom/run create mode 100644 webserv/Dockerfile create mode 100644 webserv/prom.conf diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1269488 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +data diff --git a/Readme.md b/Readme.md new file mode 100644 index 0000000..6dbc4dc --- /dev/null +++ b/Readme.md @@ -0,0 +1,86 @@ +# mqtt2prom + +This service is a mqtt-to-prometheus gateway. + +it is configured with the following ENV-Variables, the ones suffiexed with a `*` are mandatory, default values are after `|`: + +``` +MQTT_HOST* +MQTT_PORT|1883 +MQTT_USER +MQTT_PASS +MQTT_CLIENT_ID|mqtt2prometheus +MQTT_TOPIC|prometheus +MQTT_QOS|2 + +``` + +it will listen on the configured mqtt server at `$MQTT_TOPIC` for metrics in json_format. + +The payload needs to be an object containing only the following keys: + +- `name` (string): the name of the metric +- `value` (number): the value of the metric +- `labels` (object, optional): an object containing the labels for the metric (key-value pairs of strings) + +According to prometheus conventions, the metric name and the label names need to be a lower or upper case letter or a underscore followed by all case leters, numbers or underscores. + +These are all valid payloads: + +```json +{ + "name": "temperature", + "value": 30.7, + "labels": { + "unit": "°C", + "location": "Werkstatt" + } +} +``` + +```json +{ + "name": "temperature", + "value": 30.7, + "labels": {} +} +``` + +```json +{ + "name": "temperature", + "value": 30.7 +} +``` + +The service will expose the metrics on the `/metrics` endpoint. +All metrics will be presented for about 5 minutes, and exported with their eventtime as a timestamp. + +Metric types are not currently implemented, all metrics are exported without a type. +This might change when prometheus supports it. + +The service will report errors in received messages to STDERR and to $MQTT_TOPIC/error. +Possible errors are: + +- invalid json +- missing `name` or `value` key +- invalid `name` (prometheus conventions) +- invalid `value` (not a number) +- invalid `labels` (not an object) +- invalid label names (not a sting or not a valid prometheus label name) +- invalid label values (not a string) +- additional information in the payload (not one of `name`, `value`, `labels`) + +The error message describes the error type and repeats the payload that caused the error. + +There is a comment block on top if the metric that shows some metadata for human consumption. + +mem usage and mem usage peak is according to [memory_get_usage()](https://www.php.net/manual/en/function.memory-get-usage.php) and [memory_get_peak_usage()](https://www.php.net/manual/en/function.memory-get-peak-usage.php) respectively. +in both cases it gives 2 values, the first is the acutally used value, the second is tha allocated. + +A new export is written either + +- once oper second if new events are incoming. +- every 15 seconds if no new events are incoming, to update stats and remove stale entries. + +There is a 60 second period on startup where metrics are received but not exported, in case the service crashes and there are previous exported metrics that are not polled by prometheus yet. diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..5118b34 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,19 @@ +version: "3" +services: + worker: + build: mqtt2prom + restart: always + environment: + MQTT_HOST: 10.0.10.2 + MQTT_PORT: 1883 + # MQTT_USER: mqtt + # MQTT_PASS: pass + volumes: + - ./data/metrics:/www/metrics + webserv: + build: webserv + restart: always + ports: + - "8080:80" + volumes: + - ./data/metrics:/www/metrics diff --git a/mqtt2prom/Dockerfile b/mqtt2prom/Dockerfile new file mode 100644 index 0000000..f8964e9 --- /dev/null +++ b/mqtt2prom/Dockerfile @@ -0,0 +1,14 @@ +FROM debian:12 +RUN apt-get update && apt-get upgrade -y +RUN apt-get install -y php-dev php-cli php-pear php-mbstring php-curl git build-essential libmosquitto-dev libmosquitto-dev php-sqlite3 +WORKDIR /tmp +RUN git clone https://github.com/nismoryco/Mosquitto-PHP.git +WORKDIR /tmp/Mosquitto-PHP +RUN phpize +RUN ./configure +RUN make +RUN make install +RUN echo "extension=mosquitto.so" > /etc/php/8.2/cli/php.ini +ADD run /usr/local/bin/entrypoint +RUN chmod +x /usr/local/bin/entrypoint +CMD ["/usr/local/bin/entrypoint"] diff --git a/mqtt2prom/run b/mqtt2prom/run new file mode 100755 index 0000000..1c6cb1b --- /dev/null +++ b/mqtt2prom/run @@ -0,0 +1,295 @@ +#!/usr/bin/env php +setCredentials($mqttUser, $mqttPass); + +$usedConfig = [ + "MQTT_HOST" => $mqttHost, + "MQTT_PORT" => $mqttPort, + "MQTT_USER" => $mqttUser, + "MQTT_PASS" => $mqttPass, + "MQTT_CLIENT_ID" => $mqttClientId, + "MQTT_TOPIC" => $mqttTopic, + "MQTT_QOS" => $qos, + "SERVICE_RELOAD_TIME" => $serviceReloadTime, +]; + +$padLength = max(array_map("strlen", array_keys($usedConfig))); + +echo "Using the following config:\n"; +foreach ($usedConfig as $key => $value) { + echo str_pad($key, $padLength) . ": $value\n"; +} + +echo "Connecting to $mqttHost:$mqttPort"; +$mqtt->connect($mqttHost, $mqttPort); +echo "Connected\n"; + +$data = []; +$stats = [ + "messages" => [], + "errors" => [], + "metrics" => [], +]; +$news = true; +$mqtt->onMessage(function ($message) use (&$data, &$stats, &$news) { + $stats["messages"][] = time(); + if (!precheck($message)) { + $stats["errors"][] = time(); + return; + } + $stats["metrics"][] = time(); + $payload = (array) json_decode($message->payload, true); + + $payload["timestamp"] = microtime(true); + $data[] = $payload; + $news = true; +}); + +echo "Subscribing to $mqttTopic with QoS $qos "; +$mqtt->subscribe($mqttTopic, $qos); +echo "Subscribed\n"; + +$timer = 0; +echo "Started listening for incoming messages\n will wait 60 seconds before starting to export data\n"; +$realStart = time() + 60; +echo "waiting till " . + date("Y-m-d H:i:s", $realStart) . + " that's in " . + ($realStart - time()) . + " seconds\n"; +while (time() < $realStart) { + $wait = time() + 5; + $mqtt->loop(5000); + sleep($wait - time()); + echo "Still waiting...\n"; +} +echo "\n Starting to export data\n"; + +while (date("H:i") != $serviceReloadTime) { + $mqtt->loop(1000); + if ($news || time() - $timer > 15) { + output(); + $news = false; + $timer = time(); + } +} + +function error(string $msg): void +{ + global $mqtt, $mqttTopic; + $mqtt->publish($mqttTopic . "/error", $msg, 2); + fwrite(STDERR, "ERROR: " . $msg . "\n"); +} +function precheck(Message $message): bool +{ + $payloadRaw = $message->payload; + $payload = json_decode($payloadRaw, true); + if (json_last_error() != JSON_ERROR_NONE) { + error( + "Invalid json: $message->payload (" . json_last_error_msg() . ")" + ); + return false; + } + if (!is_array($payload)) { + error("Payload is not an object: $payloadRaw"); + return false; + } + + if (!isset($payload["name"]) || !isset($payload["value"])) { + error( + "Invalid payload, at least name and value is needed: $payloadRaw" + ); + return false; + } + if (!preg_match('/^[a-zA-Z_][a-zA-Z0-9_]*$/', $payload["name"])) { + error("Invalid name: " . $payload["name"]); + return false; + } + if (!is_numeric($payload["value"])) { + error("Value is not a number: $payloadRaw"); + return false; + } + $payload["labels"] = $payload["labels"] ?? []; + if (!is_array($payload["labels"])) { + error("Labels must be an object: $payloadRaw"); + return false; + } + foreach ($payload["labels"] as $labelName => $labelValue) { + if (!is_string($labelName) || !is_scalar($labelValue)) { + error("Label names and values must be strings: $payloadRaw"); + return false; + } + if (!preg_match('/^[a-zA-Z_][a-zA-Z0-9_]*$/', $labelName)) { + error("Invalid label name: $labelName"); + return false; + } + } + foreach (array_keys($payload) as $key) { + if (!in_array($key, ["name", "value", "labels"])) { + error("Unknown key: $key"); + return false; + } + } + return true; +} + +function output(): void +{ + filter(); + global $data, $stats; + $t = microtime(true); + + $prom = ""; + $prom .= + "# service started at : " . + STARTED . + " / " . + date("Y-m-d H:i:s", intval(STARTED)) . + "\n"; + $prom .= + "# exported at : $t / " . + date("Y-m-d H:i:s", intval($t)) . + "\n"; + $prom .= "# \n"; + $prom .= + "# mem usage : " . + round(memory_get_usage() / 1024 / 1024, 1) . + " / " . + round(memory_get_usage(true) / 1024 / 1024, 1) . + " MB\n"; + $prom .= + "# mem usage peak : " . + round(memory_get_peak_usage() / 1024 / 1024, 1) . + " / " . + round(memory_get_peak_usage(true) / 1024 / 1024, 1) . + " MB\n"; + $timeframe = [ + 60 => "Minute", + 300 => "5 Minutes", + 900 => "15 Minutes", + 1800 => "30 Minutes", + 3600 => "Hour", + 21600 => "6 Hours", + 43200 => "12 Hours", + 86400 => "Day", + ]; + foreach ($stats as $key => $values) { + $prom .= "# \n"; + foreach ($timeframe as $time => $name) { + $count = count( + array_filter($values, function ($v) use ($time) { + return $v > time() - $time; + }) + ); + $fkey = str_pad($key, 8); + $fname = str_pad($name, 10); + $prom .= "# $fkey in the last $fname: $count\n"; + } + } + foreach ($data as $entry) { + $labels = []; + if (isset($entry["labels"])) { + foreach ($entry["labels"] as $labelName => $labelValue) { + $labels[] = + "$labelName=" . + json_encode( + strval($labelValue), + JSON_UNESCAPED_UNICODE + JSON_UNESCAPED_SLASHES + ); + } + } + $labels = implode(", ", $labels); + if ($labels != "") { + $labels = "{" . $labels . "}"; + } + + $prom .= + $entry["name"] . + $labels . + " " . + $entry["value"] . + " " . + $entry["timestamp"] . + "\n"; + } + file_put_contents("/www/metrics/new.prom", $prom); + rename("/www/metrics/new.prom", "/www/metrics/index.prom"); + // return $prom; +} + +function filter(): void +{ + global $data, $stats; + $data = array_filter($data, function ($entry) { + return $entry["timestamp"] > time() - 5 * 60; // 5 min is the hardcoded value in prometheus + }); + // remove stats older than 24h + foreach ($stats as $key => $values) { + $stats[$key] = array_filter($values, function ($v) { + return $v > time() - 24 * 60 * 60; + }); + } +} + +function getEnvWithDefaultStr(string $key, string $default): string +{ + return strval(getEnvWithDefault($key, $default)); +} + +function getEnvWithDefaultInt(string $key, int $default): int +{ + return intval(getEnvWithDefault($key, $default)); +} +function getEnvWithDefault(string $key, bool|string|int $default): bool|string|int +{ + $value = getenv($key); + if(!is_scalar($value)){ + return $default; + } + if ($value === false) { + return $default; + } + if(is_numeric($default)){ + return intval($value); + } + return strval($value); +} + +function endit(): void +{ + global $mqtt; + error("Exiting"); + $mqtt->loop(100); + $mqtt->disconnect(); + exit(0); +} \ No newline at end of file diff --git a/webserv/Dockerfile b/webserv/Dockerfile new file mode 100644 index 0000000..97a34f1 --- /dev/null +++ b/webserv/Dockerfile @@ -0,0 +1,7 @@ +FROM debian:12 +RUN apt-get update && apt-get install -y nginx-light +ADD prom.conf /etc/nginx/sites-available +RUN rm /etc/nginx/sites-enabled/* +RUN ln -s /etc/nginx/sites-available/prom.conf /etc/nginx/sites-enabled/ +EXPOSE 80 +CMD ["nginx", "-g", "daemon off;"] diff --git a/webserv/prom.conf b/webserv/prom.conf new file mode 100644 index 0000000..f8e2ac9 --- /dev/null +++ b/webserv/prom.conf @@ -0,0 +1,8 @@ +server { + listen 80 default_server; + root /www/metrics; + index index.prom; + types { + text/plain prom; + } +} \ No newline at end of file