Embedded kafka container

File EmbeddedKafkaApplication

package pw.avvero.embeddedkafka;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
 
@Slf4j
@SpringBootApplication
public class EmbeddedKafkaApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(EmbeddedKafkaApplication.class, args);
    }
 
    public static final int KAFKA_PORT = 9093;
    public static final int ZK_PORT = 2181;
 
    public static class It {
 
    }
 
    @Bean
    public It embeddedKafkaBroker(@Value("${app.kafka.advertised.listeners}") String advertisedListeners) {
        long start = System.currentTimeMillis();
        log.info("[KT] Kafka from testcontainers is going to start");
        String[] topics = new String[]{"topic1"};
 
        EmbeddedKafkaBroker broker = new EmbeddedKafkaBroker(1, true, 1, topics)
                .zkPort(ZK_PORT)
                .kafkaPorts(KAFKA_PORT)
                .brokerProperty("listeners", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092")
                .brokerProperty("listener.security.protocol.map", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
                .brokerProperty("inter.broker.listener.name", "BROKER")
                .brokerProperty("advertised.listeners", advertisedListeners)
                .zkConnectionTimeout(EmbeddedKafkaBroker.DEFAULT_ZK_CONNECTION_TIMEOUT)
                .zkSessionTimeout(EmbeddedKafkaBroker.DEFAULT_ZK_SESSION_TIMEOUT);
 
        broker.afterPropertiesSet();
//        System.setProperty("spring.kafka.bootstrap-servers", broker.getBrokersAsString());
        long finish = System.currentTimeMillis() - start;
        log.info("[KT] Kafka from testcontainers is started on: {} (zookeeper: {}, advertised.listeners: {}) in {} millis",
                broker.getBrokersAsString(), broker.getZookeeperConnectionString(), advertisedListeners, finish);
        return new It();
    }
 
}
app.kafka.advertised.listeners=PLAINTEXT://localhost:9093,BROKER://localhost:9092

Dockerfile

FROM openjdk:17
 
COPY build/install/embedded-kafka-boot embedded-kafka-boot
 
RUN ls -al
 
EXPOSE 55900
 
ENTRYPOINT ["./embedded-kafka-boot/bin/embedded-kafka"]

KafkaEmbeddedContainer

package com.fxclub.test.spock;
 
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;
 
/**
 * This container wraps Confluent Kafka and Zookeeper (optionally)
 */
public class KafkaEmbeddedContainer extends GenericContainer<KafkaEmbeddedContainer> {
 
    public static final int KAFKA_PORT = 9093;
    public static final int ZOOKEEPER_PORT = 2181;
 
    public KafkaEmbeddedContainer() {
        super(DockerImageName.parse("embedded-kafka_emk:latest"));
        addFixedExposedPort(KAFKA_PORT, KAFKA_PORT);
        addFixedExposedPort(ZOOKEEPER_PORT, ZOOKEEPER_PORT);
        String host = getNetwork() != null ? getNetworkAliases().get(0) : "localhost";
        withEnv("app.kafka.advertised.listeners", "PLAINTEXT://" + host + ":" + KAFKA_PORT + ",BROKER://localhost:9092");
    }
 
    public String getBootstrapServers() {
        return String.format("PLAINTEXT://%s:%s", getHost(), getMappedPort(KAFKA_PORT));
    }
}

test-containerskafkatest