userver: userver/concurrent/conflated_event_channel.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
conflated_event_channel.hpp
1#pragma once
2
3#include <atomic>
4#include <vector>
5
6#include <userver/concurrent/async_event_channel.hpp>
7#include <userver/engine/single_consumer_event.hpp>
8#include <userver/engine/task/task_with_result.hpp>
9
10USERVER_NAMESPACE_BEGIN
11
12namespace concurrent {
13
14/// @ingroup userver_concurrency
15///
16/// @brief A non-blocking version of 'AsyncEventChannel'
17///
18/// The difference is that 'SendEvent' returns immediately, without waiting for
19/// subscribers to finish. If 'SendEvent' is called multiple times while
20/// subscribers are handling the previous event, new events will be conflated
21/// (all events except for the last one will be ignored). This class can be used
22/// instead of 'AsyncEventChannel' when we've got a "heavy" subscriber and we
23/// don't want to slow down the pipeline.
25 public:
26 explicit ConflatedEventChannel(std::string name,
27 OnRemoveCallback on_listener_removal = {});
28 ~ConflatedEventChannel() override;
29
30 /// For convenient forwarding of events from other channels
31 template <typename... Args>
32 void AddChannel(concurrent::AsyncEventSource<Args...>& channel);
33
35
36 /// Subscribes to updates using a member function. Also immediately invokes
37 /// the function with the current config snapshot.
38 template <typename Class>
39 concurrent::AsyncEventSubscriberScope UpdateAndListen(Class* obj,
40 std::string_view name,
41 void (Class::*func)());
42
43 void SendEvent();
44
45 private:
46 template <typename... Args>
47 void OnChannelEvent(Args...);
48
49 std::atomic<bool> stop_flag_;
50 engine::TaskWithResult<void> task_;
51 std::vector<concurrent::AsyncEventSubscriberScope> subscriptions_;
52 engine::SingleConsumerEvent event_;
53};
54
55template <typename... Args>
57 concurrent::AsyncEventSource<Args...>& channel) {
58 subscriptions_.push_back(channel.AddListener(
59 this, Name(), &ConflatedEventChannel::OnChannelEvent<Args...>));
60}
61
62template <typename Class>
63concurrent::AsyncEventSubscriberScope ConflatedEventChannel::UpdateAndListen(
64 Class* obj, std::string_view name, void (Class::*func)()) {
65 return DoUpdateAndListen(obj, name, func, [&] { (obj->*func)(); });
66}
67
68template <typename... Args>
69void ConflatedEventChannel::OnChannelEvent(Args...) {
70 SendEvent();
71}
72
73} // namespace concurrent
74
75USERVER_NAMESPACE_END