userver: userver/kafka/exceptions.hpp Source File
Loading...
Searching...
No Matches
exceptions.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/kafka/exceptions.hpp
4/// @brief Kafka client exceptions
5
6#include <cstdint>
7#include <exception>
8#include <map>
9#include <stdexcept>
10#include <string_view>
11
12USERVER_NAMESPACE_BEGIN
13
14namespace kafka {
15
16/// @brief Base exception thrown by Producer::Send and Producer::SendAsync
17/// on send or delivery errors.
18class SendException : public std::runtime_error {
19public:
20 using std::runtime_error::runtime_error;
21
22 /// @brief Returns whether it makes sense to retry failed send.
23 ///
24 /// @see
25 /// https://docs.confluent.io/platform/current/clients/librdkafka/html/md_INTRODUCTION.html#autotoc_md8
26 bool IsRetryable() const noexcept;
27
28protected:
29 SendException(const char* what, bool is_retryable);
30
31private:
32 const bool is_retryable_{false};
33};
34
35/// @brief Base exception thrown by Producer::Send in bulk mode
36/// in case of one or more send errors.
37class BulkSendException : public std::runtime_error {
38 static constexpr const char* kWhat{"Some messages was not delivered."};
39
40public:
41 using ExceptionMap = std::map<std::size_t, std::exception_ptr>;
42
43 explicit BulkSendException(ExceptionMap nested_exceptions);
44
45 /// @return nested errors.
46 /// Nested exceptions are subclasses of SendException.
47 const ExceptionMap& GetExceptions() const noexcept;
48
49private:
50 /// @brief A mapping from the message's index in the bulk send operation
51 /// to the exception that occurred during its delivering.
52 /// @details Key: 0-based index of the element in the input batch.
53 /// Value: Pointer to the exception.
54 /// @note Contains only indices that resulted in an error.
55 const ExceptionMap nested_exceptions_;
56};
57
58class DeliveryTimeoutException final : public SendException {
59 static constexpr const char* kWhat{
60 "Message is not delivered after `delivery_timeout` milliseconds. Hint: "
61 "Adjust `delivery_timeout` and `queue_buffering_*` options or manually "
62 "retry the send request."
63 };
64
65public:
66 DeliveryTimeoutException();
67};
68
69class QueueFullException final : public SendException {
70 static constexpr const char* kWhat{
71 "The sending queue is full - send request cannot be scheduled. Hint: "
72 "Manually retry the error or increase `queue_buffering_max_messages` "
73 "and/or `queue_buffering_max_kbytes` config option."
74 };
75
76public:
77 QueueFullException();
78};
79
80class MessageTooLargeException final : public SendException {
81 static constexpr const char* kWhat{
82 "Message size exceeds configured limit. Hint: increase "
83 "`message_max_bytes` config option."
84 };
85
86public:
87 MessageTooLargeException();
88};
89
90class UnknownTopicException final : public SendException {
91 static constexpr const char* kWhat{"Given topic does not exist in cluster."};
92
93public:
94 UnknownTopicException();
95};
96
97class UnknownPartitionException final : public SendException {
98 static constexpr const char* kWhat = "Topic does not have given partition.";
99
100public:
101 UnknownPartitionException();
102};
103
104/// @brief Exception thrown when there is an error retrieving the offset range.
105class OffsetRangeException : public std::runtime_error {
106public:
107 using std::runtime_error::runtime_error;
108
109 OffsetRangeException(std::string_view what, std::string_view topic, std::uint32_t partition);
110};
111
112class OffsetRangeTimeoutException final : public OffsetRangeException {
113 static constexpr const char* kWhat = "Timeout while fetching offsets.";
114
115public:
116 OffsetRangeTimeoutException(std::string_view topic, std::uint32_t partition);
117};
118
119class TopicNotFoundException final : public std::runtime_error {
120public:
121 using std::runtime_error::runtime_error;
122};
123
124/// @brief Exception thrown when fetching metadata.
125class GetMetadataException : public std::runtime_error {
126public:
127 using std::runtime_error::runtime_error;
128
129 GetMetadataException(std::string_view what, std::string_view topic);
130};
131
132class GetMetadataTimeoutException final : public GetMetadataException {
133 static constexpr const char* kWhat = "Timeout while getting metadata.";
134
135public:
136 GetMetadataTimeoutException(std::string_view topic);
137};
138
139/// @brief Exception thrown when parsing consumed messages headers.
140/// @ref Message::GetHeaders
141class ParseHeadersException final : std::runtime_error {
142 static constexpr const char* kWhat = "Failed to parse headers";
143
144public:
145 ParseHeadersException(std::string_view error);
146};
147
148/// @brief Exception thrown when Seek* process failed.
149/// @ref ConsumerScope::Seek
150/// @ref ConsumerScope::SeekToBeginning
151/// @ref ConsumerScope::SeekToEnd
152class SeekException final : public std::runtime_error {
153public:
154 using std::runtime_error::runtime_error;
155};
156
157/// @brief Exception thrown when Seek* arguments are invalid.
158/// @ref ConsumerScope::Seek
159/// @ref ConsumerScope::SeekToBeginning
160/// @ref ConsumerScope::SeekToEnd
161class SeekInvalidArgumentException final : public std::invalid_argument {
162public:
163 using std::invalid_argument::invalid_argument;
164};
165
166} // namespace kafka
167
168USERVER_NAMESPACE_END