Skip to main content
Version: 1.18.x

Rosbag

Rosbag is the standard ROS 2 recording format (rosbag2), capturing time-series messages across topics with metadata. ReductROS works with rosbag archives stored in ReductStore as ZIP files that include metadata.yaml and use ROS 2 SQLite3 or MCAP storage.

License Information

This feature is available under a commercial license. For testing, you can either use a free demo server (extension included) or request a demo license for your own deployment.

Supported input

The ReductROS extension supports ROS 2 rosbag2 archives in SQLite3 or MCAP storage formats. Upload the rosbag directory as a ZIP archive that contains metadata.yaml at the root (or one level below). Store it in ReductStore with the content type application/rosbag.

Query format

A user can use the ext query parameter to activate the ros extension and define the parameters for extracting and transforming rosbag data in the following format:

{
"ext": {
"ros": {
"rosbag": {},

"extract": {
"topic": "string", # ROS topic name to extract from
"encode": "object", # e.g., {"data":"jpeg"} for JPEG encoding
"as_label": "object" # e.g., {"label_name": "path_to_json"}
},

"transform": {
"include": "string[]", # e.g., ["/topic-.*"]
"exclude": "string[]", # e.g., ["/topic-b", "/ext-.*"]
"duration": "string", # e.g., "1m" (max episode duration)
"size": "string" # e.g., "100MB" (max content length)
}
}
}
}
info

You must specify either extract or transform for a rosbag request.

Data extraction

The extract property allows you to select a single topic from a rosbag and convert it to JSON. The extension reads the rosbag, decodes the messages for the specified topic, and returns each message as a JSON record with timestamps from the message header (when present).

ParameterTypeMandatoryDescription
topicstringYesThe ROS topic name to extract messages from. This must match the topic names in the rosbag.
encodeobjectNoA dictionary specifying how to encode binary data in the messages. For example, {"data": "jpeg"} for JPEG encoding.
encode.<field>stringNoThe encoding format for binary fields in the message. Supported values are base64 and jpeg. If not specified, the field will be returned as a JSON list.
as_labelobjectNoAn object that specifies dynamic computed labels that will be delivered to the client. Can be a basis for output filtering.

Encoding binary data

The encode property allows you to specify how to handle binary data in the extracted messages. You can choose to encode binary fields in the following formats:

FormatDescription
base64Encodes binary data as a base64 string, suitable for text-based formats like JSON.
jpegEncodes binary data as a JPEG image encoded to a base64 string, suitable for image data.
info

Currently, the extension supports encoding only sensor_msgs/msg/Image messages with encoding set to rgb8, bgr8, or mono8.

Extract output

Each output record contains:

  • JSON payload for the decoded ROS 2 message
  • timestamp from the message header
  • computed_labels with topic, schema, and encoding

Example

{
"ext": {
"ros": {
"rosbag": {},
"extract": {
"topic": "/test/geometry_msgs/accel"
}
}
}
}

Data transformation

The transform property allows you to filter topics from a rosbag and re-pack them into new rosbag episodes. The output is a ZIP archive with content type application/rosbag.

FieldTypeMandatoryDescription
includestring[]NoList of topics to include. Supports regular expressions. If omitted, all topics are considered.
excludestring[]NoList of topics to exclude. Applied after include.
durationstringNoMaximum duration per episode (e.g., 5m, 2h, 1d).
sizestringNoMaximum content length per episode (e.g., 100MB, 1GB).
info

If a topic pattern does not contain regex tokens (like .*, ^, $), it is treated as an exact match.

Transforming rosbag

When transform is provided, the extension:

  1. Reads the source rosbag.
  2. Applies topic filters (include first, then exclude).
  3. Streams matching messages into a new rosbag.
  4. Starts a new episode whenever duration or size is reached.

Example

