Skip to main content
Version: Next

Zenoh API

ReductStore can receive and serve time-series data directly over Zenoh, using its native pub/sub and query primitives. This is an alternative ingestion and retrieval path that runs alongside the existing HTTP API. Both work at the same time and share the same stored data.

Refer to the Zenoh API Settings section for configuration details.

info

The Zenoh API is compiled under the zenoh-api Cargo feature flag. Official Docker images ship with this feature enabled. If you build from source, add --features zenoh-api to your cargo build invocation.

How It Works

ReductStore opens a Zenoh session on startup and registers a subscriber (for writes) and a queryable (for reads). Both talk directly to the storage engine without going through HTTP.

Zenoh conceptReductStore mapping
SubscriberWrite path. Samples published to keys matching the configured key expression are ingested.
QueryableRead path. Queries whose selector matches the configured key expression retrieve records.
KeyThe concrete address of a sample (e.g. robot/arm/joint1). Becomes the entry name in the bucket.
Key expressionA wildcard pattern (e.g. robot/**). Used to configure which keys the subscriber and queryable respond to.
EncodingMapped to the record's content type.
AttachmentJSON-encoded map of labels (key-value metadata).
TimestampZenoh HLC timestamp converted to Unix microseconds.
info

All data is stored in a single ReductStore bucket. The bucket is created automatically if it does not exist.

Key Expressions

In Zenoh, a key is a concrete address for a value, such as organizationA/building8/room275/sensor3/temperature. A key expression is a pattern defined using the Key Expression Language that can match a set of keys.

RS_ZENOH_SUB_KEYEXPRS and RS_ZENOH_QUERY_KEYEXPRS take key expressions. They define which keys the subscriber and queryable respond to. The key of each individual sample (the concrete address it was published to) is what becomes the entry name in ReductStore.

A key is a /-separated sequence of non-empty UTF-8 segments, similar to a filesystem path. Individual key segments may not contain *, $, ?, or #.

Key expressions extend plain keys with three wildcards. The following table and examples are taken from the Zenoh abstractions documentation:

WildcardMatchesExample
*Any single segment (not including /). Must be surrounded by / or be the whole chunkrobot1/*/temperature matches robot1/imu/temperature but not robot1/a/b/temperature
$*Any substring within a single segment, including an empty stringrobot$*/temperature matches robot1/temperature and robotA/temperature
**Any number of segments, including none. Must appear after / or at the startfactory/**/temperature matches all temperature keys under factory at any depth
tip

Prefer * and ** over $* when designing your key space. According to Zenoh's documentation, $* is slower and is usually only needed when different identifiers are mixed within the same segment, e.g. robot12 and pc18. Structuring keys as robot/12 and pc/18 instead avoids this.

Selectors

When querying, Zenoh uses a selector which is a key expression optionally followed by ? and a list of parameters. The following syntax is from the Zenoh abstractions documentation:

factory/**/temperature?start=1700000000000000;stop=1700086400000000
^ ^^ ^
|--- key expression --||-------------- parameters ---------------|

The key expression part is used by Zenoh routers to route the query to the right queryable. The parameters are passed through to ReductStore and interpreted as query filters (see Time-range Query below). Parameters are ;-separated key=value pairs.

Write Path

When a Zenoh publisher sends a sample to a key expression that matches RS_ZENOH_SUB_KEYEXPRS:

  1. The sample's key becomes the entry name (leading and trailing / are stripped).
  2. The sample payload is written to that entry in RS_ZENOH_BUCKET.
  3. The sample's encoding is mapped to the record's content type.
  4. If the sample carries a Zenoh timestamp, it is converted from HLC to Unix microseconds and used as the record timestamp. Otherwise the server clock is used.
  5. If the sample carries a Zenoh attachment, it is decoded as a JSON object and stored as record labels.
  6. Two additional labels are automatically added to every record:
    • zenoh_source_id: Identifies the Zenoh node that published the sample. Useful for tracing data provenance in distributed systems.
    • zenoh_ts_ntp64: The original Zenoh HLC timestamp in NTP64 format, preserving full precision before conversion to Unix microseconds.

Encoding and Content Type

Zenoh samples carry an optional encoding that describes the format of their payload. ReductStore stores this as the record's content type, making it available when reading back the data over HTTP or Zenoh.

import zenoh

KEY = "factory/line1/status"
CONSOLIDATION = zenoh.ConsolidationMode.NONE

with zenoh.open(zenoh.Config()) as session:
session.put(
KEY,
b'{"state": "running", "speed": 120}',
encoding=zenoh.Encoding.APPLICATION_JSON,
)

for reply in session.get(
f"{KEY}?last=true",
timeout=5.0,
consolidation=CONSOLIDATION,
):
if reply.ok:
print(reply.ok.payload.to_bytes())
break

Sending Labels

Zenoh samples can also carry an optional attachment, which is a binary blob that can be used to store arbitrary metadata. ReductStore expects this attachment to be a JSON-encoded map of string key-value pairs, which it stores as record labels.

import json
import zenoh

KEY = "factory/line1/camera"
PAYLOAD = b"<binary payload>"
LABELS = {"robot": "alpha", "status": "ok"}

with zenoh.open(zenoh.Config()) as session:
session.put(
KEY,
PAYLOAD,
attachment=json.dumps(LABELS).encode(),
)

Read Path

When a Zenoh client sends a query to a key expression that matches RS_ZENOH_QUERY_KEYEXPRS, ReductStore looks up the matching records and replies with them.

The query takes the form of a selector: the key expression routes the query to the right queryable, and the parameters after ? control which records are returned. Parameters are ;-separated key=value pairs.

tip

When querying for multiple records, set consolidation=zenoh.ConsolidationMode.NONE. This ensures all matching records are returned individually rather than merged.

Point-in-time Lookup

Fetch the single record at an exact timestamp:

ParameterDescriptionType
tsExact record timestamp in Unix microsecondsTimestamp
import zenoh

KEY = "factory/line1/camera"
TS = 1700000000000000
CONSOLIDATION = zenoh.ConsolidationMode.NONE

with zenoh.open(zenoh.Config()) as session:
replies = [
reply
for reply in session.get(
f"{KEY}?ts={TS}",
timeout=5.0,
consolidation=CONSOLIDATION,
)
if reply.ok
]
for reply in replies:
print(reply.ok.payload.to_bytes())

Latest Record

Fetch only the most recent record in an entry:

ParameterDescriptionTypeDefault
lastReturn the latest recordBooleanFalse
import zenoh

KEY = "factory/line1/last-query"
CONSOLIDATION = zenoh.ConsolidationMode.NONE

with zenoh.open(zenoh.Config()) as session:
replies = [
reply
for reply in session.get(
f"{KEY}?last=true",
timeout=5.0,
consolidation=CONSOLIDATION,
)
if reply.ok
]
for reply in replies:
print(reply.ok.payload.to_bytes())

Time-range Query

If neither ts nor last is set, a range query is performed over all records in the entry:

ParameterDescriptionTypeDefault
startRange start timestamp (Unix microseconds, inclusive)TimestampTimestamp of the first record
stopRange stop timestamp (Unix microseconds, exclusive)TimestampTimestamp of the last record
strictFail the query if the when condition references unknown labelsBooleanFalse
import zenoh

KEY = "factory/line1/range-query"
START = 1700000000000000
STOP = 1700000000010000
CONSOLIDATION = zenoh.ConsolidationMode.NONE

with zenoh.open(zenoh.Config()) as session:
replies = [
reply
for reply in session.get(
f"{KEY}?start={START};stop={STOP}",
timeout=5.0,
consolidation=CONSOLIDATION,
)
if reply.ok
]
for reply in replies:
print(reply.ok.payload.to_bytes())

Conditional Queries

For any range query you can filter records by label using the full ReductStore conditional query language. Pass a when expression as a JSON attachment on the session.get() call:

attachment = json.dumps({"when": {"&status": {"$eq": "ok"}}}).encode()
session.get(key, attachment=attachment, ...)

The when object follows the same syntax as the HTTP API. Refer to the Conditional Query Reference for the full list of operators.

import json
import zenoh

KEY = "factory/line1/when-query"
CONSOLIDATION = zenoh.ConsolidationMode.NONE
attachment = json.dumps({"when": {"&status": {"$eq": "ok"}}}).encode()

with zenoh.open(zenoh.Config()) as session:
replies = [
reply
for reply in session.get(
KEY,
timeout=5.0,
attachment=attachment,
consolidation=CONSOLIDATION,
)
if reply.ok
]

for reply in replies:
print(reply.ok.payload.to_bytes())

Reading Labels from Replies

Labels are returned as a JSON attachment on each reply sample.

Along with your custom labels, ReductStore also includes Zenoh metadata labels in replies:

  • zenoh_source_id
  • zenoh_ts_ntp64
import json
import zenoh

KEY = "factory/line1/camera"
CONSOLIDATION = zenoh.ConsolidationMode.NONE

with zenoh.open(zenoh.Config()) as session:
replies = [
reply
for reply in session.get(
f"{KEY}?last=true",
timeout=5.0,
consolidation=CONSOLIDATION,
)
if reply.ok
]
for reply in replies:
sample = reply.ok
labels = {}
if sample.attachment is not None:
labels = json.loads(sample.attachment.to_bytes())

print(labels)

Current Limitations

  • Single-bucket mode. All Zenoh data maps to one configured bucket. Routing to different buckets based on the key expression is not yet supported.
  • No data deletion. Records, entries, and buckets cannot be deleted through the Zenoh API. Use the HTTP API for data management operations.