userver: userver/urabbitmq/consumer_base.hpp Source File
Loading...
Searching...
No Matches
consumer_base.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/urabbitmq/consumer_base.hpp
4/// @brief Base class for your consumers.
5
6#include <memory>
7
8#include <userver/utils/periodic_task.hpp>
9
10#include <userver/urabbitmq/consumer_settings.hpp>
11
12USERVER_NAMESPACE_BEGIN
13
14namespace urabbitmq {
15
16class Client;
17class ConsumerBaseImpl;
18
19/// @ingroup userver_base_classes
20///
21/// @brief Base class for your consumers.
22/// You should derive from it and override `Process` method, which gets called
23/// when a new message arrives from the broker.
24///
25/// If your configuration is known upfront and doesn't change ar runtime
26/// consider using `ConsumerComponentBase` instead.
27///
28/// Library takes care of handling start failures and runtime failures
29/// (connection breakage/broker node downtime etc.) and will try it's best to
30/// restart the consumer.
31///
32/// @note Since messages are delivered asynchronously in the background you
33/// must call `Stop` before derived class is destroyed, otherwise a race is
34/// possible, when `Process` is called concurrently with
35/// derived class destructor, which is UB.
36///
37/// @note Library guarantees `at least once` delivery, hence some deduplication
38/// might be needed ou your side.
40public:
41 ConsumerBase(std::shared_ptr<Client> client, const ConsumerSettings& settings);
42 virtual ~ConsumerBase();
43
44 /// @brief Start consuming messages from the broker.
45 /// Calling this method on running consumer has no effect.
46 ///
47 /// Should not throw, in case of initial setup failure library will restart
48 /// the consumer in the background.
49 void Start();
50
51 /// @brief Stop consuming messages from the broker.
52 /// Calling this method on stopped consumer has no effect.
53 ///
54 /// @note You must call this method before your derived class is destroyed,
55 /// otherwise it's UB.
56 void Stop();
57
58protected:
59 /// @brief You may override this method in derived class and implement
60 /// message handling logic. By default it does nothing.
61 ///
62 /// If this method returns successfully message would be acked (best effort)
63 /// to the broker, if this method throws the message would be requeued.
64 ///
65 /// Please keep in mind that it is possible for the message to be delivered
66 /// again even if `Process` returns successfully: sadly we can't guarantee
67 /// that `ack` ever reached the broker (network issues or unexpected shutdown,
68 /// for example).
69 /// It is however guaranteed for message to be requeued if `Process` fails.
70 virtual void Process(std::string) { /* do nothing */
71 }
72
73 /// @brief You may override this method in derived class and implement
74 /// message handling logic. By default it just calls `Process(std::string)`
75 /// with message body.
76 ///
77 virtual void Process(ConsumedMessage msg) { Process(std::move(msg.message)); }
78
79private:
80 std::shared_ptr<Client> client_;
81 const ConsumerSettings settings_;
82
83 std::unique_ptr<ConsumerBaseImpl> impl_;
84 utils::PeriodicTask monitor_{};
85};
86
87} // namespace urabbitmq
88
89USERVER_NAMESPACE_END