userver: userver/storages/postgres/transaction.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
transaction.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/storages/postgres/transaction.hpp
4/// @brief Transactions
5
6#include <memory>
7#include <string>
8
9#include <userver/storages/postgres/detail/connection_ptr.hpp>
10#include <userver/storages/postgres/detail/query_parameters.hpp>
11#include <userver/storages/postgres/detail/time_types.hpp>
12#include <userver/storages/postgres/options.hpp>
13#include <userver/storages/postgres/parameter_store.hpp>
14#include <userver/storages/postgres/portal.hpp>
15#include <userver/storages/postgres/postgres_fwd.hpp>
16#include <userver/storages/postgres/query.hpp>
17#include <userver/storages/postgres/result_set.hpp>
18
19USERVER_NAMESPACE_BEGIN
20
21namespace storages::postgres {
22
23/// @page pg_transactions uPg: Transactions
24///
25/// All queries that are run on a PostgreSQL cluster are executed inside
26/// a transaction, even if a single-query interface is used.
27///
28/// A uPg transaction can be started using all isolation levels and modes
29/// supported by PostgreSQL server as specified in documentation here
30/// https://www.postgresql.org/docs/current/static/sql-set-transaction.html.
31/// When starting a transaction, the options are specified using
32/// TransactionOptions structure.
33///
34/// For convenience and improvement of readability there are constants
35/// defined: Transaction::RW, Transaction::RO and Transaction::Deferrable.
36///
37/// @see TransactionOptions
38///
39/// Transaction object ensures that a transaction started in a PostgreSQL
40/// connection will be either committed or rolled back and the connection
41/// will returned back to a connection pool.
42///
43/// @todo Code snippet with transaction starting and committing
44///
45/// Next: @ref pg_run_queries
46///
47/// See also: @ref pg_process_results
48///
49/// ----------
50///
51/// @htmlonly <div class="bottom-nav"> @endhtmlonly
52/// ⇦ @ref pg_driver | @ref pg_run_queries ⇨
53/// @htmlonly </div> @endhtmlonly
54
55/// @page pg_run_queries uPg: Running queries
56///
57/// All queries are executed through a transaction object, event when being
58/// executed through singe-query interface, so here only executing queries
59/// with transaction will be covered. Single-query interface is basically
60/// the same except for additional options.
61///
62/// uPg provides means to execute text queries only. There is no query
63/// generation, but can be used by other tools to execute SQL queries.
64///
65/// @warning A query must contain a single query, multiple statements delimited
66/// by ';' are not supported.
67///
68/// All queries are parsed and prepared during the first invocation and are
69/// executed as prepared statements afterwards.
70///
71/// Any query execution can throw an exception. Please see @ref pg_errors for
72/// more information on possible errors.
73///
74/// @par Queries without parameters
75///
76/// Executing a query without any parameters is rather straightforward.
77/// @code
78/// auto trx = cluster->Begin(/* transaction options */);
79/// auto res = trx.Execute("select foo, bar from foobar");
80/// trx.Commit();
81/// @endcode
82///
83/// The cluster also provides interface for single queries
84/// @code
85/// auto res = cluster->Execute(/* transaction options */, /* the statement */);
86/// @endcode
87///
88/// @par Queries with parameters
89///
90/// uPg supports SQL dollar notation for parameter placeholders. The statement
91/// is prepared at first execution and then only arguments for a query is sent
92/// to the server.
93///
94/// A parameter can be of any type that is supported by the driver. See @ref
95/// pg_types for more information.
96///
97/// @code
98/// auto trx = cluster->Begin(/* transaction options */);
99/// auto res = trx.Execute(
100/// "select foo, bar from foobar where foo > $1 and bar = $2", 42, "baz");
101/// trx.Commit();
102/// @endcode
103///
104/// @see Transaction
105/// @see ResultSet
106///
107/// ----------
108///
109/// @htmlonly <div class="bottom-nav"> @endhtmlonly
110/// ⇦ @ref pg_transactions | @ref pg_process_results ⇨
111/// @htmlonly </div> @endhtmlonly
112
113// clang-format off
114/// @brief PostgreSQL transaction.
115///
116/// RAII wrapper for running transactions on PostgreSQL connections. Should be
117/// retrieved by calling storages::postgres::Cluster::Begin().
118///
119/// Non-copyable.
120///
121/// If the transaction is not explicitly finished (either committed or rolled back)
122/// it will roll itself back in the destructor.
123///
124/// @par Usage synopsis
125/// @code
126/// auto trx = someCluster.Begin(/* transaction options */);
127/// auto res = trx.Execute("select col1, col2 from schema.table");
128/// DoSomething(res);
129/// res = trx.Execute("update schema.table set col1 = $1 where col2 = $2", v1, v2);
130/// // If in the above lines an exception is thrown, then the transaction is
131/// // rolled back in the destructor of trx.
132/// trx.Commit();
133/// @endcode
134// clang-format on
135
137 public:
138 //@{
139 /** @name Shortcut transaction options constants */
140 /// Read-write read committed transaction
141 static constexpr TransactionOptions RW{};
142 /// Read-only read committed transaction
143 static constexpr TransactionOptions RO{TransactionOptions::kReadOnly};
144 /// Read-only serializable deferrable transaction
147 //@}
148
149 static constexpr std::size_t kDefaultRowsInChunk = 1024;
150
151 /// @cond
152 explicit Transaction(detail::ConnectionPtr&& conn,
153 const TransactionOptions& = RW,
154 OptionalCommandControl trx_cmd_ctl = {},
155 detail::SteadyClock::time_point trx_start_time =
156 detail::SteadyClock::now());
157 /// @endcond
158
159 Transaction(Transaction&&) noexcept;
160 Transaction& operator=(Transaction&&) noexcept;
161
162 Transaction(const Transaction&) = delete;
163 Transaction& operator=(const Transaction&) = delete;
164
165 ~Transaction();
166 /// @name Query execution
167 /// @{
168 /// Execute statement with arbitrary parameters.
169 ///
170 /// Suspends coroutine for execution.
171 ///
172 /// @snippet storages/postgres/tests/landing_test.cpp TransacExec
173 template <typename... Args>
174 ResultSet Execute(const Query& query, const Args&... args) {
175 return Execute(OptionalCommandControl{}, query, args...);
176 }
177
178 /// Execute statement with arbitrary parameters and per-statement command
179 /// control.
180 ///
181 /// Suspends coroutine for execution.
182 ///
183 /// @warning Do NOT create a query string manually by embedding arguments!
184 /// It leads to vulnerabilities and bad performance. Either pass arguments
185 /// separately, or use storages::postgres::ParameterScope.
186 template <typename... Args>
187 ResultSet Execute(OptionalCommandControl statement_cmd_ctl,
188 const Query& query, const Args&... args) {
189 detail::StaticQueryParameters<sizeof...(args)> params;
190 params.Write(GetConnectionUserTypes(), args...);
191 return DoExecute(query, detail::QueryParameters{params}, statement_cmd_ctl);
192 }
193
194 /// Execute statement with stored parameters.
195 ///
196 /// Suspends coroutine for execution.
197 ///
198 /// @warning Do NOT create a query string manually by embedding arguments!
199 /// It leads to vulnerabilities and bad performance. Either pass arguments
200 /// separately, or use storages::postgres::ParameterScope.
201 ResultSet Execute(const Query& query, const ParameterStore& store) {
202 return Execute(OptionalCommandControl{}, query, store);
203 }
204
205 /// Execute statement with stored parameters and per-statement command
206 /// control.
207 ///
208 /// Suspends coroutine for execution.
209 ResultSet Execute(OptionalCommandControl statement_cmd_ctl,
210 const Query& query, const ParameterStore& store);
211
212 /// Execute statement that uses an array of arguments splitting that array in
213 /// chunks and executing the statement with a chunk of arguments.
214 ///
215 /// Useful for statements that unnest their arguments to avoid the need to
216 /// increase timeouts due to data amount growth.
217 template <typename Container>
218 void ExecuteBulk(const Query& query, const Container& args,
219 std::size_t chunk_rows = kDefaultRowsInChunk);
220
221 /// Execute statement that uses an array of arguments splitting that array in
222 /// chunks and executing the statement with a chunk of arguments.
223 ///
224 /// Useful for statements that unnest their arguments to avoid the need to
225 /// increase timeouts due to data amount growth.
226 template <typename Container>
227 void ExecuteBulk(OptionalCommandControl statement_cmd_ctl, const Query& query,
228 const Container& args,
229 std::size_t chunk_rows = kDefaultRowsInChunk);
230
231 /// Execute statement that uses an array of arguments transforming that array
232 /// into N arrays of corresponding fields and executing the statement
233 /// with a chunk of each of these arrays values.
234 /// Basically, a column-wise ExecuteBulk.
235 ///
236 /// Useful for statements that unnest their arguments to avoid the need to
237 /// increase timeouts due to data amount growth, but providing an explicit
238 /// mapping from `Container::value_type` to PG type is infeasible for some
239 /// reason (otherwise, use ExecuteBulk).
240 ///
241 /// @snippet storages/postgres/tests/arrays_pgtest.cpp ExecuteDecomposeBulk
242 template <typename Container>
243 void ExecuteDecomposeBulk(const Query& query, const Container& args,
244 std::size_t chunk_rows = kDefaultRowsInChunk);
245
246 /// Execute statement that uses an array of arguments transforming that array
247 /// into N arrays of corresponding fields and executing the statement
248 /// with a chunk of each of these arrays values.
249 /// Basically, a column-wise ExecuteBulk.
250 ///
251 /// Useful for statements that unnest their arguments to avoid the need to
252 /// increase timeouts due to data amount growth, but providing an explicit
253 /// mapping from `Container::value_type` to PG type is infeasible for some
254 /// reason (otherwise, use ExecuteBulk).
255 ///
256 /// @snippet storages/postgres/tests/arrays_pgtest.cpp ExecuteDecomposeBulk
257 template <typename Container>
258 void ExecuteDecomposeBulk(OptionalCommandControl statement_cmd_ctl,
259 const Query& query, const Container& args,
260 std::size_t chunk_rows = kDefaultRowsInChunk);
261
262 /// Create a portal for fetching results of a statement with arbitrary
263 /// parameters.
264 template <typename... Args>
265 Portal MakePortal(const Query& query, const Args&... args) {
266 return MakePortal(OptionalCommandControl{}, query, args...);
267 }
268
269 /// Create a portal for fetching results of a statement with arbitrary
270 /// parameters and per-statement command control.
271 template <typename... Args>
272 Portal MakePortal(OptionalCommandControl statement_cmd_ctl,
273 const Query& query, const Args&... args) {
274 detail::StaticQueryParameters<sizeof...(args)> params;
275 params.Write(GetConnectionUserTypes(), args...);
276 return MakePortal(PortalName{}, query, detail::QueryParameters{params},
277 statement_cmd_ctl);
278 }
279
280 /// Create a portal for fetching results of a statement with stored
281 /// parameters.
282 Portal MakePortal(const Query& query, const ParameterStore& store) {
283 return MakePortal(OptionalCommandControl{}, query, store);
284 }
285
286 /// Create a portal for fetching results of a statement with stored parameters
287 /// and per-statement command control.
288 Portal MakePortal(OptionalCommandControl statement_cmd_ctl,
289 const Query& query, const ParameterStore& store);
290
291 /// Set a connection parameter
292 /// https://www.postgresql.org/docs/current/sql-set.html
293 /// The parameter is set for this transaction only
294 void SetParameter(const std::string& param_name, const std::string& value);
295 //@}
296
297 /// Commit the transaction
298 /// Suspends coroutine until command complete.
299 /// After Commit or Rollback is called, the transaction is not usable any
300 /// more.
301 void Commit();
302 /// Rollback the transaction
303 /// Suspends coroutine until command complete.
304 /// After Commit or Rollback is called, the transaction is not usable any
305 /// more.
306 void Rollback();
307
308 /// Used in tests
309 OptionalCommandControl GetConnTransactionCommandControlDebug() const;
310 TimeoutDuration GetConnStatementTimeoutDebug() const;
311
312 private:
313 ResultSet DoExecute(const Query& query, const detail::QueryParameters& params,
314 OptionalCommandControl statement_cmd_ctl);
315 Portal MakePortal(const PortalName&, const Query& query,
316 const detail::QueryParameters& params,
317 OptionalCommandControl statement_cmd_ctl);
318
319 const UserTypes& GetConnectionUserTypes() const;
320
321 detail::ConnectionPtr conn_;
322};
323
324template <typename Container>
325void Transaction::ExecuteBulk(const Query& query, const Container& args,
326 std::size_t chunk_rows) {
327 auto split = io::SplitContainer(args, chunk_rows);
328 for (auto&& chunk : split) {
329 Execute(query, chunk);
330 }
331}
332
333template <typename Container>
334void Transaction::ExecuteBulk(OptionalCommandControl statement_cmd_ctl,
335 const Query& query, const Container& args,
336 std::size_t chunk_rows) {
337 auto split = io::SplitContainer(args, chunk_rows);
338 for (auto&& chunk : split) {
339 Execute(statement_cmd_ctl, query, chunk);
340 }
341}
342
343template <typename Container>
345 const Container& args,
346 std::size_t chunk_rows) {
347 io::SplitContainerByColumns(args, chunk_rows)
348 .Perform([&query, this](const auto&... args) {
349 this->Execute(query, args...);
350 });
351}
352
353template <typename Container>
354void Transaction::ExecuteDecomposeBulk(OptionalCommandControl statement_cmd_ctl,
355 const Query& query,
356 const Container& args,
357 std::size_t chunk_rows) {
358 io::SplitContainerByColumns(args, chunk_rows)
359 .Perform([&query, &statement_cmd_ctl, this](const auto&... args) {
360 this->Execute(statement_cmd_ctl, query, args...);
361 });
362}
363
364} // namespace storages::postgres
365
366USERVER_NAMESPACE_END