userver: userver/urabbitmq/consumer_component_base.hpp Source File
⚠️ This is the documentation for an old userver version. Click here to switch to the latest version.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
consumer_component_base.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/urabbitmq/consumer_component_base.hpp
4/// @brief Base component for your consumers.
5
6#include <memory>
7
8#include <userver/components/loggable_component_base.hpp>
9
10USERVER_NAMESPACE_BEGIN
11
12namespace urabbitmq {
13
14// clang-format off
15/// @ingroup userver_base_classes
16///
17/// @brief Base component for your consumers.
18/// Basically a `ConsumerBase` but in a nice component-ish way
19///
20/// You should derive from it and override `Process` method, which gets called
21/// when a new message arrives from the broker.
22/// The consumer will be automatically started after all components are loaded
23/// and stopped before all components are beginning to stop.
24///
25/// Library takes care of handling start failures and runtime failures
26/// (connection breakage/broker node downtime etc.) and will try it's best to
27/// restart the consumer.
28///
29/// @note Library guarantees `at least once` delivery, hence some deduplication
30/// might be needed ou your side.
31///
32/// ## Static configuration example:
33///
34/// @snippet samples/rabbitmq_service/static_config.yaml RabbitMQ consumer sample - static config
35///
36/// ## Static options:
37/// Name | Description
38/// rabbit_name | Name of the RabbitMQ component to use for consumption
39/// queue | Name of the queue to consume from
40/// prefetch_count | prefetch_count for the consumer, limits the amount of in-flight messages
41///
42// clang-format on
44 public:
45 ConsumerComponentBase(const components::ComponentConfig& config,
46 const components::ComponentContext& context);
47 ~ConsumerComponentBase() override;
48
49 static yaml_config::Schema GetStaticConfigSchema();
50
51 protected:
52 void OnAllComponentsLoaded() final;
53
54 void OnAllComponentsAreStopping() final;
55
56 /// @brief Override this method in derived class and implement
57 /// message handling logic.
58 ///
59 /// If this method returns successfully message would be acked (best effort)
60 /// to the broker, if this method throws the message would be requeued.
61 ///
62 /// Please keep in mind that it is possible for the message to be delivered
63 /// again even if `Process` returns successfully: sadly we can't guarantee
64 /// that `ack` ever reached the broker (network issues or unexpected shutdown,
65 /// for example).
66 /// It is however guaranteed for message to be requeued if `Process` fails.
67 virtual void Process(std::string message) = 0;
68
69 private:
70 // This is actually just a subclass of `ConsumerBase`
71 class Impl;
72 std::unique_ptr<Impl> impl_;
73};
74
75} // namespace urabbitmq
76
77namespace components {
78
79template <>
80inline constexpr bool kHasValidate<urabbitmq::ConsumerComponentBase> = true;
81
82}
83
84USERVER_NAMESPACE_END