79 using Function =
typename AsyncEventSource<Args...>::Function;
80 using OnRemoveCallback = std::function<
void(
const Function&)>;
84 explicit AsyncEventChannel(std::string_view name) : name_(name), data_(ListenersData{{}, {}}) {}
105 : name_(name), data_(ListenersData{{}, std::move(on_listener_removal)}) {}
118 template <
typename UpdaterFunc>
119 AsyncEventSubscriberScope
120 DoUpdateAndListen(FunctionId id, std::string_view name, Function&& func, UpdaterFunc&& updater) {
121 const std::shared_lock lock(event_mutex_);
122 std::forward<UpdaterFunc>(updater)();
123 return DoAddListener(id, name, std::move(func));
127 template <
typename Class,
typename UpdaterFunc>
128 AsyncEventSubscriberScope
129 DoUpdateAndListen(Class* obj, std::string_view name,
void (Class::*func)(Args...), UpdaterFunc&& updater) {
130 return DoUpdateAndListen(
133 [obj, func](Args... args) { (obj->*func)(args...); },
134 std::forward<UpdaterFunc>(updater)
145 std::shared_ptr<
const Listener> listener;
148 std::vector<Task> tasks;
156 event_mutex_.unlock_and_lock_shared();
167 std::shared_lock<engine::SharedMutex> tmp_lock{event_mutex_, std::adopt_lock};
169 auto lock = std::make_shared<std::shared_lock<engine::SharedMutex>>(std::move(tmp_lock));
171 auto data = data_.Lock();
172 auto& listeners = data->listeners;
173 tasks.reserve(listeners.size());
175 for (
const auto& [_, listener] : listeners) {
176 tasks.push_back(Task{
178 utils::Async(listener->task_name, [&, &callback = listener->callback, lock] { callback(args...); }),
184 for (
auto& task : tasks) {
185 impl::WaitForTask(task.listener->name, task.task);
190 const std::string&
Name()
const noexcept {
return name_; }
193 struct Listener
final {
196 std::string task_name;
199 struct ListenersData
final {
200 std::unordered_map<FunctionId, std::shared_ptr<
const Listener>, FunctionId::Hash> listeners;
201 OnRemoveCallback on_listener_removal;
204 void RemoveListener(FunctionId id, UnsubscribingKind kind)
noexcept final {
205 const engine::TaskCancellationBlocker blocker;
206 const std::shared_lock lock(event_mutex_);
207 std::shared_ptr<
const Listener> listener;
208 OnRemoveCallback on_listener_removal;
211 auto data = data_.Lock();
212 auto& listeners = data->listeners;
213 const auto iter = listeners.find(id);
215 if (iter == listeners.end()) {
216 impl::ReportNotSubscribed(
Name());
220 listener = iter->second;
221 on_listener_removal = data->on_listener_removal;
223 listeners.erase(iter);
228 if (kind == UnsubscribingKind::kAutomatic) {
229 if (!on_listener_removal) {
230 impl::ReportUnsubscribingAutomatically(name_, listener->name);
233 if constexpr (impl::kCheckSubscriptionUB) {
235 impl::CheckDataUsedByCallbackHasNotBeenDestroyedBeforeUnsubscribing(
236 on_listener_removal, listener->callback, name_, listener->name
242 AsyncEventSubscriberScope DoAddListener(FunctionId id, std::string_view name, Function&& func)
final {
245 auto data = data_.Lock();
246 auto& listeners = data->listeners;
247 auto task_name = impl::MakeAsyncChannelName(name_, name);
248 const auto [iterator, success] = listeners.emplace(
249 id, std::make_shared<
const Listener>(Listener{std::string{name}, std::move(func), std::move(task_name)})
251 if (!success) impl::ReportAlreadySubscribed(
Name(), name);
252 return AsyncEventSubscriberScope(
utils::impl::InternalTag{}, *
this, id);
255 const std::string name_;
264 mutable engine::SharedMutex event_mutex_;