userver: Synchronization Primitives
Loading...
Searching...
No Matches
Synchronization Primitives

This page describes the synchronization mechanisms that are available in userver, their advantages and weaknesses. It is assumed that the developer is aware of concurrent programming and concepts like "race", "critical section", "mutex".

Constraint

⚠️🐙❗ Use of the C++ standard library and libc synchronization primitives in coroutines is forbidden.

Cancellations, Cancellation Blockers and Synchronization Primitives

Different synchronization primitives treat task cancellations differently. Some may ignore cancellations, others return early without acquiring the resource.

Many synchronization primitives ignore cancellation requests as such default seems to provoke less issues in code that uses the primitive. For example engine::Mutex::lock() ignores cancellation requests as it has no way to report failure other than by throwing an exception, and throwing an exception may not be expected by users and could lead to std::terminate().

Read the documentation on particular primitive to get the behavior on task cancellation. The cancellation could be blocked by engine::TaskCancellationBlocker, so the latter could be used to force the primitive to ignore the cancellation request.

See also: task_cancellation_intro

Synchronization mechanisms and primitives

This section describes the major available synchronization mechanisms with use cases. All the primitives are listed at the Concurrency API Group.

Note
There is no "faster" synchronization mechanism. Different primitives are suitable for different situations. For some load profile primitive A could be 1000 times faster than primitive B. For another - exactly the opposite. Before choosing a primitive, determinate the load profile (how often does reading and writing occur, how many concurrent writers/readers are there etc). For a practical assessment of the effectiveness of a particular tool for your case, use google benchmark and stress testing.

engine::TaskWithResult

For parallel independent calculations the simplest, and most reliable way to transmit data is through the result of the engine::TaskWithResult execution.

std::vector<engine::TaskWithResult<int>> tasks;
tasks.reserve(container.size());
for (auto value : container) {
// Creating tasks that will be executed in parallel
tasks.push_back(utils::Async("some_task", [value = std::move(value)] {
engine::InterruptibleSleepFor(std::chrono::milliseconds(100));
return value;
}));
}
// we wait for the completion of tasks and get the results
std::vector<int> results;
results.reserve(tasks.size());
for (auto& task : tasks) {
// If the task completed with an exception,
// then Get () will throw an exception
results.push_back(task.Get());
}

A less convenient and more complicated way to solve the same problem is to create a data structure shared between tasks, where the tasks themselves will record the result. This requires protecting the data through atomic variables or engine::Mutex, as well as passing this data structure to the subtasks. In this case, engine::Future may be useful (see below).

Note that when programming tasks, you need to take into account the lifetime of objects. If you pass a closure to a task with a reference to a variable, then you must ensure that the lifetime of the task is strictly less than the lifetime of the variable. This must also be guaranteed for the case of throwing an exception from any function used. If this cannot be guaranteed, then either pass the data to the closure via shared_ptr, or pass it over the copy.

engine::Future

Sometimes calculations could not decomposed easily and a single engine::Task should return many results. At the same time, it is not efficient to collect them into one structure and return them at one (the results are used as inputs for several tasks, etc.). For such cases, you can use the engine::Promise and engine::Future. They provide a synchronized channel for transmitting the value between tasks. The interface and contracts of these classes are as close as possible to similar types from the standard library. The main differences are related to the support for the cancellation mechanism.

engine::Promise<int> int_promise;
auto deadline = GetDeadline();
constexpr auto kBadValue = -1;
constexpr auto kFallbackString = "Bad string";
constexpr auto kTestValue = 777;
constexpr auto kTestString = "Test string value";
auto int_future = int_promise.get_future();
auto string_future = string_promise.get_future();
auto calc_task = utils::Async(
"calc",
[int_promise = std::move(int_promise), string_promise = std::move(string_promise), kTestValue]() mutable {
// Emulating long calculation of x and s...
int_promise.set_value(kTestValue);
string_promise.set_value(kTestString);
// Other calculations.
}
);
auto int_consumer =
utils::Async("int_consumer", [deadline, int_future = std::move(int_future), kTestValue]() mutable {
auto status = int_future.wait_until(deadline);
auto x = kBadValue;
switch (status) {
x = int_future.get();
ASSERT_EQ(x, kTestValue);
// ...
break;
// Timeout Handling
break;
// Handling cancellation of calculations
// (example, return to queue)
return;
}
});
auto string_consumer =
utils::Async("string_consumer", [string_future = std::move(string_future), kTestString]() mutable {
std::string s;
try {
s = string_future.get();
ASSERT_EQ(s, kTestString);
} catch (const std::exception& ex) {
// Exception Handling
s = kFallbackString;
}
// ...
});
calc_task.Get();
int_consumer.Get();
string_consumer.Get();

