master
Dirk Heilig 2024-08-14 14:51:03 +02:00
commit 38fdd94792
7 changed files with 430 additions and 0 deletions

1
.gitignore vendored 100644
View File

@ -0,0 +1 @@
data

86
Readme.md 100644
View File

@ -0,0 +1,86 @@
# mqtt2prom
This service is a mqtt-to-prometheus gateway.
it is configured with the following ENV-Variables, the ones suffiexed with a `*` are mandatory, default values are after `|`:
```
MQTT_HOST*
MQTT_PORT|1883
MQTT_USER
MQTT_PASS
MQTT_CLIENT_ID|mqtt2prometheus
MQTT_TOPIC|prometheus
MQTT_QOS|2
```
it will listen on the configured mqtt server at `$MQTT_TOPIC` for metrics in json_format.
The payload needs to be an object containing only the following keys:
- `name` (string): the name of the metric
- `value` (number): the value of the metric
- `labels` (object, optional): an object containing the labels for the metric (key-value pairs of strings)
According to prometheus conventions, the metric name and the label names need to be a lower or upper case letter or a underscore followed by all case leters, numbers or underscores.
These are all valid payloads:
```json
{
"name": "temperature",
"value": 30.7,
"labels": {
"unit": "°C",
"location": "Werkstatt"
}
}
```
```json
{
"name": "temperature",
"value": 30.7,
"labels": {}
}
```
```json
{
"name": "temperature",
"value": 30.7
}
```
The service will expose the metrics on the `/metrics` endpoint.
All metrics will be presented for about 5 minutes, and exported with their eventtime as a timestamp.
Metric types are not currently implemented, all metrics are exported without a type.
This might change when prometheus supports it.
The service will report errors in received messages to STDERR and to $MQTT_TOPIC/error.
Possible errors are:
- invalid json
- missing `name` or `value` key
- invalid `name` (prometheus conventions)
- invalid `value` (not a number)
- invalid `labels` (not an object)
- invalid label names (not a sting or not a valid prometheus label name)
- invalid label values (not a string)
- additional information in the payload (not one of `name`, `value`, `labels`)
The error message describes the error type and repeats the payload that caused the error.
There is a comment block on top if the metric that shows some metadata for human consumption.
mem usage and mem usage peak is according to [memory_get_usage()](https://www.php.net/manual/en/function.memory-get-usage.php) and [memory_get_peak_usage()](https://www.php.net/manual/en/function.memory-get-peak-usage.php) respectively.
in both cases it gives 2 values, the first is the acutally used value, the second is tha allocated.
A new export is written either
- once oper second if new events are incoming.
- every 15 seconds if no new events are incoming, to update stats and remove stale entries.
There is a 60 second period on startup where metrics are received but not exported, in case the service crashes and there are previous exported metrics that are not polled by prometheus yet.

19
docker-compose.yml 100644
View File

@ -0,0 +1,19 @@
version: "3"
services:
worker:
build: mqtt2prom
restart: always
environment:
MQTT_HOST: 10.0.10.2
MQTT_PORT: 1883
# MQTT_USER: mqtt
# MQTT_PASS: pass
volumes:
- ./data/metrics:/www/metrics
webserv:
build: webserv
restart: always
ports:
- "8080:80"
volumes:
- ./data/metrics:/www/metrics

View File

@ -0,0 +1,14 @@
FROM debian:12
RUN apt-get update && apt-get upgrade -y
RUN apt-get install -y php-dev php-cli php-pear php-mbstring php-curl git build-essential libmosquitto-dev libmosquitto-dev php-sqlite3
WORKDIR /tmp
RUN git clone https://github.com/nismoryco/Mosquitto-PHP.git
WORKDIR /tmp/Mosquitto-PHP
RUN phpize
RUN ./configure
RUN make
RUN make install
RUN echo "extension=mosquitto.so" > /etc/php/8.2/cli/php.ini
ADD run /usr/local/bin/entrypoint
RUN chmod +x /usr/local/bin/entrypoint
CMD ["/usr/local/bin/entrypoint"]

295
mqtt2prom/run 100755
View File

@ -0,0 +1,295 @@
#!/usr/bin/env php
<?php
use Mosquitto\Message;
define("STARTED", microtime(true));
$mqttHost = getenv("MQTT_HOST");
if (!$mqttHost) {
echo "Please set MQTT_HOST environment variable\n";
exit(1);
}
pcntl_signal(SIGINT, "endit");
pcntl_signal(SIGTERM, "endit");
pcntl_signal(SIGHUP, "endit");
$mqttPort = getEnvWithDefaultInt("MQTT_PORT", 1883);
$mqttUser = getEnvWithDefaultStr("MQTT_USER", "");
$mqttPass = getEnvWithDefaultStr("MQTT_PASS", "");
$mqttClientId = getEnvWithDefaultStr("MQTT_CLIENT_ID", "mqtt2prometheus");
$mqttTopic = getEnvWithDefaultStr("MQTT_TOPIC", "prometheus");
$qos = getEnvWithDefaultInt("MQTT_QOS", 2);
$serviceReloadTime = getEnvWithDefaultStr("SERVICE_RELOAD_TIME", "03:30");
$mqtt = new Mosquitto\Client($mqttClientId);
$data = [];
$mqtt->setCredentials($mqttUser, $mqttPass);
$usedConfig = [
"MQTT_HOST" => $mqttHost,
"MQTT_PORT" => $mqttPort,
"MQTT_USER" => $mqttUser,
"MQTT_PASS" => $mqttPass,
"MQTT_CLIENT_ID" => $mqttClientId,
"MQTT_TOPIC" => $mqttTopic,
"MQTT_QOS" => $qos,
"SERVICE_RELOAD_TIME" => $serviceReloadTime,
];
$padLength = max(array_map("strlen", array_keys($usedConfig)));
echo "Using the following config:\n";
foreach ($usedConfig as $key => $value) {
echo str_pad($key, $padLength) . ": $value\n";
}
echo "Connecting to $mqttHost:$mqttPort";
$mqtt->connect($mqttHost, $mqttPort);
echo "Connected\n";
$data = [];
$stats = [
"messages" => [],
"errors" => [],
"metrics" => [],
];
$news = true;
$mqtt->onMessage(function ($message) use (&$data, &$stats, &$news) {
$stats["messages"][] = time();
if (!precheck($message)) {
$stats["errors"][] = time();
return;
}
$stats["metrics"][] = time();
$payload = (array) json_decode($message->payload, true);
$payload["timestamp"] = microtime(true);
$data[] = $payload;
$news = true;
});
echo "Subscribing to $mqttTopic with QoS $qos ";
$mqtt->subscribe($mqttTopic, $qos);
echo "Subscribed\n";
$timer = 0;
echo "Started listening for incoming messages\n will wait 60 seconds before starting to export data\n";
$realStart = time() + 60;
echo "waiting till " .
date("Y-m-d H:i:s", $realStart) .
" that's in " .
($realStart - time()) .
" seconds\n";
while (time() < $realStart) {
$wait = time() + 5;
$mqtt->loop(5000);
sleep($wait - time());
echo "Still waiting...\n";
}
echo "\n Starting to export data\n";
while (date("H:i") != $serviceReloadTime) {
$mqtt->loop(1000);
if ($news || time() - $timer > 15) {
output();
$news = false;
$timer = time();
}
}
function error(string $msg): void
{
global $mqtt, $mqttTopic;
$mqtt->publish($mqttTopic . "/error", $msg, 2);
fwrite(STDERR, "ERROR: " . $msg . "\n");
}
function precheck(Message $message): bool
{
$payloadRaw = $message->payload;
$payload = json_decode($payloadRaw, true);
if (json_last_error() != JSON_ERROR_NONE) {
error(
"Invalid json: $message->payload (" . json_last_error_msg() . ")"
);
return false;
}
if (!is_array($payload)) {
error("Payload is not an object: $payloadRaw");
return false;
}
if (!isset($payload["name"]) || !isset($payload["value"])) {
error(
"Invalid payload, at least name and value is needed: $payloadRaw"
);
return false;
}
if (!preg_match('/^[a-zA-Z_][a-zA-Z0-9_]*$/', $payload["name"])) {
error("Invalid name: " . $payload["name"]);
return false;
}
if (!is_numeric($payload["value"])) {
error("Value is not a number: $payloadRaw");
return false;
}
$payload["labels"] = $payload["labels"] ?? [];
if (!is_array($payload["labels"])) {
error("Labels must be an object: $payloadRaw");
return false;
}
foreach ($payload["labels"] as $labelName => $labelValue) {
if (!is_string($labelName) || !is_scalar($labelValue)) {
error("Label names and values must be strings: $payloadRaw");
return false;
}
if (!preg_match('/^[a-zA-Z_][a-zA-Z0-9_]*$/', $labelName)) {
error("Invalid label name: $labelName");
return false;
}
}
foreach (array_keys($payload) as $key) {
if (!in_array($key, ["name", "value", "labels"])) {
error("Unknown key: $key");
return false;
}
}
return true;
}
function output(): void
{
filter();
global $data, $stats;
$t = microtime(true);
$prom = "";
$prom .=
"# service started at : " .
STARTED .
" / " .
date("Y-m-d H:i:s", intval(STARTED)) .
"\n";
$prom .=
"# exported at : $t / " .
date("Y-m-d H:i:s", intval($t)) .
"\n";
$prom .= "# \n";
$prom .=
"# mem usage : " .
round(memory_get_usage() / 1024 / 1024, 1) .
" / " .
round(memory_get_usage(true) / 1024 / 1024, 1) .
" MB\n";
$prom .=
"# mem usage peak : " .
round(memory_get_peak_usage() / 1024 / 1024, 1) .
" / " .
round(memory_get_peak_usage(true) / 1024 / 1024, 1) .
" MB\n";
$timeframe = [
60 => "Minute",
300 => "5 Minutes",
900 => "15 Minutes",
1800 => "30 Minutes",
3600 => "Hour",
21600 => "6 Hours",
43200 => "12 Hours",
86400 => "Day",
];
foreach ($stats as $key => $values) {
$prom .= "# \n";
foreach ($timeframe as $time => $name) {
$count = count(
array_filter($values, function ($v) use ($time) {
return $v > time() - $time;
})
);
$fkey = str_pad($key, 8);
$fname = str_pad($name, 10);
$prom .= "# $fkey in the last $fname: $count\n";
}
}
foreach ($data as $entry) {
$labels = [];
if (isset($entry["labels"])) {
foreach ($entry["labels"] as $labelName => $labelValue) {
$labels[] =
"$labelName=" .
json_encode(
strval($labelValue),
JSON_UNESCAPED_UNICODE + JSON_UNESCAPED_SLASHES
);
}
}
$labels = implode(", ", $labels);
if ($labels != "") {
$labels = "{" . $labels . "}";
}
$prom .=
$entry["name"] .
$labels .
" " .
$entry["value"] .
" " .
$entry["timestamp"] .
"\n";
}
file_put_contents("/www/metrics/new.prom", $prom);
rename("/www/metrics/new.prom", "/www/metrics/index.prom");
// return $prom;
}
function filter(): void
{
global $data, $stats;
$data = array_filter($data, function ($entry) {
return $entry["timestamp"] > time() - 5 * 60; // 5 min is the hardcoded value in prometheus
});
// remove stats older than 24h
foreach ($stats as $key => $values) {
$stats[$key] = array_filter($values, function ($v) {
return $v > time() - 24 * 60 * 60;
});
}
}
function getEnvWithDefaultStr(string $key, string $default): string
{
return strval(getEnvWithDefault($key, $default));
}
function getEnvWithDefaultInt(string $key, int $default): int
{
return intval(getEnvWithDefault($key, $default));
}
function getEnvWithDefault(string $key, bool|string|int $default): bool|string|int
{
$value = getenv($key);
if(!is_scalar($value)){
return $default;
}
if ($value === false) {
return $default;
}
if(is_numeric($default)){
return intval($value);
}
return strval($value);
}
function endit(): void
{
global $mqtt;
error("Exiting");
$mqtt->loop(100);
$mqtt->disconnect();
exit(0);
}

View File

@ -0,0 +1,7 @@
FROM debian:12
RUN apt-get update && apt-get install -y nginx-light
ADD prom.conf /etc/nginx/sites-available
RUN rm /etc/nginx/sites-enabled/*
RUN ln -s /etc/nginx/sites-available/prom.conf /etc/nginx/sites-enabled/
EXPOSE 80
CMD ["nginx", "-g", "daemon off;"]

View File

@ -0,0 +1,8 @@
server {
listen 80 default_server;
root /www/metrics;
index index.prom;
types {
text/plain prom;
}
}