Github   Telegram
Loading...
Searching...
No Matches
samples/tcp_full_duplex_service/tcp_full_duplex_service.cpp
#include <userver/concurrent/queue.hpp>
#include <userver/utils/statistics/metric_tag.hpp>
#include <userver/utils/statistics/metrics_storage.hpp>
namespace samples::tcp::echo {
struct Stats;
class Echo final : public components::TcpAcceptorBase {
public:
static constexpr std::string_view kName = "tcp-echo";
// Component is valid after construction and is able to accept requests
Echo(const components::ComponentConfig& config,
void ProcessSocket(engine::io::Socket&& sock) override;
private:
Stats& stats_;
};
} // namespace samples::tcp::echo
template <>
inline constexpr bool components::kHasValidate<samples::tcp::echo::Echo> = true;
namespace samples::tcp::echo {
struct Stats {
std::atomic<std::uint64_t> opened_sockets{0};
std::atomic<std::uint64_t> closed_sockets{0};
std::atomic<std::uint64_t> bytes_read{0};
};
const utils::statistics::MetricTag<Stats> kTcpEchoTag{"tcp-echo"};
formats::json::ValueBuilder DumpMetric(const Stats& stats) {
value["sockets"]["opened"] = stats.opened_sockets.load();
value["sockets"]["closed"] = stats.closed_sockets.load();
value["bytes"]["read"] = stats.bytes_read.load();
return value.ExtractValue();
}
void ResetMetric(Stats& stats) {
stats.opened_sockets = 0;
stats.closed_sockets = 0;
stats.bytes_read = 0;
}
Echo::Echo(const components::ComponentConfig& config,
: TcpAcceptorBase(config, context),
stats_(context.FindComponent<components::StatisticsStorage>()
.GetMetricsStorage()
->GetMetric(kTcpEchoTag)) {}
namespace {
void DoSend(engine::io::Socket& sock, Queue::Consumer consumer) {
std::string data;
while (consumer.Pop(data)) {
const auto sent_bytes = sock.SendAll(data.data(), data.size(), {});
if (sent_bytes != data.size()) {
LOG_INFO() << "Failed to send all the data";
return;
}
}
}
void DoRecv(engine::io::Socket& sock, Queue::Producer producer, Stats& stats) {
std::array<char, 1024> buf; // NOLINT(cppcoreguidelines-pro-type-member-init)
while (!engine::current_task::ShouldCancel()) {
const auto read_bytes = sock.ReadSome(buf.data(), buf.size(), {});
if (!read_bytes) {
LOG_INFO() << "Failed to read data";
return;
}
stats.bytes_read += read_bytes;
if (!producer.Push({buf.data(), read_bytes})) {
return;
}
}
}
} // anonymous namespace
void Echo::ProcessSocket(engine::io::Socket&& sock) {
const auto sock_num = ++stats_.opened_sockets;
tracing::Span span{fmt::format("sock_{}", sock_num)};
span.AddTag("fd", std::to_string(sock.Fd()));
utils::FastScopeGuard guard{[this]() noexcept {
LOG_INFO() << "Closing socket";
++stats_.closed_sockets;
}};
auto queue = Queue::Create();
auto send_task =
utils::Async("send", DoSend, std::ref(sock), queue->GetConsumer());
DoRecv(sock, queue->GetProducer(), stats_);
}
} // namespace samples::tcp::echo
int main(int argc, const char* const argv[]) {
const auto component_list = components::MinimalServerComponentList()
.Append<samples::tcp::echo::Echo>()
// Testuite components:
.Append<components::TestsuiteSupport>()
return utils::DaemonMain(argc, argv, component_list);
}