欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Springboot 2.x集成kafka 2.2.0的示例代碼

 更新時(shí)間:2022年04月28日 10:22:08   作者:g-Jack  
kafka近幾年更新非常快,也可以看出kafka在企業(yè)中是用的頻率越來越高。本文主要為大家介紹了Springboot 2.x集成kafka 2.2.0的示例代碼,需要的可以參考一下

引言

kafka近幾年更新非???,也可以看出kafka在企業(yè)中是用的頻率越來越高,在springboot中集成kafka還是比較簡單的,但是應(yīng)該注意使用的版本和kafka中基本配置,這個(gè)地方需要信心,防止進(jìn)入坑中。

版本對應(yīng)地址:https://spring.io/projects/spring-kafka

基本環(huán)境

springboot版本2.1.4

kafka版本2.2.0

jdk 1.8

代碼編寫

1、基本引用pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.4.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.example</groupId>
	<artifactId>demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>kafkademo</name>
	<description>Demo project for Spring Boot</description>
 
	<properties>
		<java.version>1.8</java.version>
	</properties>
 
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
 
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
 
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
			<version>2.2.0.RELEASE</version>
		</dependency>
 
		<dependency>
			<groupId>com.google.code.gson</groupId>
			<artifactId>gson</artifactId>
			<version>2.7</version>
		</dependency>
 
	</dependencies>
 
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
 
</project>

2、基本配置

spring.kafka.bootstrap-servers=2.1.1.1:9092
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
 
#logging.level.root=debug

3、實(shí)體類

package com.example.demo.model;
 
import java.util.Date;
 
public class Messages {
 
    private Long id;
 
    private String msg;
 
    private Date sendTime;
 
    public Long getId() {
        return id;
    }
 
    public void setId(Long id) {
        this.id = id;
    }
 
    public String getMsg() {
        return msg;
    }
 
    public void setMsg(String msg) {
        this.msg = msg;
    }
 
    public Date getSendTime() {
        return sendTime;
    }
 
    public void setSendTime(Date sendTime) {
        this.sendTime = sendTime;
    }
}

4、生產(chǎn)者端

package com.example.demo.service;
 
import com.example.demo.model.Messages;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
 
import java.util.Date;
import java.util.UUID;
 
@Service
public class KafkaSender {
 
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    private Gson gson = new GsonBuilder().create();
 
    public void send() {
        Messages message = new Messages();
        message.setId(System.currentTimeMillis());
        message.setMsg("123");
        message.setSendTime(new Date());
        ListenableFuture<SendResult<String, String>> test0 = kafkaTemplate.send("newtopic", gson.toJson(message));
    }
}

5、消費(fèi)者

package com.example.demo.service;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
 
import java.util.Optional;
 
@Service
public class KafkaReceiver {
 
    @KafkaListener(topics = {"newtopic"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("record =" + record);
            System.out.println("message =" + message);
 
        }
    }
    
}

6、測試

在啟動方法中模擬消息生產(chǎn)者,向kafka中發(fā)送消息

package com.example.demo;
 
import com.example.demo.service.KafkaSender;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
 
@SpringBootApplication
public class KafkademoApplication {
 
	public static void main(String[] args) {
		ConfigurableApplicationContext context = SpringApplication.run(KafkademoApplication.class, args);
 
		KafkaSender sender = context.getBean(KafkaSender.class);
		for (int i = 0; i <1000; i++) {
			sender.send();
 
			try {
				Thread.sleep(300);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
 
 
	}
 
}

效果展示

命令行直接消費(fèi)消息

遇到的問題

生產(chǎn)端連接kafka超時(shí)

at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119)

解決方案:

修改kafka中的server.properties中的下面配置,將原來的默認(rèn)配置替換成下面ip+端口的形式,重啟kafka

到此這篇關(guān)于Springboot 2.x集成kafka 2.2.0的示例代碼的文章就介紹到這了,更多相關(guān)Springboot集成kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Spring6整合JUnit的詳細(xì)步驟

