userver: userver/concurrent/conflated_event_channel.hpp Source File
Loading...
Searching...
No Matches
conflated_event_channel.hpp
1#pragma once
2
3#include <atomic>
4#include <vector>
5
6#include <userver/concurrent/async_event_channel.hpp>
7#include <userver/engine/single_consumer_event.hpp>
8#include <userver/engine/task/task_with_result.hpp>
9
10USERVER_NAMESPACE_BEGIN
11
12namespace concurrent {
13
14/// @ingroup userver_concurrency
15///
16/// @brief A non-blocking version of 'AsyncEventChannel'
17///
18/// The difference is that 'SendEvent' returns immediately, without waiting for
19/// subscribers to finish. If 'SendEvent' is called multiple times while
20/// subscribers are handling the previous event, new events will be conflated
21/// (all events except for the last one will be ignored). This class can be used
22/// instead of 'AsyncEventChannel' when we've got a "heavy" subscriber and we
23/// don't want to slow down the pipeline.
25public:
26 explicit ConflatedEventChannel(std::string name, OnRemoveCallback on_listener_removal = {});
27 ~ConflatedEventChannel() override;
28
29 /// For convenient forwarding of events from other channels
30 template <typename... Args>
31 void AddChannel(concurrent::AsyncEventSource<Args...>& channel);
32
34
35 /// Subscribes to updates using a member function. Also immediately invokes
36 /// the function with the current config snapshot.
37 template <typename Class>
38 concurrent::AsyncEventSubscriberScope UpdateAndListen(Class* obj, std::string_view name, void (Class::*func)());
39
40 void SendEvent();
41
42private:
43 template <typename... Args>
44 void OnChannelEvent(Args...);
45
46 std::atomic<bool> stop_flag_;
47 engine::TaskWithResult<void> task_;
48 std::vector<concurrent::AsyncEventSubscriberScope> subscriptions_;
49 engine::SingleConsumerEvent event_;
50};
51
52template <typename... Args>
53void ConflatedEventChannel::AddChannel(concurrent::AsyncEventSource<Args...>& channel) {
54 subscriptions_.push_back(channel.AddListener(this, Name(), &ConflatedEventChannel::OnChannelEvent<Args...>));
55}
56
57template <typename Class>
58concurrent::AsyncEventSubscriberScope
59ConflatedEventChannel::UpdateAndListen(Class* obj, std::string_view name, void (Class::*func)()) {
60 return DoUpdateAndListen(obj, name, func, [&] { (obj->*func)(); });
61}
62
63template <typename... Args>
64void ConflatedEventChannel::OnChannelEvent(Args...) {
65 SendEvent();
66}
67
68} // namespace concurrent
69
70USERVER_NAMESPACE_END