10#include <unordered_map>
14#include <userver/concurrent/async_event_source.hpp>
15#include <userver/concurrent/variable.hpp>
16#include <userver/engine/semaphore.hpp>
17#include <userver/engine/shared_mutex.hpp>
18#include <userver/engine/task/cancel.hpp>
19#include <userver/engine/task/task_with_result.hpp>
20#include <userver/utils/assert.hpp>
21#include <userver/utils/async.hpp>
23USERVER_NAMESPACE_BEGIN
29void WaitForTask(std::string_view name, engine::TaskWithResult<
void>& task);
31[[noreturn]]
void ReportAlreadySubscribed(std::string_view channel_name, std::string_view listener_name);
33void ReportNotSubscribed(std::string_view channel_name)
noexcept;
35void ReportUnsubscribingAutomatically(std::string_view channel_name, std::string_view listener_name)
noexcept;
37void ReportErrorWhileUnsubscribing(
38 std::string_view channel_name,
39 std::string_view listener_name,
40 std::string_view error
43std::string MakeAsyncChannelName(std::string_view base, std::string_view name);
45inline constexpr bool kCheckSubscriptionUB = utils::impl::kEnableAssert;
52template <
typename Func>
53void CheckDataUsedByCallbackHasNotBeenDestroyedBeforeUnsubscribing(
54 std::function<
void(
const Func&)>& on_listener_removal,
55 const Func& listener_func,
56 std::string_view channel_name,
57 std::string_view listener_name
59 if (!on_listener_removal) {
63 on_listener_removal(listener_func);
64 }
catch (
const std::exception& e) {
65 ReportErrorWhileUnsubscribing(channel_name, listener_name, e.what());
79template <
typename... Args>
82 using Function =
typename AsyncEventSource<Args...>::Function;
83 using OnRemoveCallback = std::function<
void(
const Function&)>;
89 data_(ListenersData{{}, {}})
112 data_(ListenersData{{}, std::move(on_listener_removal)})
126 template <
typename UpdaterFunc>
129 std::string_view name,
131 UpdaterFunc&& updater
133 const std::shared_lock lock(event_mutex_);
134 std::forward<UpdaterFunc>(updater)();
135 return DoAddListener(id, name, std::move(func));
139 template <
typename Class,
typename UpdaterFunc>
142 std::string_view name,
143 void (Class::*func)(Args...),
144 UpdaterFunc&& updater
146 return DoUpdateAndListen(
149 [obj, func](Args... args) { (obj->*func)(args...); },
150 std::forward<UpdaterFunc>(updater)
161 std::shared_ptr<
const Listener> listener;
162 engine::TaskWithResult<
void> task;
164 std::vector<Task> tasks;
172 event_mutex_.unlock_and_lock_shared();
183 std::shared_lock<engine::SharedMutex> tmp_lock{event_mutex_, std::adopt_lock};
185 auto lock = std::make_shared<std::shared_lock<engine::SharedMutex>>(std::move(tmp_lock));
187 auto data = data_.Lock();
188 auto& listeners = data->listeners;
189 tasks.reserve(listeners.size());
191 for (
const auto& [_, listener] : listeners) {
192 tasks.push_back(Task{
196 [&, &callback = listener->callback, lock, sema_lock = std::shared_lock(listener->sema)] {
205 for (
auto& task : tasks) {
206 impl::WaitForTask(task.listener->name, task.task);
211 const std::string&
Name()
const noexcept {
return name_; }
214 struct Listener
final {
216 mutable engine::Semaphore sema;
220 std::string task_name;
222 Listener(std::string name, Function callback, std::string task_name)
224 name(std::move(name)),
225 callback(std::move(callback)),
226 task_name(std::move(task_name))
230 struct ListenersData
final {
231 std::unordered_map<FunctionId, std::shared_ptr<
const Listener>, FunctionId::Hash> listeners;
232 OnRemoveCallback on_listener_removal;
235 void RemoveListener(FunctionId id, UnsubscribingKind kind)
noexcept final {
236 const engine::TaskCancellationBlocker blocker;
237 const std::shared_lock lock(event_mutex_);
238 std::shared_ptr<
const Listener> listener;
239 OnRemoveCallback on_listener_removal;
242 auto data = data_.Lock();
243 auto& listeners = data->listeners;
244 const auto iter = listeners.find(id);
246 if (iter == listeners.end()) {
247 impl::ReportNotSubscribed(
Name());
251 listener = iter->second;
253 on_listener_removal = data->on_listener_removal;
255 listeners.erase(iter);
259 (
void)std::shared_lock(listener->sema);
264 if (kind == UnsubscribingKind::kAutomatic) {
265 if (!on_listener_removal) {
266 impl::ReportUnsubscribingAutomatically(name_, listener->name);
269 if constexpr (impl::kCheckSubscriptionUB) {
271 impl::CheckDataUsedByCallbackHasNotBeenDestroyedBeforeUnsubscribing(
281 AsyncEventSubscriberScope DoAddListener(FunctionId id, std::string_view name, Function&& func)
final {
284 auto data = data_.Lock();
285 auto& listeners = data->listeners;
286 auto task_name = impl::MakeAsyncChannelName(name_, name);
287 const auto [iterator, success] = listeners.emplace(
289 std::make_shared<
const Listener>(std::string{name}, std::move(func), std::move(task_name))
292 impl::ReportAlreadySubscribed(
Name(), name);
294 return AsyncEventSubscriberScope(utils::impl::InternalTag{}, *
this, id);
297 const std::string name_;
298 concurrent::Variable<ListenersData> data_;
306 mutable engine::SharedMutex event_mutex_;