MCAP
MCAP is a container format for recorded robotics data. In ROS 2, it commonly stores CDR-encoded messages across many topics with metadata like timestamps and schemas. ReductROS can extract and transform ROS 2 messages from MCAP files stored in ReductStore (content type application/mcap).
This feature is available under a commercial license. For testing, you can either use a free demo server (extension included) or request a demo license for your own deployment.
Query format
A user can use the ext query parameter to activate the ros extension and define the parameters for extracting and transforming ROS data in the following format:
{
"ext": {
"ros": {
"extract": {
"topic": "string", # ROS topic name to extract from
"encode": "object", # e.g., {"data":"jpeg"} for JPEG encoding
"as_label": "object" # e.g., {"label_name": "path_to_json"}
},
"transform": {
"include": "string[]", # e.g., ["/topic-.*"]
"exclude": "string[]", # e.g., ["/topic-b", "/ext-.*"]
"duration": "string", # e.g., "1m" (max episode duration)
"size": "string" # e.g., "100MB" (max content length)
}
}
}
}
Data extraction
The extract property allows you to specify the ROS topic from which to extract messages and convert them to JSON format.
The extension will read the MCAP file, decode the messages from the specified topic, and return each message as a JSON record with
the timestamp from the message header (when present).
| Parameter | Type | Mandatory | Description |
|---|---|---|---|
topic | string | Yes | The name of the ROS topic to extract messages from. This should match the topic names used in the MCAP file. |
encode | object | No | A dictionary specifying how to encode binary data in the messages. For example, {"data": "jpeg"} for JPEG encoding. |
encode.<field> | string | No | The encoding format for binary fields in the message. Supported values are base64 and jpeg. If not specified, the field will be returned as a JSON list. |
as_label | object | No | An object that specifies dynamic computed labels that will be delivered to the client. Can be a basis for output filtering. |
Encoding binary data
The encode property allows you to specify how to handle binary data in the extracted messages.
You can choose to encode binary fields in the following formats:
| Format | Description |
|---|---|
base64 | Encodes binary data as a base64 string, suitable for text-based formats like JSON. |
jpeg | Encodes binary data as a JPEG image encoded to a base64 string, suitable for image data. |
Currently, the extension supports encoding only sensor_msgs/msg/Image messages with encoding set to rgb8, bgr8, or mono8.
Data transformation
The transform property allows you to generate new MCAP episodes with topic filtering and size/duration limits.
| Field | Type | Mandatory | Description |
|---|---|---|---|
include | string[] | No | List of topics to include. Supports regular expressions. If omitted, all topics are considered. |
exclude | string[] | No | List of topics to exclude. Applied after include. |
duration | string | No | Maximum duration per episode (e.g., 5m, 2h, 1d). |
size | string | No | Maximum content length per episode (e.g., 100MB, 1GB). |
The size parameter refers to the uncompressed size of the episode. The actual file size may be smaller due to MCAP's chunk compression.
Transforming MCAP
When transform is provided, the extension:
- Reads the source MCAP files.
- Applies topic filters (
includefirst, thenexclude). - Streams matching messages into a new MCAP file.
- Starts a new episode whenever
durationorsizeis reached.
Example
{
"ext": {
"ros": {
"transform": {
"include": ["/camera/.*", "^/imu/data$"],
"exclude": ["/camera/debug"],
"duration": "10m",
"size": "500MB"
}
}
}
}
Result
- All topics starting with
/camera/are included (because of/camera/.*). - The topic
/imu/datais included exactly (because of^/imu/data$). - The topic
/camera/debugis excluded, even though it matches the/camera/.*rule. - Output is a sequence of MCAP episodes, each ≤ 10 minutes or ≤ 500 MB (whichever comes first).
Examples
The following examples demonstrate how to use the ReductROS extension to extract and transform ROS messages. Although this example is written in Python, it can be run using any of the official SDKs.
Extracting messages as JSON
This example demonstrates how to use the ROS extension to extract a topic from an MCAP file stored in ReductStore,
convert it to JSON format, and use as_label plus when for filtering.
- Python
from time import time_ns
from pathlib import Path
from reduct import Client
HERE = Path(__file__).parent
async def main():
async with Client("http://localhost:8383", api_token="my-token") as client:
bucket = await client.create_bucket(
"my-bucket",
exist_ok=True,
)
# Write a mcap file with timestamps
now = time_ns() // 1000
data = b""
with open(f"{HERE}/../data/file.mcap", "rb") as f:
data = f.read()
await bucket.write("mcap", data, content_length=len(data), timestamp=now, content_type="application/mcap")
# Prepare the query with the 'ros' extension
condition = {
"#ext": {
"ros": { # name of the extension to use
"extract": {
"topic": "/test", # Specify the topic to extract from the mcap file
"as_label": {
"data1": "data",
},
},
},
"when": {
"@data1": {"$eq": "hello"},
},
}
}
# Query the data with the 'ros' extension
async for record in bucket.query("mcap", start=now, when=condition):
print(f"Record entry: {record.entry}")
print(f"Record timestamp: {record.timestamp}")
print(f"Record labels: {record.labels}")
json = await record.read_all()
print(json.decode("utf-8").strip())
# 5. Run the main function
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Expected output
The expected output of the above code is as follows:
Record timestamp: 24
Record labels: {'@encoding': 'cdr', '@schema': 'std_msgs/String', '@topic': '/test', '@data1': 'hello'}
{"data":"hello"}
Explanation
- The extension extracts ROS 2 messages from an
.mcapfile stored in ReductStore. - Only messages from the topic
/testare selected using thetopicfilter in theros.extractconfiguration. - The content of each message is CDR-encoded and decoded by the extension.
- The decoded message is returned as JSON with the field
data, matching thestd_msgs/Stringschema. - Each record corresponds to one ROS 2 message and includes:
- The decoded JSON payload, e.g.,
{"data":"hello"} - Message metadata as labels, including:
topic:/testschema:std_msgs/Stringencoding:cdr
- The
as_labeloption adds@data1based on the JSON pathdata. - The
whenfilter selects only records where@data1equalshello.
Extracting messages as JSON with JPEG encoding
This example demonstrates how to use the ROS extension to extract topic from an MCAP file stored in ReductStore and convert it to JSON format, while also encoding binary image data into JPEG format.
- Python
import base64
import json
from time import time_ns
from pathlib import Path
from reduct import Client
HERE = Path(__file__).parent
async def main():
async with Client("http://localhost:8383", api_token="my-token") as client:
bucket = await client.create_bucket(
"my-bucket",
exist_ok=True,
)
# Write a mcap file with timestamps
now = time_ns() // 1000
data = b""
with open(f"{HERE}/../data/camera_bag_0.mcap", "rb") as f:
data = f.read()
await bucket.write("mcap", data, content_length=len(data), timestamp=now, content_type="application/mcap")
# Prepare the query with the 'ros' extension
condition = {
"#ext": {
"ros": { # name of the extension to use
"extract": {
"topic": "/image_raw",
# encode the data filed in http://docs.ros.org/en/noetic/api/sensor_msgs/html/msg/Image.html
"encode": {
"data": "jpeg",
},
},
},
"when": { # optional filter to apply
"$limit": 1, # return only one record
}
}
}
# Query the data with the 'ros' extension
async for record in bucket.query("mcap", start=now, when=condition):
print(f"Record entry: {record.entry}")
print(f"Record timestamp: {record.timestamp}")
print(f"Record labels: {record.labels}")
content = await record.read_all()
# Record content is a JSON object with metadata and base64-encoded data
obj = json.loads(content)
# Decode the base64-encoded data and save it as a JPEG file
with open("output.jpg", "wb") as f:
# Decode the base64-encoded data
encoded = base64.decodebytes(obj["data"].encode("ascii"))
f.write(encoded)
# Print the image parameters without the data field
del obj["data"]
print(f"Image parameters: {obj}")
# 5. Run the main function
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Expected output
The expected output of the above code is as follows:
Record timestamp: 1753341400522732
Record labels: {'@encoding': 'cdr', '@schema': 'sensor_msgs/Image', '@topic': '/image_raw'}
Image parameters: {'height': 720, 'width': 1280, 'is_bigendian': 0, 'encoding': 'rgb8', 'step': 3840, 'header': {'frame_id': 'camera', 'stamp': {'sec': 1753341400, 'nanosec': 522732248}}}
Explanation
- The extension extracts ROS 2 messages from an
.mcapfile stored in ReductStore. - Only messages from the topic
/image_raware selected using thetopicfilter in theros.extractconfiguration. - The content of each message is CDR-encoded and decoded by the extension.
- The decoded message is returned as JSON with the field
data, which contains the image data encoded in JPEG format. - Each record corresponds to one ROS 2 message and includes:
- The decoded image parameters, such as height, width, encoding, and step.
- The timestamp from the message header.
- Message metadata as labels, including:
topic:/image_rawschema:sensor_msgs/Imageencoding:cdr
Transforming MCAP with splitting and topic filtering
This example demonstrates how to use the ROS extension to create new MCAP episodes while including and excluding topics using regular expressions.
- Python
from time import time_ns
from pathlib import Path
from reduct import Client
HERE = Path(__file__).parent
from time import time_ns
from pathlib import Path
from reduct import Client
async def main():
async with Client("http://localhost:8383", api_token="my-token") as client:
bucket = await client.create_bucket(
"my-bucket",
exist_ok=True,
)
# Write an MCAP file with timestamps
now = time_ns() // 1000
with open(f"{HERE}/../data/multi_topic_5min.mcap", "rb") as f:
data = f.read()
await bucket.write(
"mcap",
data,
content_length=len(data),
timestamp=now,
content_type="application/mcap",
)
# Prepare the query with the 'ros' extension (transform)
condition = {
"#ext": {
"ros": { # name of the extension to use
"transform": {
"include": ["/topic-.*"],
"exclude": ["/topic-b", "/ext-.*"],
"duration": "1m",
"size": "100MB"
},
}
}
}
# Query the data with the 'ros' extension
async for record in bucket.query("mcap", start=now, when=condition):
print(f"Record entry: {record.entry}")
print(f"Record timestamp: {record.timestamp}")
# Each record corresponds to a new MCAP episode
data = await record.read_all()
print(f"Episode file size: {len(data)} bytes")
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Expected output
The expected output of the above code is as follows:
Record timestamp: 1755592486439270
Episode file size: 1602 bytes
Record timestamp: 1755592546439270
Episode file size: 1586 bytes
Record timestamp: 1755592606439270
Episode file size: 1582 bytes
Record timestamp: 1755592666439270
Episode file size: 1585 bytes
Record timestamp: 1755592726439270
Episode file size: 1558 bytes
Explanation
- The extension reads an
.mcapfile stored in ReductStore. - Topics matching
/topic-.*(aside from/topic-b) are included. - Topics matching
/ext-.*are excluded. - The extension writes a sequence of new MCAP episodes capped at 1 minute or 100 MB, whichever comes first.