Let's write a TCP echo server. It should accept incoming connections, read the data from socket and send the received data back concurrently with read. The read/write operation continues as long as the socket is open.
We would also need production quality metrics and logs for the service.
ProcessSocket functions are invoked concurrently on the same instance of the class. Use synchronization primitives or do not modify shared data in ProcessSocket.
struct Stats holds the statistics for the component and is defined as:
struct Stats {
std::atomic<std::uint64_t> opened_sockets{0};
std::atomic<std::uint64_t> closed_sockets{0};
std::atomic<std::uint64_t> bytes_read{0};
};
Statistics registration
To automatically deliver the metrics they should be registered via utils::statistics::MetricTag and DumpMetric+ResetMetric functions should be defined:
Lets configure our component in the components section:
tcp-echo:task_processor:main-task-processor# Run socket accepts on CPU bound task processorsockets_task_processor:main-task-processor# Run ProcessSocket() for each new socket on CPU bound task processorport:8181
We also need to configure the HTTP server and the handle that responds with statistics:
server:listener:port:8182# ...to listen on this port and...task_processor:monitor-task-processor# ...process incoming requests on this task processor. handler-server-monitor:path:/service/monitormethod:GETtask_processor:monitor-task-processormonitor-handler:false
Dynamic config
We are not planning to get updates for dynamic config values in this sample. Because of that we just write the defaults to the fallback file of the components::DynamicConfigFallbacks component.
All the values are described in a separate section Dynamic configs .
A production ready service would dynamically retrieve the above options at runtime from a configuration service. See Writing your own configs server for insights on how to change the above options on the fly, without restarting the service.
ProcessSocket
The full-duplex communication means that the same engine::io::Socket is concurrently used for sending and receiving data. It is safe to concurrently read and write into socket. We would need two functions:
function that reads data from socket and puts it into a queue
function that pops data from queue and sends it
Those two functions could be implemented in the following way:
The tracing::Span and utils::Async work together to produce nice logs that allow you to trace particular file descriptor:
tskv timestamp=2022-08-22T16:31:34.855853 text=Failed to read data fd=108 link=5bc8829cc3dc425d8d5c5d560f815fa2 trace_id=63eb16f2165d45669c23df725530572c span_id=17b35cd05db1c11e
On scope exit (for example because of the exception or return) the destructors would work in the following order:
destructor of the producer - it unblocks the consumer Pop operation
destructor of send_task - it cancels the coroutine and waits for it finish
destructor of consumer
destructor of queue
destructor of scope guard - it increments the closed sockets count
destructor of span - it writes the execution time of the scope
destructor of socket is called after leaving the ProcessSocket - it closes the OS socket.
To build the sample, execute the following build steps at the userver root directory:
mkdir build_release
cd build_release
cmake -DCMAKE_BUILD_TYPE=Release ..
make userver-samples-tcp_full_duplex_service
The sample could be started by running make start-userver-samples-tcp_full_duplex_service. The command would invoke testsuite start target that sets proper paths in the configuration files and starts the service.
To start the service manually run ./samples/tcp_full_duplex_service/userver-samples-tcp_full_duplex_service -c </path/to/static_config.yaml> (do not forget to prepare the configuration files!).
Now you can send a request to your server from another terminal:
Note that in this case testsuite requires some help to detect that the service is ready to accept requests. To do that, override the service_non_http_health_checks: