userver: userver/urabbitmq/consumer_base.hpp Source File
Loading...
Searching...
No Matches
consumer_base.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/urabbitmq/consumer_base.hpp
4/// @brief Base class for your consumers.
5
6#include <memory>
7
8#include <userver/utils/periodic_task.hpp>
9
10#include <userver/urabbitmq/consumer_settings.hpp>
11
12USERVER_NAMESPACE_BEGIN
13
14namespace urabbitmq {
15
16class Client;
17class ConsumerBaseImpl;
18
19/// @ingroup userver_base_classes
20///
21/// @brief Base class for your consumers.
22/// You should derive from it and override `Process` method, which gets called
23/// when a new message arrives from the broker.
24///
25/// If your configuration is known upfront and doesn't change ar runtime
26/// consider using `ConsumerComponentBase` instead.
27///
28/// Library takes care of handling start failures and runtime failures
29/// (connection breakage/broker node downtime etc.) and will try it's best to
30/// restart the consumer.
31///
32/// @note Since messages are delivered asynchronously in the background you
33/// must call `Stop` before derived class is destroyed, otherwise a race is
34/// possible, when `Process` is called concurrently with
35/// derived class destructor, which is UB.
36///
37/// @note Library guarantees `at least once` delivery, hence some deduplication
38/// might be needed ou your side.
40 public:
41 ConsumerBase(std::shared_ptr<Client> client,
42 const ConsumerSettings& settings);
43 virtual ~ConsumerBase();
44
45 /// @brief Start consuming messages from the broker.
46 /// Calling this method on running consumer has no effect.
47 ///
48 /// Should not throw, in case of initial setup failure library will restart
49 /// the consumer in the background.
50 void Start();
51
52 /// @brief Stop consuming messages from the broker.
53 /// Calling this method on stopped consumer has no effect.
54 ///
55 /// @note You must call this method before your derived class is destroyed,
56 /// otherwise it's UB.
57 void Stop();
58
59 protected:
60 /// @brief You may override this method in derived class and implement
61 /// message handling logic. By default it does nothing.
62 ///
63 /// If this method returns successfully message would be acked (best effort)
64 /// to the broker, if this method throws the message would be requeued.
65 ///
66 /// Please keep in mind that it is possible for the message to be delivered
67 /// again even if `Process` returns successfully: sadly we can't guarantee
68 /// that `ack` ever reached the broker (network issues or unexpected shutdown,
69 /// for example).
70 /// It is however guaranteed for message to be requeued if `Process` fails.
71 virtual void Process(std::string) { /* do nothing */
72 }
73
74 /// @brief You may override this method in derived class and implement
75 /// message handling logic. By default it just calls `Process(std::string)`
76 /// with message body.
77 ///
78 virtual void Process(ConsumedMessage msg) { Process(std::move(msg.message)); }
79
80 private:
81 std::shared_ptr<Client> client_;
82 const ConsumerSettings settings_;
83
84 std::unique_ptr<ConsumerBaseImpl> impl_;
85 utils::PeriodicTask monitor_{};
86};
87
88} // namespace urabbitmq
89
90USERVER_NAMESPACE_END