79 using Function =
typename AsyncEventSource<Args...>::Function;
80 using OnRemoveCallback = std::function<
void(Function&)>;
84 explicit AsyncEventChannel(std::string_view name) : name_(name), data_(ListenersData{{}, {}}) {}
105 : name_(name), data_(ListenersData{{}, std::move(on_listener_removal)}) {}
118 template <
typename UpdaterFunc>
119 AsyncEventSubscriberScope
120 DoUpdateAndListen(FunctionId id, std::string_view name, Function&& func, UpdaterFunc&& updater) {
121 std::lock_guard lock(event_mutex_);
122 std::forward<UpdaterFunc>(updater)();
123 return DoAddListener(id, name, std::move(func));
127 template <
typename Class,
typename UpdaterFunc>
128 AsyncEventSubscriberScope
129 DoUpdateAndListen(Class* obj, std::string_view name,
void (Class::*func)(Args...), UpdaterFunc&& updater) {
130 return DoUpdateAndListen(
133 [obj, func](Args... args) { (obj->*func)(args...); },
134 std::forward<UpdaterFunc>(updater)
129 DoUpdateAndListen(Class* obj, std::string_view name,
void (Class::*func)(Args...), UpdaterFunc&& updater) {
…}
144 std::lock_guard lock(event_mutex_);
145 auto data = data_.Lock();
146 auto& listeners = data->listeners;
149 tasks.reserve(listeners.size());
151 for (
const auto& [_, listener] : listeners) {
152 tasks.push_back(
utils::Async(listener.task_name, [&, &callback = listener.callback] { callback(args...); })
157 for (
const auto& [_, listener] : listeners) {
158 impl::WaitForTask(listener.name, tasks[i++]);
163 const std::string&
Name()
const noexcept {
return name_; }
166 struct Listener
final {
169 std::string task_name;
172 struct ListenersData
final {
173 std::unordered_map<FunctionId, Listener, FunctionId::Hash> listeners;
174 OnRemoveCallback on_listener_removal;
177 void RemoveListener(FunctionId id, UnsubscribingKind kind)
noexcept final {
178 engine::TaskCancellationBlocker blocker;
179 auto data = data_.Lock();
180 auto& listeners = data->listeners;
181 const auto iter = listeners.find(id);
183 if (iter == listeners.end()) {
184 impl::ReportNotSubscribed(
Name());
188 if (kind == UnsubscribingKind::kAutomatic) {
189 if (!data->on_listener_removal) {
190 impl::ReportUnsubscribingAutomatically(name_, iter->second.name);
193 if constexpr (impl::kCheckSubscriptionUB) {
195 impl::CheckDataUsedByCallbackHasNotBeenDestroyedBeforeUnsubscribing(
196 data->on_listener_removal, iter->second.callback, name_, iter->second.name
200 listeners.erase(iter);
203 AsyncEventSubscriberScope DoAddListener(FunctionId id, std::string_view name, Function&& func)
final {
204 auto data = data_.Lock();
205 auto& listeners = data->listeners;
206 auto task_name = impl::MakeAsyncChannelName(name_, name);
207 const auto [iterator, success] =
208 listeners.emplace(id, Listener{std::string{name}, std::move(func), std::move(task_name)});
209 if (!success) impl::ReportAlreadySubscribed(
Name(), name);
210 return AsyncEventSubscriberScope(
utils::impl::InternalTag{}, *
this, id);
213 const std::string name_;
215 mutable engine::Mutex event_mutex_;