userver: Concurrency
Loading...
Searching...
No Matches
Concurrency

Detailed Description

Task construction and synchronization primitives for coroutines.

+ Collaboration diagram for Concurrency:

Files

file  atomic.hpp
 Helper algorithms to work with atomics.
 

Classes

class  utils::TokenBucket
 
class  concurrent::AsyncEventChannel< Args >
 
class  concurrent::AsyncEventSource< Args >
 The read-only side of an event channel. Events are delivered to listeners in a strict FIFO order, i.e. only after the event was processed a new event may appear for processing, same listener is never called concurrently. More...
 
class  concurrent::BackgroundTaskStorageCore
 
class  concurrent::BackgroundTaskStorage
 
class  concurrent::ConflatedEventChannel
 A non-blocking version of 'AsyncEventChannel'. More...
 
class  concurrent::MpscQueue< T >
 
class  concurrent::MutexSet< Key, Hash, Equal >
 A dynamic set of mutexes. More...
 
class  concurrent::StripedCounter
 A contention-free sharded atomic counter, with memory consumption and read performance traded for write performance. Intended to be used for write-heavy counters, mostly in metrics. More...
 
class  concurrent::Variable< Data, Mutex >
 
class  dist_lock::DistLockStrategyBase
 Interface for distributed lock strategies. More...
 
class  dist_lock::DistLockedTask
 A task that tries to acquire a distributed lock and runs user callback once while the lock is held. More...
 
class  drivers::SubscribableFutureWrapper< SubscribableFuture >
 An adaptor for working with certain external futures. More...
 
class  engine::ConditionVariable
 std::condition_variable replacement for asynchronous tasks More...
 
class  engine::Promise< T >
 std::promise replacement for asynchronous tasks that works in pair with engine::Future More...
 
class  engine::Future< T >
 std::future replacement for asynchronous tasks that works in pair with engine::Promise More...
 
class  engine::Mutex
 std::mutex replacement for asynchronous tasks. More...
 
class  engine::CancellableSemaphore
 Class that allows up to max_simultaneous_locks concurrent accesses to the critical section. It honours task cancellation, unlike Semaphore. More...
 
class  engine::Semaphore
 Class that allows up to max_simultaneous_locks concurrent accesses to the critical section. It ignores task cancellation, unlike CancellableSemaphore. More...
 
class  engine::SharedMutex
 std::shared_mutex replacement for asynchronous tasks. More...
 
class  engine::SingleConsumerEvent
 A multiple-producers, single-consumer event. More...
 
class  engine::SingleUseEvent
 A single-producer, single-consumer event. More...
 
class  engine::SingleWaitingTaskMutex
 Lighter version of Mutex with not more than 1 waiting task. More...
 
class  engine::TaskInheritedVariable< T >
 TaskInheritedVariable is a per-coroutine variable of arbitrary type. More...
 
class  engine::TaskLocalVariable< T >
 TaskLocalVariable is a per-coroutine variable of arbitrary type. More...
 
class  rcu::Variable< T, RcuTraits >
 Read-Copy-Update variable. More...
 
class  rcu::RcuMap< Key, Value, RcuMapTraits >
 Map-like structure allowing RCU keyset updates. More...
 
class  utils::PeriodicTask
 Task that periodically runs a user callback. Callback is started after the previous callback execution is finished every period + A - B, where: More...
 

Typedefs

template<typename T >
using concurrent::NonFifoMpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<true, true>>
 Non FIFO multiple producers multiple consumers queue.
 
template<typename T >
using concurrent::NonFifoMpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<true, false>>
 Non FIFO multiple producers single consumer queue.
 
template<typename T >
using concurrent::SpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<false, true>>
 Single producer multiple consumers queue.
 
template<typename T >
using concurrent::SpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<false, false>>
 Single producer single consumer queue.
 
using concurrent::StringStreamQueue
 Single producer single consumer queue of std::string which is bounded bytes inside.
 

Functions

template<typename T , typename Func >
utils::AtomicUpdate (std::atomic< T > &atomic, Func updater)
 Atomically performs the operation of updater on atomic
 
template<typename T >
utils::AtomicMin (std::atomic< T > &atomic, T value)
 Concurrently safe sets atomic to a value if value is less.
 
template<typename T >
utils::AtomicMax (std::atomic< T > &atomic, T value)
 Concurrently safe sets atomic to a value if value is greater.
 
template<typename SubscribableFuture >
void drivers::WaitForSubscribableFuture (SubscribableFuture &&future)
 Waits on the given future as described on drivers::SubscribableFutureWrapper.
 
template<typename SubscribableFuture >
engine::FutureStatus drivers::TryWaitForSubscribableFuture (SubscribableFuture &&future, engine::Deadline deadline)
 
template<typename... Tasks>
auto engine::GetAll (Tasks &... tasks)
 Waits for the successful completion of all of the specified tasks or the cancellation of the caller.
 
template<typename... Tasks>
void engine::WaitAllChecked (Tasks &... tasks)
 Waits for the successful completion of all of the specified tasks or for the cancellation of the caller.
 
template<typename... Tasks>
std::optional< std::size_t > engine::WaitAny (Tasks &... tasks)
 Waits for the completion of any of the specified tasks or the cancellation of the caller.
 
template<typename Function , typename... Args>
auto utils::Async (std::string name, Function &&f, Args &&... args)
 Starts an asynchronous task.
 
template<typename Function , typename... Args>
auto utils::Async (engine::TaskProcessor &task_processor, std::string name, Function &&f, Args &&... args)
 
template<typename Function , typename... Args>
auto utils::CriticalAsync (engine::TaskProcessor &task_processor, std::string name, Function &&f, Args &&... args)
 
template<typename Function , typename... Args>
auto utils::SharedCriticalAsync (engine::TaskProcessor &task_processor, std::string name, Function &&f, Args &&... args)
 
template<typename Function , typename... Args>
auto utils::SharedAsync (engine::TaskProcessor &task_processor, std::string name, Function &&f, Args &&... args)
 
template<typename Function , typename... Args>
auto utils::Async (engine::TaskProcessor &task_processor, std::string name, engine::Deadline deadline, Function &&f, Args &&... args)
 
template<typename Function , typename... Args>
auto utils::SharedAsync (engine::TaskProcessor &task_processor, std::string name, engine::Deadline deadline, Function &&f, Args &&... args)
 
template<typename Function , typename... Args>
auto utils::CriticalAsync (std::string name, Function &&f, Args &&... args)
 
template<typename Function , typename... Args>
auto utils::SharedCriticalAsync (std::string name, Function &&f, Args &&... args)
 
template<typename Function , typename... Args>
auto utils::SharedAsync (std::string name, Function &&f, Args &&... args)
 
template<typename Function , typename... Args>
auto utils::Async (std::string name, engine::Deadline deadline, Function &&f, Args &&... args)
 
template<typename Function , typename... Args>
auto utils::SharedAsync (std::string name, engine::Deadline deadline, Function &&f, Args &&... args)
 
template<typename Function , typename... Args>
auto utils::AsyncBackground (std::string name, engine::TaskProcessor &task_processor, Function &&f, Args &&... args)
 
template<typename Function , typename... Args>
auto utils::CriticalAsyncBackground (std::string name, engine::TaskProcessor &task_processor, Function &&f, Args &&... args)
 

Typedef Documentation

◆ NonFifoMpmcQueue

template<typename T >
using concurrent::NonFifoMpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<true, true>>

Non FIFO multiple producers multiple consumers queue.

Items from the same producer are always delivered in the production order. Items from different producers (or when using a MultiProducer token) are delivered in an unspecified order. In other words, FIFO order is maintained only within producers, but not between them. This may lead to increased peak latency of item processing.

In exchange for this, the queue has lower contention and increased throughput compared to a conventional lock-free queue.

See also
Synchronization Primitives

Definition at line 623 of file queue.hpp.

◆ NonFifoMpscQueue

template<typename T >
using concurrent::NonFifoMpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<true, false>>

Non FIFO multiple producers single consumer queue.

See also
concurrent::NonFifoMpmcQueue for the description of what NonFifo means.
Synchronization Primitives

Definition at line 632 of file queue.hpp.

◆ SpmcQueue

template<typename T >
using concurrent::SpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<false, true>>

Single producer multiple consumers queue.

See also
Synchronization Primitives

Definition at line 640 of file queue.hpp.

◆ SpscQueue

template<typename T >
using concurrent::SpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<false, false>>

Single producer single consumer queue.

See also
Synchronization Primitives

Definition at line 648 of file queue.hpp.

◆ StringStreamQueue

Initial value:
GenericQueue<std::string, impl::ContainerQueuePolicy<false, false>>

Single producer single consumer queue of std::string which is bounded bytes inside.

See also
Synchronization Primitives

Definition at line 656 of file queue.hpp.

Function Documentation

◆ Async() [1/4]

template<typename Function , typename... Args>
auto utils::Async ( engine::TaskProcessor & task_processor,
std::string name,
engine::Deadline deadline,
Function && f,
Args &&... args )

This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.

Task execution may be cancelled before the function starts execution in case of TaskProcessor overload.

Parameters
task_processorTask processor to run on
nameName of the task to show in logs
fFunction to execute asynchronously
argsArguments to pass to the function
Returns
engine::TaskWithResult

Definition at line 319 of file async.hpp.

◆ Async() [2/4]

template<typename Function , typename... Args>
auto utils::Async ( engine::TaskProcessor & task_processor,
std::string name,
Function && f,
Args &&... args )

This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.

Task execution may be cancelled before the function starts execution in case of TaskProcessor overload.

Parameters
task_processorTask processor to run on
nameName of the task to show in logs
fFunction to execute asynchronously
argsArguments to pass to the function
Returns
engine::TaskWithResult

Definition at line 241 of file async.hpp.

◆ Async() [3/4]

template<typename Function , typename... Args>
auto utils::Async ( std::string name,
engine::Deadline deadline,
Function && f,
Args &&... args )

This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.

Task execution may be cancelled before the function starts execution in case of TaskProcessor overload.

Parameters
nameName of the task to show in logs
fFunction to execute asynchronously
argsArguments to pass to the function
Returns
engine::TaskWithResult

Definition at line 411 of file async.hpp.

◆ Async() [4/4]

template<typename Function , typename... Args>
auto utils::Async ( std::string name,
Function && f,
Args &&... args )

Starts an asynchronous task.

By default, arguments are copied or moved inside the resulting TaskWithResult, like std::thread does. To pass an argument by reference, wrap it in std::ref / std::cref or capture the arguments using a lambda.

Flavors of Async

There are multiple orthogonal parameters of the task being started. Use this specific overload by default (utils::Async).

By engine::TaskProcessor:

  • By default, task processor of the current task is used.
  • Custom task processor can be passed as the first or second function parameter (see function signatures).

By shared-ness:

  • By default, functions return engine::TaskWithResult, which can be awaited from 1 task at once. This is a reasonable choice for most cases.
  • Functions from utils::Shared*Async* and engine::Shared*AsyncNoSpan families return engine::SharedTaskWithResult, which can be awaited from multiple tasks at the same time, at the cost of some overhead.

By engine::TaskBase::Importance ("critical-ness"):

  • By default, functions can be cancelled due to engine::TaskProcessor overload. Also, if the task is cancelled before being started, it will not run at all.
  • If the whole service's health (not just one request) depends on the task being run, then functions from utils::*CriticalAsync* and engine::*CriticalAsyncNoSpan* families can be used. There, execution of the function is guaranteed to start regardless of engine::TaskProcessor load limits

By tracing::Span:

  • Functions from utils::*Async* family (which you should use by default) create tracing::Span with inherited trace_id and link, a new span_id and the specified stopwatch_name, which ensures that logs from the task are categorized correctly and will not get lost.
  • Functions from engine::*AsyncNoSpan* family create span-less tasks:
    • A possible usage scenario is to create a task that will mostly wait in the background and do various unrelated work every now and then. In this case it might make sense to trace execution of work items, but not of the task itself.
    • Its usage can (albeit very rarely) be justified to squeeze some nanoseconds of performance where no logging is expected. But beware! Using tracing::Span::CurrentSpan() will trigger asserts and lead to UB in production.

By the propagation of engine::TaskInheritedVariable instances:

  • Functions from utils::*Async* family (which you should use by default) inherit all task-inherited variables from the parent task.
  • Functions from engine::*AsyncNoSpan* family do not inherit any task-inherited variables.

By deadline: some utils::*Async* functions accept an engine::Deadline parameter. If the deadline expires, the task is cancelled. See *Async* function signatures for details.

Lifetime of captures

Note
To launch background tasks, which are not awaited in the local scope, use concurrent::BackgroundTaskStorage.

When launching a task, it's important to ensure that it will not access its lambda captures after they are destroyed. Plain data captured by value (including by move) is always safe. By-reference captures and objects that store references inside are always something to be aware of. Of course, copying the world will degrade performance, so let's see how to ensure lifetime safety with captured references.

Task objects are automatically cancelled and awaited on destruction, if not already finished. The lifetime of the task object is what determines when the task may be running and accessing its captures. The task can only safely capture by reference objects that outlive the task object.

When the task is just stored in a new local variable and is not moved or returned from a function, capturing anything is safe:

int x{};
int y{};
// It's recommended to write out captures explicitly when launching tasks.
auto task = utils::Async("frobnicate", [&x, &y] {
// Capturing anything defined before the `task` variable is safe.
Use(x, y);
});
// ...
task.Get();

A more complicated example, where the task is moved into a container:

// Variables are destroyed in the reverse definition order: y, tasks, x.
int x{};
std::vector<engine::TaskWithResult<void>> tasks;
int y{};
tasks.push_back(utils::Async("frobnicate", [&x, &y] {
// Capturing x is safe, because `tasks` outlives `x`.
Use(x);
// BUG! The task may keep running for some time after `y` is destroyed.
Use(y);
}));

The bug above can be fixed by placing the declaration of tasks after y.

In the case above people often think that calling .Get() in appropriate places solves the problem. It does not! If an exception is thrown somewhere before .Get(), then the variables' definition order is the source of truth.

Same guidelines apply when tasks are stored in classes or structs: the task object must be defined below everything that it accesses:

private:
// Can access foo_ but not bar_.

Generally, it's a good idea to put task objects as low as possible in the list of class members.

Although, tasks are rarely stored in classes on practice, concurrent::BackgroundTaskStorage is typically used for that purpose.

Components and their clients can always be safely captured by reference:

See also
Component system

About this specific overload

This is the overload that should be used by default.

  • The task will be launched on the current TaskProcessor.
  • Only 1 task may call Wait or Get on this task.
  • The task may be cancelled before the function starts execution in case of TaskProcessor overload. Also, if the task is cancelled for any reason before the function starts execution, it will not run at all.
  • The task will create a child tracing::Span with the specified name
  • The task will inherit all engine::TaskInheritedVariable instances from the current task.

For details on the various other overloads:

See also
flavors_of_async
Parameters
nameName of the task to show in logs
fFunction to execute asynchronously
argsArguments to pass to the function
Returns
engine::TaskWithResult
Examples
components/component_sample_test.cpp, and samples/tcp_full_duplex_service/tcp_full_duplex_service.cpp.

Definition at line 222 of file async.hpp.

◆ AsyncBackground()

