userver: userver/kafka/exceptions.hpp Source File
Loading...
Searching...
No Matches
exceptions.hpp
1#pragma once
2
3#include <cstdint>
4#include <stdexcept>
5#include <string_view>
6
7USERVER_NAMESPACE_BEGIN
8
9namespace kafka {
10
11/// @brief Base exception thrown by Producer::Send and Producer::SendAsync
12/// on send or delivery errors.
13class SendException : public std::runtime_error {
14public:
15 using std::runtime_error::runtime_error;
16
17 /// @brief Returns whether it makes sense to retry failed send.
18 ///
19 /// @see
20 /// https://docs.confluent.io/platform/current/clients/librdkafka/html/md_INTRODUCTION.html#autotoc_md8
21 bool IsRetryable() const noexcept;
22
23protected:
24 SendException(const char* what, bool is_retryable);
25
26private:
27 const bool is_retryable_{false};
28};
29
30class DeliveryTimeoutException final : public SendException {
31 static constexpr const char* kWhat{
32 "Message is not delivered after `delivery_timeout` milliseconds. Hint: "
33 "Adjust `delivery_timeout` and `queue_buffering_*` options or manually "
34 "retry the send request."
35 };
36
37public:
38 DeliveryTimeoutException();
39};
40
41class QueueFullException final : public SendException {
42 static constexpr const char* kWhat{
43 "The sending queue is full - send request cannot be scheduled. Hint: "
44 "Manually retry the error or increase `queue_buffering_max_messages` "
45 "and/or `queue_buffering_max_kbytes` config option."
46 };
47
48public:
49 QueueFullException();
50};
51
52class MessageTooLargeException final : public SendException {
53 static constexpr const char* kWhat{
54 "Message size exceeds configured limit. Hint: increase "
55 "`message_max_bytes` config option."
56 };
57
58public:
59 MessageTooLargeException();
60};
61
62class UnknownTopicException final : public SendException {
63 static constexpr const char* kWhat{"Given topic does not exist in cluster."};
64
65public:
66 UnknownTopicException();
67};
68
69class UnknownPartitionException final : public SendException {
70 static constexpr const char* kWhat = "Topic does not have given partition.";
71
72public:
73 UnknownPartitionException();
74};
75
76/// @brief Exception thrown when there is an error retrieving the offset range.
77class OffsetRangeException : public std::runtime_error {
78public:
79 using std::runtime_error::runtime_error;
80
81public:
82 OffsetRangeException(std::string_view what, std::string_view topic, std::uint32_t partition);
83};
84
85class OffsetRangeTimeoutException final : public OffsetRangeException {
86 static constexpr const char* kWhat = "Timeout while fetching offsets.";
87
88public:
89 OffsetRangeTimeoutException(std::string_view topic, std::uint32_t partition);
90};
91
92class TopicNotFoundException final : public std::runtime_error {
93public:
94 using std::runtime_error::runtime_error;
95};
96
97/// @brief Exception thrown when fetching metadata.
98class GetMetadataException : public std::runtime_error {
99public:
100 using std::runtime_error::runtime_error;
101
102public:
103 GetMetadataException(std::string_view what, std::string_view topic);
104};
105
106class GetMetadataTimeoutException final : public GetMetadataException {
107 static constexpr const char* kWhat = "Timeout while getting metadata.";
108
109public:
110 GetMetadataTimeoutException(std::string_view topic);
111};
112
113/// @brief Exception thrown when parsing consumed messages headers.
114/// @ref Message::GetHeaders
115class ParseHeadersException final : std::runtime_error {
116 static constexpr const char* kWhat = "Failed to parse headers";
117
118public:
119 ParseHeadersException(std::string_view error);
120};
121
122/// @brief Exception thrown when Seek* process failed.
123/// @ref ConsumerScope::Seek
124/// @ref ConsumerScope::SeekToBeginning
125/// @ref ConsumerScope::SeekToEnd
126class SeekException final : public std::runtime_error {
127public:
128 using std::runtime_error::runtime_error;
129};
130
131/// @brief Exception thrown when Seek* arguments are invalid.
132/// @ref ConsumerScope::Seek
133/// @ref ConsumerScope::SeekToBeginning
134/// @ref ConsumerScope::SeekToEnd
135class SeekInvalidArgumentException final : public std::invalid_argument {
136public:
137 using std::invalid_argument::invalid_argument;
138};
139
140} // namespace kafka
141
142USERVER_NAMESPACE_END