userver: userver/kafka/exceptions.hpp Source File
Loading...
Searching...
No Matches
exceptions.hpp
1#pragma once
2
3#include <cstdint>
4#include <exception>
5#include <map>
6#include <stdexcept>
7#include <string_view>
8
9USERVER_NAMESPACE_BEGIN
10
11namespace kafka {
12
13/// @brief Base exception thrown by Producer::Send and Producer::SendAsync
14/// on send or delivery errors.
15class SendException : public std::runtime_error {
16public:
17 using std::runtime_error::runtime_error;
18
19 /// @brief Returns whether it makes sense to retry failed send.
20 ///
21 /// @see
22 /// https://docs.confluent.io/platform/current/clients/librdkafka/html/md_INTRODUCTION.html#autotoc_md8
23 bool IsRetryable() const noexcept;
24
25protected:
26 SendException(const char* what, bool is_retryable);
27
28private:
29 const bool is_retryable_{false};
30};
31
32/// @brief Base exception thrown by Producer::Send in bulk mode
33/// in case of one or more send errors.
34class BulkSendException : public std::runtime_error {
35 static constexpr const char* kWhat{"Some messages was not delivered."};
36
37public:
38 using ExceptionMap = std::map<std::size_t, std::exception_ptr>;
39
40 explicit BulkSendException(ExceptionMap nested_exceptions);
41
42 /// @return nested errors.
43 /// Nested exceptions are subclasses of SendException.
44 const ExceptionMap& GetExceptions() const noexcept;
45
46private:
47 /// @brief A mapping from the message's index in the bulk send operation
48 /// to the exception that occurred during its delivering.
49 /// @details Key: 0-based index of the element in the input batch.
50 /// Value: Pointer to the exception.
51 /// @note Contains only indices that resulted in an error.
52 const ExceptionMap nested_exceptions_;
53};
54
55class DeliveryTimeoutException final : public SendException {
56 static constexpr const char* kWhat{
57 "Message is not delivered after `delivery_timeout` milliseconds. Hint: "
58 "Adjust `delivery_timeout` and `queue_buffering_*` options or manually "
59 "retry the send request."
60 };
61
62public:
63 DeliveryTimeoutException();
64};
65
66class QueueFullException final : public SendException {
67 static constexpr const char* kWhat{
68 "The sending queue is full - send request cannot be scheduled. Hint: "
69 "Manually retry the error or increase `queue_buffering_max_messages` "
70 "and/or `queue_buffering_max_kbytes` config option."
71 };
72
73public:
74 QueueFullException();
75};
76
77class MessageTooLargeException final : public SendException {
78 static constexpr const char* kWhat{
79 "Message size exceeds configured limit. Hint: increase "
80 "`message_max_bytes` config option."
81 };
82
83public:
84 MessageTooLargeException();
85};
86
87class UnknownTopicException final : public SendException {
88 static constexpr const char* kWhat{"Given topic does not exist in cluster."};
89
90public:
91 UnknownTopicException();
92};
93
94class UnknownPartitionException final : public SendException {
95 static constexpr const char* kWhat = "Topic does not have given partition.";
96
97public:
98 UnknownPartitionException();
99};
100
101/// @brief Exception thrown when there is an error retrieving the offset range.
102class OffsetRangeException : public std::runtime_error {
103public:
104 using std::runtime_error::runtime_error;
105
106 OffsetRangeException(std::string_view what, std::string_view topic, std::uint32_t partition);
107};
108
109class OffsetRangeTimeoutException final : public OffsetRangeException {
110 static constexpr const char* kWhat = "Timeout while fetching offsets.";
111
112public:
113 OffsetRangeTimeoutException(std::string_view topic, std::uint32_t partition);
114};
115
116class TopicNotFoundException final : public std::runtime_error {
117public:
118 using std::runtime_error::runtime_error;
119};
120
121/// @brief Exception thrown when fetching metadata.
122class GetMetadataException : public std::runtime_error {
123public:
124 using std::runtime_error::runtime_error;
125
126 GetMetadataException(std::string_view what, std::string_view topic);
127};
128
129class GetMetadataTimeoutException final : public GetMetadataException {
130 static constexpr const char* kWhat = "Timeout while getting metadata.";
131
132public:
133 GetMetadataTimeoutException(std::string_view topic);
134};
135
136/// @brief Exception thrown when parsing consumed messages headers.
137/// @ref Message::GetHeaders
138class ParseHeadersException final : std::runtime_error {
139 static constexpr const char* kWhat = "Failed to parse headers";
140
141public:
142 ParseHeadersException(std::string_view error);
143};
144
145/// @brief Exception thrown when Seek* process failed.
146/// @ref ConsumerScope::Seek
147/// @ref ConsumerScope::SeekToBeginning
148/// @ref ConsumerScope::SeekToEnd
149class SeekException final : public std::runtime_error {
150public:
151 using std::runtime_error::runtime_error;
152};
153
154/// @brief Exception thrown when Seek* arguments are invalid.
155/// @ref ConsumerScope::Seek
156/// @ref ConsumerScope::SeekToBeginning
157/// @ref ConsumerScope::SeekToEnd
158class SeekInvalidArgumentException final : public std::invalid_argument {
159public:
160 using std::invalid_argument::invalid_argument;
161};
162
163} // namespace kafka
164
165USERVER_NAMESPACE_END