    Spring6整合JUnit的詳細(xì)步驟

    這篇文章主要介紹了Spring6整合JUnit的詳細(xì)步驟,本文結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-05-05
  • log4j如何根據(jù)變量動態(tài)生成文件名

    log4j如何根據(jù)變量動態(tài)生成文件名

    這篇文章主要介紹了log4j如何根據(jù)變量動態(tài)生成文件名方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • SpringMVC+ZTree實(shí)現(xiàn)樹形菜單權(quán)限配置的方法

    SpringMVC+ZTree實(shí)現(xiàn)樹形菜單權(quán)限配置的方法

    本篇文章主要介紹了SpringMVC+ZTree實(shí)現(xiàn)樹形菜單權(quán)限配置的方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2017-12-12
  • Java實(shí)現(xiàn)24點(diǎn)小游戲

    Java實(shí)現(xiàn)24點(diǎn)小游戲

    這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)24點(diǎn)小游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-01-01
  • 9個(gè)小技巧讓你的Java if else看起來更優(yōu)雅

    9個(gè)小技巧讓你的Java if else看起來更優(yōu)雅

    這篇文章主要給大家介紹了9個(gè)小技巧,通過這幾個(gè)小技巧可以讓你的Java if else看起來更優(yōu)雅,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-11-11
  • Spring實(shí)戰(zhàn)之使用ClassPathResource加載xml資源示例

    Spring實(shí)戰(zhàn)之使用ClassPathResource加載xml資源示例

    這篇文章主要介紹了Spring實(shí)戰(zhàn)之使用ClassPathResource加載xml資源,結(jié)合實(shí)例形式分析了Spring使用ClassPathResource加載xml資源的具體實(shí)現(xiàn)步驟與相關(guān)操作技巧,需要的朋友可以參考下
    2019-12-12
  • SpringBoot實(shí)現(xiàn)緩存預(yù)熱的幾種常用方案

    SpringBoot實(shí)現(xiàn)緩存預(yù)熱的幾種常用方案

    緩存預(yù)熱是指在 Spring Boot 項(xiàng)目啟動時(shí),預(yù)先將數(shù)據(jù)加載到緩存系統(tǒng)(如 Redis)中的一種機(jī)制,本文給大家介紹了SpringBoot實(shí)現(xiàn)緩存預(yù)熱的幾種常用方案,并通過代碼示例講解的非常詳細(xì),需要的朋友可以參考下
    2024-02-02
  • MyBatis 動態(tài)SQL之where標(biāo)簽的使用

    MyBatis 動態(tài)SQL之where標(biāo)簽的使用

    本文主要介紹了MyBatis 動態(tài)SQL之where標(biāo)簽,where 標(biāo)簽主要用來簡化 SQL 語句中的條件判斷,可以自動處理 AND/OR 條件,下面就來具體介紹一下
    2024-01-01
  • MyBatis-Plus通用枚舉自動關(guān)聯(lián)注入的實(shí)現(xiàn)

    MyBatis-Plus通用枚舉自動關(guān)聯(lián)注入的實(shí)現(xiàn)

    本文主要介紹了MyBatis-Plus通用枚舉自動關(guān)聯(lián)注入的實(shí)現(xiàn),解決了繁瑣的配置,讓 mybatis 優(yōu)雅的使用枚舉屬性,感興趣的可以一起來了解一下
    2021-06-06
  • spring boot中多線程開發(fā)的注意事項(xiàng)總結(jié)

    spring boot中多線程開發(fā)的注意事項(xiàng)總結(jié)

    spring boot 通過任務(wù)執(zhí)行器 taskexecutor 來實(shí)現(xiàn)多線程和并發(fā)編程。下面這篇文章主要給大家介紹了關(guān)于spring boot中多線程開發(fā)的注意事項(xiàng),文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2018-09-09

最新評論