userver: userver/kafka/exceptions.hpp Source File
Loading...
Searching...
No Matches
exceptions.hpp
1#pragma once
2
3#include <cstdint>
4#include <stdexcept>
5#include <string_view>
6
7USERVER_NAMESPACE_BEGIN
8
9namespace kafka {
10
11/// @brief Base exception thrown by Producer::Send and Producer::SendAsync
12/// on send or delivery errors.
13class SendException : public std::runtime_error {
14public:
15 using std::runtime_error::runtime_error;
16
17 /// @brief Returns whether it makes sense to retry failed send.
18 ///
19 /// @see
20 /// https://docs.confluent.io/platform/current/clients/librdkafka/html/md_INTRODUCTION.html#autotoc_md8
21 bool IsRetryable() const noexcept;
22
23protected:
24 SendException(const char* what, bool is_retryable);
25
26private:
27 const bool is_retryable_{false};
28};
29
30class DeliveryTimeoutException final : public SendException {
31 static constexpr const char* kWhat{
32 "Message is not delivered after `delivery_timeout` milliseconds. Hint: "
33 "Adjust `delivery_timeout` and `queue_buffering_*` options or manually "
34 "retry the send request."
35 };
36
37public:
38 DeliveryTimeoutException();
39};
40
41class QueueFullException final : public SendException {
42 static constexpr const char* kWhat{
43 "The sending queue is full - send request cannot be scheduled. Hint: "
44 "Manually retry the error or increase `queue_buffering_max_messages` "
45 "and/or `queue_buffering_max_kbytes` config option."
46 };
47
48public:
49 QueueFullException();
50};
51
52class MessageTooLargeException final : public SendException {
53 static constexpr const char* kWhat{
54 "Message size exceeds configured limit. Hint: increase "
55 "`message_max_bytes` config option."
56 };
57
58public:
59 MessageTooLargeException();
60};
61
62class UnknownTopicException final : public SendException {
63 static constexpr const char* kWhat{"Given topic does not exist in cluster."};
64
65public:
66 UnknownTopicException();
67};
68
69class UnknownPartitionException final : public SendException {
70 static constexpr const char* kWhat = "Topic does not have given partition.";
71
72public:
73 UnknownPartitionException();
74};
75
76/// @brief Exception thrown when there is an error retrieving the offset range.
77class OffsetRangeException : public std::runtime_error {
78public:
79 using std::runtime_error::runtime_error;
80
81 OffsetRangeException(std::string_view what, std::string_view topic, std::uint32_t partition);
82};
83
84class OffsetRangeTimeoutException final : public OffsetRangeException {
85 static constexpr const char* kWhat = "Timeout while fetching offsets.";
86
87public:
88 OffsetRangeTimeoutException(std::string_view topic, std::uint32_t partition);
89};
90
91class TopicNotFoundException final : public std::runtime_error {
92public:
93 using std::runtime_error::runtime_error;
94};
95
96/// @brief Exception thrown when fetching metadata.
97class GetMetadataException : public std::runtime_error {
98public:
99 using std::runtime_error::runtime_error;
100
101 GetMetadataException(std::string_view what, std::string_view topic);
102};
103
104class GetMetadataTimeoutException final : public GetMetadataException {
105 static constexpr const char* kWhat = "Timeout while getting metadata.";
106
107public:
108 GetMetadataTimeoutException(std::string_view topic);
109};
110
111/// @brief Exception thrown when parsing consumed messages headers.
112/// @ref Message::GetHeaders
113class ParseHeadersException final : std::runtime_error {
114 static constexpr const char* kWhat = "Failed to parse headers";
115
116public:
117 ParseHeadersException(std::string_view error);
118};
119
120/// @brief Exception thrown when Seek* process failed.
121/// @ref ConsumerScope::Seek
122/// @ref ConsumerScope::SeekToBeginning
123/// @ref ConsumerScope::SeekToEnd
124class SeekException final : public std::runtime_error {
125public:
126 using std::runtime_error::runtime_error;
127};
128
129/// @brief Exception thrown when Seek* arguments are invalid.
130/// @ref ConsumerScope::Seek
131/// @ref ConsumerScope::SeekToBeginning
132/// @ref ConsumerScope::SeekToEnd
133class SeekInvalidArgumentException final : public std::invalid_argument {
134public:
135 using std::invalid_argument::invalid_argument;
136};
137
138} // namespace kafka
139
140USERVER_NAMESPACE_END