template<typename Function , typename... Args>
auto utils::AsyncBackground ( std::string name,
engine::TaskProcessor & task_processor,
Function && f,
Args &&... args )

Starts an asynchronous task without propagating engine::TaskInheritedVariable. tracing::Span and baggage::Baggage are inherited. Task execution may be cancelled before the function starts execution in case of engine::TaskProcessor overload.

Typically used from a request handler to launch tasks that outlive the request and do not effect its completion.

Usage example

Suppose you have some component that runs asynchronous tasks:

class AsyncRequestProcessor final {
public:
AsyncRequestProcessor();
void FooAsync(Request&& request);
Response WaitAndGetAggregate();
private:
static Response Foo(Request&& request);
engine::TaskProcessor& task_processor_;
};
auto handler = [&](Request&& request) {
async_request_processor.FooAsync(std::move(request));
return "Please wait, your request is being processed.";
};

If the tasks logically belong to the component itself (not to the method caller), then they should be launched using utils::AsyncBackground instead of the regular utils::Async

void AsyncRequestProcessor::FooAsync(Request&& request) {
auto tasks = tasks_.Lock();
tasks->push_back(
utils::AsyncBackground("foo", task_processor_, &Foo, std::move(request)));
}

Arguments

By default, arguments are copied or moved inside the resulting TaskWithResult, like std::thread does. To pass an argument by reference, wrap it in std::ref / std::cref or capture the arguments using a lambda.

Parameters
nameName of the task to show in logs
task_processorTask processor to run on
fFunction to execute asynchronously
argsArguments to pass to the function
Returns
engine::TaskWithResult

Definition at line 467 of file async.hpp.

◆ AtomicMax()

template<typename T >
T utils::AtomicMax ( std::atomic< T > & atomic,
T value )

Concurrently safe sets atomic to a value if value is greater.

Note
Uses std::memory_order_relaxed

Definition at line 55 of file atomic.hpp.

◆ AtomicMin()

template<typename T >
T utils::AtomicMin ( std::atomic< T > & atomic,
T value )

Concurrently safe sets atomic to a value if value is less.

Note
Uses std::memory_order_relaxed

Definition at line 43 of file atomic.hpp.

◆ AtomicUpdate()

template<typename T , typename Func >
T utils::AtomicUpdate ( std::atomic< T > & atomic,
Func updater )

Atomically performs the operation of updater on atomic

updater may be called multiple times per one call of AtomicUpdate, so it must be idempotent. To ensure that the function does not spin for a long time, updater must be fairly simple and fast.

Parameters
atomicthe variable to update
updatera lambda that takes the old value and produces the new value
Returns
The updated value
Note
Uses std::memory_order_relaxed

Definition at line 24 of file atomic.hpp.

◆ CriticalAsync() [1/2]

template<typename Function , typename... Args>
auto utils::CriticalAsync ( engine::TaskProcessor & task_processor,
std::string name,
Function && f,
Args &&... args )

This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.

Execution of function is guaranteed to start regardless of engine::TaskProcessor load limits. Prefer utils::Async by default.

Parameters
task_processorTask processor to run on
nameName for the tracing::Span to use with this task
fFunction to execute asynchronously
argsArguments to pass to the function
Returns
engine::TaskWithResult

Definition at line 260 of file async.hpp.

◆ CriticalAsync() [2/2]

template<typename Function , typename... Args>
auto utils::CriticalAsync ( std::string name,
Function && f,
Args &&... args )

This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.

Execution of function is guaranteed to start regardless of engine::TaskProcessor load limits. Prefer utils::Async by default.

Parameters
nameName for the tracing::Span to use with this task
fFunction to execute asynchronously
argsArguments to pass to the function
Returns
engine::TaskWithResult

Definition at line 358 of file async.hpp.

◆ CriticalAsyncBackground()

template<typename Function , typename... Args>
auto utils::CriticalAsyncBackground ( std::string name,
engine::TaskProcessor & task_processor,
Function && f,
Args &&... args )

