Task construction and synchronization primitives for coroutines.
Files | |
file | atomic.hpp |
Helper algorithms to work with atomics. | |
Classes | |
class | concurrent::Variable< Data, Mutex > |
class | engine::Promise< T > |
std::promise replacement for asynchronous tasks that works in pair with engine::Future More... | |
class | engine::Future< T > |
std::future replacement for asynchronous tasks that works in pair with engine::Promise More... | |
class | engine::CancellableSemaphore |
Class that allows up to max_simultaneous_locks concurrent accesses to the critical section. It honours task cancellation, unlike Semaphore. More... | |
class | engine::Semaphore |
Class that allows up to max_simultaneous_locks concurrent accesses to the critical section. It ignores task cancellation, unlike CancellableSemaphore. More... | |
class | engine::SingleConsumerEvent |
A multiple-producers, single-consumer event. More... | |
class | utils::TokenBucket |
class | concurrent::AsyncEventChannel< Args > |
class | concurrent::AsyncEventSource< Args > |
The read-only side of an event channel. Events are delivered to listeners in a strict FIFO order, i.e. only after the event was processed a new event may appear for processing, same listener is never called concurrently. More... | |
class | concurrent::BackgroundTaskStorageCore |
class | concurrent::BackgroundTaskStorage |
class | concurrent::ConflatedEventChannel |
A non-blocking version of 'AsyncEventChannel'. More... | |
class | concurrent::MpscQueue< T > |
class | concurrent::MutexSet< Key, Hash, Equal > |
A dynamic set of mutexes. More... | |
class | concurrent::StripedCounter |
A contention-free sharded atomic counter, with memory consumption and read performance traded for write performance. Intended to be used for write-heavy counters, mostly in metrics. More... | |
class | dist_lock::DistLockStrategyBase |
Interface for distributed lock strategies. More... | |
class | dist_lock::DistLockedTask |
A task that tries to acquire a distributed lock and runs user callback once while the lock is held. More... | |
class | drivers::SubscribableFutureWrapper< SubscribableFuture > |
An adaptor for working with certain external futures. More... | |
class | engine::ConditionVariable |
std::condition_variable replacement for asynchronous tasks More... | |
class | engine::Mutex |
std::mutex replacement for asynchronous tasks. More... | |
class | engine::SharedMutex |
std::shared_mutex replacement for asynchronous tasks. More... | |
class | engine::SingleUseEvent |
A single-producer, single-consumer event. More... | |
class | engine::SingleWaitingTaskMutex |
Lighter version of Mutex with not more than 1 waiting task. More... | |
class | engine::TaskInheritedVariable< T > |
TaskInheritedVariable is a per-coroutine variable of arbitrary type. More... | |
class | engine::TaskLocalVariable< T > |
TaskLocalVariable is a per-coroutine variable of arbitrary type. More... | |
class | rcu::Variable< T, RcuTraits > |
Read-Copy-Update variable. More... | |
class | rcu::RcuMap< Key, Value, RcuMapTraits > |
Map-like structure allowing RCU keyset updates. More... | |
class | utils::PeriodicTask |
Task that periodically runs a user callback. Callback is started after the previous callback execution is finished every period + A - B , where: More... | |
Typedefs | |
template<typename T > | |
using | concurrent::NonFifoMpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<true, true>> |
Non FIFO multiple producers multiple consumers queue. | |
template<typename T > | |
using | concurrent::NonFifoMpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<true, false>> |
Non FIFO multiple producers single consumer queue. | |
template<typename T > | |
using | concurrent::SpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<false, true>> |
Single producer multiple consumers queue. | |
template<typename T > | |
using | concurrent::SpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<false, false>> |
Single producer single consumer queue. | |
template<typename T > | |
using | concurrent::UnboundedNonFifoMpscQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<true, false>> |
Like. | |
template<typename T > | |
using | concurrent::UnboundedSpmcQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<false, true>> |
Like. | |
template<typename T > | |
using | concurrent::UnboundedSpscQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<false, false>> |
Like. | |
using | concurrent::StringStreamQueue = GenericQueue<std::string, impl::ContainerQueuePolicy<false, false>> |
Single producer single consumer queue of std::string which is bounded by the total bytes inside the strings. | |
Functions | |
template<typename T , typename Func > | |
T | utils::AtomicUpdate (std::atomic< T > &atomic, Func updater) |
Atomically performs the operation of updater on atomic | |
template<typename T > | |
T | utils::AtomicMin (std::atomic< T > &atomic, T value) |
Concurrently safe sets atomic to a value if value is less. | |
template<typename T > | |
T | utils::AtomicMax (std::atomic< T > &atomic, T value) |
Concurrently safe sets atomic to a value if value is greater. | |
template<typename SubscribableFuture > | |
void | drivers::WaitForSubscribableFuture (SubscribableFuture &&future) |
Waits on the given future as described on drivers::SubscribableFutureWrapper. | |
template<typename SubscribableFuture > | |
engine::FutureStatus | drivers::TryWaitForSubscribableFuture (SubscribableFuture &&future, engine::Deadline deadline) |
template<typename... Tasks> | |
auto | engine::GetAll (Tasks &... tasks) |
Waits for the successful completion of all of the specified tasks or the cancellation of the caller. | |
template<typename... Tasks> | |
void | engine::WaitAllChecked (Tasks &... tasks) |
Waits for the successful completion of all of the specified tasks or for the cancellation of the caller. | |
template<typename... Tasks> | |
std::optional< std::size_t > | engine::WaitAny (Tasks &... tasks) |
Waits for the completion of any of the specified tasks or the cancellation of the caller. | |
template<typename Function , typename... Args> | |
auto | utils::Async (std::string name, Function &&f, Args &&... args) |
Starts an asynchronous task. | |
template<typename Function , typename... Args> | |
auto | utils::Async (engine::TaskProcessor &task_processor, std::string name, Function &&f, Args &&... args) |
template<typename Function , typename... Args> | |
auto | utils::CriticalAsync (engine::TaskProcessor &task_processor, std::string name, Function &&f, Args &&... args) |
template<typename Function , typename... Args> | |
auto | utils::SharedCriticalAsync (engine::TaskProcessor &task_processor, std::string name, Function &&f, Args &&... args) |
template<typename Function , typename... Args> | |
auto | utils::SharedAsync (engine::TaskProcessor &task_processor, std::string name, Function &&f, Args &&... args) |
template<typename Function , typename... Args> | |
auto | utils::Async (engine::TaskProcessor &task_processor, std::string name, engine::Deadline deadline, Function &&f, Args &&... args) |
template<typename Function , typename... Args> | |
auto | utils::SharedAsync (engine::TaskProcessor &task_processor, std::string name, engine::Deadline deadline, Function &&f, Args &&... args) |
template<typename Function , typename... Args> | |
auto | utils::CriticalAsync (std::string name, Function &&f, Args &&... args) |
template<typename Function , typename... Args> | |
auto | utils::SharedCriticalAsync (std::string name, Function &&f, Args &&... args) |
template<typename Function , typename... Args> | |
auto | utils::SharedAsync (std::string name, Function &&f, Args &&... args) |
template<typename Function , typename... Args> | |
auto | utils::Async (std::string name, engine::Deadline deadline, Function &&f, Args &&... args) |
template<typename Function , typename... Args> | |
auto | utils::SharedAsync (std::string name, engine::Deadline deadline, Function &&f, Args &&... args) |
template<typename Function , typename... Args> | |
auto | utils::AsyncBackground (std::string name, engine::TaskProcessor &task_processor, Function &&f, Args &&... args) |
template<typename Function , typename... Args> | |
auto | utils::CriticalAsyncBackground (std::string name, engine::TaskProcessor &task_processor, Function &&f, Args &&... args) |
using concurrent::NonFifoMpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<true, true>> |
Non FIFO multiple producers multiple consumers queue.
Items from the same producer are always delivered in the production order. Items from different producers (or when using a MultiProducer
token) are delivered in an unspecified order. In other words, FIFO order is maintained only within producers, but not between them. This may lead to increased peak latency of item processing.
In exchange for this, the queue has lower contention and increased throughput compared to a conventional lock-free queue.
using concurrent::NonFifoMpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<true, false>> |
Non FIFO multiple producers single consumer queue.
using concurrent::SpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<false, true>> |
Single producer multiple consumers queue.
using concurrent::SpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<false, false>> |
Single producer single consumer queue.
using concurrent::StringStreamQueue = GenericQueue<std::string, impl::ContainerQueuePolicy<false, false>> |
Single producer single consumer queue of std::string which is bounded by the total bytes inside the strings.
using concurrent::UnboundedNonFifoMpscQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<true, false>> |
Like.
using concurrent::UnboundedSpmcQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<false, true>> |
Like.
using concurrent::UnboundedSpscQueue = GenericQueue<T, impl::NoMaxSizeQueuePolicy<false, false>> |
Like.
auto utils::Async | ( | engine::TaskProcessor & | task_processor, |
std::string | name, | ||
engine::Deadline | deadline, | ||
Function && | f, | ||
Args &&... | args ) |
This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.
Task execution may be cancelled before the function starts execution in case of TaskProcessor overload.
task_processor | Task processor to run on |
name | Name of the task to show in logs |
f | Function to execute asynchronously |
args | Arguments to pass to the function |
auto utils::Async | ( | engine::TaskProcessor & | task_processor, |
std::string | name, | ||
Function && | f, | ||
Args &&... | args ) |
This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.
Task execution may be cancelled before the function starts execution in case of TaskProcessor overload.
task_processor | Task processor to run on |
name | Name of the task to show in logs |
f | Function to execute asynchronously |
args | Arguments to pass to the function |
auto utils::Async | ( | std::string | name, |
engine::Deadline | deadline, | ||
Function && | f, | ||
Args &&... | args ) |
This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.
Task execution may be cancelled before the function starts execution in case of TaskProcessor overload.
name | Name of the task to show in logs |
f | Function to execute asynchronously |
args | Arguments to pass to the function |
auto utils::Async | ( | std::string | name, |
Function && | f, | ||
Args &&... | args ) |
Starts an asynchronous task.
By default, arguments are copied or moved inside the resulting TaskWithResult
, like std::thread
does. To pass an argument by reference, wrap it in std::ref / std::cref
or capture the arguments using a lambda.
Async
There are multiple orthogonal parameters of the task being started. Use this specific overload by default (utils::Async
).
By engine::TaskProcessor:
By shared-ness:
utils::Shared*Async*
and engine::Shared*AsyncNoSpan
families return engine::SharedTaskWithResult, which can be awaited from multiple tasks at the same time, at the cost of some overhead.By engine::TaskBase::Importance ("critical-ness"):
utils::*CriticalAsync*
and engine::*CriticalAsyncNoSpan*
families can be used. There, execution of the function is guaranteed to start regardless of engine::TaskProcessor load limitsBy tracing::Span:
utils::*Async*
family (which you should use by default) create tracing::Span with inherited trace_id
and link
, a new span_id
and the specified stopwatch_name
, which ensures that logs from the task are categorized correctly and will not get lost.engine::*AsyncNoSpan*
family create span-less tasks:By the propagation of engine::TaskInheritedVariable instances:
utils::*Async*
family (which you should use by default) inherit all task-inherited variables from the parent task.engine::*AsyncNoSpan*
family do not inherit any task-inherited variables.By deadline: some utils::*Async*
functions accept an engine::Deadline
parameter. If the deadline expires, the task is cancelled. See *Async*
function signatures for details.
When launching a task, it's important to ensure that it will not access its lambda captures after they are destroyed. Plain data captured by value (including by move) is always safe. By-reference captures and objects that store references inside are always something to be aware of. Of course, copying the world will degrade performance, so let's see how to ensure lifetime safety with captured references.
Task objects are automatically cancelled and awaited on destruction, if not already finished. The lifetime of the task object is what determines when the task may be running and accessing its captures. The task can only safely capture by reference objects that outlive the task object.
When the task is just stored in a new local variable and is not moved or returned from a function, capturing anything is safe:
A more complicated example, where the task is moved into a container:
The bug above can be fixed by placing the declaration of tasks
after y
.
In the case above people often think that calling .Get()
in appropriate places solves the problem. It does not! If an exception is thrown somewhere before .Get()
, then the variables' definition order is the source of truth.
Same guidelines apply when tasks are stored in classes or structs: the task object must be defined below everything that it accesses:
Generally, it's a good idea to put task objects as low as possible in the list of class members.
Although, tasks are rarely stored in classes on practice, concurrent::BackgroundTaskStorage is typically used for that purpose.
Components and their clients can always be safely captured by reference:
This is the overload that should be used by default.
Wait
or Get
on this task.For details on the various other overloads:
name | Name of the task to show in logs |
f | Function to execute asynchronously |
args | Arguments to pass to the function |
auto utils::AsyncBackground | ( | std::string | name, |
engine::TaskProcessor & | task_processor, | ||
Function && | f, | ||
Args &&... | args ) |
Starts an asynchronous task without propagating engine::TaskInheritedVariable. tracing::Span and baggage::Baggage are inherited. Task execution may be cancelled before the function starts execution in case of engine::TaskProcessor overload.
Typically used from a request handler to launch tasks that outlive the request and do not effect its completion.
Suppose you have some component that runs asynchronous tasks:
If the tasks logically belong to the component itself (not to the method caller), then they should be launched using utils::AsyncBackground instead of the regular utils::Async
By default, arguments are copied or moved inside the resulting TaskWithResult
, like std::thread
does. To pass an argument by reference, wrap it in std::ref / std::cref
or capture the arguments using a lambda.
name | Name of the task to show in logs |
task_processor | Task processor to run on |
f | Function to execute asynchronously |
args | Arguments to pass to the function |
T utils::AtomicMax | ( | std::atomic< T > & | atomic, |
T | value ) |
Concurrently safe sets atomic
to a value
if value
is greater.
Definition at line 53 of file atomic.hpp.
T utils::AtomicMin | ( | std::atomic< T > & | atomic, |
T | value ) |
Concurrently safe sets atomic
to a value
if value
is less.
Definition at line 43 of file atomic.hpp.
T utils::AtomicUpdate | ( | std::atomic< T > & | atomic, |
Func | updater ) |
Atomically performs the operation of updater
on atomic
updater
may be called multiple times per one call of AtomicUpdate
, so it must be idempotent. To ensure that the function does not spin for a long time, updater
must be fairly simple and fast.
atomic | the variable to update |
updater | a lambda that takes the old value and produces the new value |
Definition at line 24 of file atomic.hpp.
auto utils::CriticalAsync | ( | engine::TaskProcessor & | task_processor, |
std::string | name, | ||
Function && | f, | ||
Args &&... | args ) |
This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.
Execution of function is guaranteed to start regardless of engine::TaskProcessor load limits. Prefer utils::Async by default.
task_processor | Task processor to run on |
name | Name for the tracing::Span to use with this task |
f | Function to execute asynchronously |
args | Arguments to pass to the function |
auto utils::CriticalAsync | ( | std::string | name, |
Function && | f, | ||
Args &&... | args ) |
This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.
Execution of function is guaranteed to start regardless of engine::TaskProcessor load limits. Prefer utils::Async by default.
name | Name for the tracing::Span to use with this task |
f | Function to execute asynchronously |
args | Arguments to pass to the function |
auto utils::CriticalAsyncBackground | ( | std::string | name, |
engine::TaskProcessor & | task_processor, | ||
Function && | f, | ||
Args &&... | args ) |
This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.
Execution of function is guaranteed to start regardless of engine::TaskProcessor load limits. Use for background tasks for which failing to start not just breaks handling of a single request, but harms the whole service instance.
name | Name of the task to show in logs |
task_processor | Task processor to run on |
f | Function to execute asynchronously |
args | Arguments to pass to the function |
auto engine::GetAll | ( | Tasks &... | tasks | ) |
Waits for the successful completion of all of the specified tasks or the cancellation of the caller.
Effectively performs for (auto& task : tasks) task.Get();
with a twist: task.Get() is called in tasks completion order rather than in provided order, thus exceptions are rethrown ASAP.
After successful return from this method the tasks are invalid, in case of an exception being thrown some of the tasks might be invalid.
tasks | either a single container, or a pack of future-like elements. |
std::vector<Result>
or void
, depending on the tasks result type (which must be the same for all tasks
). WaitInterruptedException | when current_task::IsCancelRequested() and no TaskCancellationBlockers are present. |
std::exception | rethrows one of specified tasks exception, if any, in no particular order. |
std::vector
or when storing the results long-term. Definition at line 84 of file get_all.hpp.
auto utils::SharedAsync | ( | engine::TaskProcessor & | task_processor, |
std::string | name, | ||
engine::Deadline | deadline, | ||
Function && | f, | ||
Args &&... | args ) |
This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.
Task execution may be cancelled before the function starts execution in case of TaskProcessor overload.
task_processor | Task processor to run on |
name | Name of the task to show in logs |
f | Function to execute asynchronously |
args | Arguments to pass to the function |
auto utils::SharedAsync | ( | engine::TaskProcessor & | task_processor, |
std::string | name, | ||
Function && | f, | ||
Args &&... | args ) |
This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.
Task execution may be cancelled before the function starts execution in case of TaskProcessor overload.
task_processor | Task processor to run on |
name | Name of the task to show in logs |
f | Function to execute asynchronously |
args | Arguments to pass to the function |
auto utils::SharedAsync | ( | std::string | name, |
engine::Deadline | deadline, | ||
Function && | f, | ||
Args &&... | args ) |
This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.
Task execution may be cancelled before the function starts execution in case of TaskProcessor overload.
name | Name of the task to show in logs |
f | Function to execute asynchronously |
args | Arguments to pass to the function |
auto utils::SharedAsync | ( | std::string | name, |
Function && | f, | ||
Args &&... | args ) |
This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.
Task execution may be cancelled before the function starts execution in case of TaskProcessor overload.
name | Name of the task to show in logs |
f | Function to execute asynchronously |
args | Arguments to pass to the function |
auto utils::SharedCriticalAsync | ( | engine::TaskProcessor & | task_processor, |
std::string | name, | ||
Function && | f, | ||
Args &&... | args ) |
This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.
Execution of function is guaranteed to start regardless of engine::TaskProcessor load limits. Prefer utils::SharedAsync by default.
task_processor | Task processor to run on |
name | Name for the tracing::Span to use with this task |
f | Function to execute asynchronously |
args | Arguments to pass to the function |
auto utils::SharedCriticalAsync | ( | std::string | name, |
Function && | f, | ||
Args &&... | args ) |
This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.
Execution of function is guaranteed to start regardless of engine::TaskProcessor load limits. Prefer utils::SharedAsync by default.
name | Name for the tracing::Span to use with this task |
f | Function to execute asynchronously |
args | Arguments to pass to the function |
engine::FutureStatus drivers::TryWaitForSubscribableFuture | ( | SubscribableFuture && | future, |
engine::Deadline | deadline ) |
This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.
deadline
expiration leads to a memory leak, use drivers::SubscribableFutureWrapper instead. Definition at line 97 of file subscribable_futures.hpp.
void engine::WaitAllChecked | ( | Tasks &... | tasks | ) |
Waits for the successful completion of all of the specified tasks or for the cancellation of the caller.
Effectively performs for (auto& task : tasks) task.Wait();
with a twist: if any task completes with an exception, it gets rethrown ASAP.
Invalid tasks are skipped.
Tasks are not invalidated by WaitAllChecked
; the result can be retrieved after the call.
tasks | either a single container, or a pack of future-like elements. |
WaitInterruptedException | when current_task::ShouldCancel() (for WaitAllChecked versions without a deadline) |
std::exception | one of specified tasks exception, if any, in no particular order. |
Definition at line 95 of file wait_all_checked.hpp.
std::optional< std::size_t > engine::WaitAny | ( | Tasks &... | tasks | ) |
Waits for the completion of any of the specified tasks or the cancellation of the caller.
Could be used to get the ready HTTP requests ASAP:
Works with different types of tasks and futures:
tasks | either a single container, or a pack of future-like elements. |
std::nullopt
if there are no completed tasks (possible if current task was cancelled). Definition at line 54 of file wait_any.hpp.
void drivers::WaitForSubscribableFuture | ( | SubscribableFuture && | future | ) |
Waits on the given future as described on drivers::SubscribableFutureWrapper.
The result can be retrieved from the original future once ready.
engine::WaitInterruptedException | on task cancellation |
Definition at line 85 of file subscribable_futures.hpp.