Before you start
Make sure that you can compile and run core tests and read a basic example Writing your first HTTP server.
Also, it is expected that you are familiar with basic Kafka notions and ideas (brokers, topics, partitions, producers, consumers, etc.).
Step by step guide
In this tutorial we will write a service that consists of two components:
- HTTP component that sends given message to Kafka broker. Method is HTTP POST
/produce
;
- Basic component that reads all messages produced to configured topics;
HTTP handler component (producer) declaration
Like in Writing your first HTTP server we create a component for handling HTTP requests:
public:
static constexpr std::string_view kName{"producer-handler"};
) const override;
private:
};
Note that the component holds a reference to kafka::Producer - a client to the Kafka (its writer). That client is thread safe, you can use it concurrently from different threads and tasks.
To use producer in the program, include it with
#include <userver/kafka/producer.hpp>
Produce messages to topic
To send messages to topics, let us declare and implement function kafka_sample::Produce
that accepts a producer reference and the kafka_sample::RequestMessage
and uses kafka::Producer::Send
method to schedule its send to the given topic and asynchronously wait its delivery:
SendStatus Produce(
const kafka::Producer& producer,
const RequestMessage& message) {
try {
producer.
Send(message.topic, message.key, message.payload);
return SendStatus::kSuccess;
return ex.
IsRetryable() ? SendStatus::kErrorRetryable : SendStatus::kErrorNonRetryable;
}
}
Note that kafka::Producer::Send may throw kafka::SendException if message is not correct or delivery timedout.
kafka::SendException::IsRetryable method says whether it makes sense to retry the request.
Also see kafka::Producer::SendAsync for more flexible message delivery scheduling.
Produce message on HTTP request
At first, we should find a producer instance and save it in our component's field:
: server::handlers::HttpHandlerJsonBase{config, context},
producer_{context.FindComponent<kafka::ProducerComponent>().GetProducer()} {}
Then, implement the server::handlers::HttpHandlerJsonBase method to define each request handler:
) const {
if (!IsCorrectRequest(request_json)) {
}
const auto message = request_json.
As<RequestMessage>();
switch (Produce(producer_, message)) {
case SendStatus::kSuccess:
case SendStatus::kErrorRetryable:
case SendStatus::kErrorNonRetryable:
}
}
Here we:
- Check incoming message correctness;
- Parse request to
kafka_sample::RequestMessage
;
- Send message with
kafka_sample::Produce
- Dispatch returned result
Great! Currently, we understand how to write messages to broker. Next, let's understand how to read them.
Base component (consumer) declaration
Here we create a base component that just starts and does its job:
public:
static constexpr std::string_view kName{"consumer-handler"};
private:
};
Note that component holds a reference to kafka::ConsumerScope - a client to the Kafka (its reader). Consumer should be launched on the component start to asynchronously handle consumed messages.
To use consumer in the program, include it with
#include <userver/kafka/consumer_scope.hpp>
Consume messages from topics
We should implement a method or function that handles the messages consumed from topics. Here, we are reading the message batch and call the testpoint message_consumed
on each message key. But in real service, the process may take a long time and do worthy things.
for (const auto& message : messages) {
if (!message.GetTimestamp().has_value()) {
continue;
}
builder["key"] = message.GetKey();
}());
}
}
Consume messages in component
At first, we should find a consumer instance and save it in our component's field. After that we start the asynchronous message processing process, passing a callback to ConsumerScope::Start that calls the kafka_sample::Consume
and commits the messages offsets on success (no exception thrown in Consume
).
consumer_{context.FindComponent<kafka::ConsumerComponent>().GetConsumer()} {
Consume(messages);
consumer_.AsyncCommit();
});
}
Excellent! We understand how to read messages from broker, too. Finally, start the service.
int main()
Add all created and required components and run utils::DaemonRun to start the server:
int main(int argc, char* argv[]) {
.
Append<kafka_sample::ConsumerHandler>()
.Append<kafka_sample::ProducerHandler>()
.Append<kafka::ProducerComponent>("kafka-producer")
.Append<components::Secdist>()
.Append<components::HttpClient>()
.Append<server::handlers::TestsControl>();
}
Static config
Static configuration of service is quite close to the configuration from Writing your first HTTP server except for the handlers and producer and consumer settings.
Consumer settings are:
# yaml
kafka-consumer: # Kafka consumer component name must start with kafka-consumer
group_id: test-group # Consumers group current consumer will join on start
topics: # List of topics consumer starts to listen
- test-topic-1
- test-topic-2
auto_offset_reset: smallest # Strategy for reading topics without committed offsets
max_batch_size: 5 # Number of messages consumers waits until user-callback is called
security_protocol: PLAINTEXT # Broker connection settings
And basic producer setting is:
# yaml
kafka-producer: # Kafka producer component name must start with kafka-consumer
delivery_timeout: 10s # Timeout for message delivery
queue_buffering_max: 100ms # Time, producer waits before sending a message
enable_idempotence: true # Whether to enable idempotence mode
security_protocol: PLAINTEXT # Broker connection settings
Secdist
Kafka clients need to know the Kafka brokers endpoints and, optionally, the connection username and password.
Producer and Consumer read the aforementioned settings from Secdist in format:
"kafka_settings": {
"<kafka-component-name>": {
"brokers": "<brokers comma-separated endpoint list>",
"username": "SASL2 username (may be empty if use PLAINTEXT)",
"password": "SASL2 password (may be empty if use PLAINTEXT)"
}
}
Build and run
To build the sample, execute the following build steps at the userver root directory:
mkdir build_release
cd build_release
cmake -DCMAKE_BUILD_TYPE=Release ..
make userver-samples-kafka_service
The sample could be started by running make start-userver-samples-kafka_service
. The command would invoke testsuite start target that sets proper paths in the configuration files, prepares and starts the Kafka broker, and starts the service.
Make sure to place secdist settings into /etc/kafka_service/secure_data.json
and define the KAFKA_HOME
environment variable with the path to Kafka binaries.
Also set the TESTSUITE_KAFKA_CUSTOM_TOPICS
environment to create topics on server start.
Read about Kafka service and environment settings in https://yandex.github.io/yandex-taxi-testsuite/kafka/.
Final start command may look as follows:
$ KAFKA_HOME=/etc/kafka TESTSUITE_KAFKA_CUSTOM_TOPICS=test-topic-1:1,test-topic-2:1 make start-userver-samples-kafka_service
To start the service manually start the Kafka broker and run ./samples/kafka_service/userver-samples-kafka_service -c </path/to/static_config.yaml>
.
Now you can send a request to your service from another terminal:
$ curl -X POST -i --data '{"topic": "test-topic-1", "key": "key", "payload": "msg"}' localhost:8187/produce
HTTP/1.1 200 OK
Date: Thu, 03 Oct 2024 12:54:27 UTC
Server: userver/2.5-rc (20241003123116; rv:unknown)
Content-Type: application/json; charset=utf-8
Accept-Encoding: gzip, zstd, identity
traceparent: 00-93a06ecf62ff45828985bae8c5e6e31b-6a0abcac1b7e835b-01
X-YaRequestId: bd1a8240fad04a259dab26d0aa8603a4
X-YaTraceId: 93a06ecf62ff45828985bae8c5e6e31b
X-YaSpanId: 6a0abcac1b7e835b
Connection: keep-alive
Content-Length: 39
{"message":"Message send successfully"}
Unit tests
- Warning
- There may be issues with environment settings, so read kafka::utest::KafkaCluster and testsuite documentation before write your own tests.
To write unittest with Kafka environment, link your project with userver::kafka-utest
target and call userver_add_utest
with adjusted environment:
add_executable(${PROJECT_NAME}-unittest "unittest/kafka_test.cpp")
target_link_libraries(${PROJECT_NAME}-unittest
PRIVATE ${PROJECT_NAME}_objs userver::kafka-
utest)
userver_add_utest(
NAME ${PROJECT_NAME}-unittest
DATABASES kafka
TEST_ENV
"TESTSUITE_KAFKA_SERVER_START_TIMEOUT=120.0"
"TESTSUITE_KAFKA_SERVER_HOST=[::1]"
"TESTSUITE_KAFKA_CUSTOM_TOPICS=test-topic-1:1,test-topic-2:1"
)
For unit testing your service logic, it is useful to add functions that accepts the producer or consumer and call them both from service code and from tests.
To create producer and consumer instances in userver unit tests you can use kafka::utest::KafkaCluster fixture. Inherit your testing class from it and use UTEST_F
macros.
#include <userver/kafka/utest/kafka_fixture.hpp>
To test kafka_sample::Produce
we can use such technique:
UTEST_F(KafkaServiceTest, Produce) {
auto producer = MakeProducer("kafka-producer");
EXPECT_EQ(
kafka_sample::Produce(producer, kafka_sample::RequestMessage{"test-topic", "test-key", "test-message"}),
kafka_sample::SendStatus::kSuccess
);
}
To test kafka_sample::Consume
use kafka::utest::KafkaCluster::SendMessages and kafka::utest::KafkaCluster::ReceiveMessages:
UTEST_F(KafkaServiceTest, Consume) {
const std::string kTestTopic1{"test-topic-1"};
const std::string kTestTopic2{"test-topic-2"};
const std::array kTestMessages{
kTestTopic1,
"test-key-1",
"test-msg-1",
0},
kTestTopic2,
"test-key-2",
"test-msg-2",
0}};
SendMessages(kTestMessages);
auto consumer = MakeConsumer("kafka-consumer", {kTestTopic1, kTestTopic2});
const auto received_messages = ReceiveMessages(consumer, kTestMessages.size(), std::move(user_callback));
ASSERT_EQ(received_messages.size(), kTestMessages.size());
EXPECT_THAT(received_messages, ::testing::UnorderedElementsAreArray(kTestMessages));
}
Functional tests
Yandex Taxi Testsuite framework has Kafka plugin that helps set environment for functional service testing. Also, userver has some custom fixtures in pytest_userver.plugins.kafka
Fixtures and environment settings settings may be included in your conftest.py
with:
pytest_plugins = ['pytest_userver.plugins.kafka']
Also, you can redefine a service_env fixture and pass kafka_secdist fixture to generate secdists settings for all yours consumers and producers:
@pytest.fixture(scope='session')
def service_env(kafka_secdist) -> dict:
"""
Note: kafka_secist fixture generates the secdist config
Expected secdist format is:
"kafka_settings": {
"<kafka-component-name>": {
"brokers": "<brokers comma-separated endpoint list>",
"username": "SASL2 username (may be empty if use PLAINTEXT)",
"password": "SASL2 password (may be empty if use PLAINTEXT)"
}
}
"""
return {'SECDIST_CONFIG': kafka_secdist}
For sample service it generates:
{
"kafka_settings": {
"kafka-producer": {
"brokers": "[::1]:9099",
"username": "",
"password": ""
},
"kafka-consumer": {
"brokers": "[::1]:9099",
"username": "",
"password": ""
}
}
}
Add functional tests using:
userver_testsuite_add_simple(
TEST_ENV
"TESTSUITE_KAFKA_SERVER_START_TIMEOUT=120.0"
"TESTSUITE_KAFKA_SERVER_HOST=[::1]"
"TESTSUITE_KAFKA_CUSTOM_TOPICS=test-topic-1:1,test-topic-2:1"
)
Basic test that send 10 messages to 2 topics and waits until each message is delivered:
async def test_kafka_basic(service_client, testpoint):
received_key = ''
@testpoint('message_consumed')
def message_consumed(data):
nonlocal received_key
received_key = data['key']
await service_client.update_server_state()
TOPIC1 = 'test-topic-1'
TOPIC2 = 'test-topic-2'
MESSAGE_COUNT = 10
for send in range(MESSAGE_COUNT):
topic = TOPIC1 if send % 2 == 0 else TOPIC2
send_key = f'test-key-{send}'
response = await service_client.post(
'/produce',
json={
'topic': topic,
'key': send_key,
'payload': f'test-msg-{send}',
},
)
assert response.status_code == 200
await message_consumed.wait_call()
assert received_key == send_key
Also, testsuite exposes kafka_producer and kafka_consumer fixtures that also can be used in the functional tests.
- See also
- https://yandex.github.io/yandex-taxi-testsuite/kafka/#kafka-consumer
Full sources
See the full example at: