userver: userver/concurrent/async_event_source.hpp Source File
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
async_event_source.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/concurrent/async_event_source.hpp
4/// @brief @copybrief concurrent::AsyncEventSource
5
6#include <cstddef>
7#include <functional>
8#include <string_view>
9#include <typeinfo>
10#include <utility>
11
12#include <userver/utils/impl/internal_tag.hpp>
13
14USERVER_NAMESPACE_BEGIN
15
16namespace concurrent {
17
18class FunctionId;
19enum class UnsubscribingKind;
20
21template <typename... Args>
22class AsyncEventSource;
23
24namespace impl {
25
26class AsyncEventSourceBase {
27public:
28 virtual void RemoveListener(FunctionId, UnsubscribingKind) noexcept = 0;
29
30protected:
31 // disallow deletion via pointer to base
32 ~AsyncEventSourceBase();
33};
34
35} // namespace impl
36
37/// Uniquely identifies a subscription (usually the callback method owner) for
38/// AsyncEventSource
39class FunctionId final {
40public:
41 constexpr FunctionId() = default;
42
43 template <typename Class>
44 explicit FunctionId(Class* obj) : FunctionId(obj, typeid(Class)) {}
45
46 explicit operator bool() const;
47
48 bool operator==(const FunctionId& other) const;
49
50 struct Hash final {
51 std::size_t operator()(FunctionId id) const noexcept;
52 };
53
54private:
55 FunctionId(void* ptr, const std::type_info& type);
56
57 void* ptr_{nullptr};
58 const std::type_info* type_{nullptr};
59};
60
61enum class UnsubscribingKind { kManual, kAutomatic };
62
63/// @brief Manages the subscription to events from an AsyncEventSource
64///
65/// Removes the associated listener automatically on destruction.
66///
67/// The Scope is usually placed as a member in the subscribing object.
68/// `Unsubscribe` should be called manually in the objects destructor, before
69/// anything that the callback needs is destroyed.
70class [[nodiscard]] AsyncEventSubscriberScope final {
71public:
72 AsyncEventSubscriberScope() = default;
73
74 AsyncEventSubscriberScope(AsyncEventSubscriberScope&& scope) noexcept;
75 AsyncEventSubscriberScope& operator=(AsyncEventSubscriberScope&& other) noexcept;
76 ~AsyncEventSubscriberScope();
77
78 /// Unsubscribes manually. The subscription should be cancelled before
79 /// anything that the callback needs is destroyed.
80 void Unsubscribe() noexcept;
81
82 /// @cond
83 // For internal use only.
84 template <typename... Args>
85 AsyncEventSubscriberScope(utils::impl::InternalTag, AsyncEventSource<Args...>& channel, FunctionId id)
86 : AsyncEventSubscriberScope(static_cast<impl::AsyncEventSourceBase&>(channel), id) {}
87 /// @endcond
88
89private:
90 AsyncEventSubscriberScope(impl::AsyncEventSourceBase& channel, FunctionId id);
91
92 impl::AsyncEventSourceBase* channel_{nullptr};
93 FunctionId id_;
94};
95
96/// @ingroup userver_concurrency
97///
98/// @brief The read-only side of an event channel. Events are delivered to
99/// listeners in a strict FIFO order, i.e. only after the event was processed
100/// a new event may appear for processing, same listener is never called
101/// concurrently.
102template <typename... Args>
103class AsyncEventSource : public impl::AsyncEventSourceBase {
104public:
105 using Function = std::function<void(Args... args)>;
106
107 virtual ~AsyncEventSource() = default;
108
109 /// @brief Subscribes to updates from this event source
110 ///
111 /// The listener won't be called immediately. To process the current value and
112 /// then listen to updates, use `UpdateAndListen` of specific event channels.
113 ///
114 /// @warning Listeners should not be added or removed while processing the
115 /// event inside another listener.
116 ///
117 /// Example usage:
118 /// @snippet concurrent/async_event_channel_test.cpp AddListener sample
119 ///
120 /// @param obj the subscriber, which is the owner of the listener method, and
121 /// is also used as the unique identifier of the subscription for this
122 /// AsyncEventSource
123 /// @param name the name of the subscriber, for diagnostic purposes
124 /// @param func the listener method, usually called `On<DataName>Update`, e.g.
125 /// `OnConfigUpdate` or `OnCacheUpdate`
126 /// @returns a AsyncEventSubscriberScope controlling the subscription, which
127 /// should be stored as a member in the subscriber; `Unsubscribe` should be
128 /// called explicitly
129 template <class Class>
130 AsyncEventSubscriberScope AddListener(Class* obj, std::string_view name, void (Class::*func)(Args...)) {
131 return AddListener(FunctionId(obj), name, [obj, func](Args... args) { (obj->*func)(args...); });
132 }
133
134 /// @brief A type-erased version of AddListener
135 ///
136 /// @param id the unique identifier of the subscription
137 /// @param name the name of the subscriber, for diagnostic purposes
138 /// @param func the callback used for event notifications
139 AsyncEventSubscriberScope AddListener(FunctionId id, std::string_view name, Function&& func) {
140 return DoAddListener(id, name, std::move(func));
141 }
142
143 /// Used by AsyncEventSubscriberScope to cancel an event subscription
144 void RemoveListener(FunctionId, UnsubscribingKind) noexcept override = 0;
145
146protected:
147 virtual AsyncEventSubscriberScope DoAddListener(FunctionId id, std::string_view name, Function&& func) = 0;
148};
149
150} // namespace concurrent
151
152USERVER_NAMESPACE_END