欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

云服務(wù)器(Linux)安裝部署Kafka的詳細(xì)過(guò)程

 更新時(shí)間:2022年11月15日 08:52:46   作者:shadc  
這篇文章主要介紹了云服務(wù)器(Linux)安裝部署Kafka的詳細(xì)過(guò)程,kafka的安裝需要依賴于jdk,需要在服務(wù)器上提前安裝好該環(huán)境,這里使用用jdk1.8,本文給大家詳細(xì)介紹感興趣的朋友跟隨小編一起看看吧

云服務(wù)器(Linux)安裝部署Kafka

前期準(zhǔn)備

kafka的安裝需要依賴于jdk,需要在服務(wù)器上提前安裝好該環(huán)境,這里使用用jdk1.8。

下載安裝包

官網(wǎng)地址:

較新的版本已自帶Zookeeper,無(wú)需額外下載。這里使用3.2.0做演示。

注意要下載Binary downloads標(biāo)簽下的tgz包,Source download標(biāo)簽下的包為源碼。無(wú)法直接運(yùn)行,需要編譯。

上載安裝包到云服務(wù)器

使用ssh連接工具將kafka_2.12-3.2.0.tgz這個(gè)包上傳到云服務(wù)器上的一個(gè)目錄。

打開(kāi)命令行,進(jìn)入到放有壓縮包的目錄,執(zhí)行

tar -zxvf kafka_2.12-3.2.0.tgz

配置kafka

然后使用cd命令進(jìn)入到/kafka_2.12-3.2.0/config/下,使用

vi server.properties

編輯配置文件。

刪除listeners和advertised前方的#號(hào),改成如下配置:

listeners=PLAINTEXT://云服務(wù)器內(nèi)網(wǎng)ip:9092(本地訪問(wèn)用本地ip)
# 如果要提供外網(wǎng)訪問(wèn)則必須配置此項(xiàng)
advertised.listeners=PLAINTEXT://云服務(wù)器公網(wǎng)ip:9092(若要遠(yuǎn)程訪問(wèn)需配置此項(xiàng)為云服務(wù)器的公網(wǎng)ip)
# zookeeper連接地址,集群配置格式為ip:port,ip:port,ip:port
zookeeper.connect=云服務(wù)器公網(wǎng)ip:2181

開(kāi)放云服務(wù)器端口

在云服務(wù)器控制臺(tái)內(nèi)進(jìn)入安全組頁(yè)面,添加兩條新的入站規(guī)則,tcp/9092和tcp/2181

開(kāi)放linux防火墻端口

先查看使用的防火墻類(lèi)型iptables/firewalld

iptables操作命令

1.打開(kāi)/關(guān)閉/重啟防火墻

開(kāi)啟防火墻(重啟后永久生效):chkconfig iptables on

關(guān)閉防火墻(重啟后永久生效):chkconfig iptables off

開(kāi)啟防火墻(即時(shí)生效,重啟后失效):service iptables start

關(guān)閉防火墻(即時(shí)生效,重啟后失效):service iptables stop

重啟防火墻:service iptables restartd

2.查看打開(kāi)的端口

/etc/init.d/iptables status
3.開(kāi)啟端口

iptables -A INPUT -p tcp --dport 8080 -j ACCEPT
4.保存并重啟防火墻
/etc/rc.d/init.d/iptables save
/etc/init.d/iptables restart

Centos7默認(rèn)安裝了firewalld,如果沒(méi)有安裝的話,可以使用 yum install firewalld firewalld-config進(jìn)行安裝。

操作指令如下:

1.啟動(dòng)防火墻

systemctl start firewalld
2.禁用防火墻

systemctl stop firewalld
3.設(shè)置開(kāi)機(jī)啟動(dòng)

systemctl enable firewalld
4.停止并禁用開(kāi)機(jī)啟動(dòng)

sytemctl disable firewalld
5.重啟防火墻

firewall-cmd --reload

6.查看狀態(tài)

systemctl status firewalld或者 firewall-cmd --state
7.在指定區(qū)域打開(kāi)端口(記得重啟防火墻)

firewall-cmd --zone=public --add-port=80/tcp(永久生效再加上 --permanent)

