Skip to main content
Share

Subscribing new records with Reduct C++ SDK

· 5 min read

This article provides an introduction to ReductStore and explains how to use the Reduct C++ SDK to subscribe to data from the database.

Prerequisites

To subscribe to new records, we should use a continuous query, which has been supported by ReductStore since version v1.4. We can use the following Docker command to run it:

docker pull reduct/store:latest
docker run -p 8383:8383 reduct/store:latest

Now, we need to install the Reduct Client SDK for C++. Please refer to these instructions.

Full Example

Now, take a look at the code of the example.

#include <reduct/client.h>

#include <iostream>
#include <thread>

using reduct::IBucket;
using reduct::IClient;

int main() {
auto writer = std::thread([]() {
auto client = IClient::Build("http://127.0.0.1:8383");

auto [bucket, err] = client->GetOrCreateBucket("bucket");
if (err) {
std::cerr << "Error: " << err;
return;
}

for (int i = 0; i < 10; ++i) {
const IBucket::WriteOptions opts{
.timestamp = IBucket::Time::clock::now(),
.labels = {{"good", i % 2 == 0 ? "true" : "false"}},
};

const auto msg = "Hey " + std::to_string(i);
[[maybe_unused]] auto write_err = bucket->Write("entry-1", opts, [msg](auto rec) { rec->WriteAll(msg); });
std::cout << "Write: " << msg << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
});

// Subscribe to good messages
int good_count = 0;
auto client = IClient::Build("http://127.0.0.1:8383");
auto [bucket, err] = client->GetOrCreateBucket("bucket");
if (err) {
std::cerr << "Error: " << err;
return -1;
}

const auto opts = IBucket::QueryOptions{
.include = {{"good", "true"}},
.continuous = true,
.poll_interval = std::chrono::milliseconds{100},
};

// Continuously read messages until we get 3 good ones
auto query_err =
bucket->Query("entry-1", IBucket::Time::clock::now(), std::nullopt, opts, [&good_count](auto &&record) {
auto [msg, read_err] = record.ReadAll();
if (read_err) {
std::cerr << "Error: " << read_err;
return false;
}
std::cout << "Read: " << msg << std::endl;
return ++good_count != 3;
});

writer.join();

if (query_err) {
std::cerr << "Query error:" << query_err;
return -1;
}
}

To build use this CMakeLists.txt:

cmake_minimum_required(VERSION 3.18)

project(ReductCppExamples)
set(CMAKE_CXX_STANDARD 20)

find_package(ZLIB)
find_package(OpenSSL)

find_package(ReductCpp 1.4.0)

add_executable(subscription subscription.cc)
target_link_libraries(subscription ${REDUCT_CPP_LIBRARIES} ${ZLIB_LIBRARIES} OpenSSL::SSL OpenSSL::Crypto)

The example code demonstrates how to use the C++ Reduct SDK to subscribe to new records from a bucket. The program writes 10 records to a bucket, reads records with the label "good" set to "true" or "false", and continuously reads records until it has read 3 records with this label set to "true".

Let's consider the example in detail.

Write New Records with Labels

To communicate with a ReductStore instance, first create a client.

auto client = IClient::Build("http://127.0.0.1:8383");

In this example, we run the database locally with default settings, but we may need to have an API token for authorsization.

Like many other blob storages, ReductStore keeps data in buckets for granular access control and quotas. For read/write operations, we have to get a bucket or create one:

 auto [bucket, err] = client->GetOrCreateBucket("bucket");
if (err) {
std::cerr << "Error: " << err;
return;
}

When creating a bucket, you have the option to provide additional settings. One particularly useful setting is the FIFO quota, which automatically removes old data when the bucket size reaches a certain limit. This feature is especially beneficial for edge devices, as it helps prevent the device from running out of disk space.

Buckets contain entries, you can understand them as topics or folders. ReductStore doesn’t provide tree of entries and they must have unique names. Let’s write a record to entry-1:

const IBucket::WriteOptions opts{
.timestamp = IBucket::Time::clock::now(),
.labels = {{"good", i % 2 == 0 ? "true" : "false"}},
};

const auto msg = "Hey " + std::to_string(i);
auto write_err = bucket->Write("entry-1", opts, [msg](auto rec) { rec->WriteAll(msg); });

ReductStore is a time-series database that stores data as a blob. Each blob is a record that must have a timestamp. However, you can attach additional information to the record, such as labels, and use them for annotation or data filtering. In this case, we assign the label goog with the values true or false.

In this example, we send short text messages for demonstration purposes only. ReductStore is better suited for handling larger data blobs, such as images, sound, or binary data like Protobuf messages.

Continuous Query

After we ran writing labels in a separated thread we can query only good data and wait for first 3:

const auto opts = IBucket::QueryOptions{
.include = {{"good", "true"}},
.continuous = true,
.poll_interval = std::chrono::milliseconds{100},
};

auto query_err =
bucket->Query("entry-1", IBucket::Time::clock::now(), std::nullopt, opts, [&good_count](auto &&record) {
auto [msg, read_err] = record.ReadAll();
if (read_err) {
std::cerr << "Error: " << read_err;
return false;
}
std::cout << "Read: " << msg << std::endl;
return ++good_count != 3;
});

This is a simple example. We use the flag continuous to indicate that we will wait for new records and poll them every 100 ms.

In ReductStore, queries work as iterators. It doesn't matter how many records are stored; we only ask for the next one. When we use a continuous query, we are always asking for a new record, even if we didn't receive a previous one. We use the pool_interval option to specify how often we ask for a new record.

How could it be useful?

ReductStore is an open source database focused on edge computing and AI/ML applications. Its continuous queries work as a publish-subscription communication model, similar to MQTT. You can use the database as a message broker or easily integrate it with your warehouse by creating a program that subscribes to new records and writes them or only labels them to another database.

You can use this feature in a Python or JavaScript application by using the Python and JavaScript client SDKs.

I hope you find this release useful. If you have any questions or feedback, don’t hesitate to reach out in Discord or by opening a discussion on GitHub.

Thanks for using ReductStore!