userver: TCP half-duplex server with static configs validation
Loading...
Searching...
No Matches
TCP half-duplex server with static configs validation

Before you start

Make sure that you can compile and run core tests as described at Configure and Build.

Step by step guide

Let's write a simple TCP server that accepts incoming connections, and as long as the client sends "hi" responds with greeting from configuration file.

TCP server

Derive from components::TcpAcceptorBase and override the ProcessSocket function to get the new sockets:

namespace samples::tcp {
class Hello final : public components::TcpAcceptorBase {
public:
static constexpr std::string_view kName = "tcp-hello";
// Component is valid after construction and is able to accept requests
Hello(const components::ComponentConfig& config,
: TcpAcceptorBase(config, context),
greeting_(config["greeting"].As<std::string>("hi")) {}
void ProcessSocket(engine::io::Socket&& sock) override;
static yaml_config::Schema GetStaticConfigSchema();
private:
const std::string greeting_;
};
} // namespace samples::tcp
Warning
ProcessSocket functions are invoked concurrently on the same instance of the class. Use synchronization primitives or do not modify shared data in ProcessSocket.

Static config

Our new "tcp-hello" component should support the options of the components::TcpAcceptorBase and the "greeting" option. To achieve that we would need the following implementation of the GetStaticConfigSchema function:

yaml_config::Schema Hello::GetStaticConfigSchema() {
return yaml_config::MergeSchemas<TcpAcceptorBase>(R"(
type: object
description: |
Component for accepting incoming TCP connections and responding with some
greeting as long as the client sends 'hi'.
additionalProperties: false
properties:
greeting:
type: string
description: greeting to send to client
defaultDescription: hi
)");
}

Now lets configure our component in the components section:

# yaml
tcp-hello:
task_processor: main-task-processor # Run socket accepts on CPU bound task processor
sockets_task_processor: main-task-processor # Run ProcessSocket() for each new socket on CPU bound task processor
port: 8180
greeting: hello

Dynamic config

We are not planning to get updates for dynamic config values in this sample. Because of that we just write the defaults to the fallback file of the components::DynamicConfigFallbacks component.

All the values are described in a separate section Dynamic configs .

{
"BAGGAGE_SETTINGS": {
"allowed_keys": []
},
"HTTP_CLIENT_CONNECTION_POOL_SIZE": 1000,
"HTTP_CLIENT_CONNECT_THROTTLE": {
"max-size": 100,
"token-update-interval-ms": 0
},
"USERVER_BAGGAGE_ENABLED": false,
"USERVER_CACHES": {},
"USERVER_CANCEL_HANDLE_REQUEST_BY_DEADLINE": false,
"USERVER_CHECK_AUTH_IN_HANDLERS": true,
"USERVER_DEADLINE_PROPAGATION_ENABLED": true,
"USERVER_DUMPS": {},
"USERVER_FILES_CONTENT_TYPE_MAP": {
".css": "text/css",
".gif": "image/gif",
".htm": "text/html",
".html": "text/html",
".jpeg": "image/jpeg",
".js": "application/javascript",
".json": "application/json",
".md": "text/markdown",
".png": "image/png",
".svg": "image/svg+xml",
"__default__": "text/plain"
},
"USERVER_HANDLER_STREAM_API_ENABLED": false,
"USERVER_HTTP_PROXY": "",
"USERVER_LOG_DYNAMIC_DEBUG": {
"force-disabled": [],
"force-enabled": []
},
"USERVER_LOG_REQUEST": true,
"USERVER_LOG_REQUEST_HEADERS": false,
"USERVER_LRU_CACHES": {},
"USERVER_NO_LOG_SPANS": {
"names": [],
"prefixes": []
},
"USERVER_RPS_CCONTROL": {
"down-level": 1,
"down-rate-percent": 2,
"min-limit": 10,
"no-limit-seconds": 1000,
"overload-off-seconds": 3,
"overload-on-seconds": 3,
"up-level": 2,
"up-rate-percent": 2
},
"USERVER_RPS_CCONTROL_ACTIVATED_FACTOR_METRIC": 5,
"USERVER_RPS_CCONTROL_CUSTOM_STATUS": {},
"USERVER_RPS_CCONTROL_ENABLED": false,
"USERVER_TASK_PROCESSOR_PROFILER_DEBUG": {
"fs-task-processor": {
"enabled": false,
"execution-slice-threshold-us": 1000000
},
"main-task-processor": {
"enabled": false,
"execution-slice-threshold-us": 2000
}
},
"USERVER_TASK_PROCESSOR_QOS": {
"default-service": {
"default-task-processor": {
"wait_queue_overload": {
"action": "ignore",
"length_limit": 5000,
"time_limit_us": 3000
}
}
}
}
}

A production ready service would dynamically retrieve the above options at runtime from a configuration service. See Writing your own configs server for insights on how to change the above options on the fly, without restarting the service.

ProcessSocket

It's time to deal with new sockets. The code is quite straightforward:

void Hello::ProcessSocket(engine::io::Socket&& sock) {
std::string data;
data.resize(2);
const auto read_bytes = sock.ReadAll(data.data(), 2, {});
if (read_bytes != 2 || data != "hi") {
sock.Close();
return;
}
const auto sent_bytes =
sock.SendAll(greeting_.data(), greeting_.size(), {});
if (sent_bytes != greeting_.size()) {
return;
}
}
}

int main()

Finally, add the component to the components::MinimalComponentList(), and start the server with static configuration file passed from command line.

int main(int argc, const char* const argv[]) {
const auto component_list =
components::MinimalComponentList().Append<samples::tcp::Hello>();
return utils::DaemonMain(argc, argv, component_list);
}

Build and Run

To build the sample, execute the following build steps at the userver root directory:

mkdir build_release
cd build_release
cmake -DCMAKE_BUILD_TYPE=Release ..
make userver-samples-tcp_service

The sample could be started by running make start-userver-samples-tcp_service. The command would invoke testsuite start target that sets proper paths in the configuration files and starts the service.

To start the service manually run ./samples/tcp_service/userver-samples-tcp_service -c </path/to/static_config.yaml> (do not forget to prepare the configuration files!).

Now you can send a request to your server from another terminal:

bash
$ nc localhost 8180
hi
hello

Functional testing

Functional tests for the service could be implemented using the testsuite in the following way:

import socket
async def test_basic(service_client, loop, tcp_service_port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', tcp_service_port))
await loop.sock_sendall(sock, b'hi')
hello = await loop.sock_recv(sock, 5)
assert hello == b'hello'
await loop.sock_sendall(sock, b'whats up?')
try:
await loop.sock_recv(sock, 1)
assert False
except ConnectionResetError:
pass

Note that in this case testsuite requires some help to detect that the service is ready to accept requests. To do that, override the pytest_userver.plugins.service.service_non_http_health_checks :

import pytest
from pytest_userver.utils import net
pytest_plugins = ['pytest_userver.plugins.core']
@pytest.fixture(scope='session')
def tcp_service_port(service_config_yaml) -> int:
components = service_config_yaml['components_manager']['components']
tcp_hello = components.get('tcp-hello')
assert tcp_hello, 'No "tcp-hello" component found'
return int(tcp_hello['port'])
@pytest.fixture(scope='session')
def service_non_http_health_checks(
service_config_yaml, tcp_service_port,
) -> net.HealthChecks:
checks = net.get_health_checks_info(service_config_yaml)
checks.tcp.append(net.HostPort(host='localhost', port=tcp_service_port))
return checks

Full sources

See the full example at: