10#include <unordered_map> 
   14#include <userver/concurrent/async_event_source.hpp> 
   15#include <userver/concurrent/variable.hpp> 
   16#include <userver/engine/mutex.hpp> 
   17#include <userver/engine/task/cancel.hpp> 
   18#include <userver/engine/task/task_with_result.hpp> 
   19#include <userver/utils/assert.hpp> 
   20#include <userver/utils/async.hpp> 
   22USERVER_NAMESPACE_BEGIN
 
   28void WaitForTask(std::string_view name, engine::
TaskWithResult<
void>& task);
 
   30[[noreturn]] 
void ReportAlreadySubscribed(std::string_view channel_name,
 
   31                                          std::string_view listener_name);
 
   33void ReportNotSubscribed(std::string_view channel_name) 
noexcept;
 
   35void ReportUnsubscribingAutomatically(std::string_view channel_name,
 
   36                                      std::string_view listener_name) 
noexcept;
 
   38void ReportErrorWhileUnsubscribing(std::string_view channel_name,
 
   39                                   std::string_view listener_name,
 
   40                                   std::string_view error) 
noexcept;
 
   42std::string MakeAsyncChannelName(std::string_view base, std::string_view name);
 
   44inline constexpr bool kCheckSubscriptionUB = utils::impl::kEnableAssert;
 
   51template <
typename Func>
 
   52void CheckDataUsedByCallbackHasNotBeenDestroyedBeforeUnsubscribing(
 
   53    std::function<
void(Func&)>& on_listener_removal, Func& listener_func,
 
   54    std::string_view channel_name, std::string_view listener_name) 
noexcept {
 
   55  if (!on_listener_removal) 
return;
 
   57    on_listener_removal(listener_func);
 
   58  } 
catch (
const std::exception& e) {
 
   59    ReportErrorWhileUnsubscribing(channel_name, listener_name, e.what());
 
   73template <
typename... Args>
 
   76  using Function = 
typename AsyncEventSource<Args...>::Function;
 
   77  using OnRemoveCallback = std::function<
void(Function&)>;
 
  103      : name_(std::move(name)),
 
  112  template <
typename UpdaterFunc>
 
  114                                              std::string_view name,
 
  116                                              UpdaterFunc&& updater) {
 
  117    std::lock_guard lock(event_mutex_);
 
  118    std::forward<UpdaterFunc>(updater)();
 
  119    return DoAddListener(id, name, std::move(func));
 
  123  template <
typename Class, 
typename UpdaterFunc>
 
  125                                              void (Class::*func)(Args...),
 
  126                                              UpdaterFunc&& updater) {
 
  127    return DoUpdateAndListen(
 
  128        FunctionId(obj), name,
 
  129        [obj, func](Args... args) { (obj->*func)(args...); },
 
  130        std::forward<UpdaterFunc>(updater));
 
  139    std::lock_guard lock(event_mutex_);
 
  140    auto data = data_.Lock();
 
  141    auto& listeners = data->listeners;
 
  143    std::vector<engine::TaskWithResult<
void>> tasks;
 
  144    tasks.reserve(listeners.size());
 
  146    for (
const auto& [_, listener] : listeners) {
 
  147      tasks.push_back(utils::Async(
 
  149          [&, &callback = listener.callback] { callback(args...); }));
 
  153    for (
const auto& [_, listener] : listeners) {
 
  154      impl::WaitForTask(listener.name, tasks[i++]);
 
  159  const std::string& 
Name() 
const noexcept { 
return name_; }
 
  162  struct Listener 
final {
 
  165    std::string task_name;
 
  168  struct ListenersData 
final {
 
  169    std::unordered_map<FunctionId, Listener, FunctionId::Hash> listeners;
 
  170    OnRemoveCallback on_listener_removal;
 
  173  void RemoveListener(FunctionId id, UnsubscribingKind kind) 
noexcept final {
 
  174    engine::TaskCancellationBlocker blocker;
 
  175    auto data = data_.Lock();
 
  176    auto& listeners = data->listeners;
 
  177    const auto iter = listeners.find(id);
 
  179    if (iter == listeners.end()) {
 
  180      impl::ReportNotSubscribed(Name());
 
  184    if (kind == UnsubscribingKind::kAutomatic) {
 
  185      if (!data->on_listener_removal) {
 
  186        impl::ReportUnsubscribingAutomatically(name_, iter->second.name);
 
  189      if constexpr (impl::kCheckSubscriptionUB) {
 
  191        impl::CheckDataUsedByCallbackHasNotBeenDestroyedBeforeUnsubscribing(
 
  192            data->on_listener_removal, iter->second.callback, name_,
 
  196    listeners.erase(iter);
 
  199  AsyncEventSubscriberScope DoAddListener(FunctionId id, std::string_view name,
 
  200                                          Function&& func) 
final {
 
  201    auto data = data_.Lock();
 
  202    auto& listeners = data->listeners;
 
  203    auto task_name = impl::MakeAsyncChannelName(name_, name);
 
  204    const auto [iterator, success] = listeners.emplace(
 
  205        id, Listener{std::string{name}, std::move(func), std::move(task_name)});
 
  206    if (!success) impl::ReportAlreadySubscribed(Name(), name);
 
  207    return AsyncEventSubscriberScope(*
this, id);
 
  210  const std::string name_;
 
  211  concurrent::Variable<ListenersData> data_;
 
  212  mutable engine::Mutex event_mutex_;