userver: userver/concurrent/async_event_channel.hpp Source File
Loading...
Searching...
No Matches
async_event_channel.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/concurrent/async_event_channel.hpp
4/// @brief @copybrief concurrent::AsyncEventChannel
5
6#include <functional>
7#include <string>
8#include <string_view>
9#include <typeindex>
10#include <unordered_map>
11#include <utility>
12#include <vector>
13
14#include <userver/concurrent/async_event_source.hpp>
15#include <userver/concurrent/variable.hpp>
16#include <userver/engine/mutex.hpp>
17#include <userver/engine/task/cancel.hpp>
18#include <userver/engine/task/task_with_result.hpp>
19#include <userver/utils/assert.hpp>
20#include <userver/utils/async.hpp>
21
22USERVER_NAMESPACE_BEGIN
23
24namespace concurrent {
25
26namespace impl {
27
28void WaitForTask(std::string_view name, engine::TaskWithResult<void>& task);
29
30[[noreturn]] void ReportAlreadySubscribed(std::string_view channel_name,
31 std::string_view listener_name);
32
33void ReportNotSubscribed(std::string_view channel_name) noexcept;
34
35void ReportUnsubscribingAutomatically(std::string_view channel_name,
36 std::string_view listener_name) noexcept;
37
38void ReportErrorWhileUnsubscribing(std::string_view channel_name,
39 std::string_view listener_name,
40 std::string_view error) noexcept;
41
42std::string MakeAsyncChannelName(std::string_view base, std::string_view name);
43
44inline constexpr bool kCheckSubscriptionUB = utils::impl::kEnableAssert;
45
46// During the `AsyncEventSubscriberScope::Unsubscribe` call or destruction of
47// `AsyncEventSubscriberScope`, all variables used by callback must be valid
48// (must not be destroyed). A common cause of crashes in this place: there is no
49// manual call to `Unsubscribe`. In this case check the declaration order of the
50// struct fields.
51template <typename Func>
52void CheckDataUsedByCallbackHasNotBeenDestroyedBeforeUnsubscribing(
53 std::function<void(Func&)>& on_listener_removal, Func& listener_func,
54 std::string_view channel_name, std::string_view listener_name) noexcept {
55 if (!on_listener_removal) return;
56 try {
57 on_listener_removal(listener_func);
58 } catch (const std::exception& e) {
59 ReportErrorWhileUnsubscribing(channel_name, listener_name, e.what());
60 }
61}
62
63} // namespace impl
64
65/// @ingroup userver_concurrency
66///
67/// AsyncEventChannel is an in-process pub-sub with strict FIFO serialization,
68/// i.e. only after the event was processed a new event may appear for
69/// processing, same listener is never called concurrently.
70///
71/// Example usage:
72/// @snippet concurrent/async_event_channel_test.cpp AsyncEventChannel sample
73template <typename... Args>
74class AsyncEventChannel : public AsyncEventSource<Args...> {
75 public:
76 using Function = typename AsyncEventSource<Args...>::Function;
77 using OnRemoveCallback = std::function<void(Function&)>;
78
79 /// @brief The primary constructor
80 /// @param name used for diagnostic purposes and is also accessible with Name
81 explicit AsyncEventChannel(std::string name)
82 : name_(std::move(name)), data_(ListenersData{{}, {}}) {}
83
84 /// @brief The constructor with `AsyncEventSubscriberScope` usage checking.
85 ///
86 /// The constructor with a callback that is called on listener removal. The
87 /// callback takes a reference to `Function' as input. This is useful for
88 /// checking the lifetime of data captured by the listener update function.
89 ///
90 /// @note Works only in debug mode.
91 ///
92 /// @warning Data captured by `on_listener_removal` function must be valid
93 /// until the `AsyncEventChannel` object is completely destroyed.
94 ///
95 /// Example usage:
96 /// @snippet concurrent/async_event_channel_test.cpp OnListenerRemoval sample
97 ///
98 /// @param name used for diagnostic purposes and is also accessible with Name
99 /// @param on_listener_removal the callback used for check
100 ///
101 /// @see impl::CheckDataUsedByCallbackHasNotBeenDestroyedBeforeUnsubscribing
102 AsyncEventChannel(std::string name, OnRemoveCallback on_listener_removal)
103 : name_(std::move(name)),
105
106 /// @brief For use in `UpdateAndListen` of specific event channels
107 ///
108 /// Atomically calls `updater`, which should invoke `func` with the previously
109 /// sent event, and subscribes to new events as if using AddListener.
110 ///
111 /// @see AsyncEventSource::AddListener
112 template <typename UpdaterFunc>
113 AsyncEventSubscriberScope DoUpdateAndListen(FunctionId id,
114 std::string_view name,
115 Function&& func,
116 UpdaterFunc&& updater) {
117 std::lock_guard lock(event_mutex_);
118 std::forward<UpdaterFunc>(updater)();
119 return DoAddListener(id, name, std::move(func));
120 }
121
122 /// @overload
123 template <typename Class, typename UpdaterFunc>
124 AsyncEventSubscriberScope DoUpdateAndListen(Class* obj, std::string_view name,
125 void (Class::*func)(Args...),
126 UpdaterFunc&& updater) {
127 return DoUpdateAndListen(
128 FunctionId(obj), name,
129 [obj, func](Args... args) { (obj->*func)(args...); },
130 std::forward<UpdaterFunc>(updater));
131 }
132
133 /// Send the next event and wait until all the listeners process it.
134 ///
135 /// Strict FIFO serialization is guaranteed, i.e. only after this event is
136 /// processed a new event may be delivered for the subscribers, same
137 /// listener/subscriber is never called concurrently.
138 void SendEvent(Args... args) const {
139 std::lock_guard lock(event_mutex_);
140 auto data = data_.Lock();
141 auto& listeners = data->listeners;
142
143 std::vector<engine::TaskWithResult<void>> tasks;
144 tasks.reserve(listeners.size());
145
146 for (const auto& [_, listener] : listeners) {
147 tasks.push_back(utils::Async(
148 listener.task_name,
149 [&, &callback = listener.callback] { callback(args...); }));
150 }
151
152 std::size_t i = 0;
153 for (const auto& [_, listener] : listeners) {
154 impl::WaitForTask(listener.name, tasks[i++]);
155 }
156 }
157
158 /// @returns the name of this event channel
159 const std::string& Name() const noexcept { return name_; }
160
161 private:
162 struct Listener final {
163 std::string name;
164 Function callback;
165 std::string task_name;
166 };
167
168 struct ListenersData final {
169 std::unordered_map<FunctionId, Listener, FunctionId::Hash> listeners;
170 OnRemoveCallback on_listener_removal;
171 };
172
173 void RemoveListener(FunctionId id, UnsubscribingKind kind) noexcept final {
174 engine::TaskCancellationBlocker blocker;
175 auto data = data_.Lock();
176 auto& listeners = data->listeners;
177 const auto iter = listeners.find(id);
178
179 if (iter == listeners.end()) {
180 impl::ReportNotSubscribed(Name());
181 return;
182 }
183
184 if (kind == UnsubscribingKind::kAutomatic) {
185 if (!data->on_listener_removal) {
186 impl::ReportUnsubscribingAutomatically(name_, iter->second.name);
187 }
188
189 if constexpr (impl::kCheckSubscriptionUB) {
190 // Fake listener call to check
191 impl::CheckDataUsedByCallbackHasNotBeenDestroyedBeforeUnsubscribing(
192 data->on_listener_removal, iter->second.callback, name_,
193 iter->second.name);
194 }
195 }
196 listeners.erase(iter);
197 }
198
199 AsyncEventSubscriberScope DoAddListener(FunctionId id, std::string_view name,
200 Function&& func) final {
201 auto data = data_.Lock();
202 auto& listeners = data->listeners;
203 auto task_name = impl::MakeAsyncChannelName(name_, name);
204 const auto [iterator, success] = listeners.emplace(
205 id, Listener{std::string{name}, std::move(func), std::move(task_name)});
206 if (!success) impl::ReportAlreadySubscribed(Name(), name);
207 return AsyncEventSubscriberScope(*this, id);
208 }
209
210 const std::string name_;
211 concurrent::Variable<ListenersData> data_;
212 mutable engine::Mutex event_mutex_;
213};
214
215} // namespace concurrent
216
217USERVER_NAMESPACE_END