Your opinion will help to improve our service
Leave a feedback >Make sure that you can compile and run core tests and read a basic example Writing your first HTTP server.
Also, it is expected that you are familiar with basic Kafka notions and ideas (brokers, topics, partitions, producers, consumers, etc.).
In this tutorial we will write a service that consists of two components:
/produce
;Like in Writing your first HTTP server we create a component for handling HTTP requests:
Note that the component holds a reference to kafka::Producer - a client to the Kafka (its writer). That client is thread safe, you can use it concurrently from different threads and tasks.
To use producer in the program, include it with
To send messages to topics, let us declare and implement function kafka_sample::Produce
that accepts a producer reference and the kafka_sample::RequestMessage
and uses kafka::Producer::Send
method to schedule its send to the given topic and asynchronously wait its delivery:
Note that kafka::Producer::Send may throw kafka::SendException if message is not correct or delivery timedout.
kafka::SendException::IsRetryable method says whether it makes sense to retry the request.
Also see kafka::Producer::SendAsync for more flexible message delivery scheduling.
At first, we should find a producer instance and save it in our component's field:
Then, implement the server::handlers::HttpHandlerJsonBase method to define each request handler:
Here we:
kafka_sample::RequestMessage
;kafka_sample::Produce
Great! Currently, we understand how to write messages to broker. Next, let's understand how to read them.
Here we create a base component that just starts and does its job:
Note that component holds a reference to kafka::ConsumerScope - a client to the Kafka (its reader). Consumer should be launched on the component start to asynchronously handle consumed messages.
To use consumer in the program, include it with
We should implement a method or function that handles the messages consumed from topics. Here, we are reading the message batch and call the testpoint message_consumed
on each message key. But in real service, the process may take a long time and do worthy things.
At first, we should find a consumer instance and save it in our component's field. After that we start the asynchronous message processing process, passing a callback to ConsumerScope::Start that calls the kafka_sample::Consume
and commits the messages offsets on success (no exception thrown in Consume
).
Excellent! We understand how to read messages from broker, too. Finally, start the service.
Add all created and required components and run utils::DaemonRun to start the server:
Static configuration of service is quite close to the configuration from Writing your first HTTP server except for the handlers and producer and consumer settings.
Consumer settings are:
kafka-consumer: # Kafka consumer component name must start with kafka-consumer
group_id: test-group # Consumers group current consumer will join on start
topics: # List of topics consumer starts to listen
- test-topic-1
- test-topic-2
auto_offset_reset: smallest # Strategy for reading topics without committed offsets
max_batch_size: 5 # Number of messages consumers waits until user-callback is called
security_protocol: PLAINTEXT # Broker connection settings
And basic producer setting is:
kafka-producer: # Kafka producer component name must start with kafka-producer
delivery_timeout: 10s # Timeout for message delivery
queue_buffering_max: 100ms # Time, producer waits before sending a message
enable_idempotence: true # Whether to enable idempotence mode
security_protocol: PLAINTEXT # Broker connection settings
Kafka clients need to know the Kafka brokers endpoints and, optionally, the connection username and password.
Producer and Consumer read the aforementioned settings from Secdist in format:
To build the sample, execute the following build steps at the userver root directory:
The sample could be started by running make start-userver-samples-kafka_service
. The command would invoke testsuite start target that sets proper paths in the configuration files, prepares and starts the Kafka broker, and starts the service.
Make sure to place secdist settings into /etc/kafka_service/secure_data.json
and define the KAFKA_HOME
environment variable with the path to Kafka binaries.
Also set the TESTSUITE_KAFKA_CUSTOM_TOPICS
environment to create topics on server start.
Read about Kafka service and environment settings in https://yandex.github.io/yandex-taxi-testsuite/kafka/.
Final start command may look as follows:
To start the service manually start the Kafka broker and run ./samples/kafka_service/userver-samples-kafka_service -c </path/to/static_config.yaml>
.
Now you can send a request to your service from another terminal:
To write unittest with Kafka environment, link your project with userver::kafka-utest
target and call userver_add_utest
with adjusted environment:
For unit testing your service logic, it is useful to add functions that accepts the producer or consumer and call them both from service code and from tests.
To create producer and consumer instances in userver unit tests you can use kafka::utest::KafkaCluster fixture. Inherit your testing class from it and use UTEST_F
macros.
To test kafka_sample::Produce
we can use such technique:
To test kafka_sample::Consume
use kafka::utest::KafkaCluster::SendMessages and kafka::utest::KafkaCluster::ReceiveMessages:
Yandex Taxi Testsuite framework has Kafka plugin that helps set environment for functional service testing. Also, userver has some custom fixtures in pytest_userver.plugins.kafka
Fixtures and environment settings settings may be included in your conftest.py
with:
Also, you can redefine a service_env fixture and pass kafka_secdist fixture to generate secdists settings for all yours consumers and producers:
For sample service it generates:
Add functional tests using:
Basic test that send 10 messages to 2 topics and waits until each message is delivered:
Also, testsuite exposes kafka_producer and kafka_consumer fixtures that also can be used in the functional tests.
See the full example at: