userver: userver/concurrent/async_event_channel.hpp Source File
Loading...
Searching...
No Matches
async_event_channel.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/concurrent/async_event_channel.hpp
4/// @brief @copybrief concurrent::AsyncEventChannel
5
6#include <functional>
7#include <string>
8#include <string_view>
9#include <typeindex>
10#include <unordered_map>
11#include <utility>
12#include <vector>
13
14#include <userver/concurrent/async_event_source.hpp>
15#include <userver/concurrent/variable.hpp>
16#include <userver/engine/semaphore.hpp>
17#include <userver/engine/shared_mutex.hpp>
18#include <userver/engine/task/cancel.hpp>
19#include <userver/engine/task/task_with_result.hpp>
20#include <userver/utils/assert.hpp>
21#include <userver/utils/async.hpp>
22
23USERVER_NAMESPACE_BEGIN
24
25namespace concurrent {
26
27namespace impl {
28
29void WaitForTask(std::string_view name, engine::TaskWithResult<void>& task);
30
31[[noreturn]] void ReportAlreadySubscribed(std::string_view channel_name, std::string_view listener_name);
32
33void ReportNotSubscribed(std::string_view channel_name) noexcept;
34
35void ReportUnsubscribingAutomatically(std::string_view channel_name, std::string_view listener_name) noexcept;
36
37void ReportErrorWhileUnsubscribing(
38 std::string_view channel_name,
39 std::string_view listener_name,
40 std::string_view error
41) noexcept;
42
43std::string MakeAsyncChannelName(std::string_view base, std::string_view name);
44
45inline constexpr bool kCheckSubscriptionUB = utils::impl::kEnableAssert;
46
47// During the `AsyncEventSubscriberScope::Unsubscribe` call or destruction of
48// `AsyncEventSubscriberScope`, all variables used by callback must be valid
49// (must not be destroyed). A common cause of crashes in this place: there is no
50// manual call to `Unsubscribe`. In this case check the declaration order of the
51// struct fields.
52template <typename Func>
53void CheckDataUsedByCallbackHasNotBeenDestroyedBeforeUnsubscribing(
54 std::function<void(const Func&)>& on_listener_removal,
55 const Func& listener_func,
56 std::string_view channel_name,
57 std::string_view listener_name
58) noexcept {
59 if (!on_listener_removal) {
60 return;
61 }
62 try {
63 on_listener_removal(listener_func);
64 } catch (const std::exception& e) {
65 ReportErrorWhileUnsubscribing(channel_name, listener_name, e.what());
66 }
67}
68
69} // namespace impl
70
71/// @ingroup userver_concurrency
72///
73/// AsyncEventChannel is an in-process pub-sub with strict FIFO serialization,
74/// i.e. only after the event was processed a new event may appear for
75/// processing, same listener is never called concurrently.
76///
77/// Example usage:
78/// @snippet concurrent/async_event_channel_test.cpp AsyncEventChannel sample
79template <typename... Args>
80class AsyncEventChannel : public AsyncEventSource<Args...> {
81public:
82 using Function = typename AsyncEventSource<Args...>::Function;
83 using OnRemoveCallback = std::function<void(const Function&)>;
84
85 /// @brief The primary constructor
86 /// @param name used for diagnostic purposes and is also accessible with Name
87 explicit AsyncEventChannel(std::string_view name)
88 : name_(name),
89 data_(ListenersData{{}, {}})
90 {}
91
92 /// @brief The constructor with `AsyncEventSubscriberScope` usage checking.
93 ///
94 /// The constructor with a callback that is called on listener removal. The
95 /// callback takes a reference to `Function` as input. This is useful for
96 /// checking the lifetime of data captured by the listener update function.
97 ///
98 /// @note Works only in debug mode.
99 ///
100 /// @warning Data captured by `on_listener_removal` function must be valid
101 /// until the `AsyncEventChannel` object is completely destroyed.
102 ///
103 /// Example usage:
104 /// @snippet concurrent/async_event_channel_test.cpp OnListenerRemoval sample
105 ///
106 /// @param name used for diagnostic purposes and is also accessible with Name
107 /// @param on_listener_removal the callback used for check
108 ///
109 /// @see impl::CheckDataUsedByCallbackHasNotBeenDestroyedBeforeUnsubscribing
110 AsyncEventChannel(std::string_view name, OnRemoveCallback on_listener_removal)
111 : name_(name),
112 data_(ListenersData{{}, std::move(on_listener_removal)})
113 {}
114
115 /// @brief For use in `UpdateAndListen` of specific event channels
116 ///
117 /// Atomically calls `updater`, which should invoke `func` with the previously
118 /// sent event, and subscribes to new events as if using AddListener.
119 ///
120 /// @param id the subscriber class instance, see also a simpler `DoUpdateAndListen` overload below
121 /// @param name the name of the subscriber
122 /// @param func the callback that is called on each update
123 /// @param updater the initial `() -> void` callback that should call `func` with the current value
124 ///
125 /// @see AsyncEventSource::AddListener
126 template <typename UpdaterFunc>
127 AsyncEventSubscriberScope DoUpdateAndListen(
128 FunctionId id,
129 std::string_view name,
130 Function&& func,
131 UpdaterFunc&& updater
132 ) {
133 const std::shared_lock lock(event_mutex_);
134 std::forward<UpdaterFunc>(updater)();
135 return DoAddListener(id, name, std::move(func));
136 }
137
138 /// @overload
139 template <typename Class, typename UpdaterFunc>
140 AsyncEventSubscriberScope DoUpdateAndListen(
141 Class* obj,
142 std::string_view name,
143 void (Class::*func)(Args...),
144 UpdaterFunc&& updater
145 ) {
146 return DoUpdateAndListen(
147 FunctionId(obj),
148 name,
149 [obj, func](Args... args) { (obj->*func)(args...); },
150 std::forward<UpdaterFunc>(updater)
151 );
152 }
153
154 /// Send the next event and wait until all the listeners process it.
155 ///
156 /// Strict FIFO serialization is guaranteed, i.e. only after this event is
157 /// processed a new event may be delivered for the subscribers, same
158 /// listener/subscriber is never called concurrently.
159 void SendEvent(Args... args) const {
160 struct Task {
161 std::shared_ptr<const Listener> listener;
162 engine::TaskWithResult<void> task;
163 };
164 std::vector<Task> tasks;
165
166 {
167 // Try to obtain unique lock for event_mutex_ to serialize
168 // calls to SendEvent()
169 event_mutex_.lock();
170
171 // Now downgrade the lock to shared to allow new subscriptions
172 event_mutex_.unlock_and_lock_shared();
173
174 // Now we want to create N subtasks for callbacks,
175 // which must hold event_mutex_'s std::shared_lock.
176 // A naive implementation would create std::shared_lock{event_mutex_} for each subtask,
177 // however, it might deadlock if any parallel SendEvent() is called and is blocked on
178 // event_mutex_.lock(). It happens due to strict prioritization of writers above readers
179 // in SharedMutex: if there is any pending writer, nobody may lock the mutex for read.
180
181 // To avoid std::bad_alloc and leaked mutex shared lock, do the following:
182 // 1) catch the lock into RAII std::shared_lock...
183 std::shared_lock<engine::SharedMutex> tmp_lock{event_mutex_, std::adopt_lock};
184 // 2) ...and move it into std::shared_ptr
185 auto lock = std::make_shared<std::shared_lock<engine::SharedMutex>>(std::move(tmp_lock));
186
187 auto data = data_.Lock();
188 auto& listeners = data->listeners;
189 tasks.reserve(listeners.size());
190
191 for (const auto& [_, listener] : listeners) {
192 tasks.push_back(Task{
193 listener, // an intentional copy
194 utils::Async(
195 listener->task_name,
196 [&, &callback = listener->callback, lock, sema_lock = std::shared_lock(listener->sema)] {
197 callback(args...);
198 }
199 ),
200 });
201 }
202 }
203 // Unlock data_ here because callbacks may subscribe to this
204
205 for (auto& task : tasks) {
206 impl::WaitForTask(task.listener->name, task.task);
207 }
208 }
209
210 /// @returns the name of this event channel
211 const std::string& Name() const noexcept { return name_; }
212
213private:
214 struct Listener final {
215 // 'sema' with data_.Lock() are used to synchronize removal 'Listener' from ListenersData::listeners
216 mutable engine::Semaphore sema;
217
218 std::string name;
219 Function callback;
220 std::string task_name;
221
222 Listener(std::string name, Function callback, std::string task_name)
223 : sema(1),
224 name(std::move(name)),
225 callback(std::move(callback)),
226 task_name(std::move(task_name))
227 {}
228 };
229
230 struct ListenersData final {
231 std::unordered_map<FunctionId, std::shared_ptr<const Listener>, FunctionId::Hash> listeners;
232 OnRemoveCallback on_listener_removal;
233 };
234
235 void RemoveListener(FunctionId id, UnsubscribingKind kind) noexcept final {
236 const engine::TaskCancellationBlocker blocker;
237 const std::shared_lock lock(event_mutex_);
238 std::shared_ptr<const Listener> listener;
239 OnRemoveCallback on_listener_removal;
240
241 {
242 auto data = data_.Lock();
243 auto& listeners = data->listeners;
244 const auto iter = listeners.find(id);
245
246 if (iter == listeners.end()) {
247 impl::ReportNotSubscribed(Name());
248 return;
249 }
250
251 listener = iter->second;
252
253 on_listener_removal = data->on_listener_removal;
254
255 listeners.erase(iter);
256
257 // Lock and unlock sema under data_.Lock(),
258 // now we're sure that SendEvent() will not trigger listener->callback()
259 (void)std::shared_lock(listener->sema);
260 }
261 // Unlock data_ here to be able to (un)subscribe to *this in listener->callback (in debug)
262 // without deadlock
263
264 if (kind == UnsubscribingKind::kAutomatic) {
265 if (!on_listener_removal) {
266 impl::ReportUnsubscribingAutomatically(name_, listener->name);
267 }
268
269 if constexpr (impl::kCheckSubscriptionUB) {
270 // Fake listener call to check
271 impl::CheckDataUsedByCallbackHasNotBeenDestroyedBeforeUnsubscribing(
272 on_listener_removal,
273 listener->callback,
274 name_,
275 listener->name
276 );
277 }
278 }
279 }
280
281 AsyncEventSubscriberScope DoAddListener(FunctionId id, std::string_view name, Function&& func) final {
282 UASSERT(id);
283
284 auto data = data_.Lock();
285 auto& listeners = data->listeners;
286 auto task_name = impl::MakeAsyncChannelName(name_, name);
287 const auto [iterator, success] = listeners.emplace(
288 id,
289 std::make_shared<const Listener>(std::string{name}, std::move(func), std::move(task_name))
290 );
291 if (!success) {
292 impl::ReportAlreadySubscribed(Name(), name);
293 }
294 return AsyncEventSubscriberScope(utils::impl::InternalTag{}, *this, id);
295 }
296
297 const std::string name_;
298 concurrent::Variable<ListenersData> data_;
299
300 // event_mutex_ is required only for event serialization,
301 // it doesn't protect any data. The mutex is unique locked
302 // for new event publishing, and is shared locked for calling callbacks.
303 // If any callback is working, no new event publishing is possible.
304 // It *is* possible to re-subscribe on async channel while another callback
305 // operates.
306 mutable engine::SharedMutex event_mutex_;
307};
308
309} // namespace concurrent
310
311USERVER_NAMESPACE_END