Изоляция в тестах с Kafka
Опыт запуска Kafka в тестовых сценариях достиг высокого уровня удобства благодаря использованию Testcontainers и улучшенной поддержке в Spring Boot 3.1 с помощью аннотации @ServiceConnection
. Однако написание и поддержка интеграционных тестов с Kafka по-прежнему представляют собой вызов. В этой статье описывается подход, который значительно упрощает процесс тестирования, обеспечивая изоляцию тестов и предоставляя набор инструментов для достижения этой цели. При успешной реализации изоляции, тесты с Kafka могут быть организованы так, что на этапе проверки результатов обеспечивается полный доступ ко всем сообщениям, возникшим в ходе теста, избегая при этом необходимости в использовании методов принудительного ожидания, таких как Thread.sleep()
.
Этот метод подходит как для использования с Testcontainers, так и для Embedded Kafka или других способов запуска сервиса Kafka (например, локального инстанса).
Изоляция в тестах
Как подробно описано в статье Eradicating Non-Determinism in Tests, для надежного выполнения тестов критически важен четкий контроль над тестовым окружением. Это гарантирует, что каждый тест начинается с заранее известного состояния. Примером может служить ситуация, когда один тест создает данные в базе данных и не очищает их после себя, что может негативно сказаться на выполнении последующих тестов, ожидающих другое состояние базы данных.
Для достижения изоляции тестов можно применять различные методы, включая:
- Восстановление исходного состояния системы перед запуском каждого теста.
- Обязательная очистка данных после выполнения каждого теста, чтобы избежать влияния на следующие тесты.
Изоляция в тестах с Kafka
Восстановление начального состояния среды Kafka с нуля для каждого тестового сценария может быть реализованно путем перезапуска Kafka. Этот вариант достаточно простой в плане реализации, но дорогой по времени запуска. Существуют методы для ускорения этого процесса (подробнее можно почитать в статье о запуске Kafka на GraalVM), но в данной статье предлагается рассмотреть вариант, когда в рамках всех тестовых сценариев мы работаем с общим экземпляром Kafka.
Такой подход ставит перед нами определенные вызовы: если один тест отправляет сообщения в топик и оставляет без внимания факт их получения и обработки, это может повредить выполнению другого теста, который может полагаться на иное состояние топика.
Для обеспечения изоляции необходимо, чтобы в случае отправки сообщения в топик, который прослушивается тем же приложением, до завершения тестового сценария были выполнены все процессы, инициированные этими сообщениями.
Реализация
Рассмотрим на примере условного телеграм-бота, который перенаправляет запросы к OpenAI API и отправляет ответы пользователям.
Контракты взаимодействия с сервисами описаны в упрощенном виде, чтобы подчеркнуть основную логику работы. Ниже приведена диаграмма последовательностей, демонстрирующая архитектуру приложения. Понимаю, что дизайн может вызвать вопросы с точки зрения системной архитектуры, но прошу отнестись к этому с пониманием — главная цель здесь продемонстрировать подход к изоляции в тестах.
Ниже представлена схема, иллюстрирующая подход к тестированию:
Основной особенностью предложенного решения является строгое разделение кода теста на фазы, соответствующие паттерну Arrange-Act-Assert. Подробнее о данном подходе вы можете прочитать в статье Разносим по полочкам этапы тестирования http запросов в Spring.
Для достижения изоляции критически важно соблюдать следующие взаимосвязи между ключевыми элементами схемы (цифры соответствуют указанным на схеме):
- (1) Подготовка сценария (Arrange) происходит до Выполнения сценария (Act).
- (2) Сообщение отправлено и (3) подтверждено лидером партиции до того как обработка запроса сервисом ServiceA считается завершенной.
- (4) Ручное управление смещением (offset) c фиксацией (commit) происходит только после того, как вся обработка сервисами ServiceB или ServiceA будет полностью завершена.
- (5) Выполнение сценария (Act) происходит до Проверки результата (Assert).
Такой подход позволяет к моменту проверки результатов гарантировать выполнение всех процессов в рамках тестового сценария, отправку и прием всех сообщений, обеспечивая нахождение тестового окружения в известном и финальном состоянии.
Подготовка сценария (Arrange) происходит до Выполнения сценария (Act)
Цель этого этапа — подготовить все необходимое для тестового сценария. В контексте нашего примера основные компоненты тестовой среды включают контекст приложения, моки HTTP-запросов, Kafka и Record Captor.
Что касается интеграции с Kafka, критически важно убедиться, что все консумеры готовы к приему сообщений. Данная проверка реализована в методе KafkaSupport#waitForPartitionAssignment. Решение основано на оригинальном ContainerTestUtils
из библиотеки org.springframework.kafka:spring-kafka-test
с доработкой согласно описываемому сценарию использования. Этот метод гарантирует, что каждому консумеру Kafka будет назначена хотя бы одна партиция. Это предполагает ограничение: в тестовой среде должна быть только одна партиция на топик, хотя данное ограничение является следствием текущей реализации метода и может быть изменено.
Использование общего экземпляра Kafka требует настройки параметра auto.offset.reset = latest
для консумеров, для Spring приложения это делается следующим образом:
spring.kafka.consumer.auto-offset-reset=latest
Record Captor является ключевым элементом этого решения. Его задача - “ловить” сообщения из указанных в конфигурации топиков и предоставить к ним доступ для шага проверки результатов тестового сценария. Технически это простой консумер для Kafka топика с механизмом хранения сообщений и интерфейсом доступа к ним. Код Record Captor доступен в репозитории проекта.
Текущая реализация Record Captor предлагает использование ключа сообщения для идентификации сообщений, связанных с конкретным тестовым случаем. Это будет полезно в системах, где присутствуют уникальные идентификаторы, такие как ID клиента или идентификатор процесса в доменной модели. Использование таких идентификаторов в качестве ключа позволяет эффективно группировать и отслеживать все сообщения, относящиеся к одному и тому же тестовому сценарию, даже если они распределены по различным топикам или обрабатываются разными компонентами системы.
Синхронная отправка сообщения с подтверждением
Цель состоит в реализации синхронной отправки сообщений в Kafka с получением подтверждения от лидера партиции. Для достижения этого необходимо установить параметр acks = 1
для продюсера. В контексте Spring приложения это настройка задается следующим образом:
spring.kafka.producer.acks=1
При использовании KafkaTemplate
для отправки сообщений важно обеспечить синхронность отправки, поскольку данный компонент предоставляет только асинхронный интерфейс org.springframework.kafka.core.KafkaTemplate#send(org.springframework.messaging.Message<?>) return CompletableFuture<SendResult<K, V>>
. Для синхронной отправки можно использовать следующий подход:
Это гарантирует, что отправка сообщения будет завершена синхронно, с ожиданием подтверждения от Kafka перед продолжением выполнения программы.
Ручное управление смещением
Ручное управление смещением c фиксацией означает, что консумер сообщения будет фиксировать обработку сообщений только после их полной обработки. В данном контексте фиксация смещения для топика topicA
произойдет только после того, как сообщение будет успешно отправлено в topicB
и получено соответствующее подтверждение.
Для реализации такой логики, необходимо отключить автоматическую фиксацию смещений для консумеров, установив параметр enable.auto.commit = false
. В контексте Spring-приложения это настраивается следующим образом:
Кроме того, следует настроить механизм фиксации так, чтобы смещение фиксировалось после обработки каждого отдельного сообщения, что достигается установкой параметра
Выполнение сценария (Act) происходит до Проверки результата (Assert)
До начала этапа проверки результатов необходимо убедиться, что все процессы, связанные со сценарием, завершены, все сообщения отправлены и получены, обеспечивая переход тестового окружения в “известное” финальное состояние. Благодаря предшествующим этапам, мы обеспечили соблюдение принципа happens-before между действиями продюсеров и консумеров, а также между всеми обработками внутри приложения. На этом этапе требуется выполнить проверку фиксации смещений для каждой партиции для каждой группы консумеров.
Для выполнения данной проверки можно воспользоваться решением, представленном в методе pw.avvero.emk.KafkaSupport#waitForPartitionOffsetCommit.
Проверки результата (Assert)
На заключительном этапе происходит анализ результатов тестового сценария. Этот процесс включает проверку состояния моков и анализ сообщений, пойманных в RecordCaptor.
Основные элементы решения вкратце
Вот краткое изложение ключевых компонентов предложенного решения для обеспечения эффективного тестирования с Kafka:
- Одна партиция на топик.
- Политика начала чтения сообщений для консумеров -
spring.kafka.consumer.auto-offset-reset=latest
. - Политика подтверждений для продюсеров
spring.kafka.producer.acks=1
. - Синхронная отправка сообщений
kafkaTemplate.send(message).get()
. - Ручной контроль за смещением
spring.kafka.consumer.enable-auto-commit=false
,spring.kafka.listener.ack-mode=record
. - Ожидание назначения партиций до начала тестового сценария
pw.avvero.emk.KafkaSupport#waitForPartitionAssignment
. - Ожидание подтверждения смещения перед проверкой результатов теста
pw.avvero.emk.KafkaSupport#waitForPartitionOffsetCommit
.
Результат
Код приложения доступен в модуле примеров. Код теста выглядит следующим образом.
Вот ключевые шаги, описанные в тестовом сценарии:
- Ожидание назначения партиций до начала тестового сценария
- Мокирование запросов к OpenAI
- Мокирование запросов к Telegram
- Выполнение тестового сценария
- Ожидание подтверждения смещения
- Проверки результата
Дополнительные тесты, включая сценарии с интенсивной отправкой сообщений и использованием механизма @RetryableTopic
для повторных попыток, также доступны в репозитории проекта, предоставляя возможности для изучения и адаптации под собственные нужды разработки.
Заключение
Успешное тестирование взаимодействия с Kafka требует внимательного подхода к изоляции тестов и контролю окружения. Использование Testcontainers и возможностей Spring Boot 3.1 значительно упрощает этот процесс, а применение предложенных методик и инструментов позволяет разработчикам сфокусироваться на логике приложения, делая разработку более эффективной и менее подверженной ошибкам.
Спасибо за внимание к статье, и удачи в вашем стремлении к написанию удобных и надежных тестов!