Skip to main content
Version: Next

ReductSelect Extension

The ReductSelect extension is a powerful tool that enables you to manipulate columnar data on the storage side. It allows you to select specific columns, filter data and perform various operations before returning it to the client.

License Information

This feature is available in ReductStore Pro under a commercial license. For testing, you can either use a free demo server or request a demo license for your own deployment.

Query Format

ReductSelect uses SQL to select, filter, aggregate, and reorder rows.

A user can specify the SQL query and other parameters in the #ext.select object of the conditional query in the JSON format:

{
# CSV specific parameters
"csv": {
"has_headers": "boolean" # Indicates if the CSV data has headers, by default inferred from content type
},
# Switch to JSON format
"json": {
},
# Switch to Parquet format
"parquet": {
},
# SQL query to run against the current record. Default is "SELECT * FROM ENTRY()"
"sql": "string",
# Optional output controls
"export": {
# Output format: "csv", "json", or "parquet"
"format": "string",
# Batching: group SQL result rows into output records with at most this many rows each. Must be greater than 0.
"rows": "number",
# Batching: group SQL result rows into output records by source timestamp span, as microseconds or a duration string such as "1s", "1.5s", or "1m".
"duration": "number|string"
},
# Map SQL result columns to computed labels: label name -> SQL column name
"as_label": {
"label_name": "column_name"
}
}

SQL Source

The sql field contains the query to run against the current record. The current input record is available through the case-sensitive ENTRY() table function.

If it is not specified, ReductSelect uses:

SELECT * FROM ENTRY()

For CSV data without headers, columns are exposed as column_0, column_1, and so on. For CSV data with headers, JSON objects, and Parquet data, SQL columns use the original field names. Nested JSON and Parquet fields can be referenced with dotted paths, for example vector.z or temp.status.

Parameters

ParameterTypeMandatoryDefaultDescription
csvobjectNoInferred from content typeEnables CSV-specific parsing options. Cannot be used together with json or parquet.
jsonobjectNoInferred from content typeSelects JSON input parsing. Cannot be used together with csv or parquet.
parquetobjectNoInferred from content typeSelects Parquet input parsing. Cannot be used together with csv or json.
sqlstringNoSELECT * FROM ENTRY()SQL query executed against the current input record.
exportobjectNoFormat-specificControls output format and batching.
as_labelobjectNo{}Maps SQL result columns to computed labels using {"label_name": "column_name"}. Only primitive values are accepted.

Export Parameters

ParameterTypeMandatoryDefaultDescription
formatstringNoInput-dependentOutput format. Supported values are csv, json, and parquet.
rowsnumberNoAll rowsBatches SQL result rows into output records with at most this many rows per output record. Must be greater than 0.
durationnumber or stringNoNot setBatches SQL result rows into output records by source timestamp span, as microseconds or a duration string such as 1s, 1.5s, or 1m.

Computed Labels

The as_label object maps SQL result columns to computed labels. Use {"label_name": "column_name"} to expose a SQL result column as @label_name. Only primitive values like strings, numbers, and booleans can be mapped to computed labels.

For example, for the following query:

{
"#ext": {
"select": {
"sql": "SELECT col1 FROM ENTRY() WHERE col1 > 10",
"as_label": {
"col1": "col1"
}
}
}
}

The following CSV record:

col1col2col3
20falseanother message
0falsemessage string
30trueyet another message

will be transformed to:

col1
20
30

and the output record has the computed label @col1.

You can find more information about the conditional queries in the Conditional Query Reference documentation.

info

When an output record contains multiple rows, computed labels use the values from the last row in that output record.

Data Formats

Currently, the extension supports JSON, comma-separated CSV, and Parquet data.

CSV Support

ReductSelect processes CSV records when the record content type is text/csv or when the query explicitly contains a csv object. Stored CSV data can be single- or multi-line and can contain any number of columns. The extension converts CSV rows into a SQL table before running the query.

The CSV format example:

header1,header2,header3
100,true,message string
200,false,another message
300,true,yet another message

Parameters

The csv object contains parameters that specify how to handle CSV data. It can also be used to force CSV parsing when the record content type is not text/csv.