This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.

Execution of function is guaranteed to start regardless of engine::TaskProcessor load limits. Use for background tasks for which failing to start not just breaks handling of a single request, but harms the whole service instance.

Parameters
nameName of the task to show in logs
task_processorTask processor to run on
fFunction to execute asynchronously
argsArguments to pass to the function
Returns
engine::TaskWithResult

Definition at line 492 of file async.hpp.

◆ GetAll()

template<typename... Tasks>
auto engine::GetAll ( Tasks &... tasks)

Waits for the successful completion of all of the specified tasks or the cancellation of the caller.

Effectively performs for (auto& task : tasks) task.Get(); with a twist: task.Get() is called in tasks completion order rather than in provided order, thus exceptions are rethrown ASAP.

After successful return from this method the tasks are invalid, in case of an exception being thrown some of the tasks might be invalid.

Parameters
taskseither a single container, or a pack of future-like elements.
Returns
std::vector<Result> or void, depending on the tasks result type (which must be the same for all tasks).
Exceptions
WaitInterruptedExceptionwhen current_task::IsCancelRequested() and no TaskCancellationBlockers are present.
std::exceptionrethrows one of specified tasks exception, if any, in no particular order.
Note
Has overall computational complexity of O(N^2), where N is the number of tasks.
Prefer engine::WaitAllChecked for tasks with a result, unless you specifically need the results stored in a std::vector or when storing the results long-term.

Definition at line 84 of file get_all.hpp.

◆ SharedAsync() [1/4]

template<typename Function , typename... Args>
auto utils::SharedAsync ( engine::TaskProcessor & task_processor,
std::string name,
engine::Deadline deadline,
Function && f,
Args &&... args )

This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.

Task execution may be cancelled before the function starts execution in case of TaskProcessor overload.

Parameters
task_processorTask processor to run on
nameName of the task to show in logs
fFunction to execute asynchronously
argsArguments to pass to the function
Returns
engine::SharedTaskWithResult

Definition at line 339 of file async.hpp.

◆ SharedAsync() [2/4]

template<typename Function , typename... Args>
auto utils::SharedAsync ( engine::TaskProcessor & task_processor,
std::string name,
Function && f,
Args &&... args )

This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.

Task execution may be cancelled before the function starts execution in case of TaskProcessor overload.

Parameters
task_processorTask processor to run on
nameName of the task to show in logs
fFunction to execute asynchronously
argsArguments to pass to the function
Returns
engine::SharedTaskWithResult

Definition at line 300 of file async.hpp.

◆ SharedAsync() [3/4]

template<typename Function , typename... Args>
auto utils::SharedAsync ( std::string name,
engine::Deadline deadline,
Function && f,
Args &&... args )

This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.

Task execution may be cancelled before the function starts execution in case of TaskProcessor overload.

Parameters
nameName of the task to show in logs
fFunction to execute asynchronously
argsArguments to pass to the function
Returns
engine::SharedTaskWithResult

Definition at line 429 of file async.hpp.

◆ SharedAsync() [4/4]

template<typename Function , typename... Args>
auto utils::SharedAsync ( std::string name,
Function && f,
Args &&... args )

This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.

Task execution may be cancelled before the function starts execution in case of TaskProcessor overload.

Parameters
nameName of the task to show in logs
fFunction to execute asynchronously
argsArguments to pass to the function
Returns
engine::SharedTaskWithResult

Definition at line 394 of file async.hpp.

◆ SharedCriticalAsync() [1/2]

template<typename Function , typename... Args>
auto utils::SharedCriticalAsync ( engine::TaskProcessor & task_processor,
std::string name,
Function && f,
Args &&... args )

This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.

Execution of function is guaranteed to start regardless of engine::TaskProcessor load limits. Prefer utils::SharedAsync by default.

