82 using Function =
typename AsyncEventSource<Args...>::Function;
83 using OnRemoveCallback = std::function<
void(
const Function&)>;
89 data_(ListenersData{{}, {}})
112 data_(ListenersData{{}, std::move(on_listener_removal)})
126 template <
typename UpdaterFunc>
129 std::string_view name,
131 UpdaterFunc&& updater
133 const std::shared_lock lock(event_mutex_);
134 std::forward<UpdaterFunc>(updater)();
135 return DoAddListener(id, name, std::move(func));
139 template <
typename Class,
typename UpdaterFunc>
142 std::string_view name,
143 void (Class::*func)(Args...),
144 UpdaterFunc&& updater
146 return DoUpdateAndListen(
149 [obj, func](Args... args) { (obj->*func)(args...); },
150 std::forward<UpdaterFunc>(updater)
161 std::shared_ptr<
const Listener> listener;
164 std::vector<Task> tasks;
174 std::shared_lock<
engine::SharedMutex> tmp_lock{event_mutex_, std::adopt_lock};
183 auto data = data_.Lock();
184 auto& listeners = data->listeners;
185 tasks.reserve(listeners.size());
187 for (
const auto& [_, listener] : listeners) {
188 tasks.push_back(Task{
192 [&, &callback = listener->callback, sema_lock = std::shared_lock(listener->sema)] {
201 for (
auto& task : tasks) {
202 impl::WaitForTask(task.listener->name, task.task);
207 const std::string&
Name()
const noexcept {
return name_; }
210 struct Listener
final {
212 mutable engine::Semaphore sema;
216 std::string task_name;
218 Listener(std::string name, Function callback, std::string task_name)
220 name(std::move(name)),
221 callback(std::move(callback)),
222 task_name(std::move(task_name))
226 struct ListenersData
final {
227 std::unordered_map<FunctionId, std::shared_ptr<
const Listener>, FunctionId::Hash> listeners;
228 OnRemoveCallback on_listener_removal;
231 void RemoveListener(FunctionId id, UnsubscribingKind kind)
noexcept final {
232 const engine::TaskCancellationBlocker blocker;
233 const std::shared_lock lock(event_mutex_);
234 std::shared_ptr<
const Listener> listener;
235 OnRemoveCallback on_listener_removal;
238 auto data = data_.Lock();
239 auto& listeners = data->listeners;
240 const auto iter = listeners.find(id);
242 if (iter == listeners.end()) {
243 impl::ReportNotSubscribed(
Name());
247 listener = iter->second;
249 on_listener_removal = data->on_listener_removal;
251 listeners.erase(iter);
255 (
void)std::shared_lock(listener->sema);
260 if (kind == UnsubscribingKind::kAutomatic) {
261 if (!on_listener_removal) {
262 impl::ReportUnsubscribingAutomatically(name_, listener->name);
265 if constexpr (impl::kCheckSubscriptionUB) {
267 impl::CheckDataUsedByCallbackHasNotBeenDestroyedBeforeUnsubscribing(
277 AsyncEventSubscriberScope DoAddListener(FunctionId id, std::string_view name, Function&& func)
final {
280 auto data = data_.Lock();
281 auto& listeners = data->listeners;
282 auto task_name = impl::MakeAsyncChannelName(name_, name);
283 const auto [iterator, success] = listeners.emplace(
285 std::make_shared<
const Listener>(std::string{name}, std::move(func), std::move(task_name))
288 impl::ReportAlreadySubscribed(
Name(), name);
290 return AsyncEventSubscriberScope(utils::
impl::InternalTag{}, *
this, id);
293 const std::string name_;
302 mutable engine::SharedMutex event_mutex_;