ParameterTypeMandatoryDefaultDescription
has_headersbooleanNoFrom content typeIndicates if the CSV data has headers. If set, it overrides content-type inference and controls whether the first row is used as column names.

If has_headers is false, SQL columns are generated as column_0, column_1, and so on. If has_headers is true, SQL columns use the names from the first CSV row.

SQL Examples

For CSV data without headers, select generated column names:

{
"sql": "SELECT column_0, column_2, column_3 FROM ENTRY() WHERE column_0 < 10"
}

For CSV data with headers, enable csv.has_headers and select by header name:

{
"csv": {
"has_headers": true
},
"sql": "SELECT e FROM ENTRY() WHERE e < 10"
}
info

If the stored record content type includes header=present, ReductSelect treats the first CSV row as headers. For example, records with the content type text/csv; header=present do not need the explicit csv.has_headers query parameter.

JSON Support

Parameters

ReductSelect processes JSON records when the record content type is application/json or when the query explicitly contains a json object. The json object can be used to force JSON parsing when the record content type is different.

Field Selection

The extension converts JSON objects into a SQL table before running the query. Use SQL to select JSON fields. Nested fields can be addressed with dotted paths.

{
"json": {},
"sql": "SELECT temp.status AS status, temp.value AS value FROM ENTRY() WHERE temp.status = 'ok'"
}

Parquet Support

Parameters

ReductSelect processes Parquet records when the record content type is application/vnd.apache.parquet or when the query explicitly contains a parquet object. The parquet object can be used to force Parquet parsing when the record content type is different. The extension reads the Parquet schema and exposes its fields as SQL columns.

{
"parquet": {},
"sql": "SELECT name, value FROM ENTRY() WHERE value > 10",
"export": {
"format": "parquet",
"rows": 100
}
}

Parquet input supports the same SQL processing as CSV and JSON, including nested field selection when the source schema contains nested structures.

Examples

The following examples demonstrate how to use the ReductSelect extension to select specific columns from a CSV file and filter its rows based on these columns. Although this example is written in Python, it can be run using any of the official SDKs.

Selecting Columns from CSV Data

This example demonstrates how to use the select extension to extract specific columns from CSV data stored in ReductStore, apply filtering based on column values, and return a transformed dataset.

from time import time_ns
from reduct import Client


async def main():
async with Client("http://localhost:8383", api_token="my-token") as client:
bucket = await client.create_bucket(
"my-bucket",
exist_ok=True,
)
# Write some CSV data with timestamps
now = time_ns() // 1000
await bucket.write(
"csv",
"1,2,3,4,5\n6,7,8,9,10\n11,12,13,14,15\n",
timestamp=now,
content_type="text/csv",
)
await bucket.write(
"csv",
"1,2,3,4,5\n6,7,8,9,10\n11,12,13,14,15\n",
timestamp=now + 1,
content_type="text/csv",
)

# Prepare the query with the 'select' extension
condition = {
"#ext": {
"select": { # name of the extension to use
# Select columns and filter rows with SQL. CSV columns without
# headers are exposed as column_0, column_1, and so on.
"sql": "SELECT column_0, column_2, column_3 FROM ENTRY() WHERE column_0 < 10",
},
}
}

# Query the data with the 'select' extension
async for record in bucket.query("csv", start=now, when=condition):
print(f"Record timestamp: {record.timestamp}")
print(f"Record labels: {record.labels}")

csv = await record.read_all()
print(csv.decode("utf-8").strip())


# 5. Run the main function
if __name__ == "__main__":
import asyncio

asyncio.run(main())

Expected Output

The expected output of the above code is as follows:

Record timestamp: 1749797653273752
Record labels: {}
column_0,column_2,column_3
1,3,4
6,8,9
Record timestamp: 1749797653273753
Record labels: {}
column_0,column_2,column_3
1,3,4
6,8,9

Explanation

  • CSV data without headers is exposed to SQL as generated columns: column_0, column_1, and so on.
  • The SQL query selects column_0, column_2, and column_3.
  • The SQL WHERE clause filters out rows where column_0 >= 10, removing the row 11,12,13,14,15.
  • The result includes only the selected columns of the remaining rows:
    • From 1,2,3,4,5 to 1,3,4
    • From 6,7,8,9,10 to 6,8,9
  • The query runs over two records, so the same filtered output appears twice, once per record timestamp.

