ReductSelect Extension
The ReductSelect extension is a powerful tool that enables you to manipulate columnar data on the storage side. It allows you to select specific columns, filter data and perform various operations before returning it to the client.
This feature is available in ReductStore Pro under a commercial license. For testing, you can either use a free demo server or request a demo license for your own deployment.
Query Format
ReductSelect uses SQL to select, filter, aggregate, and reorder rows.
A user can specify the SQL query and other parameters in the #ext.select object of the conditional query in the JSON format:
{
# CSV specific parameters
"csv": {
"has_headers": "boolean" # Indicates if the CSV data has headers, by default inferred from content type
},
# Switch to JSON format
"json": {
},
# Switch to Parquet format
"parquet": {
},
# SQL query to run against the current record. Default is "SELECT * FROM ENTRY()"
"sql": "string",
# Optional output controls
"export": {
# Output format: "csv", "json", or "parquet"
"format": "string",
# Batching: group SQL result rows into output records with at most this many rows each. Must be greater than 0.
"rows": "number",
# Batching: group SQL result rows into output records by source timestamp span, as microseconds or a duration string such as "1s", "1.5s", or "1m".
"duration": "number|string"
},
# Map SQL result columns to computed labels: label name -> SQL column name
"as_label": {
"label_name": "column_name"
}
}
SQL Source
The sql field contains the query to run against the current record.
The current input record is available through the case-sensitive ENTRY() table function.
If it is not specified, ReductSelect uses:
SELECT * FROM ENTRY()
For CSV data without headers, columns are exposed as column_0, column_1, and so on.
For CSV data with headers, JSON objects, and Parquet data, SQL columns use the original field names.
Nested JSON and Parquet fields can be referenced with dotted paths, for example vector.z or temp.status.
Parameters
| Parameter | Type | Mandatory | Default | Description |
|---|---|---|---|---|
csv | object | No | Inferred from content type | Enables CSV-specific parsing options. Cannot be used together with json or parquet. |
json | object | No | Inferred from content type | Selects JSON input parsing. Cannot be used together with csv or parquet. |
parquet | object | No | Inferred from content type | Selects Parquet input parsing. Cannot be used together with csv or json. |
sql | string | No | SELECT * FROM ENTRY() | SQL query executed against the current input record. |
export | object | No | Format-specific | Controls output format and batching. |
as_label | object | No | {} | Maps SQL result columns to computed labels using {"label_name": "column_name"}. Only primitive values are accepted. |
Export Parameters
| Parameter | Type | Mandatory | Default | Description |
|---|---|---|---|---|
format | string | No | Input-dependent | Output format. Supported values are csv, json, and parquet. |
rows | number | No | All rows | Batches SQL result rows into output records with at most this many rows per output record. Must be greater than 0. |
duration | number or string | No | Not set | Batches SQL result rows into output records by source timestamp span, as microseconds or a duration string such as 1s, 1.5s, or 1m. |
Computed Labels
The as_label object maps SQL result columns to computed labels.
Use {"label_name": "column_name"} to expose a SQL result column as @label_name.
Only primitive values like strings, numbers, and booleans can be mapped to computed labels.
For example, for the following query:
{
"#ext": {
"select": {
"sql": "SELECT col1 FROM ENTRY() WHERE col1 > 10",
"as_label": {
"col1": "col1"
}
}
}
}
The following CSV record:
| col1 | col2 | col3 |
|---|---|---|
| 20 | false | another message |
| 0 | false | message string |
| 30 | true | yet another message |
will be transformed to:
| col1 |
|---|
| 20 |
| 30 |
and the output record has the computed label @col1.
You can find more information about the conditional queries in the Conditional Query Reference documentation.
When an output record contains multiple rows, computed labels use the values from the last row in that output record.
Data Formats
Currently, the extension supports JSON, comma-separated CSV, and Parquet data.
CSV Support
ReductSelect processes CSV records when the record content type is text/csv or when the query explicitly contains a csv object.
Stored CSV data can be single- or multi-line and can contain any number of columns.
The extension converts CSV rows into a SQL table before running the query.
The CSV format example:
header1,header2,header3
100,true,message string
200,false,another message
300,true,yet another message
Parameters
The csv object contains parameters that specify how to handle CSV data.
It can also be used to force CSV parsing when the record content type is not text/csv.
| Parameter | Type | Mandatory | Default | Description |
|---|---|---|---|---|
has_headers | boolean | No | From content type | Indicates if the CSV data has headers. If set, it overrides content-type inference and controls whether the first row is used as column names. |
If has_headers is false, SQL columns are generated as column_0, column_1, and so on.
If has_headers is true, SQL columns use the names from the first CSV row.
SQL Examples
For CSV data without headers, select generated column names:
{
"sql": "SELECT column_0, column_2, column_3 FROM ENTRY() WHERE column_0 < 10"
}
For CSV data with headers, enable csv.has_headers and select by header name:
{
"csv": {
"has_headers": true
},
"sql": "SELECT e FROM ENTRY() WHERE e < 10"
}
If the stored record content type includes header=present, ReductSelect treats the first CSV row as headers.
For example, records with the content type text/csv; header=present do not need the explicit csv.has_headers query parameter.
JSON Support
Parameters
ReductSelect processes JSON records when the record content type is application/json or when the query explicitly contains a json object.
The json object can be used to force JSON parsing when the record content type is different.
Field Selection
The extension converts JSON objects into a SQL table before running the query. Use SQL to select JSON fields. Nested fields can be addressed with dotted paths.
{
"json": {},
"sql": "SELECT temp.status AS status, temp.value AS value FROM ENTRY() WHERE temp.status = 'ok'"
}
Parquet Support
Parameters
ReductSelect processes Parquet records when the record content type is application/vnd.apache.parquet or when the query explicitly contains a parquet object.
The parquet object can be used to force Parquet parsing when the record content type is different.
The extension reads the Parquet schema and exposes its fields as SQL columns.
{
"parquet": {},
"sql": "SELECT name, value FROM ENTRY() WHERE value > 10",
"export": {
"format": "parquet",
"rows": 100
}
}
Parquet input supports the same SQL processing as CSV and JSON, including nested field selection when the source schema contains nested structures.
Examples
The following examples demonstrate how to use the ReductSelect extension to select specific columns from a CSV file and filter its rows based on these columns. Although this example is written in Python, it can be run using any of the official SDKs.
Selecting Columns from CSV Data
This example demonstrates how to use the select extension to extract specific columns from CSV data stored in ReductStore, apply filtering based on column values, and return a transformed dataset.
- Python
from time import time_ns
from reduct import Client
async def main():
async with Client("http://localhost:8383", api_token="my-token") as client:
bucket = await client.create_bucket(
"my-bucket",
exist_ok=True,
)
# Write some CSV data with timestamps
now = time_ns() // 1000
await bucket.write(
"csv",
"1,2,3,4,5\n6,7,8,9,10\n11,12,13,14,15\n",
timestamp=now,
content_type="text/csv",
)
await bucket.write(
"csv",
"1,2,3,4,5\n6,7,8,9,10\n11,12,13,14,15\n",
timestamp=now + 1,
content_type="text/csv",
)
# Prepare the query with the 'select' extension
condition = {
"#ext": {
"select": { # name of the extension to use
# Select columns and filter rows with SQL. CSV columns without
# headers are exposed as column_0, column_1, and so on.
"sql": "SELECT column_0, column_2, column_3 FROM ENTRY() WHERE column_0 < 10",
},
}
}
# Query the data with the 'select' extension
async for record in bucket.query("csv", start=now, when=condition):
print(f"Record timestamp: {record.timestamp}")
print(f"Record labels: {record.labels}")
csv = await record.read_all()
print(csv.decode("utf-8").strip())
# 5. Run the main function
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Expected Output
The expected output of the above code is as follows:
Record timestamp: 1749797653273752
Record labels: {}
column_0,column_2,column_3
1,3,4
6,8,9
Record timestamp: 1749797653273753
Record labels: {}
column_0,column_2,column_3
1,3,4
6,8,9
Explanation
- CSV data without headers is exposed to SQL as generated columns:
column_0,column_1, and so on. - The SQL query selects
column_0,column_2, andcolumn_3. - The SQL
WHEREclause filters out rows wherecolumn_0 >= 10, removing the row11,12,13,14,15. - The result includes only the selected columns of the remaining rows:
- From
1,2,3,4,5to1,3,4 - From
6,7,8,9,10to6,8,9
- From
- The query runs over two records, so the same filtered output appears twice, once per record timestamp.
Selecting Columns from CSV Data with Headers
This example demonstrates how to use the select extension to work with CSV data that includes a header row. You can extract specific columns by their header names and apply filters based on their values.
- Python
from time import time_ns
from reduct import Client
async def main():
async with Client("http://localhost:8383", api_token="my-token") as client:
bucket = await client.create_bucket(
"my-bucket",
exist_ok=True,
)
# Write some CSV data with timestamps
now = time_ns() // 1000
await bucket.write(
"csv",
"a,b,c,d,e\n1,2,3,4,5\n6,7,8,9,10\n11,12,13,14,15\n",
timestamp=now,
content_type="text/csv",
)
await bucket.write(
"csv",
"a,b,c,d,e\n1,2,3,4,5\n6,7,8,9,10\n11,12,13,14,15\n",
timestamp=now + 1,
content_type="text/csv",
)
# Prepare the query with the 'select' extension
condition = {
"#ext": {
"select": { # name of the extension to use
"csv": {
"has_headers": True, # Indicate that the CSV data has headers
},
# Select the "e" column and filter out rows where it is 10 or more.
"sql": "SELECT e FROM ENTRY() WHERE e < 10",
},
}
}
# Query the data with the 'select' extension
async for record in bucket.query("csv", start=now, when=condition):
print(f"Record timestamp: {record.timestamp}")
print(f"Record labels: {record.labels}")
csv = await record.read_all()
print(csv.decode("utf-8").strip())
# 5. Run the main function
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Expected Output
The expected output of the above code is as follows:
Record timestamp: 1749797694120873
Record labels: {}
e
5
Record timestamp: 1749797694120874
Record labels: {}
e
5
Explanation
- The input CSV includes a header row: a,b,c,d,e.
- The
csv.has_headers=trueoption tells the extension to use the first row as column headers. - The SQL query selects only the column named
e. - The SQL
WHEREclause filters out any row where columneis 10 or more, so the rows withe=10ande=15are excluded. - Only the row 1,2,3,4,5 remains, where e=5.
- The query runs over two records, so the same filtered output appears twice, once per record timestamp.
Selecting Fields from JSON Data
This example demonstrates how to use the select extension to extract specific fields from JSON data stored in ReductStore, apply filtering based on field values, and return a transformed dataset.
- Python
import json
from time import time_ns
from reduct import Client
async def main():
async with Client("http://localhost:8383", api_token="my-token") as client:
bucket = await client.create_bucket(
"my-bucket",
exist_ok=True,
)
# Write some JSON data with timestamps
now = time_ns() // 1000
await bucket.write(
"data",
json.dumps({"temp": {"value": 10.0, "status": "ok"}, "src": "#1"}),
timestamp=now,
content_type="application/json",
)
await bucket.write(
"data",
json.dumps({"temp": {"value": -4.0, "status": "bad"}, "src": "#1"}),
timestamp=now + 1000,
content_type="application/json",
)
# Prepare the query with the 'select' extension
condition = {
"#ext": {
"select": { # name of the extension to use
"json": {}, # Specify JSON format for the data
# Select nested JSON fields with SQL.
"sql": "SELECT temp.status AS status, temp.value AS value FROM ENTRY() WHERE temp.status = 'ok'",
},
}
}
# Query the data with the 'select' extension
async for record in bucket.query("data", start=now, when=condition):
print(f"Record timestamp: {record.timestamp}")
print(f"Record labels: {record.labels}")
json_data = json.loads(await record.read_all())
print(f"JSON data: {json_data}")
# 5. Run the main function
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Expected Output
The expected output of the above code is as follows:
Record timestamp: 1753776773034850
Record labels: {}
JSON data: [{'status': 'ok', 'value': 10.0}]
Explanation
- The
jsonobject specifies that the data is in JSON format. - The SQL query selects the nested fields
temp.statusandtemp.value. - The SQL
WHEREclause filters out any record wheretemp.statusis not equal took. - The result includes only the selected field of the remaining records:
- The record with
temp.statusequal tookis returned as a JSON object withstatusandvaluefields.
- The record with