userver: TCP full-duplex server with metrics and Spans
Loading...
Searching...
No Matches
TCP full-duplex server with metrics and Spans

Before you start

Make sure that you can compile and run core tests as described at Configure and Build and took a look at the TCP half-duplex server with static configs validation.

Step by step guide

Let's write a TCP echo server. It should accept incoming connections, read the data from socket and send the received data back concurrently with read. The read/write operation continues as long as the socket is open.

We would also need production quality metrics and logs for the service.

TCP server

Derive from components::TcpAcceptorBase and override the ProcessSocket function to get the new sockets:

#include <userver/concurrent/queue.hpp>
#include <userver/utils/statistics/metric_tag.hpp>
#include <userver/utils/statistics/metrics_storage.hpp>
namespace samples::tcp::echo {
struct Stats;
class Echo final : public components::TcpAcceptorBase {
public:
static constexpr std::string_view kName = "tcp-echo";
// Component is valid after construction and is able to accept requests
Echo(const components::ComponentConfig& config,
void ProcessSocket(engine::io::Socket&& sock) override;
private:
Stats& stats_;
};
} // namespace samples::tcp::echo
Warning
ProcessSocket functions are invoked concurrently on the same instance of the class. Use synchronization primitives or do not modify shared data in ProcessSocket.

struct Stats holds the statistics for the component and is defined as:

struct Stats {
std::atomic<std::uint64_t> opened_sockets{0};
std::atomic<std::uint64_t> closed_sockets{0};
std::atomic<std::uint64_t> bytes_read{0};
};

Statistics registration

To automatically deliver the metrics they should be registered via utils::statistics::MetricTag and DumpMetric+ResetMetric functions should be defined:

const utils::statistics::MetricTag<Stats> kTcpEchoTag{"tcp-echo"};
void DumpMetric(utils::statistics::Writer& writer, const Stats& stats) {
writer["sockets"]["opened"] = stats.opened_sockets;
writer["sockets"]["closed"] = stats.closed_sockets;
writer["bytes"]["read"] = stats.bytes_read;
}
void ResetMetric(Stats& stats) {
stats.opened_sockets = 0;
stats.closed_sockets = 0;
stats.bytes_read = 0;
}

Now the tag could be used in component constructor to get a reference to the struct Stats:

Echo::Echo(const components::ComponentConfig& config,
: TcpAcceptorBase(config, context),
stats_(context.FindComponent<components::StatisticsStorage>()
.GetMetricsStorage()
->GetMetric(kTcpEchoTag)) {}

Static config

Lets configure our component in the components section:

# yaml
tcp-echo:
task_processor: main-task-processor # Run socket accepts on CPU bound task processor
sockets_task_processor: main-task-processor # Run ProcessSocket() for each new socket on CPU bound task processor
port: 8181

We also need to configure the HTTP server and the handle that responds with statistics:

# yaml
server:
listener:
port: 8182 # ...to listen on this port and...
task_processor: monitor-task-processor # ...process incoming requests on this task processor.
handler-server-monitor:
path: /service/monitor
method: GET
task_processor: monitor-task-processor
monitor-handler: false

Dynamic config

We are not planning to get updates for dynamic config values in this sample. Because of that we just write the defaults to the fallback file of the components::DynamicConfigFallbacks component.

All the values are described in a separate section Dynamic configs .

{
"BAGGAGE_SETTINGS": {
"allowed_keys": []
},
"HTTP_CLIENT_CONNECTION_POOL_SIZE": 1000,
"HTTP_CLIENT_CONNECT_THROTTLE": {
"max-size": 100,
"token-update-interval-ms": 0
},
"USERVER_BAGGAGE_ENABLED": false,
"USERVER_CACHES": {},
"USERVER_CANCEL_HANDLE_REQUEST_BY_DEADLINE": false,
"USERVER_CHECK_AUTH_IN_HANDLERS": true,
"USERVER_DEADLINE_PROPAGATION_ENABLED": true,
"USERVER_DUMPS": {},
"USERVER_FILES_CONTENT_TYPE_MAP": {
".css": "text/css",
".gif": "image/gif",
".htm": "text/html",
".html": "text/html",
".jpeg": "image/jpeg",
".js": "application/javascript",
".json": "application/json",
".md": "text/markdown",
".png": "image/png",
".svg": "image/svg+xml",
"__default__": "text/plain"
},
"USERVER_HANDLER_STREAM_API_ENABLED": false,
"USERVER_HTTP_PROXY": "",
"USERVER_LOG_DYNAMIC_DEBUG": {
"force-disabled": [],
"force-enabled": []
},
"USERVER_LOG_REQUEST": true,
"USERVER_LOG_REQUEST_HEADERS": false,
"USERVER_LRU_CACHES": {},
"USERVER_NO_LOG_SPANS": {
"names": [],
"prefixes": []
},
"USERVER_RPS_CCONTROL": {
"down-level": 1,
"down-rate-percent": 2,
"min-limit": 10,
"no-limit-seconds": 1000,
"overload-off-seconds": 3,
"overload-on-seconds": 3,
"up-level": 2,
"up-rate-percent": 2
},
"USERVER_RPS_CCONTROL_ACTIVATED_FACTOR_METRIC": 5,
"USERVER_RPS_CCONTROL_CUSTOM_STATUS": {},
"USERVER_RPS_CCONTROL_ENABLED": false,
"USERVER_TASK_PROCESSOR_PROFILER_DEBUG": {
"fs-task-processor": {
"enabled": false,
"execution-slice-threshold-us": 1000000
},
"main-task-processor": {
"enabled": false,
"execution-slice-threshold-us": 2000
}
},
"USERVER_TASK_PROCESSOR_QOS": {
"default-service": {
"default-task-processor": {
"wait_queue_overload": {
"action": "ignore",
"length_limit": 5000,
"time_limit_us": 3000
}
}
}
}
}

A production ready service would dynamically retrieve the above options at runtime from a configuration service. See Writing your own configs server for insights on how to change the above options on the fly, without restarting the service.

ProcessSocket

The full-duplex communication means that the same engine::io::Socket is concurrently used for sending and receiving data. It is safe to concurrently read and write into socket. We would need two functions:

  • function that reads data from socket and puts it into a queue
  • function that pops data from queue and sends it

Those two functions could be implemented in the following way:

namespace {
void DoSend(engine::io::Socket& sock, Queue::Consumer consumer) {
std::string data;
while (consumer.Pop(data)) {
const auto sent_bytes = sock.SendAll(data.data(), data.size(), {});
if (sent_bytes != data.size()) {
LOG_INFO() << "Failed to send all the data";
return;
}
}
}
void DoRecv(engine::io::Socket& sock, Queue::Producer producer, Stats& stats) {
std::array<char, 1024> buf; // NOLINT(cppcoreguidelines-pro-type-member-init)
const auto read_bytes = sock.ReadSome(buf.data(), buf.size(), {});
if (!read_bytes) {
LOG_INFO() << "Failed to read data";
return;
}
stats.bytes_read += read_bytes;
if (!producer.Push({buf.data(), read_bytes})) {
return;
}
}
}
} // anonymous namespace

Now it's time to handle new sockets. In the ProcessSocket function consists of the following steps:

  • increment the stats for opened sockets
  • create a span with a "fd" tag to trace the sockets
  • make a guard to increment the closed sockets statistics on scope exit
  • create a queue for messages
  • create a new task that sends the messages from the queue
  • run the receiving function
void Echo::ProcessSocket(engine::io::Socket&& sock) {
const auto sock_num = ++stats_.opened_sockets;
tracing::Span span{fmt::format("sock_{}", sock_num)};
span.AddTag("fd", std::to_string(sock.Fd()));
utils::FastScopeGuard guard{[this]() noexcept {
LOG_INFO() << "Closing socket";
++stats_.closed_sockets;
}};
auto queue = Queue::Create();
auto send_task =
utils::Async("send", DoSend, std::ref(sock), queue->GetConsumer());
DoRecv(sock, queue->GetProducer(), stats_);
}

The tracing::Span and utils::Async work together to produce nice logs that allow you to trace particular file descriptor:

tskv timestamp=2022-08-22T16:31:34.855853 text=Failed to read data fd=108 link=5bc8829cc3dc425d8d5c5d560f815fa2 trace_id=63eb16f2165d45669c23df725530572c span_id=17b35cd05db1c11e

On scope exit (for example because of the exception or return) the destructors would work in the following order:

  • destructor of the producer - it unblocks the consumer Pop operation
  • destructor of send_task - it cancels the coroutine and waits for it finish
  • destructor of consumer
  • destructor of queue
  • destructor of scope guard - it increments the closed sockets count
  • destructor of span - it writes the execution time of the scope
  • destructor of socket is called after leaving the ProcessSocket - it closes the OS socket.

int main()

Finally, add the component to the components::MinimalServerComponentList(), and start the server with static configuration file passed from command line.

int main(int argc, const char* const argv[]) {
const auto component_list = components::MinimalServerComponentList()
.Append<samples::tcp::echo::Echo>()
// Testuite components:
.Append<components::TestsuiteSupport>()
.Append<components::HttpClient>();
return utils::DaemonMain(argc, argv, component_list);
}

Build and Run

To build the sample, execute the following build steps at the userver root directory:

mkdir build_release
cd build_release
cmake -DCMAKE_BUILD_TYPE=Release ..
make userver-samples-tcp_full_duplex_service

The sample could be started by running make start-userver-samples-tcp_full_duplex_service. The command would invoke testsuite start target that sets proper paths in the configuration files and starts the service.

To start the service manually run ./samples/tcp_full_duplex_service/userver-samples-tcp_full_duplex_service -c </path/to/static_config.yaml> (do not forget to prepare the configuration files!).

Now you can send a request to your server from another terminal:

bash
$ nc localhost 8181
hello
hello
test test test
test test test

Functional testing

Functional tests for the service and its metrics could be implemented using the testsuite in the following way:

import asyncio
import socket
import pytest
from pytest_userver import chaos
DATA = (
b'Once upon a midnight dreary, while I pondered, weak and weary, ',
b'Over many a quaint and curious volume of forgotten lore - ',
b'While I nodded, nearly napping, suddenly there came a tapping, ',
b'As of some one gently rapping, rapping at my chamber door - ',
b'"Tis some visitor," I muttered, "tapping at my chamber door - ',
b'Only this and nothing more."',
)
DATA_LENGTH = sum(len(x) for x in DATA)
# Another way to say that monitor handlers listen for the main service port
@pytest.fixture(scope='session')
def monitor_port(service_port) -> int:
return service_port
async def send_all_data(s, loop):
for data in DATA:
await loop.sock_sendall(s, data)
async def recv_all_data(s, loop):
answer = b''
while len(answer) < DATA_LENGTH:
answer += await loop.sock_recv(s, DATA_LENGTH - len(answer))
assert answer == b''.join(DATA)
async def test_basic(service_client, loop, monitor_client, tcp_service_port):
await service_client.reset_metrics()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
await loop.sock_connect(sock, ('localhost', tcp_service_port))
send_task = asyncio.create_task(send_all_data(sock, loop))
await recv_all_data(sock, loop)
await send_task
metrics = await monitor_client.metrics(prefix='tcp-echo.')
assert metrics.value_at('tcp-echo.sockets.opened') == 1
assert metrics.value_at('tcp-echo.sockets.closed') == 0
assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH

Note that in this case testsuite requires some help to detect that the service is ready to accept requests. To do that, override the service_non_http_health_checks:

import pytest
from pytest_userver.utils import net
pytest_plugins = ['pytest_userver.plugins.core']
@pytest.fixture(scope='session')
def tcp_service_port(service_config_yaml) -> int:
components = service_config_yaml['components_manager']['components']
tcp_hello = components.get('tcp-echo')
assert tcp_hello, 'No "tcp-echo" component found'
return int(tcp_hello['port'])
@pytest.fixture(scope='session')
def service_non_http_health_checks(
service_config_yaml, tcp_service_port,
) -> net.HealthChecks:
checks = net.get_health_checks_info(service_config_yaml)
checks.tcp.append(net.HostPort(host='localhost', port=tcp_service_port))
return checks

Full sources

See the full example at: