#include "topic_reader.hpp"
#include <fmt/format.h>
#include <userver/ydb/exceptions.hpp>
#include <userver/ydb/impl/cast.hpp>
namespace sample {
namespace {
NYdb::NTopic::TReadSessionSettings
ConstructReadSessionSettings(const std::string& consumer_name, const std::vector<std::string>& topics) {
NYdb::NTopic::TReadSessionSettings read_session_settings;
read_session_settings.ConsumerName(ydb::impl::ToString(consumer_name));
for (const auto& topic_path : topics) {
read_session_settings.AppendTopics(ydb::impl::ToString(topic_path));
}
return read_session_settings;
}
class SessionReadTask {
public:
using TReadSessionEvent = NYdb::NTopic::TReadSessionEvent;
using TSessionClosedEvent = NYdb::NTopic::TSessionClosedEvent;
explicit SessionReadTask(
ydb::TopicReadSession&& read_session) : read_session_(std::move(read_session)) {}
~SessionReadTask() {
if (!session_closed_) {
read_session_.Close(std::chrono::milliseconds{3000});
}
}
void Run() {
try {
auto events = read_session_.GetEvents();
for (auto& event : events) {
HandleEvent(event);
}
break;
}
}
}
private:
void HandleEvent(TReadSessionEvent::TEvent& event) {
std::visit(
[this](TReadSessionEvent::TDataReceivedEvent& e) {
NYdb::NTopic::TDeferredCommit deferredCommit;
deferredCommit.Add(e);
HandleDataReceivedEvent(e);
deferredCommit.Commit();
},
[](TReadSessionEvent::TCommitOffsetAcknowledgementEvent&) {
},
[](TReadSessionEvent::TStartPartitionSessionEvent& e) {
LOG_DEBUG() <<
"Starting partition session [TopicPath=" << e.GetPartitionSession()->GetTopicPath()
<< ", PartitionId=" << e.GetPartitionSession()->GetPartitionId() << "]";
e.Confirm();
},
[](TReadSessionEvent::TStopPartitionSessionEvent& e) {
LOG_DEBUG() <<
"Stopping partition session [TopicPath=" << e.GetPartitionSession()->GetTopicPath()
<< ", PartitionId=" << e.GetPartitionSession()->GetPartitionId() << "]";
e.Confirm();
},
[](TReadSessionEvent::TEndPartitionSessionEvent& e) {
LOG_DEBUG() <<
"End partition session [TopicPath=" << e.GetPartitionSession()->GetTopicPath()
<< ", PartitionId=" << e.GetPartitionSession()->GetPartitionId() << "]";
},
[](TReadSessionEvent::TPartitionSessionClosedEvent& e) {
if (TReadSessionEvent::TPartitionSessionClosedEvent::EReason::StopConfirmedByUser !=
e.GetReason()) {
LOG_WARNING() <<
"Unexpected PartitionSessionClosedEvent [Reason="
<< utils::UnderlyingValue(e.GetReason()) << "]";
}
},
[](TReadSessionEvent::TPartitionSessionStatusEvent&) {
},
[this](NYdb::NTopic::TSessionClosedEvent& e) {
session_closed_ = true;
if (!e.IsSuccess()) {
throw std::runtime_error{"Session closed unsuccessfully: " + e.GetIssues().ToString()};
}
},
},
event
);
}
void HandleDataReceivedEvent(TReadSessionEvent::TDataReceivedEvent& event) {
LOG_DEBUG() <<
"Handle DataReceivedEvent [MessagesCount=" <<
event.GetMessagesCount() <<
"]";
for (const auto& message : event.GetMessages()) {
HandleMessage(message);
}
}
void HandleMessage(const TReadSessionEvent::TDataReceivedEvent::TMessage& message) {
LOG_DEBUG() <<
"Handle Message [Offset=" << message.GetOffset() <<
", Data: " << message.GetData() <<
"]";
}
private:
bool session_closed_{false};
};
class TopicReader {
public:
TopicReader(
std::shared_ptr<ydb::TopicClient> topic_client,
const NYdb::NTopic::TReadSessionSettings& read_session_settings,
std::chrono::milliseconds restart_session_delay
)
: topic_client_{std::move(topic_client)},
read_session_settings_{read_session_settings},
restart_session_delay_{restart_session_delay} {}
void Run() {
try {
LOG_INFO() <<
"Creating read session...";
auto read_session = topic_client_->CreateReadSession(read_session_settings_);
LOG_INFO() <<
"Starting session read...";
SessionReadTask session_read_task{std::move(read_session)};
session_read_task.Run();
} catch (const std::exception& ex) {
LOG_ERROR() <<
"Session read failed: " << ex;
}
}
}
private:
const std::shared_ptr<ydb::TopicClient> topic_client_;
const NYdb::NTopic::TReadSessionSettings read_session_settings_;
const std::chrono::milliseconds restart_session_delay_;
};
}
TopicReaderComponent::TopicReaderComponent(
)
const auto consumer_name = config[
"consumer-name"].
As<std::string>();
const auto topics = config[
"topics"].
As<std::vector<std::string>>();
const auto restart_session_delay = config[
"restart-session-delay"].
As<std::chrono::milliseconds>();
auto topic_reader = std::make_unique<TopicReader>(
ConstructReadSessionSettings(consumer_name, topics),
restart_session_delay
);
read_task_ =
utils::CriticalAsync(config.
Name() +
"-read-task", [topic_reader = std::move(topic_reader)] {
topic_reader->Run();
});
}
type: object
description: sample topic reader
additionalProperties: false
properties:
consumer-name:
type: string
description: consumer name
topics:
type: array
description: topics
items:
type: string
description: topic path
restart-session-delay:
type: string
description: restart session delay
)");
}
}