ReductSelect Extension
The ReductSelect extension is a powerful tool that enables you to manipulate columnar data on the storage side. It allows you to select specific columns, filter data and perform various operations before returning it to the client.
The extension is only available under a commercial licence. If you wish to use it for non-commercial purposes, please contact us.
Data Format
Currently, the extension only supports comma-separated CSV data with and without headers. Stored CSV data can be single- or multi-line and can contain any number of columns.
The CSV format example:
[header1,header2,header3]
100,true,message string
200,false,another message
300,true,yet another message
Query Format
A user can use the ext
query parameter to specify the select
extension and define the columns to select from the CSV data in the following format:
{
# CSV specific parameters
"csv": {
"has_headers": "boolean" # Indicates if the CSV data has headers, default is false
},
"columns": [
{
# Column index to select
"index": "number",
# Name of the column to select, it can be used only if the CSV data has headers
"name": "string",
# Select a range of columns starting from this index until the end or `to` index
"from": "number",
# Select a range of columns starting from this index until the end or `from` index
"to": "number",
# Name of the label to use in the query condition
"as_label": "string"
}
]
}
CSV Parameters
The csv
object contains parameters that specify how to handle the CSV data. If it is not specified, the extension will use the default values.
Parameter | Type | Mandatory | Default | Description |
---|---|---|---|---|
has_headers | boolean | No | false | Indicates if the CSV data has headers. If set to true , the extension will use the first row as headers. Default is false . |
Column Selection
The columns
array contains objects that specify the columns to select from the columnar data.
Parameter | Type | Mandatory | Description |
---|---|---|---|
index | number | No* | The index of the column to select. It can be used with as_label to map the selected column to a computed label. |
name | string | No* | The name of the column to select. It can be used only if the CSV data has headers. |
from | number | No* | The starting index of the range of columns to select. It can be used with to to specify the end of the range. |
to | number | No* | The ending index of the range of columns to select. It can be used with from to specify the start of the range. |
as_label | string | No | The name of the label to use in the query condition. It can be used with index to map the selected column to a computed label. |
*Each column object must have an index, name or a range of columns to select.
Filtering
If a column is specified with the as_label
property, the extension will return the selected column as a computed label that can be used in the query condition.
The extension will apply the extension when
condition on each row of the columnar data, filtering the records based on the specified labels.
For example, for the following query:
{
"csv": {
"has_headers": true
},
"ext": {
"select": {
"columns": [
{
"name": "col1",
"as_label": "col1"
}
]
},
"when": {
"@col1": {
"$gt": 10
}
}
}
}
The following CSV record:
col1 | col2 | col3 |
---|---|---|
20 | false | another message |
0 | false | message string |
30 | true | yet another message |
will be transformed to:
col1 |
---|
20 |
30 |
You can find more information about the conditional queries in the Conditional Query Reference documentation.
When the extension merges the rows back into the output document, the computed labels retain the values from the first row of the columnar data.
Examples
The following examples demonstrate how to use the ReductSelect extension to select specific columns from a CSV file and filter its rows based on these columns. Although this example is written in Python, it can be run using any of the official SDKs.
Selecting Columns from CSV Data
This example demonstrates how to use the select extension to extract specific columns from CSV data stored in ReductStore, apply filtering based on column values, and return a transformed dataset.
- Python
from time import time_ns
from reduct import Client
async def main():
async with Client("http://localhost:8383", api_token="my-token") as client:
bucket = await client.create_bucket(
"my-bucket",
exist_ok=True,
)
# Write some CSV data with timestamps
now = time_ns() // 1000
await bucket.write("csv", "1,2,3,4,5\n6,7,8,9,10\n11,12,13,14,15\n", timestamp=now, content_type="text/csv")
await bucket.write("csv", "1,2,3,4,5\n6,7,8,9,10\n11,12,13,14,15\n", timestamp=now+1, content_type="text/csv")
# Prepare the query with the 'select' extension
ext = {
"select": { # name of the extension to use
"columns": [
# Select the first column and label it as 'col1'
{"index": 0, "as_label": "col1"},
# Select columns from 2nd to 4th column without labeling
{"from": 2, "to": 4},
],
},
"when": {
"@col1": {"$lt": 10}, # Filter out rows where the first column is greater than or equal to 10
},
}
# Query the data with the 'select' extension
async for record in bucket.query("csv", start=now, ext=ext):
print(f"Record timestamp: {record.timestamp}")
print(f"Record labels: {record.labels}")
csv = await record.read_all()
print(csv.decode("utf-8").strip())
# 5. Run the main function
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Expected Output
The expected output of the above code is as follows:
Record timestamp: 1749797653273752
Record labels: {'@col1': '1'}
1,3,4
6,8,9
Record timestamp: 1749797653273753
Record labels: {'@col1': '1'}
1,3,4
6,8,9
Explanation
- The extension extracts:
- Column 0 (1, 6, 11) as @col1, which becomes a label.
- Columns 2 to 4 (3rd to 5th values) to include in the result rows.
- The when clause filters out rows where @col1 >= 10 — removing the row 11,12,13,14,15.
- The result includes only the selected columns of the remaining rows:
- From 1,2,3,4,5: → 1 as label, 3,4 as data → becomes 1,3,4
- From 6,7,8,9,10: → 6 as label, 8,9 as data → becomes 6,8,9
- Each record corresponds to one of the two writes, and both match the filter (@col1 = 1 from the first row of each block).
Selecting Columns with Headers
This example demonstrates how to use the select extension to work with CSV data that includes a header row. You can extract specific columns by their header names and apply filters based on their values.
- Python
from time import time_ns
from reduct import Client
async def main():
async with Client("http://localhost:8383", api_token="my-token") as client:
bucket = await client.create_bucket(
"my-bucket",
exist_ok=True,
)
# Write some CSV data with timestamps
now = time_ns() // 1000
await bucket.write("csv", "a,b,c,d,e\n1,2,3,4,5\n6,7,8,9,10\n11,12,13,14,15\n", timestamp=now, content_type="text/csv")
await bucket.write("csv", "a,b,c,d,e\n1,2,3,4,5\n6,7,8,9,10\n11,12,13,14,15\n", timestamp=now+1, content_type="text/csv")
# Prepare the query with the 'select' extension
ext = {
"select": { # name of the extension to use
"csv": {
"has_headers": True, # Indicate that the CSV data has headers
},
"columns": [
# Select columns corresponding to the "e" header and map it to a label "col_e"
{"name": "e", "as_label": "col_e"},
],
},
"when": {
"@col_e": {"$lt": 10}, # Filter out rows where the "e" column is greater than or equal to 10
},
}
# Query the data with the 'select' extension
async for record in bucket.query("csv", start=now, ext=ext):
print(f"Record timestamp: {record.timestamp}")
print(f"Record labels: {record.labels}")
csv = await record.read_all()
print(csv.decode("utf-8").strip())
# 5. Run the main function
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Expected Output
The expected output of the above code is as follows:
Record timestamp: 1749797694120873
Record labels: {'@col_e': '5'}
e
5
Record timestamp: 1749797694120874
Record labels: {'@col_e': '5'}
e
5
Explanation
- The input CSV includes a header row: a,b,c,d,e.
- The
csv.has_headers=true
option tells the extension to use the first row as column headers. - The query selects only the column named "e" and exposes it as label @col_e.
- The when clause filters out any row where column "e" is 10 or more — so the rows with e=10 and e=15 are excluded.
- Only the row 1,2,3,4,5 remains, where e=5.
- The query runs over two records, so the same filtered output appears twice, once per record timestamp.