userver: userver/concurrent/async_event_channel.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
async_event_channel.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/concurrent/async_event_channel.hpp
4/// @brief @copybrief concurrent::AsyncEventChannel
5
6#include <functional>
7#include <string>
8#include <string_view>
9#include <typeindex>
10#include <unordered_map>
11#include <utility>
12#include <vector>
13
14#include <userver/concurrent/async_event_source.hpp>
15#include <userver/concurrent/variable.hpp>
16#include <userver/engine/mutex.hpp>
17#include <userver/engine/task/cancel.hpp>
18#include <userver/engine/task/task_with_result.hpp>
19#include <userver/utils/assert.hpp>
20#include <userver/utils/async.hpp>
21
22USERVER_NAMESPACE_BEGIN
23
24namespace concurrent {
25
26namespace impl {
27
28void WaitForTask(std::string_view name, engine::TaskWithResult<void>& task);
29
30[[noreturn]] void ReportAlreadySubscribed(std::string_view channel_name,
31 std::string_view listener_name);
32
33void ReportNotSubscribed(std::string_view channel_name) noexcept;
34
35void ReportUnsubscribingAutomatically(std::string_view channel_name,
36 std::string_view listener_name) noexcept;
37
38void ReportErrorWhileUnsubscribing(std::string_view channel_name,
39 std::string_view listener_name,
40 std::string_view error) noexcept;
41
42std::string MakeAsyncChannelName(std::string_view base, std::string_view name);
43
44inline constexpr bool kCheckSubscriptionUB = utils::impl::kEnableAssert;
45
46// During the `AsyncEventSubscriberScope::Unsubscribe` call or destruction of
47// `AsyncEventSubscriberScope`, all variables used by callback must be valid
48// (must not be destroyed). A common cause of crashes in this place: there is no
49// manual call to `Unsubscribe`. In this case check the declaration order of the
50// struct fields.
51template <typename Func>
52void CheckDataUsedByCallbackHasNotBeenDestroyedBeforeUnsubscribing(
53 std::function<void(Func&)>& on_listener_removal, Func& listener_func,
54 std::string_view channel_name, std::string_view listener_name) noexcept {
55 if (!on_listener_removal) return;
56 try {
57 on_listener_removal(listener_func);
58 } catch (const std::exception& e) {
59 ReportErrorWhileUnsubscribing(channel_name, listener_name, e.what());
60 }
61}
62
63} // namespace impl
64
65/// @ingroup userver_concurrency
66///
67/// AsyncEventChannel is an in-process pub-sub with strict FIFO serialization,
68/// i.e. only after the event was processed a new event may appear for
69/// processing, same listener is never called concurrently.
70///
71/// Example usage:
72/// @snippet concurrent/async_event_channel_test.cpp AsyncEventChannel sample
73template <typename... Args>
74class AsyncEventChannel : public AsyncEventSource<Args...> {
75 public:
76 using Function = typename AsyncEventSource<Args...>::Function;
77 using OnRemoveCallback = std::function<void(Function&)>;
78
79 /// @brief The primary constructor
80 /// @param name used for diagnostic purposes and is also accessible with Name
81 explicit AsyncEventChannel(std::string name)
82 : name_(std::move(name)), data_(ListenersData{{}, {}}) {}
83
84 /// @brief The constructor with `AsyncEventSubscriberScope` usage checking.
85 ///
86 /// The constructor with a callback that is called on listener removal. The
87 /// callback takes a reference to `Function' as input. This is useful for
88 /// checking the lifetime of data captured by the listener update function.
89 ///
90 /// @note Works only in debug mode.
91 ///
92 /// @warning Data captured by `on_listener_removal` function must be valid
93 /// until the `AsyncEventChannel` object is completely destroyed.
94 ///
95 /// Example usage:
96 /// @snippet concurrent/async_event_channel_test.cpp OnListenerRemoval sample
97 ///
98 /// @param name used for diagnostic purposes and is also accessible with Name
99 /// @param on_listener_removal the callback used for check
100 ///
101 /// @see impl::CheckDataUsedByCallbackHasNotBeenDestroyedBeforeUnsubscribing
102 AsyncEventChannel(std::string name, OnRemoveCallback on_listener_removal)
103 : name_(std::move(name)),
105
106 /// @brief For use in `UpdateAndListen` of specific event channels
107 ///
108 /// Atomically calls `updater`, which should invoke `func` with the previously
109 /// sent event, and subscribes to new events as if using AddListener.
110 ///
111 /// @see AsyncEventSource::AddListener
112 template <typename UpdaterFunc>
113 AsyncEventSubscriberScope DoUpdateAndListen(FunctionId id,
114 std::string_view name,
115 Function&& func,
116 UpdaterFunc&& updater) {
117 std::lock_guard lock(event_mutex_);
118 std::forward<UpdaterFunc>(updater)();
119 return DoAddListener(id, name, std::move(func));
120 }
121
122 /// @overload
123 template <typename Class, typename UpdaterFunc>
124 AsyncEventSubscriberScope DoUpdateAndListen(Class* obj, std::string_view name,
125 void (Class::*func)(Args...),
126 UpdaterFunc&& updater) {
127 return DoUpdateAndListen(
128 FunctionId(obj), name,
129 [obj, func](Args... args) { (obj->*func)(args...); },
130 std::forward<UpdaterFunc>(updater));
131 }
132
133 /// Send the next event and wait until all the listeners process it.
134 ///
135 /// Strict FIFO serialization is guaranteed, i.e. only after this event is
136 /// processed a new event may be delivered for the subscribers, same
137 /// listener/subscriber is never called concurrently.
138 void SendEvent(Args... args) const {
139 std::lock_guard lock(event_mutex_);
140 auto data = data_.Lock();
141 auto& listeners = data->listeners;
142
143 std::vector<engine::TaskWithResult<void>> tasks;
144 tasks.reserve(listeners.size());
145
146 for (const auto& [_, listener] : listeners) {
147 tasks.push_back(utils::Async(
148 listener.task_name,
149 [&, &callback = listener.callback] { callback(args...); }));
150 }
151
152 std::size_t i = 0;
153 for (const auto& [_, listener] : listeners) {
154 impl::WaitForTask(listener.name, tasks[i++]);
155 }
156 }
157
158 /// @returns the name of this event channel
159 const std::string& Name() const noexcept { return name_; }
160
161 private:
162 struct Listener final {
163 std::string name;
164 Function callback;
165 std::string task_name;
166 };
167
168 struct ListenersData final {
169 std::unordered_map<FunctionId, Listener, FunctionId::Hash> listeners;
170 OnRemoveCallback on_listener_removal;
171 };
172
173 void RemoveListener(FunctionId id, UnsubscribingKind kind) noexcept final {
174 engine::TaskCancellationBlocker blocker;
175 auto data = data_.Lock();
176 auto& listeners = data->listeners;
177 const auto iter = listeners.find(id);
178
179 if (iter == listeners.end()) {
180 impl::ReportNotSubscribed(Name());
181 return;
182 }
183
184 if (kind == UnsubscribingKind::kAutomatic) {
185 if (!data->on_listener_removal) {
186 impl::ReportUnsubscribingAutomatically(name_, iter->second.name);
187 }
188
189 if constexpr (impl::kCheckSubscriptionUB) {
190 // Fake listener call to check
191 impl::CheckDataUsedByCallbackHasNotBeenDestroyedBeforeUnsubscribing(
192 data->on_listener_removal, iter->second.callback, name_,
193 iter->second.name);
194 }
195 }
196 listeners.erase(iter);
197 }
198
199 AsyncEventSubscriberScope DoAddListener(FunctionId id, std::string_view name,
200 Function&& func) final {
201 auto data = data_.Lock();
202 auto& listeners = data->listeners;
203 auto task_name = impl::MakeAsyncChannelName(name_, name);
204 const auto [iterator, success] = listeners.emplace(
205 id, Listener{std::string{name}, std::move(func), std::move(task_name)});
206 if (!success) impl::ReportAlreadySubscribed(Name(), name);
207 return AsyncEventSubscriberScope(*this, id);
208 }
209
210 const std::string name_;
211 concurrent::Variable<ListenersData> data_;
212 mutable engine::Mutex event_mutex_;
213};
214
215} // namespace concurrent
216
217USERVER_NAMESPACE_END