userver: userver/concurrent/conflated_event_channel.hpp Source File
Loading...
Searching...
No Matches
conflated_event_channel.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/concurrent/conflated_event_channel.hpp
4/// @brief @copybrief concurrent::ConflatedEventChannel
5
6#include <atomic>
7#include <vector>
8
9#include <userver/concurrent/async_event_channel.hpp>
10#include <userver/engine/single_consumer_event.hpp>
11#include <userver/engine/task/task_with_result.hpp>
12
13USERVER_NAMESPACE_BEGIN
14
15namespace concurrent {
16
17/// @ingroup userver_concurrency
18///
19/// @brief A non-blocking version of 'AsyncEventChannel'
20///
21/// The difference is that 'SendEvent' returns immediately, without waiting for
22/// subscribers to finish. If 'SendEvent' is called multiple times while
23/// subscribers are handling the previous event, new events will be conflated
24/// (all events except for the last one will be ignored). This class can be used
25/// instead of 'AsyncEventChannel' when we've got a "heavy" subscriber and we
26/// don't want to slow down the pipeline.
28public:
29 explicit ConflatedEventChannel(std::string name, OnRemoveCallback on_listener_removal = {});
30 ~ConflatedEventChannel() override;
31
32 /// For convenient forwarding of events from other channels
33 template <typename... Args>
34 void AddChannel(concurrent::AsyncEventSource<Args...>& channel);
35
36 using AsyncEventChannel<>::AddListener;
37
38 /// Subscribes to updates using a member function. Also immediately invokes
39 /// the function with the current config snapshot.
40 template <typename Class>
41 concurrent::AsyncEventSubscriberScope UpdateAndListen(Class* obj, std::string_view name, void (Class::*func)());
42
43 void SendEvent();
44
45private:
46 template <typename... Args>
47 void OnChannelEvent(Args...);
48
49 std::atomic<bool> stop_flag_;
50 engine::TaskWithResult<void> task_;
51 std::vector<concurrent::AsyncEventSubscriberScope> subscriptions_;
52 engine::SingleConsumerEvent event_;
53};
54
55template <typename... Args>
56void ConflatedEventChannel::AddChannel(concurrent::AsyncEventSource<Args...>& channel) {
57 subscriptions_.push_back(channel.AddListener(this, Name(), &ConflatedEventChannel::OnChannelEvent<Args...>));
58}
59
60template <typename Class>
62 Class* obj,
63 std::string_view name,
64 void (Class::*func)()
65) {
66 return DoUpdateAndListen(obj, name, func, [&] { (obj->*func)(); });
67}
68
69template <typename... Args>
70void ConflatedEventChannel::OnChannelEvent(Args...) {
71 SendEvent();
72}
73
74} // namespace concurrent
75
76USERVER_NAMESPACE_END