Rosbag
Rosbag is the standard ROS 2 recording format (rosbag2), capturing time-series messages across topics with metadata. ReductROS works with rosbag archives stored in ReductStore as ZIP files that include metadata.yaml and use ROS 2 SQLite3 or MCAP storage.
This feature is available under a commercial license. For testing, you can either use a free demo server (extension included) or request a demo license for your own deployment.
Supported input
The ReductROS extension supports ROS 2 rosbag2 archives in SQLite3 or MCAP storage formats.
Upload the rosbag directory as a ZIP archive that contains metadata.yaml at the root (or one level below).
Store it in ReductStore with the content type application/rosbag.
Query format
A user can use the ext query parameter to activate the ros extension and define the parameters for extracting and transforming rosbag data in the following format:
{
"ext": {
"ros": {
"rosbag": {},
"extract": {
"topic": "string", # ROS topic name to extract from
"encode": "object", # e.g., {"data":"jpeg"} for JPEG encoding
"as_label": "object" # e.g., {"label_name": "path_to_json"}
},
"transform": {
"include": "string[]", # e.g., ["/topic-.*"]
"exclude": "string[]", # e.g., ["/topic-b", "/ext-.*"]
"duration": "string", # e.g., "1m" (max episode duration)
"size": "string" # e.g., "100MB" (max content length)
}
}
}
}
You must specify either extract or transform for a rosbag request.
Data extraction
The extract property allows you to select a single topic from a rosbag and convert it to JSON.
The extension reads the rosbag, decodes the messages for the specified topic, and returns each message as a JSON record with
timestamps from the message header (when present).
| Parameter | Type | Mandatory | Description |
|---|---|---|---|
topic | string | Yes | The ROS topic name to extract messages from. This must match the topic names in the rosbag. |
encode | object | No | A dictionary specifying how to encode binary data in the messages. For example, {"data": "jpeg"} for JPEG encoding. |
encode.<field> | string | No | The encoding format for binary fields in the message. Supported values are base64 and jpeg. If not specified, the field will be returned as a JSON list. |
as_label | object | No | An object that specifies dynamic computed labels that will be delivered to the client. Can be a basis for output filtering. |
Encoding binary data
The encode property allows you to specify how to handle binary data in the extracted messages.
You can choose to encode binary fields in the following formats:
| Format | Description |
|---|---|
base64 | Encodes binary data as a base64 string, suitable for text-based formats like JSON. |
jpeg | Encodes binary data as a JPEG image encoded to a base64 string, suitable for image data. |
Currently, the extension supports encoding only sensor_msgs/msg/Image messages with encoding set to rgb8, bgr8, or mono8.
Extract output
Each output record contains:
- JSON payload for the decoded ROS 2 message
timestampfrom the message headercomputed_labelswithtopic,schema, andencoding
Example
{
"ext": {
"ros": {
"rosbag": {},
"extract": {
"topic": "/test/geometry_msgs/accel"
}
}
}
}
Data transformation
The transform property allows you to filter topics from a rosbag and re-pack them into new rosbag episodes.
The output is a ZIP archive with content type application/rosbag.
| Field | Type | Mandatory | Description |
|---|---|---|---|
include | string[] | No | List of topics to include. Supports regular expressions. If omitted, all topics are considered. |
exclude | string[] | No | List of topics to exclude. Applied after include. |
duration | string | No | Maximum duration per episode (e.g., 5m, 2h, 1d). |
size | string | No | Maximum content length per episode (e.g., 100MB, 1GB). |
If a topic pattern does not contain regex tokens (like .*, ^, $), it is treated as an exact match.
Transforming rosbag
When transform is provided, the extension:
- Reads the source rosbag.
- Applies topic filters (
includefirst, thenexclude). - Streams matching messages into a new rosbag.
- Starts a new episode whenever
durationorsizeis reached.
Example
{
"ext": {
"ros": {
"rosbag": {},
"transform": {
"include": ["/camera/.*", "^/imu/data$"],
"exclude": ["/camera/debug"],
"duration": "10m",
"size": "500MB"
}
}
}
}
Result
- All topics starting with
/camera/are included (because of/camera/.*). - The topic
/imu/datais included exactly (because of^/imu/data$). - The topic
/camera/debugis excluded, even though it matches the/camera/.*rule. - Output is a sequence of rosbag episodes, each ≤ 10 minutes or ≤ 500 MB (whichever comes first).
Examples
The following examples demonstrate how to use the ReductROS extension to extract and transform rosbag data. Although these examples are written in Python, they can be run using any of the official SDKs.
Extracting messages as JSON
This example demonstrates how to extract a single topic from a rosbag stored in ReductStore and convert it to JSON.
- Python
from time import time_ns
from pathlib import Path
from reduct import Client
HERE = Path(__file__).parent
async def main():
async with Client("http://localhost:8383", api_token="my-token") as client:
bucket = await client.create_bucket(
"my-bucket",
exist_ok=True,
)
# Write a rosbag ZIP archive with timestamps
now = time_ns() // 1000
with open(f"{HERE}/../data/rosbag_test.zip", "rb") as f:
data = f.read()
await bucket.write(
"rosbag",
data,
content_length=len(data),
timestamp=now,
content_type="application/rosbag",
)
# Prepare the query with the 'ros' extension
condition = {
"#ext": {
"ros": {
"rosbag": {},
"extract": {
"topic": "/test/geometry_msgs/accel",
"as_label": {
"lin_x": "linear.x",
},
},
},
"when": {
"@lin_x": {"$gt": 0.0},
},
}
}
# Query the data with the 'ros' extension
async for record in bucket.query("rosbag", start=now, when=condition):
print(f"Record entry: {record.entry}")
print(f"Record timestamp: {record.timestamp}")
print(f"Record labels: {record.labels}")
json = await record.read_all()
print(json.decode("utf-8").strip())
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Expected output
The expected output of the above code is as follows:
Record timestamp: 1768398396081426
Record labels: {'encoding': 'cdr', 'schema': 'geometry_msgs/Accel', 'topic': '/test/geometry_msgs/accel', 'lin_x': '0.1'}
{"angular":{"x":0.0,"y":0.0,"z":0.1},"linear":{"x":0.1,"y":0.2,"z":9.8}}
Transforming rosbag with splitting and topic filtering
This example demonstrates how to filter topics from a rosbag and emit new rosbag episodes.
- Python
from time import time_ns
from pathlib import Path
from reduct import Client
HERE = Path(__file__).parent
async def main():
async with Client("http://localhost:8383", api_token="my-token") as client:
bucket = await client.create_bucket(
"my-bucket",
exist_ok=True,
)
# Write a rosbag ZIP archive with timestamps
now = time_ns() // 1000
with open(f"{HERE}/../data/rosbag_test.zip", "rb") as f:
data = f.read()
await bucket.write(
"rosbag",
data,
content_length=len(data),
timestamp=now,
content_type="application/rosbag",
)
# Prepare the query with the 'ros' extension
condition = {
"#ext": {
"ros": {
"rosbag": {},
"transform": {
"include": ["/test/geometry_msgs/accel"],
"duration": "1m",
"size": "100KB",
},
}
}
}
# Query the data with the 'ros' extension
async for record in bucket.query("rosbag", start=now, when=condition):
print(f"Record entry: {record.entry}")
print(f"Record timestamp: {record.timestamp}")
# Each record corresponds to a new rosbag episode
data = await record.read_all()
print(f"Episode file size: {len(data)} bytes")
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Expected output
The expected output of the above code is as follows:
Record timestamp: 1768398396081426
Episode file size: 1783 bytes