#include <userver/concurrent/queue.hpp>
#include <userver/utils/statistics/metric_tag.hpp>
#include <userver/utils/statistics/metrics_storage.hpp>
namespace samples::tcp::echo {
struct Stats;
public:
static constexpr std::string_view kName = "tcp-echo";
private:
Stats& stats_;
};
}
namespace samples::tcp::echo {
struct Stats {
std::atomic<std::uint64_t> opened_sockets{0};
std::atomic<std::uint64_t> closed_sockets{0};
std::atomic<std::uint64_t> bytes_read{0};
};
writer["sockets"]["opened"] = stats.opened_sockets;
writer["sockets"]["closed"] = stats.closed_sockets;
writer["bytes"]["read"] = stats.bytes_read;
}
void ResetMetric(Stats& stats) {
stats.opened_sockets = 0;
stats.closed_sockets = 0;
stats.bytes_read = 0;
}
: TcpAcceptorBase(config, context),
stats_(context.FindComponent<
components::StatisticsStorage>()
.GetMetricsStorage()
->GetMetric(kTcpEchoTag)) {}
namespace {
std::string data;
while (consumer.Pop(data)) {
const auto sent_bytes = sock.
SendAll(data.data(), data.size(), {});
if (sent_bytes != data.size()) {
LOG_INFO() <<
"Failed to send all the data";
return;
}
}
}
std::array<char, 1024> buf;
const auto read_bytes = sock.
ReadSome(buf.data(), buf.size(), {});
if (!read_bytes) {
return;
}
stats.bytes_read += read_bytes;
if (!producer.Push({buf.data(), read_bytes})) {
return;
}
}
}
}
const auto sock_num = ++stats_.opened_sockets;
span.
AddTag(
"fd", std::to_string(sock.
Fd()));
++stats_.closed_sockets;
}};
auto queue = Queue::Create();
auto send_task =
utils::Async(
"send", DoSend, std::ref(sock), queue->GetConsumer());
DoRecv(sock, queue->GetProducer(), stats_);
}
}
int main(int argc, const char* const argv[]) {
.Append<samples::tcp::echo::Echo>()
.Append<components::TestsuiteSupport>()
.Append<components::HttpClient>();
}