userver: Kafka service
Loading...
Searching...
No Matches
Kafka service

Before you start

Make sure that you can compile and run core tests and read a basic example Writing your first HTTP server.

Also, it is expected that you are familiar with basic Kafka notions and ideas (brokers, topics, partitions, producers, consumers, etc.).

Step by step guide

In this tutorial we will write a service that consists of two components:

  1. HTTP component that sends given message to Kafka broker. Method is HTTP POST /produce;
  2. Basic component that reads all messages produced to configured topics;

HTTP handler component (producer) declaration

Like in Writing your first HTTP server we create a component for handling HTTP requests:

class ProducerHandler final : public server::handlers::HttpHandlerJsonBase {
public:
static constexpr std::string_view kName{"producer-handler"};
ProducerHandler(const components::ComponentConfig& config, const components::ComponentContext& context);
formats::json::Value HandleRequestJsonThrow(
const server::http::HttpRequest& request,
const formats::json::Value& request_json,
) const override;
private:
const kafka::Producer& producer_;
};

Note that the component holds a reference to kafka::Producer - a client to the Kafka (its writer). That client is thread safe, you can use it concurrently from different threads and tasks.

To use producer in the program, include it with

#include <userver/kafka/producer.hpp>

Produce messages to topic

To send messages to topics, let us declare and implement function kafka_sample::Produce that accepts a producer reference and the kafka_sample::RequestMessage and uses kafka::Producer::Send method to schedule its send to the given topic and asynchronously wait its delivery:

SendStatus Produce(const kafka::Producer& producer, const RequestMessage& message) {
try {
producer.Send(message.topic, message.key, message.payload);
return SendStatus::kSuccess;
} catch (const kafka::SendException& ex) {
return ex.IsRetryable() ? SendStatus::kErrorRetryable : SendStatus::kErrorNonRetryable;
}
}

Note that kafka::Producer::Send may throw kafka::SendException if message is not correct or delivery timedout.

kafka::SendException::IsRetryable method says whether it makes sense to retry the request.

Also see kafka::Producer::SendAsync for more flexible message delivery scheduling.

Produce message on HTTP request

At first, we should find a producer instance and save it in our component's field:

ProducerHandler::ProducerHandler(const components::ComponentConfig& config, const components::ComponentContext& context)
: server::handlers::HttpHandlerJsonBase{config, context},
producer_{context.FindComponent<kafka::ProducerComponent>().GetProducer()} {}

Then, implement the server::handlers::HttpHandlerJsonBase method to define each request handler:

formats::json::Value ProducerHandler::HandleRequestJsonThrow(
const server::http::HttpRequest& request,
const formats::json::Value& request_json,
) const {
if (!IsCorrectRequest(request_json)) {
request.SetResponseStatus(server::http::HttpStatus::kBadRequest);
return formats::json::FromString(kErrorMembersNotSet);
}
const auto message = request_json.As<RequestMessage>();
switch (Produce(producer_, message)) {
case SendStatus::kSuccess:
return formats::json::MakeObject("message", "Message send successfully");
case SendStatus::kErrorRetryable:
request.SetResponseStatus(server::http::HttpStatus::TooManyRequests);
return formats::json::MakeObject("error", "Retry later");
case SendStatus::kErrorNonRetryable:
request.SetResponseStatus(server::http::HttpStatus::kBadRequest);
return formats::json::MakeObject("error", "Bad request");
}
UINVARIANT(false, "Unknown produce status");
}

Here we:

  • Check incoming message correctness;
  • Parse request to kafka_sample::RequestMessage;
  • Send message with kafka_sample::Produce
  • Dispatch returned result

Great! Currently, we understand how to write messages to broker. Next, let's understand how to read them.

Base component (consumer) declaration

Here we create a base component that just starts and does its job:

class ConsumerHandler final : public components::ComponentBase {
public:
static constexpr std::string_view kName{"consumer-handler"};
ConsumerHandler(const components::ComponentConfig& config, const components::ComponentContext& context);
private:
// Subscriptions must be the last fields! Add new fields above this comment.
};

Note that component holds a reference to kafka::ConsumerScope - a client to the Kafka (its reader). Consumer should be launched on the component start to asynchronously handle consumed messages.

To use consumer in the program, include it with

#include <userver/kafka/consumer_scope.hpp>

Consume messages from topics

We should implement a method or function that handles the messages consumed from topics. Here, we are reading the message batch and call the testpoint message_consumed on each message key. But in real service, the process may take a long time and do worthy things.

void Consume(kafka::MessageBatchView messages) {
for (const auto& message : messages) {
if (!message.GetTimestamp().has_value()) {
continue;
}
TESTPOINT("message_consumed", [&message] {
builder["key"] = message.GetKey();
return builder.ExtractValue();
}());
}
}

Consume messages in component

At first, we should find a consumer instance and save it in our component's field. After that we start the asynchronous message processing process, passing a callback to ConsumerScope::Start that calls the kafka_sample::Consume and commits the messages offsets on success (no exception thrown in Consume).