打開(kāi)tcp/9092和tcp/2181這兩個(gè)端口后,重啟防火墻,并查看開(kāi)放的端口確實(shí)生效。

啟動(dòng)kafka服務(wù)

cd命令進(jìn)入kafka_2.12-3.2.0目錄下,執(zhí)行

bin/zookeeper-server-start.sh config/zookeeper.properties

啟動(dòng)zookeeper,不加-daemon方便排除啟動(dòng)錯(cuò)誤,新建一個(gè)shell窗口,進(jìn)入該目錄再執(zhí)行

bin/kafka-server-start.sh config/server.properties

啟動(dòng)kafka,若打印日志未報(bào)錯(cuò),若未出現(xiàn)error日志,說(shuō)明啟動(dòng)成功。

測(cè)試單機(jī)連通性

查詢kafka下所有的topic
bin/kafka-topics.sh --list --zookeeper ip:port
因?yàn)閗afka使用zookeeper作為配置中心,一些topic信息需要查詢?cè)搆afka對(duì)應(yīng)的zookeeper
創(chuàng)建topic
bin/kafka-topics.sh --create --zookeeper ip:port --replication-factor 1 --partitions 1 --topic test
開(kāi)啟生產(chǎn)者
bin/kafka-console-producer.sh --broker-list cos100:9092 --topic test
開(kāi)啟消費(fèi)者
bin/kafka-console-consumer.sh --bootstrap-server cos100:9092 --topic test

Springboot連接kafak

在pom.xml文件中引入kafka依賴

<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.2.0</version>
        </dependency>

在application.yml配置文件中配置kafka

server:
  port: 8080

spring:
  kafka:
    bootstrap-servers: 云服務(wù)器外網(wǎng)ip地址:9092
    producer: # 生產(chǎn)者
      retries: 3 # 設(shè)置大于0的值,則客戶端會(huì)將發(fā)送失敗的記錄重新發(fā)送
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      # 指定消息key和消息體的編解碼方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 當(dāng)每一條記錄被消費(fèi)者監(jiān)聽(tīng)器(ListenerConsumer)處理之后提交
      # RECORD
      # 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽(tīng)器(ListenerConsumer)處理之后提交
      # BATCH
      # 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽(tīng)器(ListenerConsumer)處理之后,距離上次提交時(shí)間大于TIME時(shí)提交
      # TIME
      # 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽(tīng)器(ListenerConsumer)處理之后,被處理record數(shù)量大于等于COUNT時(shí)提交
      # COUNT
      # TIME | COUNT 有一個(gè)條件滿足時(shí)提交
      # COUNT_TIME
      # 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽(tīng)器(ListenerConsumer)處理之后, 手動(dòng)調(diào)用Acknowledgment.acknowledge()后提交
      # MANUAL
      # 手動(dòng)調(diào)用Acknowledgment.acknowledge()后立即提交,一般使用這種
      # MANUAL_IMMEDIATE
      ack-mode: manual_immediate

生產(chǎn)者

@RestController
public class KafkaController {
    private final static String TOPIC_NAME = "test-topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public String send(@RequestParam("msg") String msg) {
        kafkaTemplate.send(TOPIC_NAME, "key", msg);
        return String.format("消息 %s 發(fā)送成功!", msg);
    }
}

消費(fèi)者

@Component
public class DemoConsumer {
    /**
     * @param record record
     * @KafkaListener(groupId = "testGroup", topicPartitions = {
     * @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
     * @TopicPartition(topic = "topic2", partitions = "0",
     * partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
     * },concurrency = "6")
     * //concurrency就是同組下的消費(fèi)者個(gè)數(shù),就是并發(fā)消費(fèi)數(shù),必須小于等于分區(qū)總數(shù)
     */
    @KafkaListener(topics = "test-topic", groupId = "testGroup1")
    public void listentestGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        System.out.println("testGroup1 message: " + value);
        System.out.println("testGroup1 record: " + record);
        //手動(dòng)提交offset,一般是提交一個(gè)banch,冪等性防止重復(fù)消息
        // === 每條消費(fèi)完確認(rèn)性能不好!
        ack.acknowledge();
    }

    //配置多個(gè)消費(fèi)組
    @KafkaListener(topics = "test--topic", groupId = "testGroup2")
    public void listentestGroup2(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        System.out.println("testGroup2 message: " + value);
        System.out.println("testGroup2 record: " + record);
        //手動(dòng)提交offset
        ack.acknowledge();
    }
}

