userver: userver/urabbitmq/consumer_component_base.hpp Source File
Loading...
Searching...
No Matches
consumer_component_base.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/urabbitmq/consumer_component_base.hpp
4/// @brief Base component for your consumers.
5
6#include <memory>
7#include <userver/components/component_base.hpp>
8#include <userver/urabbitmq/typedefs.hpp>
9
10USERVER_NAMESPACE_BEGIN
11
12namespace urabbitmq {
13
14/// @ingroup userver_base_classes
15///
16/// @brief Base component for your consumers.
17/// Basically a `ConsumerBase` but in a nice component-ish way
18///
19/// You should derive from it and override `Process` method, which gets called
20/// when a new message arrives from the broker.
21/// The consumer will be automatically started after all components are loaded
22/// and stopped before all components are beginning to stop.
23///
24/// Library takes care of handling start failures and runtime failures
25/// (connection breakage/broker node downtime etc.) and will try it's best to
26/// restart the consumer.
27///
28/// @note Library guarantees `at least once` delivery, hence some deduplication
29/// might be needed ou your side.
30///
31/// ## Static configuration example:
32///
33/// @snippet samples/rabbitmq_service/static_config.yaml RabbitMQ consumer sample - static config
34///
35/// ## Static options @ref urabbitmq::ConsumerComponentBase :
36/// @include{doc} scripts/docs/en/components_schema/rabbitmq/src/urabbitmq/consumer_component_base.md
37///
38/// Options inherited from @ref components::ComponentBase :
39/// @include{doc} scripts/docs/en/components_schema/core/src/components/impl/component_base.md
41public:
42 ConsumerComponentBase(const components::ComponentConfig& config, const components::ComponentContext& context);
43 ~ConsumerComponentBase() override;
44
45 static yaml_config::Schema GetStaticConfigSchema();
46
47protected:
49
51
52 /// @brief You may override this method in derived class and implement
53 /// message handling logic. By default it does nothing.
54 ///
55 /// If this method returns successfully message would be acked (best effort)
56 /// to the broker, if this method throws the message would be requeued.
57 ///
58 /// Please keep in mind that it is possible for the message to be delivered
59 /// again even if `Process` returns successfully: sadly we can't guarantee
60 /// that `ack` ever reached the broker (network issues or unexpected shutdown,
61 /// for example).
62 /// It is however guaranteed for message to be requeued if `Process` fails.
63 virtual void Process(std::string) { /* do nothing */ }
64
65 /// @brief You may override this method in derived class and implement
66 /// message handling logic. By default it just calls `Process` with message
67 /// body.
68 ///
69 virtual void Process(ConsumedMessage msg) { Process(std::move(msg.message)); }
70
71private:
72 // This is actually just a subclass of `ConsumerBase`
73 class Impl;
74 std::unique_ptr<Impl> impl_;
75};
76
77} // namespace urabbitmq
78
79namespace components {
80
81template <>
82inline constexpr bool kHasValidate<urabbitmq::ConsumerComponentBase> = true;
83
84}
85
86USERVER_NAMESPACE_END