82 using Function =
typename AsyncEventSource<Args...>::Function;
83 using OnRemoveCallback = std::function<
void(
const Function&)>;
89 data_(ListenersData{{}, {}})
112 data_(ListenersData{{}, std::move(on_listener_removal)})
126 template <
typename UpdaterFunc>
129 std::string_view name,
131 UpdaterFunc&& updater
133 const std::shared_lock lock(event_mutex_);
134 std::forward<UpdaterFunc>(updater)();
135 return DoAddListener(id, name, std::move(func));
139 template <
typename Class,
typename UpdaterFunc>
142 std::string_view name,
143 void (Class::*func)(Args...),
144 UpdaterFunc&& updater
146 return DoUpdateAndListen(
149 [obj, func](Args... args) { (obj->*func)(args...); },
150 std::forward<UpdaterFunc>(updater)
161 std::shared_ptr<
const Listener> listener;
164 std::vector<Task> tasks;
183 std::shared_lock<
engine::SharedMutex> tmp_lock{event_mutex_, std::adopt_lock};
185 auto lock = std::make_shared<std::shared_lock<
engine::SharedMutex>>(std::move(tmp_lock));
187 auto data = data_.Lock();
188 auto& listeners = data->listeners;
189 tasks.reserve(listeners.size());
191 for (
const auto& [_, listener] : listeners) {
192 tasks.push_back(Task{
196 [&, &callback = listener->callback, lock, sema_lock = std::shared_lock(listener->sema)] {
205 for (
auto& task : tasks) {
206 impl::WaitForTask(task.listener->name, task.task);
211 const std::string&
Name()
const noexcept {
return name_; }
214 struct Listener
final {
216 mutable engine::Semaphore sema;
220 std::string task_name;
222 Listener(std::string name, Function callback, std::string task_name)
224 name(std::move(name)),
225 callback(std::move(callback)),
226 task_name(std::move(task_name))
230 struct ListenersData
final {
231 std::unordered_map<FunctionId, std::shared_ptr<
const Listener>, FunctionId::Hash> listeners;
232 OnRemoveCallback on_listener_removal;
235 void RemoveListener(FunctionId id, UnsubscribingKind kind)
noexcept final {
236 const engine::TaskCancellationBlocker blocker;
237 const std::shared_lock lock(event_mutex_);
238 std::shared_ptr<
const Listener> listener;
239 OnRemoveCallback on_listener_removal;
242 auto data = data_.Lock();
243 auto& listeners = data->listeners;
244 const auto iter = listeners.find(id);
246 if (iter == listeners.end()) {
247 impl::ReportNotSubscribed(
Name());
251 listener = iter->second;
253 on_listener_removal = data->on_listener_removal;
255 listeners.erase(iter);
259 (
void)std::shared_lock(listener->sema);
264 if (kind == UnsubscribingKind::kAutomatic) {
265 if (!on_listener_removal) {
266 impl::ReportUnsubscribingAutomatically(name_, listener->name);
269 if constexpr (impl::kCheckSubscriptionUB) {
271 impl::CheckDataUsedByCallbackHasNotBeenDestroyedBeforeUnsubscribing(
281 AsyncEventSubscriberScope DoAddListener(FunctionId id, std::string_view name, Function&& func)
final {
284 auto data = data_.Lock();
285 auto& listeners = data->listeners;
286 auto task_name = impl::MakeAsyncChannelName(name_, name);
287 const auto [iterator, success] = listeners.emplace(
289 std::make_shared<
const Listener>(std::string{name}, std::move(func), std::move(task_name))
292 impl::ReportAlreadySubscribed(
Name(), name);
294 return AsyncEventSubscriberScope(utils::
impl::InternalTag{}, *
this, id);
297 const std::string name_;
306 mutable engine::SharedMutex event_mutex_;