docker安裝單機版kafka并使用的詳細步驟
一、docker-compose.yml
version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:latest container_name: zookeeper environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - "2181:2181" volumes: - ./zookeeper-data:/var/lib/zookeeper/data - ./zookeeper-log:/var/lib/zookeeper/log kafka: image: confluentinc/cp-kafka:latest container_name: kafka depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://自己的ip:9092 KAFKA_LISTENERS: PLAINTEXT://:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 #新版使用CFG KAFKA_CFG_PROCESS_ROLES: broker KAFKA_CFG_CONTROLLER_LISTENER_NAMES: PLAINTEXT KAFKA_CFG_LISTENERS: PLAINTEXT://:9092 KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://自己的ip:9092 KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: 1 KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_CFG_NUM_PARTITIONS: 1 KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true" volumes: - ./kafka-data:/var/lib/kafka/data #kafka可視化界面 kafka-manager: image: hlebalbau/kafka-manager:latest ports: - "9000:9000" environment: ZK_HOSTS: "zookeeper:2181" APPLICATION_SECRET: "random-secret-key" KAFKA_MANAGER_LOG_LEVEL: "INFO" depends_on: - zookeeper - kafka
二、創(chuàng)建權(quán)限
sudo chown -R 1000:1000 ./zookeeper-data sudo chown -R 1000:1000 ./zookeeper-log sudo chown -R 1000:1000 ./kafka-data
三、啟動容器
docker-compose up -d
四、測試功能
1. 進入Kafka容器:
docker exec -it kafka bash
2. 創(chuàng)建一個測試主題:
kafka-topics --create --topic order1-topic --bootstrap-server 上面配置的ip:9092 --replication-factor 1 --partitions 1
3: 啟動一個生產(chǎn)者:
kafka-console-producer --topic order1-topic --bootstrap-server 上面配置的ip:9092
4. 在另一個終端窗口,啟動一個消費者:
docker exec -it kafka kafka-console-consumer --topic order1-topic --bootstrap-server 上面配置的ip:9092 --from-beginning
5:測試成功
生產(chǎn)者:
消費者:
五、kafka 可視化界面使用
打開界面進行添加Cluster,添加成功如下圖所示:
點擊進入可以進行Topics和Brokers的管理
六、項目中進行配置和使用
1:pom設(shè)置,因為我后面要用Flink,所以kafka.就直接引用flink-connector-kafka
<!--實時訂單處理--> <!-- Flink 核心依賴 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <!-- Kafka Source Connector --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency> <!-- Redis Sink 客戶端 --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.9.0</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!--實時訂單處理-->
application-dev.yml: 設(shè)置后,Consumer可以直接使用@KafkaListener監(jiān)聽消息
spring: kafka: listener: missing-topics-fatal: false bootstrap-servers: 自己的ip:9092 consumer: auto-offset-reset: earliest enable-auto-commit: false key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: session.timeout.ms: 15000 heartbeat.interval.ms: 5000 max.poll.interval.ms: 300000 metadata.max.age.ms: 3000 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
2:多 Topic + 多 Group 實現(xiàn)方式:
功能 | 實現(xiàn)方式 |
多 Topic | 多個 @KafkaListener 方法 |
多 Group | 每個監(jiān)聽器指定不同的 groupId |
負載均衡 | 多實例使用相同 groupId |
廣播模式 | 多實例使用不同 groupId |
動態(tài)配置 | 使用 application.yml + @Value 注入 |
自定義容器工廠 | 使用 ConcurrentKafkaListenerContainerFactory |
3:示例代碼
KafkaConfig:
package com.zbkj.front.config.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*; import java.util.HashMap; import java.util.Map; import java.util.Properties; @Configuration @EnableKafka public class KafkaConfig { // ========== 公共方法 ========== public Map<String, Object> commonConsumerProps(String groupId) { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "自己的ip:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); return props; } // ========== 消費者組 1 - order-group ========== @Bean public ConsumerFactory<String, String> orderConsumerFactory() { return new DefaultKafkaConsumerFactory<>(commonConsumerProps("order-group")); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> orderKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(orderConsumerFactory()); factory.setConcurrency(1); // 可根據(jù)分區(qū)數(shù)設(shè)置并發(fā)度 return factory; } // ========== 消費者組 2 - payment-group ========== @Bean public ConsumerFactory<String, String> paymentConsumerFactory() { return new DefaultKafkaConsumerFactory<>(commonConsumerProps("payment-group")); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> paymentKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(paymentConsumerFactory()); factory.setConcurrency(1); return factory; } // ========== Kafka Producer Bean ========== @Bean public KafkaProducer<String, String> orderKafkaProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "自己的ip:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); return new KafkaProducer<>(props); } }
OrderConsumer
package com.zbkj.front.config.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service @Slf4j public class OrderConsumer { @KafkaListener(topics = "orderCreate-topic", containerFactory = "orderKafkaListenerContainerFactory") public void consumeOrder(ConsumerRecord<String, String> record) { log.info("[訂單服務(wù)] 收到消息 topic={}, offset={}, key={}, value={}", record.topic(), record.offset(), record.key(), record.value()); } }
PaymentConsumer
package com.zbkj.front.config.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service @Slf4j public class PaymentConsumer { @KafkaListener(topics = "orderPayment-topic", containerFactory = "paymentKafkaListenerContainerFactory") public void consumePayment(ConsumerRecord<String, String> record) { log.info("[支付服務(wù)] 收到消息 topic={}, offset={}, key={}, value={}", record.topic(), record.offset(), record.key(), record.value()); } }
testController
package com.zbkj.front.controller; import com.fasterxml.jackson.databind.ObjectMapper; import com.zbkj.common.model.order.Order; import com.zbkj.common.result.CommonResult; import com.zbkj.front.event.OrderEvent; import com.zbkj.front.event.OrderStatusEum; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import java.math.BigDecimal; import java.time.LocalDateTime; @Slf4j @RestController @RequestMapping("api/front/test") @Api(tags = "測試") public class testController { private final ObjectMapper objectMapper = new ObjectMapper(); @Autowired @Qualifier("orderKafkaProducer") private KafkaProducer<String, String> orderKafkaProducer; @ApiOperation(value = "測試") @RequestMapping(value = "/test/orderCreate", method = RequestMethod.GET) public CommonResult<String> orderTopic(@Validated String orderNo) { Order order= new Order(); order.setOrderNo(orderNo); order.setPayPrice(new BigDecimal(100)); // 添加到訂單創(chuàng)建的地方 sendOrderCreatedEvent(order,"orderCreate-topic"); return CommonResult.success(); } @ApiOperation(value = "測試") @RequestMapping(value = "/test/orderPayment", method = RequestMethod.GET) public CommonResult<String> orderPayment(@Validated String orderNo) { Order order= new Order(); order.setOrderNo(orderNo); order.setPayPrice(new BigDecimal(100)); // 添加到訂單創(chuàng)建的地方 sendOrderCreatedEvent(order,"orderPayment-topic"); return CommonResult.success(); } public void sendOrderCreatedEvent(Order order,String topic) { try { // 構(gòu)建訂單事件對象 OrderEvent event = new OrderEvent( String.valueOf(order.getOrderNo()), order.getPayPrice(), LocalDateTime.now().toString(), OrderStatusEum.CREATED ); // 序列化為 JSON String json = objectMapper.writeValueAsString(event); // 創(chuàng)建 Kafka 消息記錄(使用訂單號作為 key) ProducerRecord<String, String> record = new ProducerRecord<>(topic, order.getOrderNo(), json); // 發(fā)送消息 orderKafkaProducer.send(record, (metadata, exception) -> { if (exception != null) { log.error("Kafka消息發(fā)送失敗 topic={}, key={}, error={}", record.topic(), record.key(), exception.getMessage()); } else { log.info("Kafka消息發(fā)送成功 topic={}, key={}, partition={}, offset={}", metadata.topic(), record.key(), metadata.partition(), metadata.offset()); } }); } catch (Exception e) { log.error("發(fā)送Kafka訂單創(chuàng)建事件異常 orderNo={}", order.getOrderNo(), e); } } }
執(zhí)行完后:
2025-05-28 15:56:53.768 [kafka-producer-network-thread | producer-1] INFO com.zbkj.front.controller.testController - Kafka消息發(fā)送成功 topic=orderCreate-topic, key=PT672174822506216634598, partition=0, offset=0
2025-05-28 15:56:53.772 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO com.zbkj.front.config.kafka.OrderConsumer - [訂單服務(wù)] 收到消息 topic=orderCreate-topic, offset=0, key=PT672174822506216634598, value={"orderId":"PT672174822506216634598","merId":null,"amount":100,"timestamp":"2025-05-28T15:56:49.726","status":"CREATED"}
2025-05-28 15:57:22.687 [kafka-producer-network-thread | producer-1] INFO com.zbkj.front.controller.testController - Kafka消息發(fā)送成功 topic=orderPayment-topic, key=SH377174799246359695171, partition=0, offset=0
2025-05-28 15:57:22.688 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO com.zbkj.front.config.kafka.PaymentConsumer - [支付服務(wù)] 收到消息 topic=orderPayment-topic, offset=0, key=SH377174799246359695171, value={"orderId":"SH377174799246359695171","merId":null,"amount":100,"timestamp":"2025-05-28T15:57:20.754","status":"CREATED"}
以上就是docker安裝單機版kafka并使用的詳細步驟的詳細內(nèi)容,更多關(guān)于docker安裝kafka的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
不同系統(tǒng)下Docker?Desktop鏡像存儲路徑設(shè)置方法
這篇文章主要介紹了不同系統(tǒng)下Docker?Desktop鏡像存儲路徑設(shè)置方法的相關(guān)資料,不同操作系統(tǒng)下設(shè)置Docker鏡像存儲路徑的方法有所不同,分別適用于Windows、macOS和Linux系統(tǒng),需要的朋友可以參考下2025-04-04詳解利用nginx和docker實現(xiàn)一個簡易的負載均衡
本篇文章主要介紹了利用nginx和docker實現(xiàn)一個簡易的負載均衡 ,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-06-06通過Dockerfile構(gòu)建Docker鏡像的方法步驟
這篇文章主要介紹了通過Dockerfile構(gòu)建Docker鏡像的方法步驟,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-02-02win10中docker部署和運行countly-server的流程
這篇文章主要記錄一下windows10中使用docker容器安裝和部署countly-server的整個流程,本文給大家講解的非常詳細,具有一定的參考借鑒價值,需要的朋友參考下吧2019-11-11Docker中conda環(huán)境的導(dǎo)出和導(dǎo)入
現(xiàn)在很多的應(yīng)用程序系統(tǒng)都會選擇使用docker容器進行部署,本文主要介紹了Docker中conda環(huán)境的導(dǎo)出和導(dǎo)入,具有一定的參考價值,感興趣的可以了解一下2024-02-02Docker快速部署國產(chǎn)達夢數(shù)據(jù)庫的實現(xiàn)示例
本文主要介紹了Docker快速部署國產(chǎn)達夢數(shù)據(jù)庫的實現(xiàn)示例,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07