Raw Messages
Raw Messages mode lets ReductROS decode ROS messages that are stored as individual records rather
than as MCAP or rosbag archives. It is intended for pipelines that store ROS 1 schema metadata in
the $ros attachment of each entry.
This feature is available under a commercial license. For testing, you can either use a free demo server (extension included) or request a demo license for your own deployment.
Supported input
Raw Messages expects:
- one ROS 1 message per record
- one or more queried entries
- a
$rosattachment on each queried entry that provides the schema metadata
Unlike the MCAP and rosbag archive modes, Raw Messages works directly with individual records. It supports extraction to JSON and export to MCAP.
Required $ros attachment
Raw Messages uses an attachment named $ros on each queried entry. The attachment must contain the
following JSON fields:
| Field | Type | Description |
|---|---|---|
encoding | string | Must be ros1. |
schema | string | Full ROS 1 message definition used to decode the binary payload. |
topic | string | Topic name associated with the stored records. |
schema_name | string | ROS 1 message type, for example std_msgs/String. |
If the records were ingested with reduct-bridge, this attachment is written automatically.
Example $ros metadata
{
"encoding": "ros1",
"schema": "string data",
"topic": "/chatter",
"schema_name": "std_msgs/String"
}
Query format
{
"#ext": {
"ros": {
"extract": {
"topic": "string", # Optional ROS topic filter when querying multiple entries
"encode": "object", # e.g., {"data": "jpeg"} for JPEG encoding
"as_label": "object" # e.g., {"label_name": "path_to_json"}
},
"export": {
"format": "string", # Currently only "mcap"
"duration": "string", # e.g., "1m" (max episode duration)
"size": "string" # e.g., "100MB" (max content length)
}
}
}
}
Specify exactly one of extract or export. Both operations resolve schema and topic information
from the $ros attachment of each queried entry.
Data extraction
The extract property tells ReductROS how to decode each raw ROS 1 record and how to shape the
resulting JSON output.
| Parameter | Type | Mandatory | Description |
|---|---|---|---|
topic | string | No | Filter decoded records by ROS topic from the $ros attachment. Useful when querying multiple entries. |
encode | object | No | A map of binary fields to conversion formats. |
encode.<field> | string | No | Conversion format for the field. Supported values are base64 and jpeg. |
as_label | object | No | Computed labels extracted from the decoded JSON payload. Use JSON paths without the $. prefix. |
If topic is omitted, Raw Messages extracts all queried entries that have a valid $ros attachment.
Encoding binary fields
The encode property controls how byte arrays are represented in the JSON output.
| Format | Description |
|---|---|
base64 | Encodes the selected byte array as a base64 string. |
jpeg | Encodes image bytes as a base64-encoded JPEG. |
Currently, JPEG conversion is supported only for sensor_msgs/Image messages with encoding set
to rgb8, bgr8, or mono8.
Timestamps
The extractor uses header.stamp.sec and header.stamp.nsec when they are present in the decoded
message. If the message has no ROS header, the output record keeps the original ReductStore record
timestamp.
Data export
The export property converts raw ROS 1 records into MCAP episodes. It can aggregate data from
multiple ReductStore entries into a single MCAP output stream, using the topic and schema from the
$ros attachment of each entry.
| Parameter | Type | Mandatory | Description |
|---|---|---|---|
format | string | Yes | Output format. Currently only mcap is supported. |
duration | string | No | Maximum episode duration, for example 30s, 5m, or 2h. |
size | string | No | Maximum uncompressed episode size, for example 100MB or 1GB. Minimum supported size is 1KB. |
When export is provided, the extension:
- Reads raw ROS 1 records from the queried entries.
- Resolves schema and topic metadata for each entry from its
$rosattachment. - Streams all matching messages into a new MCAP file.
- Starts a new MCAP episode whenever
durationorsizeis reached.
Output behavior
- One output record is produced per input raw ROS record during extraction.
- Extraction preserves the input entry name and emits
application/json. - Export emits
application/mcaprecords. - Original input labels are preserved.
- Existing computed labels are preserved during extraction.
- Extraction adds the following computed labels:
encodingschematopic- labels defined via
as_label
Examples
The following examples use Raw ROS sample data stored as application/ros1 records. Although the
examples are written in Python, the same query structure works with any official SDK.
Extracting messages as JSON
This example writes raw geometry_msgs/Point records, stores the schema metadata in the $ros
attachment, and then extracts them as JSON. It also creates a computed label from the decoded x
field.
- Python
import json
from pathlib import Path
from time import time_ns
from reduct import Client
HERE = Path(__file__).parent
DATA = HERE / '../data'
async def main():
async with Client('http://localhost:8383', api_token='my-token') as client:
bucket = await client.create_bucket('my-bucket', exist_ok=True)
entry = f'sensor-pos-{time_ns()}'
with open(DATA / 'raw_ros_point_ros_attachment.json', 'r', encoding='utf-8') as f:
ros_attachment = json.load(f)
with open(DATA / 'raw_ros_point_records.json', 'r', encoding='utf-8') as f:
records = json.load(f)
await bucket.write_attachments(entry, {'$ros': ros_attachment})
for sample in records:
payload = (DATA / sample['file']).read_bytes()
await bucket.write(
entry,
payload,
content_length=len(payload),
timestamp=sample['timestamp'],
labels=sample['labels'],
content_type='application/ros1',
)
condition = {
'#ext': {
'ros': {
'extract': {
'as_label': {
'coord_x': 'x',
},
},
},
'when': {
'@coord_x': {'$eq': 2.0},
},
}
}
async for record in bucket.query(entry, start=records[0]['timestamp'], when=condition):
print(f'Record timestamp: {record.timestamp}')
print(f"Record labels: {dict(sorted(record.labels.items()))}")
payload = await record.read_all()
message = json.loads(payload.decode('utf-8'))
print(json.dumps(message, sort_keys=True, separators=(',', ':')))
if __name__ == '__main__':
import asyncio
asyncio.run(main())
Expected output
Record timestamp: 1773324549469334
Record labels: {'@coord_x': '2.0', '@encoding': 'ros1', '@schema': 'geometry_msgs/Point', '@topic': '/sensor/pos', 'x': '2.0'}
{"x":2.0,"y":2.1,"z":2.2}
Explanation
- The example uploads three raw ROS 1 records to a new entry.
- It writes the matching
$rosattachment before querying the entry. - The extractor decodes the binary payload into a JSON
geometry_msgs/Pointmessage. - The
as_labeloption exposes the decodedxfield as@coord_x. - The
whenfilter returns only the record where@coord_xequals2.0.
Extracting image messages with JPEG encoding
This example writes a raw sensor_msgs/Image record and returns the data field as a JPEG-encoded
string inside the JSON response.
- Python
import json
from pathlib import Path
from time import time_ns
from reduct import Client
HERE = Path(__file__).parent
DATA = HERE / '../data'
async def main():
async with Client('http://localhost:8383', api_token='my-token') as client:
bucket = await client.create_bucket('my-bucket', exist_ok=True)
entry = f'camera-raw-{time_ns()}'
with open(DATA / 'raw_ros_image_ros_attachment.json', 'r', encoding='utf-8') as f:
ros_attachment = json.load(f)
with open(DATA / 'raw_ros_image_records.json', 'r', encoding='utf-8') as f:
records = json.load(f)
sample = records[0]
payload = (DATA / sample['file']).read_bytes()
await bucket.write_attachments(entry, {'$ros': ros_attachment})
await bucket.write(
entry,
payload,
content_length=len(payload),
timestamp=sample['timestamp'],
labels=sample['labels'],
content_type='application/ros1',
)
condition = {
'#ext': {
'ros': {
'extract': {
'encode': {
'data': 'jpeg',
},
},
},
}
}
async for record in bucket.query(entry, start=sample['timestamp'], when=condition):
print(f'Record timestamp: {record.timestamp}')
print(f"Record labels: {dict(sorted(record.labels.items()))}")
message = json.loads((await record.read_all()).decode('utf-8'))
image_info = {k: v for k, v in message.items() if k != 'data'}
print(f'Image parameters: {json.dumps(image_info, sort_keys=True)}')
if __name__ == '__main__':
import asyncio
asyncio.run(main())
Expected output
Record timestamp: 1773324549067620
Record labels: {'@encoding': 'ros1', '@schema': 'sensor_msgs/Image', '@topic': '/camra/raw'}
Image parameters: {"encoding": "rgb8", "header": {"frame_id": "", "seq": 1, "stamp": {"nsec": 67620754, "sec": 1773324549}}, "height": 1, "is_bigendian": 0, "step": 9, "width": 3}
Explanation
- The example uploads a raw ROS 1 image record and writes the matching
$rosattachment. - The
encode.data = jpegoption converts the binary image buffer into a JPEG string. - The script prints only the decoded image metadata to keep the output readable.
- The output timestamp comes from the ROS header stamp rather than the original record timestamp.
Exporting multiple raw entries as MCAP
This example writes raw ROS 1 records into two entries and exports both entries as a single MCAP episode.
- Python
import json
from pathlib import Path
from time import time_ns
from reduct import Client
HERE = Path(__file__).parent
DATA = HERE / '../data'
async def main():
async with Client('http://localhost:8383', api_token='my-token') as client:
bucket = await client.create_bucket('my-bucket', exist_ok=True)
entry_a = f'sensor-pos-{time_ns()}'
entry_b = f'sensor-pos-copy-{time_ns()}'
with open(DATA / 'raw_ros_point_ros_attachment.json', 'r', encoding='utf-8') as f:
base_attachment = json.load(f)
attachment_a = dict(base_attachment)
attachment_b = dict(base_attachment)
attachment_b['topic'] = '/sensor/pos_copy'
with open(DATA / 'raw_ros_point_records.json', 'r', encoding='utf-8') as f:
records = json.load(f)
await bucket.write_attachments(entry_a, {'$ros': attachment_a})
await bucket.write_attachments(entry_b, {'$ros': attachment_b})
for sample in records[:2]:
payload = (DATA / sample['file']).read_bytes()
await bucket.write(
entry_a,
payload,
content_length=len(payload),
timestamp=sample['timestamp'],
labels=sample['labels'],
content_type='application/ros1',
)
sample = records[2]
payload = (DATA / sample['file']).read_bytes()
await bucket.write(
entry_b,
payload,
content_length=len(payload),
timestamp=sample['timestamp'],
labels=sample['labels'],
content_type='application/ros1',
)
condition = {
'#ext': {
'ros': {
'export': {
'format': 'mcap',
'duration': '1m',
},
},
}
}
async for record in bucket.query('*', start=records[0]['timestamp'], when=condition):
print(f'Record entry: {record.entry}')
print(f'Record timestamp: {record.timestamp}')
data = await record.read_all()
print(f'Episode file size: {len(data)} bytes')
if __name__ == '__main__':
import asyncio
asyncio.run(main())
Explanation
- The example writes raw ROS 1 point messages into two ReductStore entries.
- Each entry gets its own
$rosattachment with the topic and schema metadata. export.format = mcaptells ReductROS to emit MCAP instead of JSON.- Because the query targets
*, both entries are aggregated into the same exported MCAP episode. duration = 1menables splitting if the exported stream grows beyond one minute of source data.