Parameters
task_processorTask processor to run on
nameName for the tracing::Span to use with this task
fFunction to execute asynchronously
argsArguments to pass to the function
Returns
engine::SharedTaskWithResult

Definition at line 280 of file async.hpp.

◆ SharedCriticalAsync() [2/2]

template<typename Function , typename... Args>
auto utils::SharedCriticalAsync ( std::string name,
Function && f,
Args &&... args )

This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.

Execution of function is guaranteed to start regardless of engine::TaskProcessor load limits. Prefer utils::SharedAsync by default.

Parameters
nameName for the tracing::Span to use with this task
fFunction to execute asynchronously
argsArguments to pass to the function
Returns
engine::SharedTaskWithResult

Definition at line 376 of file async.hpp.

◆ TryWaitForSubscribableFuture()

template<typename SubscribableFuture >
engine::FutureStatus drivers::TryWaitForSubscribableFuture ( SubscribableFuture && future,
engine::Deadline deadline )

This is an overloaded member function, provided for convenience. It differs from the above function only in what argument(s) it accepts.

Returns
an error code if deadline is exceeded or task is cancelled
Warning
Repeatedly waiting again after deadline expiration leads to a memory leak, use drivers::SubscribableFutureWrapper instead.

Definition at line 100 of file subscribable_futures.hpp.

◆ WaitAllChecked()

template<typename... Tasks>
void engine::WaitAllChecked ( Tasks &... tasks)

Waits for the successful completion of all of the specified tasks or for the cancellation of the caller.

Effectively performs for (auto& task : tasks) task.Wait(); with a twist: if any task completes with an exception, it gets rethrown ASAP.

Invalid tasks are skipped.

Tasks are not invalidated by WaitAllChecked; the result can be retrieved after the call.

Parameters
taskseither a single container, or a pack of future-like elements.
Exceptions
WaitInterruptedExceptionwhen current_task::ShouldCancel() (for WaitAllChecked versions without a deadline)
std::exceptionone of specified tasks exception, if any, in no particular order.
Note
Has overall computational complexity of O(N^2), where N is the number of tasks.
Keeping the tasks valid may have a small extra memory impact. Make sure to drop the tasks after reading the results.
Prefer engine::GetAll for tasks without a result, unless you specifically need to keep the tasks alive for some reason.

Definition at line 101 of file wait_all_checked.hpp.

◆ WaitAny()

template<typename... Tasks>
std::optional< std::size_t > engine::WaitAny ( Tasks &... tasks)

Waits for the completion of any of the specified tasks or the cancellation of the caller.

Could be used to get the ready HTTP requests ASAP:

std::size_t ProcessReadyRequests(
std::vector<clients::http::ResponseFuture>& requests,
engine::Deadline deadline) {
std::size_t processed_requests = 0;
while (auto indx = engine::WaitAnyUntil(deadline, requests)) {
++processed_requests;
std::shared_ptr<clients::http::Response> response = requests[*indx].Get();
EXPECT_TRUE(response->IsOk());
}
return processed_requests;
}

Works with different types of tasks and futures:

auto task0 = engine::AsyncNoSpan([] { return 1; });
auto task1 = utils::Async("long_task", [] {
return std::string{"abc"};
});
auto task_idx_opt = engine::WaitAny(task0, task1);
ASSERT_TRUE(task_idx_opt);
EXPECT_EQ(*task_idx_opt, 0);
Parameters
taskseither a single container, or a pack of future-like elements.
Returns
the index of the completed task, or std::nullopt if there are no completed tasks (possible if current task was cancelled).

Definition at line 56 of file wait_any.hpp.

◆ WaitForSubscribableFuture()

template<typename SubscribableFuture >
void drivers::WaitForSubscribableFuture ( SubscribableFuture && future)

Waits on the given future as described on drivers::SubscribableFutureWrapper.

The result can be retrieved from the original future once ready.

Exceptions
engine::WaitInterruptedExceptionon task cancellation

Definition at line 89 of file subscribable_futures.hpp.