userver: userver/storages/postgres/cluster.hpp Source File
Loading...
Searching...
No Matches
cluster.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/storages/postgres/cluster.hpp
4/// @brief @copybrief storages::postgres::Cluster
5
6#include <memory>
7
8#include <userver/clients/dns/resolver_fwd.hpp>
9#include <userver/dynamic_config/source.hpp>
10#include <userver/engine/task/task_processor_fwd.hpp>
11#include <userver/engine/task/task_with_result.hpp>
12#include <userver/error_injection/settings_fwd.hpp>
13#include <userver/testsuite/postgres_control.hpp>
14#include <userver/testsuite/tasks.hpp>
15
16#include <userver/storages/postgres/cluster_types.hpp>
17#include <userver/storages/postgres/database.hpp>
18#include <userver/storages/postgres/detail/non_transaction.hpp>
19#include <userver/storages/postgres/notify.hpp>
20#include <userver/storages/postgres/options.hpp>
21#include <userver/storages/postgres/query.hpp>
22#include <userver/storages/postgres/query_queue.hpp>
23#include <userver/storages/postgres/statistics.hpp>
24#include <userver/storages/postgres/transaction.hpp>
25
26USERVER_NAMESPACE_BEGIN
27
28namespace components {
29class Postgres;
30} // namespace components
31
32namespace storages::postgres {
33
34namespace detail {
35
36class ClusterImpl;
37using ClusterImplPtr = std::unique_ptr<ClusterImpl>;
38
39} // namespace detail
40
41/// @ingroup userver_clients
42///
43/// @brief Interface for executing queries on a cluster of PostgreSQL servers
44///
45/// See @ref scripts/docs/en/userver/pg/user_row_types.md "Typed PostgreSQL results" for usage examples of
46/// the storages::postgres::ResultSet.
47///
48/// Usually retrieved from components::Postgres component.
49///
50/// @todo Add information about topology discovery
51class Cluster {
52public:
53 /// Cluster constructor
54 /// @param dsns List of DSNs to connect to
55 /// @param resolver asynchronous DNS resolver
56 /// @param bg_task_processor task processor for blocking connection operations
57 /// @param cluster_settings struct with settings fields:
58 /// task_data_keys_settings - settings for per-handler command controls
59 /// topology_settings - settings for host discovery
60 /// pool_settings - settings for connection pools
61 /// conn_settings - settings for individual connections
62 /// @param default_cmd_ctls default command execution options
63 /// @param testsuite_pg_ctl command execution options customizer for testsuite
64 /// @param ei_settings error injection settings
65 /// @param testsuite_tasks see @ref testsuite::TestsuiteTasks
66 /// @param config_source see @ref dynamic_config::Source
67 /// @param metrics metrics storage for alerts
68 /// @param shard_number shard number
69 /// @note When `max_connection_pool_size` is reached, and no idle connections
70 /// available, `PoolError` is thrown for every new connection
71 /// request
73 DsnList dsns,
74 clients::dns::Resolver* resolver,
75 engine::TaskProcessor& bg_task_processor,
76 const ClusterSettings& cluster_settings,
77 DefaultCommandControls&& default_cmd_ctls,
78 const testsuite::PostgresControl& testsuite_pg_ctl,
79 const error_injection::Settings& ei_settings,
80 testsuite::TestsuiteTasks& testsuite_tasks,
81 dynamic_config::Source config_source,
82 USERVER_NAMESPACE::utils::statistics::MetricsStoragePtr metrics,
83 int shard_number
84 );
85 ~Cluster();
86
87 /// Get cluster statistics
88 ///
89 /// The statistics object is too big to fit on stack
90 ClusterStatisticsPtr GetStatistics() const;
91
92 /// @name Transaction start
93 /// @{
94
95 /// Start a transaction in any available connection depending on transaction
96 /// options.
97 ///
98 /// If the transaction is RW, will start transaction in a connection
99 /// to master. If the transaction is RO, will start trying connections
100 /// starting with slaves.
101 /// @throws ClusterUnavailable if no hosts are available
102 Transaction Begin(const TransactionOptions&, OptionalCommandControl = {});
103
104 /// Start a transaction in a connection with specified host selection rules.
105 ///
106 /// If the requested host role is not available, may fall back to another
107 /// host role, see ClusterHostType.
108 /// If the transaction is RW, only master connection can be used.
109 /// @throws ClusterUnavailable if no hosts are available
110 Transaction Begin(ClusterHostTypeFlags, const TransactionOptions&, OptionalCommandControl = {});
111
112 /// Start a named transaction in any available connection depending on
113 /// transaction options.
114 ///
115 /// If the transaction is RW, will start transaction in a connection
116 /// to master. If the transaction is RO, will start trying connections
117 /// starting with slaves.
118 /// `name` is used to set command control in config at runtime.
119 /// @throws ClusterUnavailable if no hosts are available
120 Transaction Begin(std::string name, const TransactionOptions&);
121
122 /// Start a named transaction in a connection with specified host selection
123 /// rules.
124 ///
125 /// If the requested host role is not available, may fall back to another
126 /// host role, see ClusterHostType.
127 /// If the transaction is RW, only master connection can be used.
128 /// `name` is used to set command control in config at runtime.
129 /// @throws ClusterUnavailable if no hosts are available
130 Transaction Begin(std::string name, ClusterHostTypeFlags, const TransactionOptions&);
131 /// @}
132
133 /// Start a query queue with specified host selection rules and timeout for
134 /// acquiring a connection.
135 [[nodiscard]] QueryQueue CreateQueryQueue(ClusterHostTypeFlags flags);
136
137 /// Start a query queue with specified host selection rules and timeout for
138 /// acquiring a connection.
139 [[nodiscard]] QueryQueue CreateQueryQueue(ClusterHostTypeFlags flags, TimeoutDuration acquire_timeout);
140
141 /// @name Single-statement query in an auto-commit transaction
142 /// @{
143
144 /// @brief Execute a statement at host of specified type.
145 /// @note You must specify at least one role from ClusterHostType here
146 /// @note You may write a query in `.sql` file and generate a header file with Query from it.
147 /// See @ref scripts/docs/en/userver/sql_files.md for more information.
148 ///
149 /// @snippet storages/postgres/tests/landing_test.cpp Exec sample
150 ///
151 /// @warning Do NOT create a query string manually by embedding arguments!
152 /// It leads to vulnerabilities and bad performance. Either pass arguments
153 /// separately, or use storages::postgres::ParameterScope.
154 template <typename... Args>
155 ResultSet Execute(ClusterHostTypeFlags, const Query& query, const Args&... args);
156
157 /// @brief Execute a statement with specified host selection rules and command
158 /// control settings.
159 /// @note You must specify at least one role from ClusterHostType here
160 /// @note You may write a query in `.sql` file and generate a header file with Query from it.
161 /// See @ref scripts/docs/en/userver/sql_files.md for more information.
162 ///
163 /// @warning Do NOT create a query string manually by embedding arguments!
164 /// It leads to vulnerabilities and bad performance. Either pass arguments
165 /// separately, or use storages::postgres::ParameterScope.
166 template <typename... Args>
167 ResultSet Execute(ClusterHostTypeFlags, OptionalCommandControl, const Query& query, const Args&... args);
168
169 /// @brief Execute a statement with stored arguments and specified host
170 /// selection rules.
171 /// @note You may write a query in `.sql` file and generate a header file with Query from it.
172 /// See @ref scripts/docs/en/userver/sql_files.md for more information.
173 ///
174 /// @warning Do NOT create a query string manually by embedding arguments!
175 /// It leads to vulnerabilities and bad performance. Either pass arguments
176 /// separately, or use storages::postgres::ParameterScope.
177 ResultSet Execute(ClusterHostTypeFlags flags, const Query& query, const ParameterStore& store);
178
179 /// @brief Execute a statement with stored arguments, specified host selection
180 /// rules and command control settings.
181 /// @note You may write a query in `.sql` file and generate a header file with Query from it.
182 /// See @ref scripts/docs/en/userver/sql_files.md for more information.
183 ///
184 /// @warning Do NOT create a query string manually by embedding arguments!
185 /// It leads to vulnerabilities and bad performance. Either pass arguments
186 /// separately, or use storages::postgres::ParameterScope.
188 ClusterHostTypeFlags flags,
189 OptionalCommandControl statement_cmd_ctl,
190 const Query& query,
191 const ParameterStore& store
192 );
193
194 /// @snippet storages/postgres/tests/landing_test.cpp ExecuteDecompose
195
196 /// @brief Execute statement, that uses an array of arguments transforming that array
197 /// into N arrays of corresponding fields and executing the statement
198 /// with these arrays values, at host of specified type.
199 /// Basically, a column-wise Execute.
200 ///
201 /// @note You must specify at least one role from ClusterHostType here
202 /// @note You may write a query in `.sql` file and generate a header file with Query from it.
203 /// See @ref scripts/docs/en/userver/sql_files.md for more information.
204 ///
205 /// @snippet storages/postgres/tests/landing_test.cpp ExecuteDecompose
206 ///
207 /// @warning Do NOT create a query string manually by embedding arguments!
208 /// It leads to vulnerabilities and bad performance. Either pass arguments
209 /// separately, or use storages::postgres::ParameterScope.
210 template <typename Container>
211 ResultSet ExecuteDecompose(ClusterHostTypeFlags flags, const Query& query, const Container& args);
212
213 /// @brief Execute statement, that uses an array of arguments transforming that array
214 /// into N arrays of corresponding fields and executing the statement
215 /// with these arrays values, with host selection rules and command
216 /// control settings.
217 /// Basically, a column-wise Execute.
218 ///
219 /// @note You must specify at least one role from ClusterHostType here
220 /// @note You may write a query in `.sql` file and generate a header file with Query from it.
221 /// See @ref scripts/docs/en/userver/sql_files.md for more information.
222 ///
223 /// @snippet storages/postgres/tests/arrays_pgtest.cpp ExecuteDecompose
224 ///
225 /// @warning Do NOT create a query string manually by embedding arguments!
226 /// It leads to vulnerabilities and bad performance. Either pass arguments
227 /// separately, or use storages::postgres::ParameterScope.
228 template <typename Container>
230 ClusterHostTypeFlags flags,
231 OptionalCommandControl statement_cmd_ctl,
232 const Query& query,
233 const Container& args
234 );
235 /// @}
236
237 /// @brief Listen for notifications on channel
238 /// @warning Each NotifyScope owns a single connection taken from the pool,
239 /// which effectively decreases the number of usable connections
240 NotifyScope Listen(std::string_view channel, OptionalCommandControl = {});
241
242 /// Replaces globally updated command control with a static user-provided one
244
245 /// Returns current default command control
247
248 void SetHandlersCommandControl(CommandControlByHandlerMap handlers_command_control);
249
250 void SetQueriesCommandControl(CommandControlByQueryMap queries_command_control);
251
252 /// @cond
253 /// Updates default command control from global config (if not set by user)
254 void ApplyGlobalCommandControlUpdate(CommandControl);
255 /// @endcond
256
257 /// Replaces cluster connection settings.
258 ///
259 /// Connections with an old settings will be dropped and reestablished.
261
262 void SetPoolSettings(const PoolSettings& settings);
263
264 void SetTopologySettings(const TopologySettings& settings);
265
266 void SetStatementMetricsSettings(const StatementMetricsSettings& settings);
267
268 void SetDsnList(const DsnList&);
269
270private:
271 detail::NonTransaction Start(ClusterHostTypeFlags, OptionalCommandControl);
272
273 OptionalCommandControl GetQueryCmdCtl(std::string_view query_name) const;
274 OptionalCommandControl GetHandlersCmdCtl(OptionalCommandControl cmd_ctl) const;
275
276 detail::ClusterImplPtr pimpl_;
277};
278
279template <typename... Args>
280ResultSet Cluster::Execute(ClusterHostTypeFlags flags, const Query& query, const Args&... args) {
281 return Execute(flags, OptionalCommandControl{}, query, args...);
282}
283
284template <typename... Args>
286 ClusterHostTypeFlags flags,
287 OptionalCommandControl statement_cmd_ctl,
288 const Query& query,
289 const Args&... args
290) {
291 if (!statement_cmd_ctl && query.GetOptionalNameView()) {
292 statement_cmd_ctl = GetQueryCmdCtl(*query.GetOptionalNameView());
293 }
294 statement_cmd_ctl = GetHandlersCmdCtl(statement_cmd_ctl);
295 auto ntrx = Start(flags, statement_cmd_ctl);
296 return ntrx.Execute(statement_cmd_ctl, query, args...);
297}
298
299template <typename Container>
300ResultSet Cluster::ExecuteDecompose(ClusterHostTypeFlags flags, const Query& query, const Container& args) {
301 return ExecuteDecompose(flags, OptionalCommandControl{}, query, args);
302}
303
304template <typename Container>
306 ClusterHostTypeFlags flags,
307 OptionalCommandControl statement_cmd_ctl,
308 const Query& query,
309 const Container& args
310) {
311 if (!statement_cmd_ctl && query.GetOptionalNameView()) {
312 statement_cmd_ctl = GetQueryCmdCtl(*query.GetOptionalNameView());
313 }
314 statement_cmd_ctl = GetHandlersCmdCtl(statement_cmd_ctl);
315 auto ntrx = Start(flags, statement_cmd_ctl);
316
317 return io::DecomposeContainerByColumns(args).Perform([&ntrx, &statement_cmd_ctl, &query](const auto&... args) {
318 return ntrx.Execute(statement_cmd_ctl, query, args...);
319 });
320}
321
322} // namespace storages::postgres
323
324USERVER_NAMESPACE_END