{
"ext": {
"ros": {
"rosbag": {},
"transform": {
"include": ["/camera/.*", "^/imu/data$"],
"exclude": ["/camera/debug"],
"duration": "10m",
"size": "500MB"
}
}
}
}

Result

  • All topics starting with /camera/ are included (because of /camera/.*).
  • The topic /imu/data is included exactly (because of ^/imu/data$).
  • The topic /camera/debug is excluded, even though it matches the /camera/.* rule.
  • Output is a sequence of rosbag episodes, each ≤ 10 minutes or ≤ 500 MB (whichever comes first).

Examples

The following examples demonstrate how to use the ReductROS extension to extract and transform rosbag data. Although these examples are written in Python, they can be run using any of the official SDKs.

Extracting messages as JSON

This example demonstrates how to extract a single topic from a rosbag stored in ReductStore and convert it to JSON.

from time import time_ns
from pathlib import Path

from reduct import Client

HERE = Path(__file__).parent


async def main():
async with Client("http://localhost:8383", api_token="my-token") as client:
bucket = await client.create_bucket(
"my-bucket",
exist_ok=True,
)
# Write a rosbag ZIP archive with timestamps
now = time_ns() // 1000

with open(f"{HERE}/../data/rosbag_test.zip", "rb") as f:
data = f.read()

await bucket.write(
"rosbag",
data,
content_length=len(data),
timestamp=now,
content_type="application/rosbag",
)

# Prepare the query with the 'ros' extension
condition = {
"#ext": {
"ros": {
"rosbag": {},
"extract": {
"topic": "/test/geometry_msgs/accel",
"as_label": {
"lin_x": "linear.x",
},
},
},
"when": {
"@lin_x": {"$gt": 0.0},
},
}
}

# Query the data with the 'ros' extension
async for record in bucket.query("rosbag", start=now, when=condition):
print(f"Record entry: {record.entry}")
print(f"Record timestamp: {record.timestamp}")
print(f"Record labels: {record.labels}")

json = await record.read_all()
print(json.decode("utf-8").strip())


if __name__ == "__main__":
import asyncio

asyncio.run(main())

Expected output

The expected output of the above code is as follows:

Record timestamp: 1768398396081426
Record labels: {'encoding': 'cdr', 'schema': 'geometry_msgs/Accel', 'topic': '/test/geometry_msgs/accel', 'lin_x': '0.1'}
{"angular":{"x":0.0,"y":0.0,"z":0.1},"linear":{"x":0.1,"y":0.2,"z":9.8}}

Transforming rosbag with splitting and topic filtering

This example demonstrates how to filter topics from a rosbag and emit new rosbag episodes.

from time import time_ns
from pathlib import Path

from reduct import Client

HERE = Path(__file__).parent


async def main():
async with Client("http://localhost:8383", api_token="my-token") as client:
bucket = await client.create_bucket(
"my-bucket",
exist_ok=True,
)
# Write a rosbag ZIP archive with timestamps
now = time_ns() // 1000

with open(f"{HERE}/../data/rosbag_test.zip", "rb") as f:
data = f.read()

await bucket.write(
"rosbag",
data,
content_length=len(data),
timestamp=now,
content_type="application/rosbag",
)

# Prepare the query with the 'ros' extension
condition = {
"#ext": {
"ros": {
"rosbag": {},
"transform": {
"include": ["/test/geometry_msgs/accel"],
"duration": "1m",
"size": "100KB",
},
}
}
}

# Query the data with the 'ros' extension
async for record in bucket.query("rosbag", start=now, when=condition):
print(f"Record entry: {record.entry}")
print(f"Record timestamp: {record.timestamp}")

# Each record corresponds to a new rosbag episode
data = await record.read_all()
print(f"Episode file size: {len(data)} bytes")


if __name__ == "__main__":
import asyncio

asyncio.run(main())

Expected output

The expected output of the above code is as follows:

Record timestamp: 1768398396081426
Episode file size: 1783 bytes