使用swagger測(cè)試發(fā)送消息

控制臺(tái)打印消息

到此這篇關(guān)于云服務(wù)器(Linux)安裝部署Kafka的詳細(xì)過(guò)程的文章就介紹到這了,更多相關(guān)Linux安裝Kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Mac 下阿里云服務(wù)器的配置方法

    Mac 下阿里云服務(wù)器的配置方法

    這篇文章主要介紹了Mac 下阿里云服務(wù)器的配置方法,需要的朋友可以參考下
    2017-08-08
  • 關(guān)于HTTPS端口443的技術(shù)講解(什么是443端口)

    關(guān)于HTTPS端口443的技術(shù)講解(什么是443端口)

    本文將重點(diǎn)介紹HTTPS 443端口,它是如何工作的,它保護(hù)什么,以及為什么我們需要它,需要的朋友可以參考下
    2022-10-10
  • Mac下開(kāi)啟與關(guān)閉端口轉(zhuǎn)發(fā)的腳本配置方法

    Mac下開(kāi)啟與關(guān)閉端口轉(zhuǎn)發(fā)的腳本配置方法

    這篇文章主要介紹了Mac下開(kāi)啟與關(guān)閉端口轉(zhuǎn)發(fā)的腳本配置方法,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下
    2018-04-04
  • 聯(lián)想服務(wù)器RD450 配置RAID5陣列圖文方法

    聯(lián)想服務(wù)器RD450 配置RAID5陣列圖文方法

    聯(lián)想RD450是一款服務(wù)器,cpu為英特爾 至強(qiáng) 處理器六核E5-2609 v3 1.9GHz,這里為大家分享一下聯(lián)想服務(wù)器RD450 配置RAID5陣列圖文方法,需要的朋友可以參考下
    2018-05-05
  • 運(yùn)維管理器Fabric使用方法

    運(yùn)維管理器Fabric使用方法

    Fabric是基于Python2.5版本以上實(shí)現(xiàn)的SSH命令行工具,簡(jiǎn)化了SSH的應(yīng)用程序部署及系統(tǒng)管理任務(wù),它提供了系統(tǒng)基礎(chǔ)的操作組件,可以實(shí)現(xiàn)本地或遠(yuǎn)程shell命令,包括命令執(zhí)行,文件上傳,下載及完整執(zhí)行日志輸出等功能
    2016-08-08
  • 教你搭建dns服務(wù)器(圖文教程)

    教你搭建dns服務(wù)器(圖文教程)

    這篇文章主要介紹了搭建dns服務(wù)器的圖文教程,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-03-03
  • 刀片服務(wù)器五大誤區(qū)解讀

    刀片服務(wù)器五大誤區(qū)解讀

    人總是愿意用挑剔的目光來(lái)看到新生事物,在對(duì)待刀片效勞器的問(wèn)題就是如此,有些人對(duì)于一些反復(fù)介紹的技術(shù)視而不見(jiàn),仍然強(qiáng)加給刀片效勞器一些莫須有的罪名,這些錯(cuò)誤正影響著刀片服務(wù)器的推廣和應(yīng)用。
    2009-09-09
  • 阿里云服務(wù)器?jdk1.8?安裝配置教程

    阿里云服務(wù)器?jdk1.8?安裝配置教程

    這篇文章主要介紹了阿里云服務(wù)器?jdk1.8?安裝配置,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-12-12
  • 詳解HTTP狀態(tài)碼

    詳解HTTP狀態(tài)碼

    這篇文章主要介紹了詳解HTTP狀態(tài)碼的相關(guān)資料,需要的朋友可以參考下
    2017-03-03
  • ETag使用效果對(duì)比及ETag配置圖文教程

    ETag使用效果對(duì)比及ETag配置圖文教程

    強(qiáng)烈建議大家設(shè)置sitemap的ETag,簡(jiǎn)簡(jiǎn)單單的一個(gè)小動(dòng)作就能看到相對(duì)明顯的效果還是很不錯(cuò)的,需要的朋友可以參考下
    2016-05-05

最新評(píng)論