MQTT input
The MQTT input subscribes to one or more MQTT topics and forwards messages into a pipeline.
Both mqtt:// and mqtts:// brokers are supported.
Labels are applied with the following rules:
- Labels are processed in the order they appear in
labelsin TOML. - For conflicts, later rules override earlier rules for the same label key.
- This ordering behavior applies to all rule types:
static,field, andproperty. - Payload field labels (
field = "...") work with JSON and protobuf payloads. - MQTT v5 property labels:
property = "content_type"reads the built-in MQTT v5content_typeproperty.property = "<name>"reads the MQTT v5 user property named<name>.
Record timestamps can be mapped per topic with timestamp:
timestamp = { field = "event_time", format = "unix_ms" }reads from a JSON or schema-decoded protobuf payload field.timestamp = { property = "event_time", format = "unix_us" }reads from an MQTT v5 user property.- Supported formats are
unix_s,unix_ms,unix_us,unix_ns,iso8601, andros_stamp. - If timestamp extraction fails, the input falls back to the ingest time.
Performance: Configuring timestamp = { field = "..." } enables payload decoding for each message, which may increase CPU usage. If you already use field-based label rules, no additional cost is incurred since the decoded payload is shared.
Protobuf support
MQTT input supports protobuf payloads and can map protobuf fields to labels. Choose one of these modes.
- Schema mode (recommended)
schema_pathmust point to a protobuf descriptor set file generated byprotoc. Example command:protoc --include_imports --descriptor_set_out=schema.desc schema.proto.schema_nameis the protobuf message type to decode from the payload, e.g.my.package.Message.- Use
{ field = "...", label = "..." }to extract labels by field path. - The schema is emitted as attachment
$schemain JSON with fields:encoding(payload encoding,protobuf),topic(MQTT publish topic that produced the record),schema_name(same as configured), andschema(the configured schema file content, base64-encoded).
Use this mode when you want self-describing data and easier downstream inspection.
- No-schema mode
- Omit both
schema_pathandschema_name. - Use
{ field_id = <id>, field_type = "<type>", label = "..." }to extract labels from raw protobuf wire fields.
Use this mode for lightweight setups when you only need a few known fields.
Configuration
Shared input settings
[inputs.mqtt.main]
broker = "mqtts://broker.example.com:8883"
client_id = "reduct-bridge"
version = "v5"
qos = 1
username = "bridge"
password = "${MQTT_PASSWORD}"
entry_prefix = "/mqtt"
JSON topic
[[inputs.mqtt.main.topics]]
name = "factory/+/telemetry"
entry_name = "telemetry"
content_type = "application/json"
timestamp = { field = "event_time", format = "unix_ms" }
labels = [
{ field = "device_id", label = "device" },
{ field = "site", label = "site" },
{ static = { source = "mqtt" } },
{ property = "content_type", label = "mime" },
{ property = "tenant", label = "tenant" }
]
Protobuf topic with schema (recommended)
[[inputs.mqtt.main.topics]]
name = "factory/electrical/+/power"
entry_name = "power"
content_type = "application/protobuf"
schema_path = "./factory.desc"
schema_name = "factory.PowerReading"
labels = [
{ field = "device_id", label = "meter_id" },
{ field = "panel", label = "panel" }
]
Protobuf topic without schema
[[inputs.mqtt.main.topics]]
name = "factory/electrical/+/power/raw"
content_type = "application/protobuf"
labels = [
{ field_id = 1, field_type = "string", label = "meter_id" }
]
Static-only topic
[[inputs.mqtt.main.topics]]
name = "factory/+/events"
labels = [
{ static = { source = "mqtt-events" } }
]
entry_name, content_type, and labels are configured per topic. content_type
is used as the default record content type when an MQTT v5 publish does not provide
a content_type property; for MQTT v3 it is used directly. Use bare names such as
tenant, status, or id for MQTT v5 user properties.
schema_path and schema_name must be set together.
MQTT v3 example
[inputs.mqtt.legacy]
broker = "mqtt://broker.example.com:1883"
client_id = "reduct-bridge-legacy"
version = "v3"
qos = 0
username = "legacy_user"
password = "${LEGACY_MQTT_PASSWORD}"
entry_prefix = "/mqtt"
[[inputs.mqtt.legacy.topics]]
name = "legacy/+/data"
entry_name = "legacy"
content_type = "application/json"
labels = [
{ field = "line", label = "line" },
{ static = { source = "mqtt-v3" } }
]