Before you start
Make sure that you can compile and run core tests as described at Configure and Build and took a look at the TCP half-duplex server with static configs validation.
Step by step guide
Let's write a TCP echo server. It should accept incoming connections, read the data from socket and send the received data back concurrently with read. The read/write operation continues as long as the socket is open.
We would also need production quality metrics and logs for the service.
TCP server
Derive from components::TcpAcceptorBase and override the ProcessSocket
function to get the new sockets:
#include <userver/concurrent/queue.hpp>
#include <userver/utils/statistics/metric_tag.hpp>
#include <userver/utils/statistics/metrics_storage.hpp>
namespace samples::tcp::echo {
struct Stats;
public:
static constexpr std::string_view kName = "tcp-echo";
private:
Stats& stats_;
};
}
- Warning
ProcessSocket
functions are invoked concurrently on the same instance of the class. Use synchronization primitives or do not modify shared data in ProcessSocket
.
struct Stats
holds the statistics for the component and is defined as:
struct Stats {
std::atomic<std::uint64_t> opened_sockets{0};
std::atomic<std::uint64_t> closed_sockets{0};
std::atomic<std::uint64_t> bytes_read{0};
};
Statistics registration
To automatically deliver the metrics they should be registered via utils::statistics::MetricTag and DumpMetric+ResetMetric functions should be defined:
writer["sockets"]["opened"] = stats.opened_sockets;
writer["sockets"]["closed"] = stats.closed_sockets;
writer["bytes"]["read"] = stats.bytes_read;
}
void ResetMetric(Stats& stats) {
stats.opened_sockets = 0;
stats.closed_sockets = 0;
stats.bytes_read = 0;
}
Now the tag could be used in component constructor to get a reference to the struct Stats
:
: TcpAcceptorBase(config, context),
stats_(context.FindComponent<
components::StatisticsStorage>()
.GetMetricsStorage()
->GetMetric(kTcpEchoTag)) {}
Static config
Lets configure our component in the components
section:
# yaml
tcp-echo:
task_processor: main-task-processor # Run socket accepts on CPU bound task processor
sockets_task_processor: main-task-processor # Run ProcessSocket() for each new socket on CPU bound task processor
port: 8181
We also need to configure the HTTP server and the handle that responds with statistics:
# yaml
server:
listener:
port: 8182 # ...to listen on this port and...
task_processor: monitor-task-processor # ...process incoming requests on this task processor.
handler-server-monitor:
path: /service/monitor
method: GET
task_processor: monitor-task-processor
monitor-handler: false
Dynamic config
We are not planning to get updates for dynamic config values in this sample. Because of that we just write the defaults to the fallback file of the components::DynamicConfigFallbacks
component.
All the values are described in a separate section Dynamic configs .
{
"BAGGAGE_SETTINGS": {
"allowed_keys": []
},
"HTTP_CLIENT_CONNECTION_POOL_SIZE": 1000,
"HTTP_CLIENT_CONNECT_THROTTLE": {
"max-size": 100,
"token-update-interval-ms": 0
},
"USERVER_BAGGAGE_ENABLED": false,
"USERVER_CACHES": {},
"USERVER_CANCEL_HANDLE_REQUEST_BY_DEADLINE": false,
"USERVER_CHECK_AUTH_IN_HANDLERS": true,
"USERVER_DEADLINE_PROPAGATION_ENABLED": true,
"USERVER_DUMPS": {},
"USERVER_FILES_CONTENT_TYPE_MAP": {
".css": "text/css",
".gif": "image/gif",
".htm": "text/html",
".html": "text/html",
".jpeg": "image/jpeg",
".js": "application/javascript",
".json": "application/json",
".md": "text/markdown",
".png": "image/png",
".svg": "image/svg+xml",
"__default__": "text/plain"
},
"USERVER_HANDLER_STREAM_API_ENABLED": false,
"USERVER_HTTP_PROXY": "",
"USERVER_LOG_DYNAMIC_DEBUG": {
"force-disabled": [],
"force-enabled": []
},
"USERVER_LOG_REQUEST": true,
"USERVER_LOG_REQUEST_HEADERS": false,
"USERVER_LRU_CACHES": {},
"USERVER_NO_LOG_SPANS": {
"names": [],
"prefixes": []
},
"USERVER_RPS_CCONTROL": {
"down-level": 1,
"down-rate-percent": 2,
"min-limit": 10,
"no-limit-seconds": 1000,
"overload-off-seconds": 3,
"overload-on-seconds": 3,
"up-level": 2,
"up-rate-percent": 2
},
"USERVER_RPS_CCONTROL_ACTIVATED_FACTOR_METRIC": 5,
"USERVER_RPS_CCONTROL_CUSTOM_STATUS": {},
"USERVER_RPS_CCONTROL_ENABLED": false,
"USERVER_TASK_PROCESSOR_PROFILER_DEBUG": {
"fs-task-processor": {
"enabled": false,
"execution-slice-threshold-us": 1000000
},
"main-task-processor": {
"enabled": false,
"execution-slice-threshold-us": 2000
}
},
"USERVER_TASK_PROCESSOR_QOS": {
"default-service": {
"default-task-processor": {
"wait_queue_overload": {
"action": "ignore",
"length_limit": 5000,
"time_limit_us": 3000
}
}
}
}
}
A production ready service would dynamically retrieve the above options at runtime from a configuration service. See Writing your own configs server for insights on how to change the above options on the fly, without restarting the service.
ProcessSocket
The full-duplex communication means that the same engine::io::Socket is concurrently used for sending and receiving data. It is safe to concurrently read and write into socket. We would need two functions:
- function that reads data from socket and puts it into a queue
- function that pops data from queue and sends it
Those two functions could be implemented in the following way:
namespace {
std::string data;
while (consumer.Pop(data)) {
const auto sent_bytes = sock.
SendAll(data.data(), data.size(), {});
if (sent_bytes != data.size()) {
LOG_INFO() <<
"Failed to send all the data";
return;
}
}
}
std::array<char, 1024> buf;
const auto read_bytes = sock.
ReadSome(buf.data(), buf.size(), {});
if (!read_bytes) {
return;
}
stats.bytes_read += read_bytes;
if (!producer.Push({buf.data(), read_bytes})) {
return;
}
}
}
}
Now it's time to handle new sockets. In the ProcessSocket function consists of the following steps:
- increment the stats for opened sockets
- create a span with a "fd" tag to trace the sockets
- make a guard to increment the closed sockets statistics on scope exit
- create a queue for messages
- create a new task that sends the messages from the queue
- run the receiving function
const auto sock_num = ++stats_.opened_sockets;
span.
AddTag(
"fd", std::to_string(sock.
Fd()));
++stats_.closed_sockets;
}};
auto queue = Queue::Create();
auto send_task =
utils::Async(
"send", DoSend, std::ref(sock), queue->GetConsumer());
DoRecv(sock, queue->GetProducer(), stats_);
}
The tracing::Span and utils::Async work together to produce nice logs that allow you to trace particular file descriptor:
tskv timestamp=2022-08-22T16:31:34.855853 text=Failed to read data fd=108 link=5bc8829cc3dc425d8d5c5d560f815fa2 trace_id=63eb16f2165d45669c23df725530572c span_id=17b35cd05db1c11e
On scope exit (for example because of the exception or return) the destructors would work in the following order:
- destructor of the producer - it unblocks the consumer Pop operation
- destructor of
send_task
- it cancels the coroutine and waits for it finish
- destructor of consumer
- destructor of queue
- destructor of scope guard - it increments the closed sockets count
- destructor of span - it writes the execution time of the scope
- destructor of socket is called after leaving the ProcessSocket - it closes the OS socket.
int main()
Finally, add the component to the components::MinimalServerComponentList()
, and start the server with static configuration file passed from command line.
int main(int argc, const char* const argv[]) {
.Append<samples::tcp::echo::Echo>()
.Append<components::TestsuiteSupport>()
.Append<components::HttpClient>();
}
Build and Run
To build the sample, execute the following build steps at the userver root directory:
mkdir build_release
cd build_release
cmake -DCMAKE_BUILD_TYPE=Release ..
make userver-samples-tcp_full_duplex_service
The sample could be started by running make start-userver-samples-tcp_full_duplex_service
. The command would invoke testsuite start target that sets proper paths in the configuration files and starts the service.
To start the service manually run ./samples/tcp_full_duplex_service/userver-samples-tcp_full_duplex_service -c </path/to/static_config.yaml>
(do not forget to prepare the configuration files!).
Now you can send a request to your server from another terminal:
bash
$ nc localhost 8181
hello
hello
test test test
test test test
Functional testing
Functional tests for the service and its metrics could be implemented using the testsuite in the following way:
import asyncio
import socket
import pytest
from pytest_userver import chaos
DATA = (
b'Once upon a midnight dreary, while I pondered, weak and weary, ',
b'Over many a quaint and curious volume of forgotten lore - ',
b'While I nodded, nearly napping, suddenly there came a tapping, ',
b'As of some one gently rapping, rapping at my chamber door - ',
b'"Tis some visitor," I muttered, "tapping at my chamber door - ',
b'Only this and nothing more."',
)
DATA_LENGTH = sum(len(x) for x in DATA)
@pytest.fixture(scope='session')
def monitor_port(service_port) -> int:
return service_port
async def send_all_data(s, loop):
for data in DATA:
await loop.sock_sendall(s, data)
async def recv_all_data(s, loop):
answer = b''
while len(answer) < DATA_LENGTH:
answer += await loop.sock_recv(s, DATA_LENGTH - len(answer))
assert answer == b''.join(DATA)
async def test_basic(service_client, loop, monitor_client, tcp_service_port):
await service_client.reset_metrics()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
await loop.sock_connect(sock, ('localhost', tcp_service_port))
send_task = asyncio.create_task(send_all_data(sock, loop))
await recv_all_data(sock, loop)
await send_task
metrics = await monitor_client.metrics(prefix='tcp-echo.')
assert metrics.value_at('tcp-echo.sockets.opened') == 1
assert metrics.value_at('tcp-echo.sockets.closed') == 0
assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH
Note that in this case testsuite requires some help to detect that the service is ready to accept requests. To do that, override the service_non_http_health_checks:
import pytest
pytest_plugins = ['pytest_userver.plugins.core']
@pytest.fixture(scope='session')
def tcp_service_port(service_config_yaml) -> int:
components = service_config_yaml['components_manager']['components']
tcp_hello = components.get('tcp-echo')
assert tcp_hello, 'No "tcp-echo" component found'
return int(tcp_hello['port'])
@pytest.fixture(scope='session')
def service_non_http_health_checks(
service_config_yaml, tcp_service_port,
) -> net.HealthChecks:
checks = net.get_health_checks_info(service_config_yaml)
checks.tcp.append(net.HostPort(host='localhost', port=tcp_service_port))
return checks
Full sources
See the full example at: