mqtt2prom/mqtt2prom/run

371 lines
9.8 KiB
PHP
Executable File

#!/usr/bin/env php
<?php
use Mosquitto\Message;
define("STARTED", time());
$mqttHost = getenv("MQTT_HOST");
if (!$mqttHost) {
echo "Please set MQTT_HOST environment variable\n";
exit(1);
}
pcntl_signal(SIGINT, "endit");
pcntl_signal(SIGTERM, "endit");
pcntl_signal(SIGHUP, "endit");
$mqttPort = getEnvWithDefaultInt("MQTT_PORT", 1883);
$mqttUser = getEnvWithDefaultStr("MQTT_USER", "");
$mqttPass = getEnvWithDefaultStr("MQTT_PASS", "");
$mqttClientId = getEnvWithDefaultStr("MQTT_CLIENT_ID", "mqtt2prometheus");
$mqttTopic = getEnvWithDefaultStr("MQTT_TOPIC", "prometheus");
$qos = getEnvWithDefaultInt("MQTT_QOS", 2);
$ignoreRetained = getEnvWithDefaultInt("IGNORE_RETAINED", 1) ? 1 : 0;
$mqtt = new Mosquitto\Client($mqttClientId);
$data = [];
$mqtt->setCredentials($mqttUser, $mqttPass);
$usedConfig = [
"MQTT_HOST" => $mqttHost,
"MQTT_PORT" => $mqttPort,
"MQTT_USER" => $mqttUser,
"MQTT_PASS" => $mqttPass,
"MQTT_CLIENT_ID" => $mqttClientId,
"MQTT_TOPIC" => $mqttTopic,
"MQTT_QOS" => $qos,
"IGNORE_RETAINED" => $ignoreRetained,
];
$padLength = max(array_map("strlen", array_keys($usedConfig)));
echo "Using the following config:\n";
foreach ($usedConfig as $key => $value) {
echo str_pad($key, $padLength) . ": $value\n";
}
echo "Connecting to $mqttHost:$mqttPort";
$mqtt->connect($mqttHost, $mqttPort);
echo "Connected\n";
$data = [];
$stats = [
"messages" => [],
"errors" => [],
"metrics" => [],
];
$news = true;
$isRunning = true;
$globalCounter = 0;
$mqtt->onMessage(function ($message) use (
&$data,
&$stats,
&$news,
&$globalCounter,
&$isRunning,
$mqttTopic
) {
if ($mqttTopic . "/restart" === $message->topic) {
$isRunning = false;
return;
}
$globalCounter++;
$stats["messages"][] = time();
if (!precheck($message)) {
$stats["errors"][] = time();
return;
}
$stats["metrics"][] = time();
$payload = (array) json_decode($message->payload, true);
$payload["timestamp"] = time();
$data[] = $payload;
$news = true;
});
echo "Subscribing to $mqttTopic with QoS $qos ";
$mqtt->subscribe($mqttTopic, $qos);
$mqtt->subscribe($mqttTopic . "/restart", $qos);
echo "Subscribed\n";
$timer = 0;
echo "Started listening for incoming messages\n will wait 60 seconds before starting to export data\n";
$realStart = time() + 15;
echo "waiting till " .
date("Y-m-d H:i:s", $realStart) .
" that's in " .
($realStart - time()) .
" seconds\n";
while (time() < $realStart) {
$wait = time() + 5;
$mqtt->loop(5000);
sleep($wait - time());
echo "Still waiting...\n";
}
echo "\n Starting to export data\n";
while ($isRunning) {
$mqtt->loop(1000);
if ($news || time() - $timer > 15) {
output();
$news = false;
$timer = time();
}
}
$mqtt->disconnect();
$mqtt->loop();
exit();
function error(string $msg): void
{
global $mqtt, $mqttTopic;
$mqtt->publish($mqttTopic . "/error", $msg, 2);
fwrite(STDERR, "ERROR: " . $msg . "\n");
}
function precheck(Message $message): bool
{
global $ignoreRetained;
if ($ignoreRetained && $message->retain) {
error("Ignoring retained message");
return false;
}
$payloadRaw = $message->payload;
$payload = json_decode($payloadRaw, true);
if (json_last_error() != JSON_ERROR_NONE) {
error(
"Invalid json: $message->payload (" . json_last_error_msg() . ")"
);
return false;
}
if (!is_array($payload)) {
error("Payload is not an object: $payloadRaw");
return false;
}
if (!isset($payload["name"]) || !isset($payload["value"])) {
error(
"Invalid payload, at least name and value is needed: $payloadRaw"
);
return false;
}
if (!preg_match('/^[a-zA-Z_][a-zA-Z0-9_]*$/', $payload["name"])) {
error("Invalid name: " . $payload["name"]);
return false;
}
if (!is_numeric($payload["value"])) {
error("Value is not a number: $payloadRaw");
return false;
}
$payload["labels"] = $payload["labels"] ?? [];
if (!is_array($payload["labels"])) {
error("Labels must be an object: $payloadRaw");
return false;
}
foreach ($payload["labels"] as $labelName => $labelValue) {
if (!is_string($labelName) || !is_scalar($labelValue)) {
error("Label names and values must be strings: $payloadRaw");
return false;
}
if (!preg_match('/^[a-zA-Z_][a-zA-Z0-9_]*$/', $labelName)) {
error("Invalid label name: $labelName");
return false;
}
}
if (!isset($payload["valid_seconds"])) {
$payload["valid_seconds"] = 60;
}
if (!is_numeric($payload["valid_seconds"])) {
error("valid_seconds must be a number");
return false;
}
if ($payload["valid_seconds"] < 1) {
error("valid_seconds must be at least 1");
return false;
}
if ($payload["valid_seconds"] > 60 * 60 * 48) {
error("valid_seconds must be at most 48 hours");
return false;
}
foreach (array_keys($payload) as $key) {
if (!in_array($key, ["name", "value", "labels", "valid_seconds"])) {
error("Unknown key: $key");
return false;
}
}
return true;
}
function output(): void
{
filter();
global $data, $stats, $globalCounter;
$t = time();
$json = [];
$prom = "";
$prom .=
"# service started at : " .
STARTED .
" / " .
date("Y-m-d H:i:s", intval(STARTED)) .
"\n";
$prom .=
"# exported at : $t / " .
date("Y-m-d H:i:s", intval($t)) .
"\n";
$prom .= "# \n";
$prom .=
"# mem usage : " .
round(memory_get_usage() / 1024 / 1024, 1) .
" / " .
round(memory_get_usage(true) / 1024 / 1024, 1) .
" MB\n";
$prom .=
"# mem usage peak : " .
round(memory_get_peak_usage() / 1024 / 1024, 1) .
" / " .
round(memory_get_peak_usage(true) / 1024 / 1024, 1) .
" MB\n";
$timeframe = [
60 => "Minute",
300 => "5 Minutes",
900 => "15 Minutes",
1800 => "30 Minutes",
3600 => "Hour",
21600 => "6 Hours",
43200 => "12 Hours",
86400 => "Day",
];
foreach ($stats as $key => $values) {
$prom .= "# \n";
foreach ($timeframe as $time => $name) {
$count = count(
array_filter($values, function ($v) use ($time) {
return $v > time() - $time;
})
);
$fkey = str_pad($key, 8);
$fname = str_pad($name, 10);
$prom .= "# $fkey in the last $fname: $count\n";
}
}
$prom .= "# \n";
$prom .= "# global counter : $globalCounter\n\n";
$filteredData = [];
foreach ($data as $entry) {
$sort = $entry["labels"] ?? [];
ksort($sort);
$sort[] = $entry["name"];
$hash = md5(strval(json_encode($sort)));
$filteredData[$hash] = $entry;
}
foreach ($filteredData as $entry) {
$labels = [];
if (isset($entry["labels"])) {
ksort($entry["labels"]);
foreach ($entry["labels"] as $labelName => $labelValue) {
$labels[] =
"$labelName=" .
json_encode(
strval($labelValue),
JSON_UNESCAPED_UNICODE + JSON_UNESCAPED_SLASHES
);
}
}
$json[] = [
"name" => $entry["name"],
"labels" => $entry["labels"] ?? [],
"value" => $entry["value"],
];
$labels = implode(", ", $labels);
if ($labels != "") {
$labels = "{" . $labels . "}";
}
$prom .= $entry["name"] . $labels . " " . $entry["value"] . "\n";
}
$prom .= "\n\n# internals\n";
$prom .=
"memory_usage_current{unit=\"bytes\"} " . memory_get_usage() . "\n";
$prom .=
"memory_usage_peak{unit=\"bytes\"} " . memory_get_peak_usage() . "\n";
$prom .=
"memory_usage_current_allocated{unit=\"bytes\"} " .
memory_get_usage(true) .
"\n";
$prom .=
"memory_usage_peak_allocated{unit=\"bytes\"} " .
memory_get_peak_usage(true) .
"\n";
file_put_contents("/www/metrics/new.prom", $prom);
file_put_contents(
"/www/metrics/new.json",
json_encode(
$json,
JSON_PRETTY_PRINT + JSON_UNESCAPED_SLASHES + JSON_UNESCAPED_UNICODE
)
);
rename("/www/metrics/new.json", "/www/metrics/metrics.json");
rename("/www/metrics/new.prom", "/www/metrics/index.prom");
// return $prom;
}
function filter(): void
{
global $data, $stats;
$data = array_filter($data, function ($entry) {
return $entry["timestamp"] > time() - $entry["valid_seconds"];
});
// remove stats older than 24h
foreach ($stats as $key => $values) {
$stats[$key] = array_filter($values, function ($v) {
return $v > time() - 24 * 60 * 60;
});
}
}
function getEnvWithDefaultStr(string $key, string $default): string
{
return strval(getEnvWithDefault($key, $default));
}
function getEnvWithDefaultInt(string $key, int $default): int
{
return intval(getEnvWithDefault($key, $default));
}
function getEnvWithDefault(
string $key,
bool|string|int $default
): bool|string|int {
$value = getenv($key);
if (!is_scalar($value)) {
return $default;
}
if ($value === false) {
return $default;
}
if (is_numeric($default)) {
return intval($value);
}
return strval($value);
}
function endit(): void
{
global $mqtt;
global $isRunning;
$isRunning = false;
error("Exiting");
}