userver: userver/concurrent/async_event_channel.hpp Source File
Loading...
Searching...
No Matches
async_event_channel.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/concurrent/async_event_channel.hpp
4/// @brief @copybrief concurrent::AsyncEventChannel
5
6#include <functional>
7#include <string>
8#include <string_view>
9#include <typeindex>
10#include <unordered_map>
11#include <utility>
12#include <vector>
13
14#include <userver/concurrent/async_event_source.hpp>
15#include <userver/concurrent/variable.hpp>
16#include <userver/engine/shared_mutex.hpp>
17#include <userver/engine/task/cancel.hpp>
18#include <userver/engine/task/task_with_result.hpp>
19#include <userver/utils/assert.hpp>
20#include <userver/utils/async.hpp>
21
22USERVER_NAMESPACE_BEGIN
23
24namespace concurrent {
25
26namespace impl {
27
28void WaitForTask(std::string_view name, engine::TaskWithResult<void>& task);
29
30[[noreturn]] void ReportAlreadySubscribed(std::string_view channel_name, std::string_view listener_name);
31
32void ReportNotSubscribed(std::string_view channel_name) noexcept;
33
34void ReportUnsubscribingAutomatically(std::string_view channel_name, std::string_view listener_name) noexcept;
35
36void ReportErrorWhileUnsubscribing(
37 std::string_view channel_name,
38 std::string_view listener_name,
39 std::string_view error
40) noexcept;
41
42std::string MakeAsyncChannelName(std::string_view base, std::string_view name);
43
44inline constexpr bool kCheckSubscriptionUB = utils::impl::kEnableAssert;
45
46// During the `AsyncEventSubscriberScope::Unsubscribe` call or destruction of
47// `AsyncEventSubscriberScope`, all variables used by callback must be valid
48// (must not be destroyed). A common cause of crashes in this place: there is no
49// manual call to `Unsubscribe`. In this case check the declaration order of the
50// struct fields.
51template <typename Func>
52void CheckDataUsedByCallbackHasNotBeenDestroyedBeforeUnsubscribing(
53 std::function<void(const Func&)>& on_listener_removal,
54 const Func& listener_func,
55 std::string_view channel_name,
56 std::string_view listener_name
57) noexcept {
58 if (!on_listener_removal) return;
59 try {
60 on_listener_removal(listener_func);
61 } catch (const std::exception& e) {
62 ReportErrorWhileUnsubscribing(channel_name, listener_name, e.what());
63 }
64}
65
66} // namespace impl
67
68/// @ingroup userver_concurrency
69///
70/// AsyncEventChannel is an in-process pub-sub with strict FIFO serialization,
71/// i.e. only after the event was processed a new event may appear for
72/// processing, same listener is never called concurrently.
73///
74/// Example usage:
75/// @snippet concurrent/async_event_channel_test.cpp AsyncEventChannel sample
76template <typename... Args>
77class AsyncEventChannel : public AsyncEventSource<Args...> {
78public:
79 using Function = typename AsyncEventSource<Args...>::Function;
80 using OnRemoveCallback = std::function<void(const Function&)>;
81
82 /// @brief The primary constructor
83 /// @param name used for diagnostic purposes and is also accessible with Name
84 explicit AsyncEventChannel(std::string_view name) : name_(name), data_(ListenersData{{}, {}}) {}
85
86 /// @brief The constructor with `AsyncEventSubscriberScope` usage checking.
87 ///
88 /// The constructor with a callback that is called on listener removal. The
89 /// callback takes a reference to `Function` as input. This is useful for
90 /// checking the lifetime of data captured by the listener update function.
91 ///
92 /// @note Works only in debug mode.
93 ///
94 /// @warning Data captured by `on_listener_removal` function must be valid
95 /// until the `AsyncEventChannel` object is completely destroyed.
96 ///
97 /// Example usage:
98 /// @snippet concurrent/async_event_channel_test.cpp OnListenerRemoval sample
99 ///
100 /// @param name used for diagnostic purposes and is also accessible with Name
101 /// @param on_listener_removal the callback used for check
102 ///
103 /// @see impl::CheckDataUsedByCallbackHasNotBeenDestroyedBeforeUnsubscribing
104 AsyncEventChannel(std::string_view name, OnRemoveCallback on_listener_removal)
105 : name_(name), data_(ListenersData{{}, std::move(on_listener_removal)}) {}
106
107 /// @brief For use in `UpdateAndListen` of specific event channels
108 ///
109 /// Atomically calls `updater`, which should invoke `func` with the previously
110 /// sent event, and subscribes to new events as if using AddListener.
111 ///
112 /// @param id the subscriber class instance, see also a simpler `DoUpdateAndListen` overload below
113 /// @param name the name of the subscriber
114 /// @param func the callback that is called on each update
115 /// @param updater the initial `() -> void` callback that should call `func` with the current value
116 ///
117 /// @see AsyncEventSource::AddListener
118 template <typename UpdaterFunc>
119 AsyncEventSubscriberScope
120 DoUpdateAndListen(FunctionId id, std::string_view name, Function&& func, UpdaterFunc&& updater) {
121 const std::shared_lock lock(event_mutex_);
122 std::forward<UpdaterFunc>(updater)();
123 return DoAddListener(id, name, std::move(func));
124 }
125
126 /// @overload
127 template <typename Class, typename UpdaterFunc>
128 AsyncEventSubscriberScope
129 DoUpdateAndListen(Class* obj, std::string_view name, void (Class::*func)(Args...), UpdaterFunc&& updater) {
130 return DoUpdateAndListen(
131 FunctionId(obj),
132 name,
133 [obj, func](Args... args) { (obj->*func)(args...); },
134 std::forward<UpdaterFunc>(updater)
135 );
136 }
137
138 /// Send the next event and wait until all the listeners process it.
139 ///
140 /// Strict FIFO serialization is guaranteed, i.e. only after this event is
141 /// processed a new event may be delivered for the subscribers, same
142 /// listener/subscriber is never called concurrently.
143 void SendEvent(Args... args) const {
144 struct Task {
145 std::shared_ptr<const Listener> listener;
146 engine::TaskWithResult<void> task;
147 };
148 std::vector<Task> tasks;
149
150 {
151 // Try to obtain unique lock for event_mutex_ to serialize
152 // calls to SendEvent()
153 event_mutex_.lock();
154
155 // Now downgrade the lock to shared to allow new subscriptions
156 event_mutex_.unlock_and_lock_shared();
157
158 // Now we want to create N subtasks for callbacks,
159 // which must hold event_mutex_'s std::shared_lock.
160 // A naive implementation would create std::shared_lock{event_mutex_} for each subtask,
161 // however, it might deadlock if any parallel SendEvent() is called and is blocked on
162 // event_mutex_.lock(). It happens due to strict prioritization of writers above readers
163 // in SharedMutex: if there is any pending writer, nobody may lock the mutex for read.
164
165 // To avoid std::bad_alloc and leaked mutex shared lock, do the following:
166 // 1) catch the lock into RAII std::shared_lock...
167 std::shared_lock<engine::SharedMutex> tmp_lock{event_mutex_, std::adopt_lock};
168 // 2) ...and move it into std::shared_ptr
169 auto lock = std::make_shared<std::shared_lock<engine::SharedMutex>>(std::move(tmp_lock));
170
171 auto data = data_.Lock();
172 auto& listeners = data->listeners;
173 tasks.reserve(listeners.size());
174
175 for (const auto& [_, listener] : listeners) {
176 tasks.push_back(Task{
177 listener, // an intentional copy
178 utils::Async(listener->task_name, [&, &callback = listener->callback, lock] { callback(args...); }),
179 });
180 }
181 }
182 // Unlock data_ here because callbacks may subscribe to this
183
184 for (auto& task : tasks) {
185 impl::WaitForTask(task.listener->name, task.task);
186 }
187 }
188
189 /// @returns the name of this event channel
190 const std::string& Name() const noexcept { return name_; }
191
192private:
193 struct Listener final {
194 std::string name;
195 Function callback;
196 std::string task_name;
197 };
198
199 struct ListenersData final {
200 std::unordered_map<FunctionId, std::shared_ptr<const Listener>, FunctionId::Hash> listeners;
201 OnRemoveCallback on_listener_removal;
202 };
203
204 void RemoveListener(FunctionId id, UnsubscribingKind kind) noexcept final {
205 const engine::TaskCancellationBlocker blocker;
206 const std::shared_lock lock(event_mutex_);
207 std::shared_ptr<const Listener> listener;
208 OnRemoveCallback on_listener_removal;
209
210 {
211 auto data = data_.Lock();
212 auto& listeners = data->listeners;
213 const auto iter = listeners.find(id);
214
215 if (iter == listeners.end()) {
216 impl::ReportNotSubscribed(Name());
217 return;
218 }
219
220 listener = iter->second;
221 on_listener_removal = data->on_listener_removal;
222
223 listeners.erase(iter);
224 }
225 // Unlock data_ here to be able to (un)subscribe to *this in listener->callback (in debug)
226 // without deadlock
227
228 if (kind == UnsubscribingKind::kAutomatic) {
229 if (!on_listener_removal) {
230 impl::ReportUnsubscribingAutomatically(name_, listener->name);
231 }
232
233 if constexpr (impl::kCheckSubscriptionUB) {
234 // Fake listener call to check
235 impl::CheckDataUsedByCallbackHasNotBeenDestroyedBeforeUnsubscribing(
236 on_listener_removal, listener->callback, name_, listener->name
237 );
238 }
239 }
240 }
241
242 AsyncEventSubscriberScope DoAddListener(FunctionId id, std::string_view name, Function&& func) final {
243 UASSERT(id);
244
245 auto data = data_.Lock();
246 auto& listeners = data->listeners;
247 auto task_name = impl::MakeAsyncChannelName(name_, name);
248 const auto [iterator, success] = listeners.emplace(
249 id, std::make_shared<const Listener>(Listener{std::string{name}, std::move(func), std::move(task_name)})
250 );
251 if (!success) impl::ReportAlreadySubscribed(Name(), name);
252 return AsyncEventSubscriberScope(utils::impl::InternalTag{}, *this, id);
253 }
254
255 const std::string name_;
256 concurrent::Variable<ListenersData> data_;
257
258 // event_mutex_ is required only for event serialization,
259 // it doesn't protect any data. The mutex is unique locked
260 // for new event publishing, and is shared locked for calling callbacks.
261 // If any callback is working, no new event publishing is possible.
262 // It *is* possible to re-subscribe on async channel while another callback
263 // operates.
264 mutable engine::SharedMutex event_mutex_;
265};
266
267} // namespace concurrent
268
269USERVER_NAMESPACE_END