userver: userver/urabbitmq/consumer_component_base.hpp Source File
Loading...
Searching...
No Matches
consumer_component_base.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/urabbitmq/consumer_component_base.hpp
4/// @brief Base component for your consumers.
5
6#include <memory>
7#include <userver/components/component_base.hpp>
8#include <userver/urabbitmq/typedefs.hpp>
9
10USERVER_NAMESPACE_BEGIN
11
12namespace urabbitmq {
13
14// clang-format off
15/// @ingroup userver_base_classes
16///
17/// @brief Base component for your consumers.
18/// Basically a `ConsumerBase` but in a nice component-ish way
19///
20/// You should derive from it and override `Process` method, which gets called
21/// when a new message arrives from the broker.
22/// The consumer will be automatically started after all components are loaded
23/// and stopped before all components are beginning to stop.
24///
25/// Library takes care of handling start failures and runtime failures
26/// (connection breakage/broker node downtime etc.) and will try it's best to
27/// restart the consumer.
28///
29/// @note Library guarantees `at least once` delivery, hence some deduplication
30/// might be needed ou your side.
31///
32/// ## Static configuration example:
33///
34/// @snippet samples/rabbitmq_service/static_config.yaml RabbitMQ consumer sample - static config
35///
36/// ## Static options:
37/// Name | Description
38/// rabbit_name | Name of the RabbitMQ component to use for consumption
39/// queue | Name of the queue to consume from
40/// prefetch_count | prefetch_count for the consumer, limits the amount of in-flight messages
41///
42// clang-format on
44 public:
45 ConsumerComponentBase(const components::ComponentConfig& config,
46 const components::ComponentContext& context);
47 ~ConsumerComponentBase() override;
48
49 static yaml_config::Schema GetStaticConfigSchema();
50
51 protected:
53
55
56 /// @brief You may override this method in derived class and implement
57 /// message handling logic. By default it does nothing.
58 ///
59 /// If this method returns successfully message would be acked (best effort)
60 /// to the broker, if this method throws the message would be requeued.
61 ///
62 /// Please keep in mind that it is possible for the message to be delivered
63 /// again even if `Process` returns successfully: sadly we can't guarantee
64 /// that `ack` ever reached the broker (network issues or unexpected shutdown,
65 /// for example).
66 /// It is however guaranteed for message to be requeued if `Process` fails.
67 virtual void Process(std::string) { /* do nothing */
68 }
69
70 /// @brief You may override this method in derived class and implement
71 /// message handling logic. By default it just calls `Process` with message
72 /// body.
73 ///
74 virtual void Process(ConsumedMessage msg) { Process(std::move(msg.message)); }
75
76 private:
77 // This is actually just a subclass of `ConsumerBase`
78 class Impl;
79 std::unique_ptr<Impl> impl_;
80};
81
82} // namespace urabbitmq
83
84namespace components {
85
86template <>
87inline constexpr bool kHasValidate<urabbitmq::ConsumerComponentBase> = true;
88
89}
90
91USERVER_NAMESPACE_END