10#include <unordered_map>
14#include <userver/concurrent/async_event_source.hpp>
15#include <userver/concurrent/variable.hpp>
16#include <userver/engine/mutex.hpp>
17#include <userver/engine/task/cancel.hpp>
18#include <userver/engine/task/task_with_result.hpp>
19#include <userver/utils/assert.hpp>
20#include <userver/utils/async.hpp>
22USERVER_NAMESPACE_BEGIN
28void WaitForTask(std::string_view name, engine::TaskWithResult<
void>& task);
30[[noreturn]]
void ReportAlreadySubscribed(std::string_view channel_name, std::string_view listener_name);
32void ReportNotSubscribed(std::string_view channel_name)
noexcept;
34void ReportUnsubscribingAutomatically(std::string_view channel_name, std::string_view listener_name)
noexcept;
36void ReportErrorWhileUnsubscribing(
37 std::string_view channel_name,
38 std::string_view listener_name,
39 std::string_view error
42std::string MakeAsyncChannelName(std::string_view base, std::string_view name);
44inline constexpr bool kCheckSubscriptionUB = utils::impl::kEnableAssert;
51template <
typename Func>
52void CheckDataUsedByCallbackHasNotBeenDestroyedBeforeUnsubscribing(
53 std::function<
void(Func&)>& on_listener_removal,
55 std::string_view channel_name,
56 std::string_view listener_name
58 if (!on_listener_removal)
return;
60 on_listener_removal(listener_func);
61 }
catch (
const std::exception& e) {
62 ReportErrorWhileUnsubscribing(channel_name, listener_name, e.what());
76template <
typename... Args>
79 using Function =
typename AsyncEventSource<Args...>::Function;
80 using OnRemoveCallback = std::function<
void(Function&)>;
84 explicit AsyncEventChannel(std::string_view name) : name_(name), data_(ListenersData{{}, {}}) {}
105 : name_(name), data_(ListenersData{{}, std::move(on_listener_removal)}) {}
118 template <
typename UpdaterFunc>
119 AsyncEventSubscriberScope
120 DoUpdateAndListen(FunctionId id, std::string_view name, Function&& func, UpdaterFunc&& updater) {
121 const std::lock_guard lock(event_mutex_);
122 std::forward<UpdaterFunc>(updater)();
123 return DoAddListener(id, name, std::move(func));
127 template <
typename Class,
typename UpdaterFunc>
128 AsyncEventSubscriberScope
129 DoUpdateAndListen(Class* obj, std::string_view name,
void (Class::*func)(Args...), UpdaterFunc&& updater) {
130 return DoUpdateAndListen(
133 [obj, func](Args... args) { (obj->*func)(args...); },
134 std::forward<UpdaterFunc>(updater)
144 const std::lock_guard lock(event_mutex_);
145 auto data = data_.Lock();
146 auto& listeners = data->listeners;
148 std::vector<engine::TaskWithResult<
void>> tasks;
149 tasks.reserve(listeners.size());
151 for (
const auto& [_, listener] : listeners) {
152 tasks.push_back(utils::Async(listener.task_name, [&, &callback = listener.callback] { callback(args...); })
157 for (
const auto& [_, listener] : listeners) {
158 impl::WaitForTask(listener.name, tasks[i++]);
163 const std::string&
Name()
const noexcept {
return name_; }
166 struct Listener
final {
169 std::string task_name;
172 struct ListenersData
final {
173 std::unordered_map<FunctionId, Listener, FunctionId::Hash> listeners;
174 OnRemoveCallback on_listener_removal;
177 void RemoveListener(FunctionId id, UnsubscribingKind kind)
noexcept final {
178 const engine::TaskCancellationBlocker blocker;
179 auto data = data_.Lock();
180 auto& listeners = data->listeners;
181 const auto iter = listeners.find(id);
183 if (iter == listeners.end()) {
184 impl::ReportNotSubscribed(
Name());
188 if (kind == UnsubscribingKind::kAutomatic) {
189 if (!data->on_listener_removal) {
190 impl::ReportUnsubscribingAutomatically(name_, iter->second.name);
193 if constexpr (impl::kCheckSubscriptionUB) {
195 impl::CheckDataUsedByCallbackHasNotBeenDestroyedBeforeUnsubscribing(
196 data->on_listener_removal, iter->second.callback, name_, iter->second.name
200 listeners.erase(iter);
203 AsyncEventSubscriberScope DoAddListener(FunctionId id, std::string_view name, Function&& func)
final {
204 auto data = data_.Lock();
205 auto& listeners = data->listeners;
206 auto task_name = impl::MakeAsyncChannelName(name_, name);
207 const auto [iterator, success] =
208 listeners.emplace(id, Listener{std::string{name}, std::move(func), std::move(task_name)});
209 if (!success) impl::ReportAlreadySubscribed(
Name(), name);
210 return AsyncEventSubscriberScope(utils::impl::InternalTag{}, *
this, id);
213 const std::string name_;
214 concurrent::Variable<ListenersData> data_;
215 mutable engine::Mutex event_mutex_;