欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

docker安裝單機(jī)版kafka并使用的詳細(xì)步驟

 更新時(shí)間:2025年06月11日 10:07:18   作者:小咖張  
這篇文章主要為大家詳細(xì)介紹了docker安裝單機(jī)版kafka并使用的詳細(xì)步驟,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下

一、docker-compose.yml

version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"
    volumes:
      - ./zookeeper-data:/var/lib/zookeeper/data
      - ./zookeeper-log:/var/lib/zookeeper/log

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://自己的ip:9092
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

      #新版使用CFG

      KAFKA_CFG_PROCESS_ROLES: broker
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: PLAINTEXT
      KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://自己的ip:9092
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: 1
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_CFG_NUM_PARTITIONS: 1
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
    volumes:
      - ./kafka-data:/var/lib/kafka/data

#kafka可視化界面

kafka-manager:
     image: hlebalbau/kafka-manager:latest
     ports:
        - "9000:9000"
     environment:
        ZK_HOSTS: "zookeeper:2181"
        APPLICATION_SECRET: "random-secret-key"
        KAFKA_MANAGER_LOG_LEVEL: "INFO"
     depends_on:
        - zookeeper
        - kafka

二、創(chuàng)建權(quán)限

sudo chown -R 1000:1000 ./zookeeper-data
sudo chown -R 1000:1000 ./zookeeper-log
sudo chown -R 1000:1000 ./kafka-data

三、啟動(dòng)容器

docker-compose up -d

四、測(cè)試功能

1. 進(jìn)入Kafka容器:

docker exec -it kafka bash

2. 創(chuàng)建一個(gè)測(cè)試主題:

kafka-topics --create --topic order1-topic --bootstrap-server 上面配置的ip:9092 --replication-factor 1 --partitions 1

3: 啟動(dòng)一個(gè)生產(chǎn)者:

kafka-console-producer --topic order1-topic --bootstrap-server 上面配置的ip:9092

4. 在另一個(gè)終端窗口,啟動(dòng)一個(gè)消費(fèi)者:

docker exec -it kafka kafka-console-consumer --topic order1-topic --bootstrap-server  上面配置的ip:9092 --from-beginning

5:測(cè)試成功

生產(chǎn)者:

消費(fèi)者:

五、kafka 可視化界面使用

打開(kāi)界面進(jìn)行添加Cluster,添加成功如下圖所示:

點(diǎn)擊進(jìn)入可以進(jìn)行Topics和Brokers的管理

六、項(xiàng)目中進(jìn)行配置和使用

1:pom設(shè)置,因?yàn)槲液竺嬉肍link,所以kafka.就直接引用flink-connector-kafka

<!--實(shí)時(shí)訂單處理-->
<!-- Flink 核心依賴(lài) -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
</dependency>

<!-- Kafka Source Connector -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>${flink.version}</version>
</dependency>

<!-- Redis Sink 客戶(hù)端 -->
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.9.0</version>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

<!--實(shí)時(shí)訂單處理-->

application-dev.yml: 設(shè)置后,Consumer可以直接使用@KafkaListener監(jiān)聽(tīng)消息

spring:
  kafka:
    listener:
      missing-topics-fatal: false
    bootstrap-servers: 自己的ip:9092
    consumer:
      auto-offset-reset: earliest
      enable-auto-commit: false
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    properties:
      session.timeout.ms: 15000
      heartbeat.interval.ms: 5000
      max.poll.interval.ms: 300000
      metadata.max.age.ms: 3000
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

2:多 Topic + 多 Group 實(shí)現(xiàn)方式:

功能實(shí)現(xiàn)方式
多 Topic多個(gè) @KafkaListener 方法
多 Group每個(gè)監(jiān)聽(tīng)器指定不同的 groupId
負(fù)載均衡多實(shí)例使用相同 groupId
廣播模式多實(shí)例使用不同 groupId
動(dòng)態(tài)配置使用 application.yml + @Value 注入
自定義容器工廠使用 ConcurrentKafkaListenerContainerFactory

3:示例代碼

KafkaConfig:

package com.zbkj.front.config.kafka;
 
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
 
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
 
@Configuration
@EnableKafka
public class KafkaConfig {
 
    // ========== 公共方法 ==========
    public Map<String, Object> commonConsumerProps(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "自己的ip:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return props;
    }
 
    // ========== 消費(fèi)者組 1 - order-group ==========
    @Bean
    public ConsumerFactory<String, String> orderConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(commonConsumerProps("order-group"));
    }
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> orderKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(orderConsumerFactory());
        factory.setConcurrency(1); // 可根據(jù)分區(qū)數(shù)設(shè)置并發(fā)度
        return factory;
    }
 
    // ========== 消費(fèi)者組 2 - payment-group ==========
    @Bean
    public ConsumerFactory<String, String> paymentConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(commonConsumerProps("payment-group"));
    }
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> paymentKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(paymentConsumerFactory());
        factory.setConcurrency(1);
        return factory;
    }
 
    // ========== Kafka Producer Bean ==========
    @Bean
    public KafkaProducer<String, String> orderKafkaProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "自己的ip:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
 
        return new KafkaProducer<>(props);
    }
}

OrderConsumer

package com.zbkj.front.config.kafka;
 
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
 
@Service
@Slf4j
public class OrderConsumer {
 
    @KafkaListener(topics = "orderCreate-topic", containerFactory = "orderKafkaListenerContainerFactory")
    public void consumeOrder(ConsumerRecord<String, String> record) {
        log.info("[訂單服務(wù)] 收到消息 topic={}, offset={}, key={}, value={}",
                record.topic(), record.offset(), record.key(), record.value());
    }
}

PaymentConsumer

package com.zbkj.front.config.kafka;
 
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
 
@Service
@Slf4j
public class PaymentConsumer {
 
    @KafkaListener(topics = "orderPayment-topic", containerFactory = "paymentKafkaListenerContainerFactory")
    public void consumePayment(ConsumerRecord<String, String> record) {
        log.info("[支付服務(wù)] 收到消息 topic={}, offset={}, key={}, value={}",
                record.topic(), record.offset(), record.key(), record.value());
    }
}

testController

package com.zbkj.front.controller;
 
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zbkj.common.model.order.Order;
import com.zbkj.common.result.CommonResult;
import com.zbkj.front.event.OrderEvent;
import com.zbkj.front.event.OrderStatusEum;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
 
import java.math.BigDecimal;
import java.time.LocalDateTime;
 
@Slf4j
@RestController
@RequestMapping("api/front/test")
@Api(tags = "測(cè)試")
public class testController {
 
    private final ObjectMapper objectMapper = new ObjectMapper();
    @Autowired
    @Qualifier("orderKafkaProducer")
    private KafkaProducer<String, String> orderKafkaProducer;
 
 
    @ApiOperation(value = "測(cè)試")
    @RequestMapping(value = "/test/orderCreate", method = RequestMethod.GET)
    public CommonResult<String> orderTopic(@Validated String orderNo) {
        Order order= new Order();
        order.setOrderNo(orderNo);
        order.setPayPrice(new BigDecimal(100));
        // 添加到訂單創(chuàng)建的地方
        sendOrderCreatedEvent(order,"orderCreate-topic");
      return   CommonResult.success();
    }
 
    @ApiOperation(value = "測(cè)試")
    @RequestMapping(value = "/test/orderPayment", method = RequestMethod.GET)
    public CommonResult<String> orderPayment(@Validated String orderNo) {
        Order order= new Order();
        order.setOrderNo(orderNo);
        order.setPayPrice(new BigDecimal(100));
        // 添加到訂單創(chuàng)建的地方
        sendOrderCreatedEvent(order,"orderPayment-topic");
        return   CommonResult.success();
    }
 
    public void sendOrderCreatedEvent(Order order,String topic) {
        try {
            // 構(gòu)建訂單事件對(duì)象
            OrderEvent event = new OrderEvent(
                    String.valueOf(order.getOrderNo()),
                    order.getPayPrice(),
                    LocalDateTime.now().toString(),
                    OrderStatusEum.CREATED
            );
 
            // 序列化為 JSON
            String json = objectMapper.writeValueAsString(event);
 
            // 創(chuàng)建 Kafka 消息記錄(使用訂單號(hào)作為 key)
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, order.getOrderNo(), json);
 
            // 發(fā)送消息
            orderKafkaProducer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    log.error("Kafka消息發(fā)送失敗 topic={}, key={}, error={}",
                            record.topic(), record.key(), exception.getMessage());
                } else {
                    log.info("Kafka消息發(fā)送成功 topic={}, key={}, partition={}, offset={}",
                            metadata.topic(), record.key(), metadata.partition(), metadata.offset());
                }
            });
 
        } catch (Exception e) {
            log.error("發(fā)送Kafka訂單創(chuàng)建事件異常 orderNo={}", order.getOrderNo(), e);
        }
    }
 
}

執(zhí)行完后:

2025-05-28 15:56:53.768 [kafka-producer-network-thread | producer-1] INFO  com.zbkj.front.controller.testController - Kafka消息發(fā)送成功 topic=orderCreate-topic, key=PT672174822506216634598, partition=0, offset=0
2025-05-28 15:56:53.772 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  com.zbkj.front.config.kafka.OrderConsumer - [訂單服務(wù)] 收到消息 topic=orderCreate-topic, offset=0, key=PT672174822506216634598, value={"orderId":"PT672174822506216634598","merId":null,"amount":100,"timestamp":"2025-05-28T15:56:49.726","status":"CREATED"}

2025-05-28 15:57:22.687 [kafka-producer-network-thread | producer-1] INFO  com.zbkj.front.controller.testController - Kafka消息發(fā)送成功 topic=orderPayment-topic, key=SH377174799246359695171, partition=0, offset=0
2025-05-28 15:57:22.688 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO  com.zbkj.front.config.kafka.PaymentConsumer - [支付服務(wù)] 收到消息 topic=orderPayment-topic, offset=0, key=SH377174799246359695171, value={"orderId":"SH377174799246359695171","merId":null,"amount":100,"timestamp":"2025-05-28T15:57:20.754","status":"CREATED"}

以上就是docker安裝單機(jī)版kafka并使用的詳細(xì)步驟的詳細(xì)內(nèi)容,更多關(guān)于docker安裝kafka的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 不同系統(tǒng)下Docker?Desktop鏡像存儲(chǔ)路徑設(shè)置方法

    不同系統(tǒng)下Docker?Desktop鏡像存儲(chǔ)路徑設(shè)置方法

    這篇文章主要介紹了不同系統(tǒng)下Docker?Desktop鏡像存儲(chǔ)路徑設(shè)置方法的相關(guān)資料,不同操作系統(tǒng)下設(shè)置Docker鏡像存儲(chǔ)路徑的方法有所不同,分別適用于Windows、macOS和Linux系統(tǒng),需要的朋友可以參考下
    2025-04-04
  • 詳解利用nginx和docker實(shí)現(xiàn)一個(gè)簡(jiǎn)易的負(fù)載均衡

    詳解利用nginx和docker實(shí)現(xiàn)一個(gè)簡(jiǎn)易的負(fù)載均衡

    本篇文章主要介紹了利用nginx和docker實(shí)現(xiàn)一個(gè)簡(jiǎn)易的負(fù)載均衡 ,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-06-06
  • Docker 容器操作退出后進(jìn)入解決辦法

    Docker 容器操作退出后進(jìn)入解決辦法

    這篇文章主要介紹了Docker 容器操作退出后進(jìn)入解決辦法的相關(guān)資料,需要的朋友可以參考下
    2016-10-10
  • 通過(guò)Dockerfile構(gòu)建Docker鏡像的方法步驟

    通過(guò)Dockerfile構(gòu)建Docker鏡像的方法步驟

    這篇文章主要介紹了通過(guò)Dockerfile構(gòu)建Docker鏡像的方法步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2021-02-02
  • win10中docker部署和運(yùn)行countly-server的流程

    win10中docker部署和運(yùn)行countly-server的流程

    這篇文章主要記錄一下windows10中使用docker容器安裝和部署countly-server的整個(gè)流程,本文給大家講解的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友參考下吧
    2019-11-11
  • Docker中conda環(huán)境的導(dǎo)出和導(dǎo)入

    Docker中conda環(huán)境的導(dǎo)出和導(dǎo)入

    現(xiàn)在很多的應(yīng)用程序系統(tǒng)都會(huì)選擇使用docker容器進(jìn)行部署,本文主要介紹了Docker中conda環(huán)境的導(dǎo)出和導(dǎo)入,具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-02-02
  • Docker(黑馬spring?cloud筆記)詳解

    Docker(黑馬spring?cloud筆記)詳解

    這篇文章主要介紹了Docker(黑馬spring?cloud筆記)詳解,本文章內(nèi)容詳細(xì),具有很好的參考價(jià)值,希望對(duì)大家有所幫助,需要的朋友可以參考下<BR>
    2023-01-01
  • Docker快速部署國(guó)產(chǎn)達(dá)夢(mèng)數(shù)據(jù)庫(kù)的實(shí)現(xiàn)示例

    Docker快速部署國(guó)產(chǎn)達(dá)夢(mèng)數(shù)據(jù)庫(kù)的實(shí)現(xiàn)示例

    本文主要介紹了Docker快速部署國(guó)產(chǎn)達(dá)夢(mèng)數(shù)據(jù)庫(kù)的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-07-07
  • windows?10安裝和使用docker

    windows?10安裝和使用docker

    這篇文章介紹了windows?10安裝和使用docker的方法,文中通過(guò)示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-07-07
  • CentOS/RedHat 6.5 離線(xiàn)安裝Docker

    CentOS/RedHat 6.5 離線(xiàn)安裝Docker

    這篇文章主要介紹了CentOS/RedHat 6.5 離線(xiàn)安裝Docker的相關(guān)資料,需要的朋友可以參考下
    2017-01-01

最新評(píng)論