Data Management With ReductStore
ReductStore provides a set of methods to manage data in the database. This guide provides an overview of typical data management operations in ReductStore and explains how to perform them using the ReductStore SDKs or the HTTP API.
Changing Labels​
ReductStore doesn't allow you to change the contents of records, but it does allow you to change the labels of existing records. You can add, remove or update labels for a single record or for a batch of records to avoid HTTP requests overhead.
You should send a label as an empty string to remove it.
- Python
- JavaScript
- Rust
- C++
- cURL
import time
import asyncio
from reduct import Client, Bucket, Batch
async def main():
# Create a client instance, then get or create a bucket
async with Client("http://127.0.0.1:8383", api_token="my-token") as client:
bucket: Bucket = await client.create_bucket("my-bucket", exist_ok=True)
# Send some records to the "py-example" entry with labels
ts = time.time()
await bucket.write(
"py-example",
b"Some binary data",
ts,
labels={"key1": "value1", "key2": "value2"},
)
await bucket.write(
"py-example",
b"Some binary data",
ts + 1,
labels={"key1": "value1", "key2": "value2"},
)
# Update labels of a record: remove "key2" and update "key1"
await bucket.update("py-example", ts, labels={"key1": "new-value", "key2": ""})
async with bucket.read("py-example", ts) as record:
assert record.labels["key1"] == "new-value"
assert "key2" not in record.labels
# Update labels in a batch
batch = Batch()
batch.add(ts, labels={"key1": "new-value", "key2": ""})
batch.add(ts + 1, labels={"key3": "value3"})
errors = await bucket.update_batch("py-example", batch)
assert not errors
if __name__ == "__main__":
import asyncio
asyncio.run(main())
import { Client } from "reduct-js";
import assert from "node:assert";
// Create a client instance, then get or create a bucket
const client = new Client("http://127.0.0.1:8383", { apiToken: "my-token" });
const bucket = await client.getOrCreateBucket("bucket");
// Send some records to the "entry-1" entry with labels
const timestamp = BigInt(Date.now()) * 1000n;
let record = await bucket.beginWrite("entry-1", {
ts: timestamp,
labels: { key1: "value1", key2: "value2" },
});
await record.write("Some binary data");
record = await bucket.beginWrite("entry-1", {
ts: timestamp + 1000_000n,
labels: { key1: "value1", key2: "value2" },
});
await record.write("Some more binary data");
// Update labels of a record: remove "key2" and update "key1"
await bucket.update("entry-1", timestamp, { key1: "new-value", key2: "" });
record = await bucket.beginRead("entry-1", timestamp, true); // only labels
assert(record.labels["key1"] === "new-value");
assert(record.labels["key2"] === undefined);
// Update labels in a batch
const batch = await bucket.beginUpdateBatch("entry-1");
batch.addOnlyLabels(timestamp, { label1: "value1", label2: "" });
batch.addOnlyLabels(timestamp + 1000_000n, { label3: "value3" });
const errors = await batch.write();
assert(errors.size === 0);
use std::time::{Duration, SystemTime};
use bytes::Bytes;
use reduct_rs::{RecordBuilder, ReductClient, ReductError};
use tokio;
#[tokio::main]
async fn main() -> Result<(), ReductError> {
// Create a client instance, then get or create a bucket
let client = ReductClient::builder()
.url("http://127.0.0.1:8383")
.api_token("my-token")
.build();
let bucket = client.create_bucket("test").exist_ok(true).send().await?;
// Send some records to the "rs-example" entry with labels "key1=value1" and "key2=value2
let timestamp = SystemTime::now();
bucket
.write_record("rs-example")
.timestamp(timestamp)
.add_label("key1", "value1")
.add_label("key2", "value2")
.data(Bytes::from("Some binary data"))
.send()
.await?;
bucket
.write_record("rs-example")
.timestamp(timestamp + Duration::from_secs(1))
.add_label("key1", "value1")
.add_label("key2", "value2")
.data(Bytes::from("Some more binary data"))
.send()
.await?;
// Update labels of a record: remove "key2" and update "key1"
bucket
.update_record("rs-example")
.timestamp(timestamp)
.remove_label("key2")
.update_label("key1", "value3")
.send()
.await?;
let record = bucket.read_record("rs-example").timestamp(timestamp).send().await?;
assert_eq!(record.labels().get("key1"), Some(&"value3".to_string()));
assert_eq!(record.labels().get("key2"), None);
// Update labels in a batch
let record1 = RecordBuilder::new()
.timestamp(timestamp)
.add_label("key1", "value1")
.add_label("key2", "") // Remove label "key2"
.build();
let record2 = RecordBuilder::new()
.timestamp(timestamp + Duration::from_secs(1))
.add_label("key1", "value1")
.add_label("key2", "") // Remove label "key2"
.build();
let errors = bucket.update_batch("rs-example")
.add_records(vec![record1, record2])
.send()
.await?;
assert_eq!(errors.len(), 0);
Ok(())
}
#include <reduct/client.h>
#include <cassert>
#include <chrono>
using reduct::IBucket;
using reduct::IClient;
using reduct::Error;
using std::chrono_literals::operator ""s;
int main() {
// Create a client instance, then get or create a bucket
auto client = IClient::Build("http://127.0.0.1:8383", {.api_token="my-token"});
auto [bucket, create_err] = client->GetOrCreateBucket("my-bucket");
assert(create_err == Error::kOk);
// Send some records to the "cpp-example" entry with labels
IBucket::Time ts = IBucket::Time::clock::now();
auto err = bucket->Write("cpp-example", {
.timestamp = ts,
.labels = {{"key1", "value1"},
{"key2", "value2"}}
}, [](auto rec) {
rec->WriteAll("Some binary data");
});
assert(err == Error::kOk);
err = bucket->Write("cpp-example", {
.timestamp = ts + 1s,
.labels = {{"key1", "value1"},
{"key2", "value2"}}
}, [](auto rec) {
rec->WriteAll("Some binary data");
});
assert(err == Error::kOk);
// Update labels of a record: remove "key2" and update "key1"
err = bucket->Update("cpp-example", {
.timestamp = ts,
.labels = {{"key1", "value3"},
{"key2", ""}}
});
assert(err == Error::kOk);
err = bucket->Read("cpp-example", ts, [](auto rec) {
assert(rec.labels["key1"] == "value3");
assert(rec.labels["key2"] == "");
return true;
});
assert(err == Error::kOk);
// Update labels in a batch
auto [record_errors, http_err] = bucket->UpdateBatch("cpp-example", [ts](auto batch) {
batch->AddOnlyLabels(ts, {{"key1", "value4"}});
batch->AddOnlyLabels(ts + 1s, {{"key1", "value4"}});
});
assert(http_err == Error::kOk);
assert(record_errors.empty());
return 0;
}
#!/bin/bash
set -e -x
API_PATH="http://127.0.0.1:8383/api/v1"
AUTH_HEADER="Authorization: Bearer my-token"
# Send a record with labels a
TIME=`date +%s000000`
curl -d "Some binary data" \
-H "${AUTH_HEADER}" \
-H "x-reduct-label-key1: value1" \
-H "x-reduct-label-key2: value2" \
-H "Content-Type: plain/text" \
-X POST -a ${API_PATH}/b/example-bucket/entry_1?ts=${TIME}
# Update the "key1" label to "new_value1" and remove the "key2" label
curl -H "${AUTH_HEADER}" \
-H "x-reduct-label-key1: new_value1" \
-H "x-reduct-label-key2: " \
-X PATCH -a ${API_PATH}/b/example-bucket/entry_1/labels
Deleting Data​
The ReductStore API allows you to delete specific records from an entry, an entire entry, or an entire bucket. You can do this using the Client SDKs, the Reduct CLI, or the HTTP API.
Deleting Entry or Bucket​
The following examples show how to delete the entire entry or the entire bucket:
- Python
- JavaScript
- Rust
- C++
- CLI
- cURL
from reduct import Client, Bucket
async def main():
# Create a client with the base URL and API token
async with Client("http://localhost:8383", api_token="my-token") as client:
# Get bucket to remove
bucket: Bucket = await client.get_bucket("bucket-to-remove")
# Delete only entry with name "example-entry"
await bucket.remove_entry("example-entry")
# Remove entire bucket
await bucket.remove()
if __name__ == "__main__":
import asyncio
asyncio.run(main())
import { Client } from "reduct-js";
import assert from "node:assert";
// Create a new client with the server URL and an API token
const client = new Client("http://127.0.0.1:8383", { apiToken: "my-token" });
// Get bucket to remove
const bucket = await client.getBucket("bucket-to-remove");
// Delete only entry with name "example-entry"
await bucket.removeEntry("example-entry");
// Remove entire bucket
await bucket.remove();
use reduct_rs::{ErrorCode, ReductClient, ReductError};
use tokio;
#[tokio::main]
async fn main() -> Result<(), ReductError> {
// Create a new client with the API URL and API token
let client = ReductClient::builder()
.url("http://127.0.0.1:8383")
.api_token("my-token")
.build();
// Get bucket to remove
let bucket = client.get_bucket("bucket-to-remove").await?;
// Delete only entry with name "example-entry"
bucket.remove_entry("example-entry").await?;
// Remove entire bucket
bucket.remove().await?;
Ok(())
}
#include <reduct/client.h>
#include <iostream>
#include <cassert>
using reduct::IBucket;
using reduct::IClient;
using reduct::Error;
int main() {
// Create a client with the server URL
auto client = IClient::Build("http://127.0.0.1:8383", {
.api_token = "my-token"
});
// Get bucket to remove
auto [bucket, get_err] = client->GetBucket("bucket-to-remove");
assert(get_err == Error::kOk);
// Delete only entry with name "example-entry"
auto remove_entry_err = bucket->RemoveEntry("example-entry");
assert(remove_entry_err == Error::kOk);
// Remove entire bucket
auto remove_err = bucket->Remove();
assert(remove_err == Error::kOk);
}
reduct-cli alias add local -L http://localhost:8383 -t "my-token"
# Delete only entry with name "example-entry"
reduct-cli bucket rm local/bucket-to-remove --only-entries example-entry
# Delete entire bucket without confirmation
reduct-cli bucket rm local/bucket-to-remove -y
#!/bin/bash
set -e -x
API_PATH="http://127.0.0.1:8383/api/v1"
AUTH_HEADER="Authorization: Bearer my-token"
# Delete only entry with name "example-entry"
curl -X DELETE \
-H "${AUTH_HEADER}" \
-a "${API_PATH}"/b/bucket-to-remove/example-entry
# Delete entire bucket
curl -X DELETE \
-H "${AUTH_HEADER}" \
-a "${API_PATH}"/b/bucket-to-remove
Deleting Specific Records​
You can remove a single record from an entry or a set of records by specifying their timestamps.
- Python
- JavaScript
- Rust
- C++
- cURL
from time import time
from reduct import Client, Bucket, Batch
async def main():
# Create a client with the base URL and API token
async with Client("http://localhost:8383", api_token="my-token") as client:
bucket: Bucket = await client.create_bucket("my-bucket", exist_ok=True)
# Send some records to the "py-example"
ts = time()
await bucket.write(
"py-example",
b"Some binary data",
ts
)
await bucket.write(
"py-example",
b"Some binary data",
ts + 1
)
# Delete a single record
await bucket.remove_record("py-example", ts)
# Delete a batch of records
batch = Batch()
batch.add(ts) # already deleted, so this error will be in the errors map
batch.add(ts + 1)
errors = await bucket.remove_batch("py-example", batch)
assert len(errors) == 1
print(errors)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
import { Client } from "reduct-js";
import assert from "node:assert";
// Create a client instance, then get or create a bucket
const client = new Client("http://127.0.0.1:8383", { apiToken: "my-token" });
const bucket = await client.getOrCreateBucket("bucket");
// Send some records to the "entry-1" entry
const timestamp = BigInt(Date.now()) * 1000n;
let record = await bucket.beginWrite("entry-1", timestamp);
await record.write("Some binary data");
record = await bucket.beginWrite("entry-1", timestamp + 1000_000n);
await record.write("Some more binary data");
// Delete a single record
await bucket.removeRecord("entry-1", timestamp);
// Delete a batch of records
const batch = await bucket.beginRemoveBatch("entry-1");
batch.addOnlyTimestamp(timestamp); // already deleted, so this error will be in the errors map
batch.addOnlyTimestamp(timestamp + 1000_000n);
const errors = await batch.write();
assert(errors.size === 1);
console.log(errors);
use std::time::{Duration, SystemTime};
use bytes::Bytes;
use reduct_rs::{ErrorCode, RecordBuilder, ReductClient, ReductError};
use tokio;
#[tokio::main]
async fn main() -> Result<(), ReductError> {
// Create a client instance, then get or create a bucket
let client = ReductClient::builder()
.url("http://127.0.0.1:8383")
.api_token("my-token")
.build();
let bucket = client.create_bucket("test").exist_ok(true).send().await?;
// Send some records to the "rs-example" entry
let timestamp = SystemTime::now();
bucket
.write_record("rs-example")
.timestamp(timestamp)
.data(Bytes::from("Some binary data"))
.send()
.await?;
bucket
.write_record("rs-example")
.timestamp(timestamp + Duration::from_secs(1))
.data(Bytes::from("Some more binary data"))
.send()
.await?;
// Delete a single record
bucket
.remove_record("rs-example")
.timestamp(timestamp)
.send()
.await?;
// Delete a batch of records
let batch = bucket.remove_batch("rs-example");
let errors = batch
.add_timestamp(timestamp) // was already deleted, so this error will be in the errors map
.add_timestamp(timestamp + Duration::from_secs(1)).send().await?;
assert_eq!(errors.len(), 1);
assert_eq!(errors.values().next().unwrap().status, ErrorCode::NotFound);
Ok(())
}
#include <reduct/client.h>
#include <cassert>
#include <chrono>
using reduct::IBucket;
using reduct::IClient;
using reduct::Error;
using std::chrono_literals::operator ""s;
int main() {
// Create a client instance, then get or create a bucket
auto client = IClient::Build("http://127.0.0.1:8383", {.api_token="my-token"});
auto [bucket, create_err] = client->GetOrCreateBucket("my-bucket");
assert(create_err == Error::kOk);
// Send some records to the "cpp-example"
IBucket::Time ts = IBucket::Time::clock::now();
auto err = bucket->Write("cpp-example", ts, [](auto rec) {
rec->WriteAll("Some binary data");
});
assert(err == Error::kOk);
err = bucket->Write("cpp-example", ts + 1s, [](auto rec) {
rec->WriteAll("Some binary data");
});
assert(err == Error::kOk);
// Delete a single record
err = bucket->RemoveRecord("cpp-example", ts);
assert(err == Error::kOk);
// Delete a batch of records
auto [record_errors, http_err] = bucket->RemoveBatch("cpp-example", [ts](auto batch) {
batch->AddRecord(ts); // Already deleted, so this error will be in the errors map
batch->AddRecord(ts + 1s);
});
assert(http_err == Error::kOk);
assert(record_errors.begin()->second.code == 404);
return 0;
}
#!/bin/bash
set -e -x
API_PATH="http://127.0.0.1:8383/api/v1"
AUTH_HEADER="Authorization: Bearer my-token"
# Send a record with labels a
TIME=`date +%s000000`
curl -d "Some binary data" \
-H "${AUTH_HEADER}" \
-H "Content-Type: plain/text" \
-X POST -a ${API_PATH}/b/example-bucket/entry_1?ts=${TIME}
# Delete a single record
curl -H "${AUTH_HEADER}" \
-X DELETE -a ${API_PATH}/b/example-bucket/entry_1?ts=${TIME}
Deleting Records in a Time Range​
You can delete records in a specific time range by specifying the start and end timestamps, and you can use query parameters to filter the records:
Parameter | Description | Type | Default |
---|---|---|---|
start | Remove records from | Timestamp | The timestamp of the first record in the entry |
end | Remove records until | Timestamp | The timestamp of the last record in the entry |
include | Remove only records with labels | List of key-value pairs | Disabled |
exclude | Remove all records without labels | List of key-value pairs | Disabled |
each_s | Remove a record every S seconds | Float | Disabled |
each_n | Remove only every N record | Integer | Disabled |
The following examples show how to delete records in a specific time range:
- Python
- JavaScript
- Rust
- C++
- cURL
from time import time
from reduct import Client, Bucket, Batch
async def main():
# Create a client with the base URL and API token
async with Client("http://localhost:8383", api_token="my-token") as client:
bucket: Bucket = await client.create_bucket("my-bucket", exist_ok=True)
# Send some records to the "py-example"
ts = time()
await bucket.write(
"py-example",
b"Some binary data",
ts
)
await bucket.write(
"py-example",
b"Some binary data",
ts + 1
)
# Delete all records within a time range
removed_records = await bucket.remove_query("py-example", ts, ts + 2)
assert removed_records == 2
# You can also delete all records with a specific label
await bucket.remove_query("py-example", ts, ts + 2, include={"key1": "value1"})
# Or each N-th record
await bucket.remove_query("py-example", ts, ts + 2, each_n=2)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
import { Client } from "reduct-js";
import assert from "node:assert";
// Create a client instance, then get or create a bucket
const client = new Client("http://127.0.0.1:8383", { apiToken: "my-token" });
const bucket = await client.getOrCreateBucket("bucket");
// Send some records to the "entry-1" entry
const timestamp = BigInt(Date.now()) * 1000n;
let record = await bucket.beginWrite("entry-1", timestamp);
await record.write("Some binary data");
record = await bucket.beginWrite("entry-1", timestamp + 1000_000n);
await record.write("Some more binary data");
// Delete all records withing a time range
const removedRecords = await bucket.removeQuery(
"entry-1",
timestamp,
timestamp + 2000_000n,
);
assert(removedRecords === 2);
// You can also delete all records with a specific label
await bucket.removeQuery("entry-1", undefined, undefined, {
include: { label1: "value1" },
});
// Or each N-th record
await bucket.removeQuery("entry-1", undefined, undefined, { eachN: 2 });
use std::time::{Duration, SystemTime};
use bytes::Bytes;
use reduct_rs::{ErrorCode, RecordBuilder, ReductClient, ReductError};
use tokio;
#[tokio::main]
async fn main() -> Result<(), ReductError> {
// Create a client instance, then get or create a bucket
let client = ReductClient::builder()
.url("http://127.0.0.1:8383")
.api_token("my-token")
.build();
let bucket = client.create_bucket("test").exist_ok(true).send().await?;
// Send some records to the "rs-example" entry
let timestamp = SystemTime::now();
bucket
.write_record("rs-example")
.timestamp(timestamp)
.data(Bytes::from("Some binary data"))
.send()
.await?;
bucket
.write_record("rs-example")
.timestamp(timestamp + Duration::from_secs(1))
.data(Bytes::from("Some more binary data"))
.send()
.await?;
// Delete all records within a time range
let removed_records = bucket
.remove_query("rs-example")
.start(timestamp)
.stop(timestamp + Duration::from_secs(2))
.send()
.await?;
assert_eq!(removed_records, 2);
// You can also delete all records with a specific label
bucket
.remove_query("rs-example")
.add_include("label1", "value1")
.send()
.await?;
// Or each N-th record
bucket
.remove_query("rs-example")
.each_n(2)
.send()
.await?;
Ok(())
}
#include <reduct/client.h>
#include <cassert>
#include <chrono>
using reduct::IBucket;
using reduct::IClient;
using reduct::Error;
using std::chrono_literals::operator ""s;
int main() {
// Create a client instance, then get or create a bucket
auto client = IClient::Build("http://127.0.0.1:8383", {.api_token="my-token"});
auto [bucket, create_err] = client->GetOrCreateBucket("my-bucket");
assert(create_err == Error::kOk);
// Send some records to the "cpp-example"
IBucket::Time ts = IBucket::Time::clock::now();
auto err = bucket->Write("cpp-example", ts, [](auto rec) {
rec->WriteAll("Some binary data");
});
assert(err == Error::kOk);
err = bucket->Write("cpp-example", ts + 1s, [](auto rec) {
rec->WriteAll("Some binary data");
});
assert(err == Error::kOk);
// Delete all records withing a time range
auto [removed_records, query_err] = bucket->RemoveQuery("cpp-example", ts, ts + 2s, {});
assert(query_err == Error::kOk);
assert(removed_records == 2);
// You can also delete all records with a specific label
bucket->RemoveQuery("cpp-example", {}, {},{
.include = {{"key1", "value1"}}
});
// Or each N-th record
bucket->RemoveQuery("cpp-example", {}, {},{
.each_n = 2
});
}
#!/bin/bash
set -e -x
API_PATH="http://127.0.0.1:8383/api/v1"
AUTH_HEADER="Authorization: Bearer my-token"
# Send a record with labels a
TIME=`date +%s000000`
curl -d "Some binary data" \
-H "${AUTH_HEADER}" \
-H "Content-Type: plain/text" \
-X POST -a ${API_PATH}/b/example-bucket/entry_1?ts=${TIME}
# Delete all records from TIME
curl -H "${AUTH_HEADER}" \
-X DELETE -a ${API_PATH}/b/example-bucket/entry_1/q?start=${TIME}
# You can also delete all records with a specific label
curl -H "${AUTH_HEADER}" \
-X DELETE -a ${API_PATH}/b/example-bucket/entry_1/q?include-label1=value1
# Or each N-th record
curl -H "${AUTH_HEADER}" \
-X DELETE -a ${API_PATH}/b/example-bucket/entry_1/q?each_n=2
Deleting records in a time range is a heavy operation and may take a long time to complete because it copies the retained records to new blocks and deletes the old ones.