Изоляция в тестах с Kafka
Написание и поддержка интеграционных тестов с Kafka представляет собой вызов. В этой статье описывается подход, который упрощает процесс тестирования, обеспечивая изоляцию тестов и предоставляя набор инструментов для достижения этой цели. При использовании изоляции, тесты с Kafka могут быть организованы так, что на этапе проверки результатов обеспечивается доступ ко всем сообщениям, возникшим в ходе теста, избегая при этом необходимости в использовании методов принудительного ожидания, таких как Thread.sleep()
и Awaitility.
Этот метод подходит как для использования с Testcontainers, так и для Embedded Kafka или других способов запуска сервиса Kafka (например, локального инстанса).
Изоляция в тестах
Для надежного выполнения тестов важен контроль над тестовым окружением. Это гарантирует, что каждый тест начинается с заранее известного состояния. Примером может служить ситуация, когда один тест создает данные в базе и не удаляет их после себя, что может негативно сказаться на выполнении последующих тестов, ожидающих другое состояние базы данных.
Для достижения изоляции можно применять различные методы, включая:
- Восстановление исходного состояния системы перед запуском каждого теста.
- Обязательная очистка данных в базе после выполнения каждого теста, чтобы избежать влияния на следующие тесты.
Изоляция в тестах с Kafka
Восстановление начального состояния среды Kafka с нуля для каждого тестового сценария может быть реализованно путем перезапуска Kafka. Этот вариант достаточно простой в плане реализации, но дорогой по времени запуска. Существуют методы для ускорения этого процесса, но в данной статье предлагается рассмотреть вариант, когда в рамках всех тестовых сценариев мы работаем с общим экземпляром Kafka.
Такой подход ставит перед нами определенные вызовы: если один тест отправляет сообщения в топик и оставляет без внимания факт их получения и обработки, это может повредить выполнению другого теста, который может полагаться на иное состояние топика.
Для обеспечения изоляции необходимо, чтобы в случае отправки сообщения в топик, который прослушивается тем же приложением, до завершения тестового сценария были выполнены все процессы, инициированные этими сообщениями.
Реализация
Рассмотрим на примере условного телеграм-бота, который перенаправляет запросы к OpenAI API и отправляет ответы пользователям.
Контракты взаимодействия с сервисами описаны в упрощенном виде, чтобы подчеркнуть основную логику работы. Ниже приведена диаграмма последовательностей, демонстрирующая архитектуру приложения. Понимаю, что дизайн может вызвать вопросы с точки зрения системной архитектуры, но прошу отнестись к этому с пониманием — главная цель здесь продемонстрировать подход к изоляции в тестах.
@startuml
autonumber
participant "Telegram" as tg
box "Application" #lightgreen
participant "ServiceA" as ServiceA
participant "ServiceB" as ServiceB
end box
box "Kafka" #lightblue
participant "topicA" as topicA
participant "topicB" as topicB
end box
participant "OpenAI" as openai
tg -> ServiceA++: request
ServiceA -> topicA: message
ServiceA -> tg--: ok
...
topicA -> ServiceB++: message
ServiceB -> openai++: request
openai -> ServiceB--: response
ServiceB -> topicB--: message
...
topicB -> ServiceA++: message
ServiceA -> tg--: request
@enduml
Ниже представлена схема, иллюстрирующая подход к тестированию:
Основной особенностью предложенного решения является строгое разделение кода теста на фазы, соответствующие паттерну Arrange-Act-Assert. Подробнее о данном подходе вы можете прочитать в статье Разносим по полочкам этапы тестирования http запросов в Spring.
Для достижения изоляции критически важно соблюдать следующие взаимосвязи между ключевыми элементами схемы (цифры соответствуют указанным на схеме):
- (1) Подготовка сценария (Arrange) происходит до Выполнения сценария (Act).
- (2) Сообщение отправлено и (3) подтверждено лидером партиции до того как обработка запроса сервисом ServiceA считается завершенной.
- (4) Ручное управление смещением (offset) c фиксацией (commit) происходит только после того, как вся обработка сервисами ServiceB или ServiceA будет полностью завершена.
- (5) Выполнение сценария (Act) происходит до Проверки результата (Assert).
Такой подход позволяет к моменту проверки результатов гарантировать выполнение всех процессов в рамках тестового сценария, отправку и прием всех сообщений, обеспечивая нахождение тестового окружения в известном и финальном состоянии.
Подготовка сценария (Arrange) происходит до Выполнения сценария (Act)
Цель этого этапа — подготовить все необходимое для тестового сценария. В контексте нашего примера основные компоненты тестовой среды включают контекст приложения, моки HTTP-запросов, Kafka и Record Captor.
Что касается интеграции с Kafka, критически важно убедиться, что все консумеры готовы к приему сообщений. Данная проверка реализована в методе KafkaSupport#waitForPartitionAssignment. Решение основано на оригинальном ContainerTestUtils
из библиотеки org.springframework.kafka:spring-kafka-test
с доработкой согласно описываемому сценарию использования. Этот метод гарантирует, что каждому консумеру Kafka будет назначена хотя бы одна партиция. Это предполагает ограничение: в тестовой среде должна быть только одна партиция на топик, хотя данное ограничение является следствием текущей реализации метода и может быть изменено.
Использование общего экземпляра Kafka требует настройки параметра auto.offset.reset = latest
для консумеров, для Spring приложения это делается следующим образом:
spring.kafka.consumer.auto-offset-reset=latest
Record Captor является ключевым элементом этого решения. Его задача - “ловить” сообщения из указанных в конфигурации топиков и предоставить к ним доступ для шага проверки результатов тестового сценария. Технически это простой консумер для Kafka топика с механизмом хранения сообщений и интерфейсом доступа к ним. Код Record Captor доступен в репозитории проекта.
Текущая реализация Record Captor предлагает использование ключа сообщения для идентификации сообщений, связанных с конкретным тестовым случаем. Это будет полезно в системах, где присутствуют уникальные идентификаторы, такие как ID клиента или идентификатор процесса в доменной модели. Использование таких идентификаторов в качестве ключа позволяет эффективно группировать и отслеживать все сообщения, относящиеся к одному и тому же тестовому сценарию, даже если они распределены по различным топикам или обрабатываются разными компонентами системы.
Синхронная отправка сообщения с подтверждением
Цель состоит в реализации синхронной отправки сообщений в Kafka с получением подтверждения от лидера партиции. Для достижения этого необходимо установить параметр acks = 1
для продюсера. В контексте Spring приложения это настройка задается следующим образом:
spring.kafka.producer.acks=1
При использовании KafkaTemplate
для отправки сообщений важно обеспечить синхронность отправки, поскольку данный компонент предоставляет только асинхронный интерфейс org.springframework.kafka.core.KafkaTemplate#send(org.springframework.messaging.Message<?>) return CompletableFuture<SendResult<K, V>>
. Для синхронной отправки можно использовать следующий подход:
kafkaTemplate.send(message).get()
Это гарантирует, что отправка сообщения будет завершена синхронно, с ожиданием подтверждения от Kafka перед продолжением выполнения программы.
Ручное управление смещением
Ручное управление смещением c фиксацией означает, что консумер сообщения будет фиксировать обработку сообщений только после их полной обработки. В данном контексте фиксация смещения для топика topicA
произойдет только после того, как сообщение будет успешно отправлено в topicB
и получено соответствующее подтверждение.
Для реализации такой логики, необходимо отключить автоматическую фиксацию смещений для консумеров, установив параметр enable.auto.commit = false
. В контексте Spring-приложения это настраивается следующим образом:
spring.kafka.consumer.enable-auto-commit=false
Кроме того, следует настроить механизм фиксации так, чтобы смещение фиксировалось после обработки каждого отдельного сообщения, что достигается установкой параметра
spring.kafka.listener.ack-mode=record
Выполнение сценария (Act) происходит до Проверки результата (Assert)
До начала этапа проверки результатов необходимо убедиться, что все процессы, связанные со сценарием, завершены, все сообщения отправлены и получены, обеспечивая переход тестового окружения в “известное” финальное состояние. Благодаря предшествующим этапам, мы обеспечили соблюдение принципа happens-before между действиями продюсеров и консумеров, а также между всеми обработками внутри приложения. На этом этапе требуется выполнить проверку фиксации смещений для каждой партиции для каждой группы консумеров.
Для выполнения данной проверки можно воспользоваться решением, представленном в методе pw.avvero.emk.KafkaSupport#waitForPartitionOffsetCommit.
Проверки результата (Assert)
На заключительном этапе происходит анализ результатов тестового сценария. Этот процесс включает проверку состояния моков и анализ сообщений, пойманных в RecordCaptor.
Основные элементы решения вкратце
Вот краткое изложение ключевых компонентов предложенного решения для обеспечения эффективного тестирования с Kafka:
- Одна партиция на топик.
- Политика начала чтения сообщений для консумеров -
spring.kafka.consumer.auto-offset-reset=latest
. - Политика подтверждений для продюсеров
spring.kafka.producer.acks=1
. - Синхронная отправка сообщений
kafkaTemplate.send(message).get()
. - Ручной контроль за смещением
spring.kafka.consumer.enable-auto-commit=false
,spring.kafka.listener.ack-mode=record
. - Ожидание назначения партиций до начала тестового сценария
pw.avvero.emk.KafkaSupport#waitForPartitionAssignment
. - Ожидание подтверждения смещения перед проверкой результатов теста
pw.avvero.emk.KafkaSupport#waitForPartitionOffsetCommit
.
Результат
Код приложения доступен в модуле примеров. Код теста выглядит следующим образом.
def "User Message Processing with OpenAI"() {
setup:
(1) KafkaSupport.waitForPartitionAssignment(applicationContext)
and:
def openaiRequestCaptor = new RequestCaptor()
(2) restMock.expect(manyTimes(), requestTo("https://api.openai.com/v1/chat/completions"))
.andExpect(method(HttpMethod.POST))
.andExpect(openaiRequestCaptor)
.andRespond(withSuccess('{"content": "Hi, how can i help you?"}', MediaType.APPLICATION_JSON))
and:
def telegramRequestCaptor = new RequestCaptor()
(3) restMock.expect(manyTimes(), requestTo("https://api.telegram.org/sendMessage"))
.andExpect(method(HttpMethod.POST))
.andExpect(telegramRequestCaptor)
.andRespond(withSuccess('{}', MediaType.APPLICATION_JSON))
when:
(4) mockMvc.perform(post("/telegram/webhook")
.contentType(APPLICATION_JSON_VALUE)
.content("""{
"message": {
"from": {
"id": 10000000
},
"chat": {
"id": 20000000
},
"text": "Hello!"
}
}""".toString())
.accept(APPLICATION_JSON_VALUE))
.andExpect(status().isOk())
(5) KafkaSupport.waitForPartitionOffsetCommit(applicationContext)
then:
(6) openaiRequestCaptor.times == 1
JSONAssert.assertEquals("""{
"content": "Hello!"
}""", openaiRequestCaptor.bodyString, false)
and:
telegramRequestCaptor.times == 1
JSONAssert.assertEquals("""{
"chatId": "20000000",
"text": "Hi, how can i help you?"
}""", telegramRequestCaptor.bodyString, false)
}
Вот ключевые шаги, описанные в тестовом сценарии:
- Ожидание назначения партиций до начала тестового сценария
- Мокирование запросов к OpenAI
- Мокирование запросов к Telegram
- Выполнение тестового сценария
- Ожидание подтверждения смещения
- Проверки результата
Дополнительные тесты, включая сценарии с интенсивной отправкой сообщений и использованием механизма @RetryableTopic
для повторных попыток, также доступны в репозитории проекта, предоставляя возможности для изучения и адаптации под собственные нужды разработки.
Заключение
Успешное тестирование взаимодействия с Kafka требует внимательного подхода к изоляции тестов и контролю окружения. Использование Testcontainers и возможностей Spring Boot 3.1 значительно упрощает этот процесс, а применение предложенных методик и инструментов позволяет разработчикам сфокусироваться на логике приложения, делая разработку более эффективной и менее подверженной ошибкам.
Спасибо за внимание к статье, и удачи в вашем стремлении к написанию удобных и надежных тестов!