In this case, the main mechanism for transmitting data remains the return of values from TaskWithResult.

engine::WaitAny and friends

The most effective way to wait for one of the asynchronous operations is to use engine::WaitAny, engine::WaitAnyFor or engine::WaitAnyUntil:

auto task0 = engine::AsyncNoSpan([] { return 1; });
auto task1 = utils::Async("long_task", [] {
return std::string{"abc"};
});
auto task_idx_opt = engine::WaitAny(task0, task1);
ASSERT_TRUE(task_idx_opt);
EXPECT_EQ(*task_idx_opt, 0);

It works with different types of tasks and futures. For example could be used to get the ready HTTP requests ASAP:

std::size_t ProcessReadyRequests(std::vector<clients::http::ResponseFuture>& requests, engine::Deadline deadline) {
std::size_t processed_requests = 0;
while (auto indx = engine::WaitAnyUntil(deadline, requests)) {
++processed_requests;
std::shared_ptr<clients::http::Response> response = requests[*indx].Get();
EXPECT_TRUE(response->IsOk());
}
return processed_requests;
}

See also engine::WaitAllChecked and engine::GetAll for a way to wait for all of the asynchronous operations, rethrowing exceptions immediately.

concurrent::MpscQueue and friends

userver provides coroutine-friendly concurrent queues. Basic usage example:

static constexpr std::chrono::milliseconds kTimeout{10};
auto producer = queue->GetProducer();
auto consumer = queue->GetConsumer();
auto producer_task = utils::Async("producer", [&] {
// ...
if (!producer.Push(1, engine::Deadline::FromDuration(kTimeout))) {
// The reader is dead
}
});
auto consumer_task = utils::Async("consumer", [&] {
for (;;) {
// ...
int item{};
if (consumer.Pop(item, engine::Deadline::FromDuration(kTimeout))) {
// processing the queue element
ASSERT_EQ(item, 1);
} else {
// the queue is empty and there are no more live producers
return;
}
}
});
producer_task.Get();
consumer_task.Get();

Consumers wait in for elements in Pop. If you set max size for the queue at creation or dynamically, then producers will also wait for non-fullness in Push. There are also PushNoblock and PopNoblock that can be called outside of coroutines and used for communicating between coroutine and non-coroutine (typically, driver) threads.

Warning
For GetProducer and GetConsumer, each individual Producer and Consumer can only be used from 1 thread! Typical use cases involve an unlimited number of producer threads (e.g. when pushing from an HTTP handler). Use GetMultiProducer and GetMultiConsumer (if needed) for those cases instead of creating producers and consumers on the fly.

Choosing the right type of concurrent queue

Use concurrent::MpscQueue by default: it guarantees FIFO order and allows multiple producers.

If there is only a single producing task, these can be used instead for higher performance:

If reordering of the elements is acceptable, these can be used instead for higher performance:

Warning
NonFifo queue variants can lead to high latencies for some elements. These queues are suitable for long-running background operations, as well as various kinds of logs, metrics and monitorings, but not for batching requests on which clients are actively waiting.

Consider setting max size on the queue (at creation or dynamically) to start dropping elements in case of overload and avoid OOM issues.

On the other hand, if you really mean it, you can use Unbounded queue variants that are slightly faster:

Using queue closing mechanic to process all the remaining items during shutdown

When all producers of a queue are destroyed, and all the remaining elements are consumed, the queue becomes "closed for reads", and Pop starts returning false.

This mechanic can be used to process all the remaining items during shutdown. Here is an example of how you can organize the queue processing correctly:

using FooItem = std::string;
class FooProcessor final {
public:
explicit FooProcessor(std::size_t max_size)
: queue_(Queue::Create(max_size)), queue_producer_(queue_->GetMultiProducer()) {
// There is no use for a Span in a task that lives until the service stops.
// The task should be Critical, because the whole service (not just a single request) depends on it.
consumer_task_ =
engine::CriticalAsyncNoSpan([&, consumer = queue_->GetConsumer()] { ConsumerTaskLoop(consumer); });
}
~FooProcessor() {
// When the last producer is dropped, and the remaining items are consumed,
// the queue becomes "closed for pop", and `Pop` starts returning `false`.
std::move(queue_producer_).Reset();
// Allow the task to process the remaining elements.
// If the current task is cancelled, we will honor the cancellation and stop waiting.
// (In this case, the task will be automatically cancelled and awaited in the destructor.)
// You can also provide a deadline until which we will try to process the items.
[[maybe_unused]] const bool success = consumer_task_.WaitNothrow();
}
void ProcessAsync(FooItem&& item) {
// If the queue has a max size limit, pushing will block upon reaching it.
// This provides backpressure, which, when utilized effectively, can improve reliability
// of processing pipelines.
const bool success = queue_producer_.Push(std::move(item));
if (!success) {
// If the task is waiting for free capacity and is cancelled, Push will return `false`.
// It will also return `false` if all the producers are dropped, which should not be possible in our case.
UINVARIANT(engine::current_task::ShouldCancel(), "Unexpected Push failure, is consumer dropped?");
}
}
private:
void ConsumerTaskLoop(const Queue::Consumer& consumer) {
FooItem item;
// When the last producer is dropped, and the remaining items are consumed,
// the queue becomes "closed for pop", and `Pop` starts returning `false`.
// If the current task is cancelled, `consumer.Pop` will return `false` as well.
while (consumer.Pop(item)) {
// [optional] On task cancellation, don't keep processing the remaining elements,
// just destroy them.
try {
// You might want to measure the time it takes to process an item.
tracing::Span process_span{"foo-processor"};
DoProcess(item);
} catch (const std::exception& ex) {
// Failure to handle a single item should typically not stop the whole queue
// operation. Although backoff and fallback strategies may vary.
LOG_ERROR() << "Processing of item '" << item << "' failed: " << ex;
}
}
}
void DoProcess(const FooItem& item);
std::shared_ptr<Queue> queue_;
Queue::MultiProducer queue_producer_;
// `consumer_task_` MUST be the last field so that it is awaited before
// the other fields (which are used in the task) are destroyed.
};

Using queue closing mechanic to stop the producers

Similarly, when all consumers of a queue are destroyed, the queue becomes "closed for writes", and Push starts returning false. This mechanic can be useful for temporary queues to propagate essentially cancellation of an operation, e.g. when a connection is closed, and it is useless to compute further responses.

std::atomic

If you need to access small trivial types (int, long, std::size_t, bool) in shared memory from different tasks, then atomic variables may help. Beware, for complex types compiler generates code with implicit use of synchronization primitives forbidden in userver. If you are using std::atomic with a non-trivial or type parameters with big size, then be sure to write a test to check that accessing this variable does not impose a mutex.

It is not recommended to use non-default memory_orders (for example, acquire/release), because their use is fraught with great difficulties. In such code, it is very easy to get bug that will be extremely difficult to detect. Therefore, it is better to use a simpler and more reliable default, the std::memory_order_seq_cst.

thread_local

For "handy thread-safe global storage", use engine::TaskLocalVariable instead of thread_local. Better still, avoid global state and pass data between functions explicitly.

For passing global data within a single request, e.g. from handlers to clients using middlewares, use engine::TaskInheritedVariable.

For fast insecure randomness (suitable e.g. for load balancing), use:

For secure randomness (suitable e.g. for picking gifts for users), use:

thread_local, when used with userver, suffers from issues described in compiler::ThreadLocal. So for all other needs, use compiler::ThreadLocal instead of thread_local.

engine::Mutex

A classic mutex. It allows you to work with standard std::unique_lock and std::lock_guard.

constexpr std::string_view kTestData = "Test Data";
{
std::lock_guard<engine::Mutex> lock(mutex);
// accessing data under a mutex
const auto x = kTestData;
ASSERT_EQ(kTestData, x);
}

Prefer using concurrent::Variable instead of an engine::Mutex.

engine::SharedMutex

A mutex that has readers and writers. It allows you to work with standard std::unique_lock,std::lock_guard and std::shared_lock.

constexpr auto kTestString = "123";
std::string data;
{
std::lock_guard<engine::SharedMutex> lock(mutex);
// accessing the data under the mutex for writing
data = kTestString;
}
{
std::shared_lock<engine::SharedMutex> lock(mutex);
// accessing the data under the mutex for reading,
// data cannot be changed
const auto& x = data;
ASSERT_EQ(x, kTestString);
}

engine::SharedMutex is a much more complex and heavier synchronization primitive than a regular engine::Mutex. The fact of the existence of readers and writers does not mean that engine::SharedMutex will be faster than an engine::Mutex. For example, if the critical section is small (2-3 integer variables), then the SharedMutex overhead can outweigh all the gain from concurrent reads. Therefore, you should not mindlessly use SharedMutex without benchmarks. Also, the "frequent reads, rare writes" scenario in most cases is solved much more efficiently through RCU - rcu::Variable.

Read locking of the SharedMutex is slower than reading an RCU. However, SharedMutex does not require copying data on modification, unlike RCU. Therefore, in the case of expensive data copies that are protected by a critical section, it makes sense to use SharedMutex instead of RCU. If the cost of copying is low, then it is usually more profitable to use RCU.

To work with a mutex, we recommend using concurrent::Variable. This reduces the risk of taking a mutex in the wrong mode, the wrong mutex, and so on.

rcu::Variable

A synchronization primitive with readers and writers that allows readers to work with the old version of the data while the writer fills in the new version of the data. Multiple versions of the protected data can exist at any given time. The old version is deleted when the RCU realizes that no one else is working with it. This can happen when writing a new version is finished if there are no active readers. If at least one reader holds an old version of the data, it will not be deleted.

RCU should be the "default" synchronization primitive for the case of frequent readers and rare writers. Very poorly suited for frequent updates, because a copy of the data is created on update.

constexpr int kOldValue = 1;
constexpr auto kOldString = "Old string";
constexpr int kNewValue = 2;
constexpr auto kNewString = "New string";
struct Data {
int x;
std::string s;
};
rcu::Variable<Data> data{Data{kOldValue, kOldString}};
{
auto ro_ptr = data.Read();
// We can use ro_ptr to access data.
// At this time, neither the writer nor the other readers are not blocked
// => you can hold a smart pointer without fear of blocking other users
ASSERT_EQ(ro_ptr->x, kOldValue);
ASSERT_EQ(ro_ptr->s, kOldString);
}
{
auto ptr = data.StartWrite();
// ptr stores a copy of the latest version of the data, now we can prepare
// a new version Readers continue to access the old version of the data
// via .Read()
ptr->x = kNewValue;
ptr->s = kNewString;
// After Commit(), the next reader in Read() gets the version of the data
// we just wrote. Old readers who did Read() before Commit() continue to
// work with the old version of the data.
ptr.Commit();
}

Comparison with SharedMutex is described in the engine::SharedMutex section of this page.

rcu::RcuMap

rcu::Variable based map. This primitive is used when you need a concurrent dictionary. Well suited for the case of rarely added keys. Poorly suited to the case of a frequently changing set of keys.

Note that RcuMap does not protect the value of the dictionary, it only protects the dictionary itself. If the values are non-atomic types, then they must be protected separately (for example, using concurrent::Variable).

struct Data {
// Access to RcuMap content must be synchronized via std::atomic
// or other synchronization primitives
std::atomic<int> x{0};
std::atomic<bool> flag{false};
};
// If the key is not in the dictionary,
// then a default object will be created
map["123"]->x++;
map["other_data"]->flag = true;
ASSERT_EQ(map["123"]->x.load(), 1);
ASSERT_EQ(map["123"]->flag.load(), false);
ASSERT_EQ(map["other_data"]->x.load(), 0);
ASSERT_EQ(map["other_data"]->flag.load(), true);

concurrent::Variable

A proxy class that combines user data and a synchronization primitive that protects that data. Its use can greatly reduce the number of bugs associated with incorrect use of the critical section - taking the wrong mutex, forgetting to take the mutex, taking SharedMutex in the wrong mode, etc.

constexpr auto kTestString = "Test string";
struct Data {
int x = 0;
std::string s{};
};
// If you do not specify the 2nd template parameter,
// then by default Variable is protected by engine::Mutex.
{
// We get a smart pointer to the data,
// the smart pointer stores std::lock_guard
auto data_ptr = data.Lock();
data_ptr->s = kTestString;
}
{
// We get a smart pointer to the data,
// the smart pointer stores std::shared_lock
auto data_ptr = data.SharedLock();
// we can read data, we cannot write
ASSERT_EQ(data_ptr->s, kTestString);
}

engine::Semaphore

The semaphore is used to limit the number of users that run inside a critical section. For example, a semaphore can be used to limit the number of simultaneous concurrent attempts to connect to a resource.

constexpr auto kMaxSimultaneousLocks = 3;
engine::Semaphore sema(kMaxSimultaneousLocks);
// ...
{
std::shared_lock lock(sema);
// There may be no more than 3 users
// in the critical section at the same time
}

You don't need to use a semaphore if you need to limit the number of threads that perform CPU-heavy operations. For these purposes, create a separate TaskProcessor and perform other operations on it, it is be cheaper in terms of synchronization.

If you need a counter, but do not need to wait for the counter to change, then you need to use std::atomic instead of a semaphore.

engine::SingleUseEvent

A single-producer, single-consumer event without task cancellation support. Must not be awaited or signaled multiple times in the same waiting session.

For multiple producers and cancellation support, use engine::SingleConsumerEvent instead.

{
sender = utils::Async("sender", [&event] { event.Send(); });
event.WaitNonCancellable(); // will be woken up by 'Send()' above
// 'event' is destroyed here. Note that 'Send' might continue executing, but
// it will still complete safely.
}

utils::SwappingSmart

Don't use utils::SwappingSmart, use rcu::Variable instead. There is a UB in the SwappingSmart behavior.

utils::SwappingSmart protects readers from rare writers, but in the case of very frequent writers, the reader has no guarantee of getting a valid std::shared_ptr (which occasionally fired in production). Also rcu::Variable faster than utils::SwappingSmart in most cases.