ConsumerHandler::ConsumerHandler(const components::ComponentConfig& config, const components::ComponentContext& context)
: components::ComponentBase{config, context},
consumer_{context.FindComponent<kafka::ConsumerComponent>().GetConsumer()} {
consumer_.Start([this](kafka::MessageBatchView messages) {
Consume(messages);
consumer_.AsyncCommit();
});
}

Excellent! We understand how to read messages from broker, too. Finally, start the service.

int main()

Add all created and required components and run utils::DaemonRun to start the server:

int main(int argc, char* argv[]) {
const auto components_list = components::MinimalServerComponentList()
.Append<kafka_sample::ConsumerHandler>()
.Append<kafka_sample::ProducerHandler>()
.Append<kafka::ConsumerComponent>("kafka-consumer")
.Append<kafka::ProducerComponent>("kafka-producer")
.Append<components::Secdist>()
.Append<components::HttpClient>()
.Append<server::handlers::TestsControl>();
return utils::DaemonMain(argc, argv, components_list);
}

Static config

Static configuration of service is quite close to the configuration from Writing your first HTTP server except for the handlers and producer and consumer settings.

Consumer settings are:

# yaml
kafka-consumer: # Kafka consumer component name must start with kafka-consumer
group_id: test-group # Consumers group current consumer will join on start
topics: # List of topics consumer starts to listen
- test-topic-1
- test-topic-2
auto_offset_reset: smallest # Strategy for reading topics without committed offsets
max_batch_size: 5 # Number of messages consumers waits until user-callback is called
security_protocol: PLAINTEXT # Broker connection settings

And basic producer setting is:

# yaml
kafka-producer: # Kafka producer component name must start with kafka-consumer
delivery_timeout: 10s # Timeout for message delivery
queue_buffering_max: 100ms # Time, producer waits before sending a message
enable_idempotence: true # Whether to enable idempotence mode
security_protocol: PLAINTEXT # Broker connection settings

Secdist

Kafka clients need to know the Kafka brokers endpoints and, optionally, the connection username and password.

Producer and Consumer read the aforementioned settings from Secdist in format:

"kafka_settings": {
"<kafka-component-name>": {
"brokers": "<brokers comma-separated endpoint list>",
"username": "SASL2 username (may be empty if use PLAINTEXT)",
"password": "SASL2 password (may be empty if use PLAINTEXT)"
}
}

Build and run

To build the sample, execute the following build steps at the userver root directory:

mkdir build_release
cd build_release
cmake -DCMAKE_BUILD_TYPE=Release ..
make userver-samples-kafka_service

The sample could be started by running make start-userver-samples-kafka_service. The command would invoke testsuite start target that sets proper paths in the configuration files, prepares and starts the Kafka broker, and starts the service.

Make sure to place secdist settings into /etc/kafka_service/secure_data.json and define the KAFKA_HOME environment variable with the path to Kafka binaries.

Also set the TESTSUITE_KAFKA_CUSTOM_TOPICS environment to create topics on server start.

Read about Kafka service and environment settings in https://yandex.github.io/yandex-taxi-testsuite/kafka/.

Final start command may look as follows:

$ KAFKA_HOME=/etc/kafka TESTSUITE_KAFKA_CUSTOM_TOPICS=test-topic-1:1,test-topic-2:1 make start-userver-samples-kafka_service

To start the service manually start the Kafka broker and run ./samples/kafka_service/userver-samples-kafka_service -c </path/to/static_config.yaml>.

Now you can send a request to your service from another terminal:

$ curl -X POST -i --data '{"topic": "test-topic-1", "key": "key", "payload": "msg"}' localhost:8187/produce
HTTP/1.1 200 OK
Date: Thu, 03 Oct 2024 12:54:27 UTC
Server: userver/2.5-rc (20241003123116; rv:unknown)
Content-Type: application/json; charset=utf-8
Accept-Encoding: gzip, zstd, identity
traceparent: 00-93a06ecf62ff45828985bae8c5e6e31b-6a0abcac1b7e835b-01
X-YaRequestId: bd1a8240fad04a259dab26d0aa8603a4
X-YaTraceId: 93a06ecf62ff45828985bae8c5e6e31b
X-YaSpanId: 6a0abcac1b7e835b
Connection: keep-alive
Content-Length: 39
{"message":"Message send successfully"}

Unit tests

Warning
There may be issues with environment settings, so read kafka::utest::KafkaCluster and testsuite documentation before write your own tests.

To write unittest with Kafka environment, link your project with userver::kafka-utest target and call userver_add_utest with adjusted environment:

add_executable(${PROJECT_NAME}-unittest "unittest/kafka_test.cpp")
target_link_libraries(${PROJECT_NAME}-unittest
PRIVATE ${PROJECT_NAME}_objs userver::kafka-utest)
userver_add_utest(
NAME ${PROJECT_NAME}-unittest
DATABASES kafka
TEST_ENV
"TESTSUITE_KAFKA_SERVER_START_TIMEOUT=120.0"
"TESTSUITE_KAFKA_SERVER_HOST=[::1]"
"TESTSUITE_KAFKA_CUSTOM_TOPICS=test-topic-1:1,test-topic-2:1"
)

For unit testing your service logic, it is useful to add functions that accepts the producer or consumer and call them both from service code and from tests.

To create producer and consumer instances in userver unit tests you can use kafka::utest::KafkaCluster fixture. Inherit your testing class from it and use UTEST_F macros.

#include <userver/kafka/utest/kafka_fixture.hpp>
class KafkaServiceTest : public kafka::utest::KafkaCluster {};

To test kafka_sample::Produce we can use such technique:

UTEST_F(KafkaServiceTest, Produce) {
auto producer = MakeProducer("kafka-producer");
EXPECT_EQ(
kafka_sample::Produce(producer, kafka_sample::RequestMessage{"test-topic", "test-key", "test-message"}),
kafka_sample::SendStatus::kSuccess
);
}

To test kafka_sample::Consume use kafka::utest::KafkaCluster::SendMessages and kafka::utest::KafkaCluster::ReceiveMessages:

UTEST_F(KafkaServiceTest, Consume) {
const std::string kTestTopic1{"test-topic-1"};
const std::string kTestTopic2{"test-topic-2"};
const std::array kTestMessages{
kTestTopic1,
"test-key-1",
"test-msg-1",
/*partition=*/0},
kTestTopic2,
"test-key-2",
"test-msg-2",
/*partition=*/0}};
SendMessages(kTestMessages);
auto consumer = MakeConsumer("kafka-consumer", /*topics=*/{kTestTopic1, kTestTopic2});
auto user_callback = [](kafka::MessageBatchView messages) { kafka_sample::Consume(messages); };
const auto received_messages = ReceiveMessages(consumer, kTestMessages.size(), std::move(user_callback));
ASSERT_EQ(received_messages.size(), kTestMessages.size());
EXPECT_THAT(received_messages, ::testing::UnorderedElementsAreArray(kTestMessages));
}

Functional tests

Yandex Taxi Testsuite framework has Kafka plugin that helps set environment for functional service testing. Also, userver has some custom fixtures in pytest_userver.plugins.kafka

Fixtures and environment settings settings may be included in your conftest.py with:

pytest_plugins = ['pytest_userver.plugins.kafka']

Also, you can redefine a service_env fixture and pass kafka_secdist fixture to generate secdists settings for all yours consumers and producers:

@pytest.fixture(scope='session')
def service_env(kafka_secdist) -> dict:
"""
Note: kafka_secist fixture generates the secdist config
Expected secdist format is:
"kafka_settings": {
"<kafka-component-name>": {
"brokers": "<brokers comma-separated endpoint list>",
"username": "SASL2 username (may be empty if use PLAINTEXT)",
"password": "SASL2 password (may be empty if use PLAINTEXT)"
}
}
"""
return {'SECDIST_CONFIG': kafka_secdist}

For sample service it generates:

{
"kafka_settings": {
"kafka-producer": {
"brokers": "[::1]:9099",
"username": "",
"password": ""
},
"kafka-consumer": {
"brokers": "[::1]:9099",
"username": "",
"password": ""
}
}
}

Add functional tests using:

userver_testsuite_add_simple(
TEST_ENV
"TESTSUITE_KAFKA_SERVER_START_TIMEOUT=120.0"
"TESTSUITE_KAFKA_SERVER_HOST=[::1]"
"TESTSUITE_KAFKA_CUSTOM_TOPICS=test-topic-1:1,test-topic-2:1"
)

Basic test that send 10 messages to 2 topics and waits until each message is delivered:

async def test_kafka_basic(service_client, testpoint):
received_key = ''
# register testpoint, which consumer calls
# after each message is consumed
@testpoint('message_consumed')
def message_consumed(data):
nonlocal received_key
received_key = data['key']
await service_client.update_server_state()
TOPIC1 = 'test-topic-1'
TOPIC2 = 'test-topic-2'
MESSAGE_COUNT = 10
for send in range(MESSAGE_COUNT):
topic = TOPIC1 if send % 2 == 0 else TOPIC2
send_key = f'test-key-{send}'
# send message and waits its delivery
response = await service_client.post(
'/produce',
json={
'topic': topic,
'key': send_key,
'payload': f'test-msg-{send}',
},
)
assert response.status_code == 200
# wait until consume read the message and call the testpoint
await message_consumed.wait_call()
# check message key
assert received_key == send_key

Also, testsuite exposes kafka_producer and kafka_consumer fixtures that also can be used in the functional tests.

See also
https://yandex.github.io/yandex-taxi-testsuite/kafka/#kafka-consumer

Full sources

See the full example at: