userver: userver/concurrent/conflated_event_channel.hpp Source File
Loading...
Searching...
No Matches
conflated_event_channel.hpp
1#pragma once
2
3#include <atomic>
4#include <vector>
5
6#include <userver/concurrent/async_event_channel.hpp>
7#include <userver/engine/single_consumer_event.hpp>
8#include <userver/engine/task/task_with_result.hpp>
9
10USERVER_NAMESPACE_BEGIN
11
12namespace concurrent {
13
14/// @ingroup userver_concurrency
15///
16/// @brief A non-blocking version of 'AsyncEventChannel'
17///
18/// The difference is that 'SendEvent' returns immediately, without waiting for
19/// subscribers to finish. If 'SendEvent' is called multiple times while
20/// subscribers are handling the previous event, new events will be conflated
21/// (all events except for the last one will be ignored). This class can be used
22/// instead of 'AsyncEventChannel' when we've got a "heavy" subscriber and we
23/// don't want to slow down the pipeline.
25 public:
26 explicit ConflatedEventChannel(std::string name,
27 OnRemoveCallback on_listener_removal = {});
28 ~ConflatedEventChannel() override;
29
30 /// For convenient forwarding of events from other channels
31 template <typename... Args>
32 void AddChannel(concurrent::AsyncEventSource<Args...>& channel);
33
35
36 /// Subscribes to updates using a member function. Also immediately invokes
37 /// the function with the current config snapshot.
38 template <typename Class>
39 concurrent::AsyncEventSubscriberScope UpdateAndListen(Class* obj,
40 std::string_view name,
41 void (Class::*func)());
42
43 void SendEvent();
44
45 private:
46 template <typename... Args>
47 void OnChannelEvent(Args...);
48
49 std::atomic<bool> stop_flag_;
50 engine::TaskWithResult<void> task_;
51 std::vector<concurrent::AsyncEventSubscriberScope> subscriptions_;
52 engine::SingleConsumerEvent event_;
53};
54
55template <typename... Args>
57 concurrent::AsyncEventSource<Args...>& channel) {
58 subscriptions_.push_back(channel.AddListener(
59 this, Name(), &ConflatedEventChannel::OnChannelEvent<Args...>));
60}
61
62template <typename Class>
63concurrent::AsyncEventSubscriberScope ConflatedEventChannel::UpdateAndListen(
64 Class* obj, std::string_view name, void (Class::*func)()) {
65 return DoUpdateAndListen(obj, name, func, [&] { (obj->*func)(); });
66}
67
68template <typename... Args>
69void ConflatedEventChannel::OnChannelEvent(Args...) {
70 SendEvent();
71}
72
73} // namespace concurrent
74
75USERVER_NAMESPACE_END