Reader
A reader represents the subscriber side of a DataStorm publish–subscribe application. You create readers to receive data published by writers.
Readers receive data samples from a specific topic. Each reader is associated with a single topic, but multiple readers can be created for the same topic. The reader’s Key, Value, and UpdateTag template parameters must match the corresponding types of the topic from which it is created.
When a reader is created, DataStorm notifies connected peers. Writers whose configuration matches the reader (topic, keys/filters) will attach so they can send samples to that reader.
You can optionally assign a name to a reader at creation time. This name is surfaced to writers and is used in listener notifications (connected writers/keys).
Readers keep a history (queue) of unread samples. When samples are retrieved (e.g., withgetNextUnreadorgetAllUnread), they are removed from the queue.
Reader Types
Single-key reader — reads samples for a single key.
Multi-key reader — reads samples for a set of predefined keys.
Any-key reader — reads samples for any key (no keys provided at creation).
Filtered-key reader — reads samples for keys matching a key filter (name + criteria value).
You create readers using the corresponding reader class constructors or via makeXxxReader helper functions. Using the makeXxxReader helpers lets the compiler deduce template parameters (Key, Value, UpdateTag) from the topic.
Attachment behavior
Single-key: writers with the same key attach.
Multi-key: writers attach if at least one of the reader’s keys matches.
Any-key: all writers for the topic attach (since any key can be received).
Filtered-key: writers attach if their key matches the reader’s key filter (name + criteria).
Single-Key Reader
// Constructor
Topic<string, float> temperatures{node, "temperatures"};
SingleKeyReader<string, float> reader{
temperatures,
"floor1/kitchen",
"kitchen-reader"};
Or
// Helper (template params deduced)
Topic<string, float> temperatures{node, "temperatures"};
auto reader = makeSingleKeyReader(temperatures, "floor1/kitchen", "kitchen-reader");
Multi-Key Reader
// Constructor
Topic<string, float> temperatures{node, "temperatures"};
MultiKeyReader<string, float> reader{
temperatures,
{"floor1/kitchen", "floor1/living-room"},
"first-floor-reader"};
Or
// Helper (template params deduced)
Topic<string, float> temperatures{node, "temperatures"};
auto reader = makeMultiKeyReader(
temperatures,
{"floor1/kitchen", "floor1/living-room"},
"first-floor-reader");
Any-Key Reader
// Constructor: empty key set
Topic<string, float> temperatures{node, "temperatures"};
MultiKeyReader<string, float> reader{temperatures, {}, "temperature-reader"};
Or
// Helper (template params deduced)
Topic<string, float> temperatures{node, "temperatures"};
auto reader = makeAnyKeyReader(temperatures, "temperature-reader");
Filtered-Key Reader
// Constructor
Topic<string, float> temperatures{node, "temperatures"};
FilteredKeyReader<string, float> reader{
temperatures,
Filter<string>("startsWith", "floor1/"),
"temperature-reader"};
Or
// Helper (template params deduced)
Topic<string, float> temperatures{node, "temperatures"};
auto reader = makeFilteredKeyReader(
temperatures,
Filter<string>("startsWith", "floor1/"),
"temperature-reader");
Sample Filters
All reader types support sample filtering. A reader specifies the sample filter (name + criteria), but the filter is executed by the writer. This minimizes network usage: only samples matching the filter are sent. Unlike key filters (which test only keys), sample filters can inspect the entire sample.
The filter name must match a sample filter registered on the publisher’s topic.
The criteria type must match the type used to register the filter; otherwise, the writer cannot unmarshal the criteria.
Example: receive temperatures greater than 26 °C:
Topic<string, float> temperatures{node, "temperatures"};
auto reader = makeSingleKeyReader(
temperatures,
Filter<float>("greater-than", 26.0f),
"floor1/kitchen",
"kitchen-reader");
Sample filters are specified on readers but must be defined on the writer’s topic. Criteria types can be any custom type for which your application provides encoding/decoding templates.
Reading Samples
Readers provide methods to retrieve unread samples:
getNextUnread — retrieves the next unread sample (removes it from the queue).
getAllUnread — retrieves all unread samples (removes them from the queue).
waitForUnread — blocks until the given number of unread samples are available.
hasUnread — checks if there are unread samples in the queue.
If the node is shutdown, blocking methods (e.g., waitForUnread, getNextUnread) throw NodeShutdownException.
You can register a callback with onSamples. Once registered, if unread samples are already queued, the callback is invoked immediately with those samples. Callbacks are executed by the node’s callback executor. By default, the executor uses a dedicated thread; you can supply a custom executor via NodeOptions::customExecutor when constructing the node.
Reader Configuration
Reader behavior is configurable via DataStorm::ReaderConfig. You can set:
Global defaults — DataStorm.Topic.* properties (e.g., DataStorm.Topic.SampleCount)
Topic-level defaults — by calling Topic::setReaderDefaultConfig
Per-reader configuration — via the reader constructor or or makeXxxReader helper
Precedence: lower levels override higher ones (per-reader config > topic defaults > global properties).
Options
Sample Count (sampleCount)
How many samples to keep in the unread queue. When the queue is full, the oldest samples are discarded. Default: keep all samples.
Sample Lifetime (sampleLifetime)
How long to keep a sample in the unread queue. Samples older than this duration are automatically removed.
Clear History (clearHistory)
Controls when the reader’s unread sample queue is cleared, based on sample events (ClearHistoryPolicy):
OnAdd — clears the unread queue after receiving an
Addsample.OnRemove — clears the unread queue after receiving a
Removesample.OnAll — clears the queue after receiving any sample.
OnAllExceptPartialUpdate — clears the queue after receiving any sample except a
PartialUpdate.Never — never clears the queue automatically.
Discard Policy (discardPolicy)
Whether to discard samples on receipt of new samples (DiscardPolicy):
None — never discard
SendTime — discard if the new sample’s timestamp is older than the last received
Priority — keep only samples from the highest-priority connected writers
Coordination & Listeners
Coordination methods
hasWriters — checks whether any writers are currently connected.
waitForWriters — wait for writers to connect.
waitForNoWriters — blocks until all readers disconnect.
getConnectedKeys — returns the set of keys for which at least one writer is connected.
getConnectedWriters — returns the names of connected readers.
Connected Keys Listener
Use onConnectedKeys(initCallback, updateCallback) to register callbacks that monitor connected keys:
The
initCallbackis called immediately after registration with the initial set of connected keys.The
updateCallbackis called whenever a key is connected or disconnected.
Connected Writers Listener
Use onConnectedWriters(initCallback, updateCallback) to register callbacks that monitor connected writers:
The
initCallbackis called immediately after registration with the initial set of connected readers.The
updateCallbackis called whenever a reader connects or disconnects.