Selecting Columns from CSV Data with Headers

This example demonstrates how to use the select extension to work with CSV data that includes a header row. You can extract specific columns by their header names and apply filters based on their values.

from time import time_ns
from reduct import Client


async def main():
async with Client("http://localhost:8383", api_token="my-token") as client:
bucket = await client.create_bucket(
"my-bucket",
exist_ok=True,
)
# Write some CSV data with timestamps
now = time_ns() // 1000
await bucket.write(
"csv",
"a,b,c,d,e\n1,2,3,4,5\n6,7,8,9,10\n11,12,13,14,15\n",
timestamp=now,
content_type="text/csv",
)
await bucket.write(
"csv",
"a,b,c,d,e\n1,2,3,4,5\n6,7,8,9,10\n11,12,13,14,15\n",
timestamp=now + 1,
content_type="text/csv",
)

# Prepare the query with the 'select' extension
condition = {
"#ext": {
"select": { # name of the extension to use
"csv": {
"has_headers": True, # Indicate that the CSV data has headers
},
# Select the "e" column and filter out rows where it is 10 or more.
"sql": "SELECT e FROM ENTRY() WHERE e < 10",
},
}
}

# Query the data with the 'select' extension
async for record in bucket.query("csv", start=now, when=condition):
print(f"Record timestamp: {record.timestamp}")
print(f"Record labels: {record.labels}")

csv = await record.read_all()
print(csv.decode("utf-8").strip())


# 5. Run the main function
if __name__ == "__main__":
import asyncio

asyncio.run(main())

Expected Output

The expected output of the above code is as follows:

Record timestamp: 1749797694120873
Record labels: {}
e
5
Record timestamp: 1749797694120874
Record labels: {}
e
5

Explanation

  • The input CSV includes a header row: a,b,c,d,e.
  • The csv.has_headers=true option tells the extension to use the first row as column headers.
  • The SQL query selects only the column named e.
  • The SQL WHERE clause filters out any row where column e is 10 or more, so the rows with e=10 and e=15 are excluded.
  • Only the row 1,2,3,4,5 remains, where e=5.
  • The query runs over two records, so the same filtered output appears twice, once per record timestamp.

Selecting Fields from JSON Data

This example demonstrates how to use the select extension to extract specific fields from JSON data stored in ReductStore, apply filtering based on field values, and return a transformed dataset.

import json
from time import time_ns
from reduct import Client


async def main():
async with Client("http://localhost:8383", api_token="my-token") as client:
bucket = await client.create_bucket(
"my-bucket",
exist_ok=True,
)
# Write some JSON data with timestamps
now = time_ns() // 1000
await bucket.write(
"data",
json.dumps({"temp": {"value": 10.0, "status": "ok"}, "src": "#1"}),
timestamp=now,
content_type="application/json",
)
await bucket.write(
"data",
json.dumps({"temp": {"value": -4.0, "status": "bad"}, "src": "#1"}),
timestamp=now + 1000,
content_type="application/json",
)

# Prepare the query with the 'select' extension
condition = {
"#ext": {
"select": { # name of the extension to use
"json": {}, # Specify JSON format for the data
# Select nested JSON fields with SQL.
"sql": "SELECT temp.status AS status, temp.value AS value FROM ENTRY() WHERE temp.status = 'ok'",
},
}
}

# Query the data with the 'select' extension
async for record in bucket.query("data", start=now, when=condition):
print(f"Record timestamp: {record.timestamp}")
print(f"Record labels: {record.labels}")

json_data = json.loads(await record.read_all())
print(f"JSON data: {json_data}")


# 5. Run the main function
if __name__ == "__main__":
import asyncio

asyncio.run(main())

Expected Output

The expected output of the above code is as follows:

Record timestamp: 1753776773034850
Record labels: {}
JSON data: [{'status': 'ok', 'value': 10.0}]

Explanation

  • The json object specifies that the data is in JSON format.
  • The SQL query selects the nested fields temp.status and temp.value.
  • The SQL WHERE clause filters out any record where temp.status is not equal to ok.
  • The result includes only the selected field of the remaining records:
    • The record with temp.status equal to ok is returned as a JSON object with status and value fields.