Before you start
Make sure that you can compile and run core tests as described at Configure, Build and Install and took a look at the TCP half-duplex server with static configs validation.
Step by step guide
Let's write a TCP echo server. It should accept incoming connections, read the data from socket and send the received data back concurrently with read. The read/write operation continues as long as the socket is open.
We would also need production quality metrics and logs for the service.
TCP server
Derive from components::TcpAcceptorBase and override the ProcessSocket
function to get the new sockets:
#include <userver/concurrent/queue.hpp>
#include <userver/utils/statistics/metric_tag.hpp>
#include <userver/utils/statistics/metrics_storage.hpp>
namespace samples::tcp::echo {
struct Stats;
public:
static constexpr std::string_view kName = "tcp-echo";
private:
Stats& stats_;
};
}
- Warning
ProcessSocket
functions are invoked concurrently on the same instance of the class. Use synchronization primitives or do not modify shared data in ProcessSocket
.
struct Stats
holds the statistics for the component and is defined as:
struct Stats {
std::atomic<std::uint64_t> opened_sockets{0};
std::atomic<std::uint64_t> closed_sockets{0};
std::atomic<std::uint64_t> bytes_read{0};
};
Statistics registration
To automatically deliver the metrics they should be registered via utils::statistics::MetricTag and DumpMetric+ResetMetric functions should be defined:
writer["sockets"]["opened"] = stats.opened_sockets;
writer["sockets"]["closed"] = stats.closed_sockets;
writer["bytes"]["read"] = stats.bytes_read;
}
void ResetMetric(Stats& stats) {
stats.opened_sockets = 0;
stats.closed_sockets = 0;
stats.bytes_read = 0;
}
Now the tag could be used in component constructor to get a reference to the struct Stats
:
: TcpAcceptorBase(config, context),
stats_(context.FindComponent<
components::StatisticsStorage>()
.GetMetricsStorage()
->GetMetric(kTcpEchoTag)) {}
Static config
Lets configure our component in the components
section:
# yaml
tcp-echo:
task_processor: main-task-processor # Run socket accepts on CPU bound task processor
sockets_task_processor: main-task-processor # Run ProcessSocket() for each new socket on CPU bound task processor
port: 8181
We also need to configure the HTTP server and the handle that responds with statistics:
# yaml
server:
listener:
port: 8182 # ...to listen on this port and...
task_processor: monitor-task-processor # ...process incoming requests on this task processor.
handler-server-monitor:
path: /service/monitor
method: GET
task_processor: monitor-task-processor
monitor-handler: false
ProcessSocket
The full-duplex communication means that the same engine::io::Socket is concurrently used for sending and receiving data. It is safe to concurrently read and write into socket. We would need two functions:
- function that reads data from socket and puts it into a queue
- function that pops data from queue and sends it
Those two functions could be implemented in the following way:
namespace {
std::string data;
while (consumer.Pop(data)) {
const auto sent_bytes = sock.
SendAll(data.data(), data.size(), {});
if (sent_bytes != data.size()) {
LOG_INFO() <<
"Failed to send all the data";
return;
}
}
}
std::array<char, 1024> buf;
const auto read_bytes = sock.
ReadSome(buf.data(), buf.size(), {});
if (!read_bytes) {
return;
}
stats.bytes_read += read_bytes;
if (!producer.Push({buf.data(), read_bytes})) {
return;
}
}
}
}
Now it's time to handle new sockets. In the ProcessSocket function consists of the following steps:
- increment the stats for opened sockets
- create a span with a "fd" tag to trace the sockets
- make a guard to increment the closed sockets statistics on scope exit
- create a queue for messages
- create a new task that sends the messages from the queue
- run the receiving function
const auto sock_num = ++stats_.opened_sockets;
span.
AddTag(
"fd", std::to_string(sock.
Fd()));
++stats_.closed_sockets;
}};
auto queue = Queue::Create();
auto send_task =
utils::Async(
"send", DoSend, std::ref(sock), queue->GetConsumer());
DoRecv(sock, queue->GetProducer(), stats_);
}
The tracing::Span and utils::Async work together to produce nice logs that allow you to trace particular file descriptor:
tskv timestamp=2022-08-22T16:31:34.855853 text=Failed to read data fd=108 link=5bc8829cc3dc425d8d5c5d560f815fa2 trace_id=63eb16f2165d45669c23df725530572c span_id=17b35cd05db1c11e
On scope exit (for example because of the exception or return) the destructors would work in the following order:
- destructor of the producer - it unblocks the consumer Pop operation
- destructor of
send_task
- it cancels the coroutine and waits for it finish
- destructor of consumer
- destructor of queue
- destructor of scope guard - it increments the closed sockets count
- destructor of span - it writes the execution time of the scope
- destructor of socket is called after leaving the ProcessSocket - it closes the OS socket.
int main()
Finally, add the component to the components::MinimalServerComponentList()
, and start the server with static configuration file passed from command line.
int main(int argc, const char* const argv[]) {
.Append<samples::tcp::echo::Echo>()
.Append<components::TestsuiteSupport>()
.Append<components::HttpClient>();
}
Build and Run
To build the sample, execute the following build steps at the userver root directory:
mkdir build_release
cd build_release
cmake -DCMAKE_BUILD_TYPE=Release ..
make userver-samples-tcp_full_duplex_service
The sample could be started by running make start-userver-samples-tcp_full_duplex_service
. The command would invoke testsuite start target that sets proper paths in the configuration files and starts the service.
To start the service manually run ./samples/tcp_full_duplex_service/userver-samples-tcp_full_duplex_service -c </path/to/static_config.yaml>
.
Now you can send a request to your server from another terminal:
bash
$ nc localhost 8181
hello
hello
test test test
test test test
Functional testing
Functional tests for the service and its metrics could be implemented using the testsuite in the following way:
import asyncio
import socket
import pytest
from pytest_userver import chaos
DATA = (
b'Once upon a midnight dreary, while I pondered, weak and weary, ',
b'Over many a quaint and curious volume of forgotten lore - ',
b'While I nodded, nearly napping, suddenly there came a tapping, ',
b'As of some one gently rapping, rapping at my chamber door - ',
b'"Tis some visitor," I muttered, "tapping at my chamber door - ',
b'Only this and nothing more."',
)
DATA_LENGTH = sum(len(x) for x in DATA)
@pytest.fixture(scope='session')
def monitor_port(service_port) -> int:
return service_port
async def send_all_data(sock, loop):
for data in DATA:
await loop.sock_sendall(sock, data)
async def recv_all_data(sock, loop):
answer = b''
while len(answer) < DATA_LENGTH:
answer += await loop.sock_recv(sock, DATA_LENGTH - len(answer))
assert answer == b''.join(DATA)
async def test_basic(service_client, loop, monitor_client, tcp_service_port):
await service_client.reset_metrics()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
await loop.sock_connect(sock, ('localhost', tcp_service_port))
send_task = asyncio.create_task(send_all_data(sock, loop))
await recv_all_data(sock, loop)
await send_task
metrics = await monitor_client.metrics(prefix='tcp-echo.')
assert metrics.value_at('tcp-echo.sockets.opened') == 1
assert metrics.value_at('tcp-echo.sockets.closed') == 0
assert metrics.value_at('tcp-echo.bytes.read') == DATA_LENGTH
Note that in this case testsuite requires some help to detect that the service is ready to accept requests. To do that, override the service_non_http_health_checks:
import pytest
pytest_plugins = ['pytest_userver.plugins.core']
@pytest.fixture(name='tcp_service_port', scope='session')
def _tcp_service_port(service_config) -> int:
components = service_config['components_manager']['components']
tcp_hello = components.get('tcp-echo')
assert tcp_hello, 'No "tcp-echo" component found'
return int(tcp_hello['port'])
@pytest.fixture(scope='session')
def service_non_http_health_checks(
service_config, tcp_service_port,
) -> net.HealthChecks:
checks = net.get_health_checks_info(service_config)
checks.tcp.append(net.HostPort(host='localhost', port=tcp_service_port))
return checks
Full sources
See the full example at: