MQTT input
The MQTT input subscribes to one or more MQTT topics and forwards messages into a pipeline.
Both mqtt:// and mqtts:// brokers are supported.
MQTT label rules follow the same ordered model as the ROS inputs:
- static labels are applied first
- payload field labels are applied after static labels and can override them
- MQTT v5 property labels are also applied in order and can override earlier labels
For MQTT v5 property labels:
property = "content_type"reads the built-in MQTT v5content_typeproperty- any other
property = "<name>"reads the MQTT v5 user property named<name>
Configuration
[inputs.mqtt.main]
broker = "mqtts://broker.example.com:8883"
client_id = "reduct-bridge"
version = "v5"
qos = 1
username = "bridge"
password = "${MQTT_PASSWORD}"
entry_prefix = "/mqtt"
[[inputs.mqtt.main.topics]]
name = "factory/+/telemetry"
entry_name = "telemetry"
content_type = "application/json"
# Optional label rules (default = []):
# 1) Dynamic field label:
# { field = "device_id", label = "device" }
# 2) Static labels:
# { static = { source = "mqtt", site = "lab" } }
# 3) MQTT v5 property label:
# { property = "content_type", label = "mime" }
# { property = "tenant", label = "tenant" } # MQTT v5 user property
labels = [
{ field = "device_id", label = "device" },
{ field = "site", label = "site" },
{ static = { source = "mqtt" } },
{ property = "content_type", label = "mime" },
{ property = "tenant", label = "tenant" }
]
[[inputs.mqtt.main.topics]]
name = "factory/+/events"
labels = [
{ static = { source = "mqtt-events" } }
]
entry_name, content_type, and labels are configured per topic. content_type is used as the default record content type when an MQTT v5 publish does not provide a content_type property; for MQTT v3 it is used directly.
Use bare names such as tenant, status, or id for MQTT v5 user properties.
MQTT v3 example
[inputs.mqtt.legacy]
broker = "mqtt://broker.example.com:1883"
client_id = "reduct-bridge-legacy"
version = "v3"
qos = 0
username = "legacy_user"
password = "${LEGACY_MQTT_PASSWORD}"
entry_prefix = "/mqtt"
[[inputs.mqtt.legacy.topics]]
name = "legacy/+/data"
entry_name = "legacy"
content_type = "application/json"
labels = [
{ field = "line", label = "line" },
{ static = { source = "mqtt-v3" } }
]