欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Springboot整合kafka的示例代碼

 更新時間:2022年02月11日 15:48:51   作者:fFee-ops  
這篇文章主要介紹了Springboot整合kafka的示例代碼,本文通過示例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下

1. 整合kafka

1、引入依賴

 <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2、設(shè)置yml文件

spring:
  application:
    name: demo

  kafka:
    bootstrap-servers: 52.82.98.209:10903,52.82.98.209:10904
    producer: # producer 生產(chǎn)者
      retries: 0 # 重試次數(shù)
      acks: 1 # 應(yīng)答級別:多少個分區(qū)副本備份完成時向生產(chǎn)者發(fā)送ack確認(可選0、1、all/-1)
      batch-size: 16384 # 批量大小
      buffer-memory: 33554432 # 生產(chǎn)端緩沖區(qū)大小
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
#      value-serializer: com.itheima.demo.config.MySerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer: # consumer消費者
      group-id: javagroup # 默認的消費組ID
      enable-auto-commit: true # 是否自動提交offset
      auto-commit-interval: 100  # 提交offset延時(接收到消息后多久提交offset)
      # earliest:當各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
      # latest:當各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產(chǎn)生的該分區(qū)下的數(shù)據(jù)
      # none:topic各分區(qū)都存在已提交的offset時,從offset后開始消費;只要有一個分區(qū)不存在已提交的offset,則拋出異常
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#      value-deserializer: com.itheima.demo.config.MyDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3、啟動項目

2. 消息發(fā)送

2.1 發(fā)送類型

KafkaTemplate調(diào)用send時默認采用異步發(fā)送,如果需要同步獲取發(fā)送結(jié)果,調(diào)用get方法

異步發(fā)送生產(chǎn)者:

@RestController
public class KafkaProducer {
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    @GetMapping("/kafka/test/{msg}")
    public void sendMessage(@PathVariable("msg") String msg) {
        Message message = new Message();
        message.setMessage(msg);
        kafkaTemplate.send("test", JSON.toJSONString(message));
    }
}

同步發(fā)送生產(chǎn)者:

//測試同步發(fā)送與監(jiān)聽
@RestController
public class AsyncProducer {
    private final static Logger logger = LoggerFactory.getLogger(AsyncProducer.class);
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    //同步發(fā)送
    @GetMapping("/kafka/sync/{msg}")
    public void sync(@PathVariable("msg") String msg) throws Exception {
        Message message = new Message();
        message.setMessage(msg);
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message));
        //注意,可以設(shè)置等待時間,超出后,不再等候結(jié)果
        SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS);
        logger.info("send result:{}",result.getProducerRecord().value());
    }
}

消費者:

@Component
public class KafkaConsumer {
    private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

    //不指定group,默認取yml里配置的
    @KafkaListener(topics = {"test"})
    public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("message:{}", msg);
        }
    }
}

那么我們怎么看出來同步發(fā)送和異步發(fā)送的區(qū)別呢?

①首先在服務(wù)器上,將kafka暫停服務(wù)。
②在swagger發(fā)送消息

調(diào)同步發(fā)送:請求被阻斷,一直等待,超時后返回錯誤

而調(diào)異步發(fā)送的(默認發(fā)送接口),請求立刻返回。

那么,異步發(fā)送的消息怎么確認發(fā)送情況呢?
我們使用注冊監(jiān)聽
即新建一個類:KafkaListener.java

@Configuration
public class KafkaListener {
    private final static Logger logger = LoggerFactory.getLogger(KafkaListener.class);

    @Autowired
    KafkaTemplate kafkaTemplate;
    //配置監(jiān)聽
    @PostConstruct
    private void listener() {
        kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {
            @Override
            public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {
                logger.info("ok,message={}", producerRecord.value());
            }
            public void onError(ProducerRecord<String, Object> producerRecord, Exception exception) {
                logger.error("error!message={}", producerRecord.value());
        });
    }
}

查看控制臺,等待一段時間后,異步發(fā)送失敗的消息會被回調(diào)給注冊過的listener

如果是正常發(fā)送異步消息,則會獲得該消息??梢钥吹剑趦?nèi)部類 KafkaListener$1 中,即注冊的Listener的消息。

2.2 序列化

消費者使用:KafkaConsumer.java

@Component
public class KafkaConsumer {
    private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

    //不指定group,默認取yml里配置的
    @KafkaListener(topics = {"test"})
    public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("message:{}", msg);
        }
    }
}

1)序列化詳解

  • 前面用到的是Kafka自帶的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer
  • 除此之外還有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long 等
  • 這些序列化器都實現(xiàn)了接口(org.apache.kafka.common.serialization.Serializer
  • 基本上,可以滿足絕大多數(shù)場景

2)自定義序列化
自己實現(xiàn),實現(xiàn)對應(yīng)的接口即可,有以下方法:

public interface Serializer<T> extends Closeable {
	default void configure(Map<String, ?> configs, Boolean isKey) {
	}
	//理論上,只實現(xiàn)這個即可正常運行
	byte[] serialize(String var1, T var2);
	//默認調(diào)上面的方法
	default byte[] serialize(String topic, Headers headers, T data) {
		return this.serialize(topic, data);
	}
	default void close() {
	}
}

我們來自己實現(xiàn)一個序列化器:MySerializer.java

public class MySerializer implements Serializer {

    @Override
    public byte[] serialize(String s, Object o) {
        String json = JSON.toJSONString(o);
        return json.getBytes();
    }
}

3)解碼
MyDeserializer.java,實現(xiàn)方式與編碼器幾乎一樣.

public class MyDeserializer implements Deserializer {
    private final static Logger logger = LoggerFactory.getLogger(MyDeserializer.class);
    @Override
    public Object deserialize(String s, byte[] bytes) {
        try {
            String json = new String(bytes,"utf-8");
            return JSON.parse(json);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return null;
    }
}

4)在yaml中配置自己的編碼器、解碼器

再次收發(fā),消息正常

2.3 分區(qū)策略

分區(qū)策略決定了消息根據(jù)key投放到哪個分區(qū),也是順序消費保障的基石。

  • 給定了分區(qū)號,直接將數(shù)據(jù)發(fā)送到指定的分區(qū)里面去
  • 沒有給定分區(qū)號,給定數(shù)據(jù)的key值,通過key取上hashCode進行分區(qū)
  • 既沒有給定分區(qū)號,也沒有給定key值,直接輪循進行分區(qū)(默認)
  • 自定義分區(qū),你想怎么做就怎么做

1)驗證默認分區(qū)規(guī)則
發(fā)送者代碼參考:PartitionProducer.java

//測試分區(qū)發(fā)送
@RestController
public class PartitionProducer {
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    //    指定分區(qū)發(fā)送
//    不管你key是什么,到同一個分區(qū)
    @GetMapping("/kafka/partitionSend/{key}")
    public void setPartition(@PathVariable("key") String key) {
        kafkaTemplate.send("test", 0, key, "key=" + key + ",msg=指定0號分區(qū)");
    }
    //    指定key發(fā)送,不指定分區(qū)
//    根據(jù)key做hash,相同的key到同一個分區(qū)
    @GetMapping("/kafka/keysend/{key}")
    public void setKey(@PathVariable("key") String key) {
        kafkaTemplate.send("test", key, "key=" + key + ",msg=不指定分區(qū)");
}

消費者代碼使用:PartitionConsumer.java

@Component
public class PartitionConsumer {
    private final Logger logger = LoggerFactory.getLogger(PartitionConsumer.class);

    //分區(qū)消費
    @KafkaListener(topics = {"test"},topicPattern = "0")
    public void onMessage(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("partition=0,message:[{}]", msg);
        }
    }
    @KafkaListener(topics = {"test"},topicPattern = "1")
    public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("partition=1,message:[{}]", msg);
        }
    }
}

通過swagger訪問setKey(也就是只給了key的方法):

可以看到key相同的被hash到了同一個分區(qū)

再訪問setPartition來設(shè)置分區(qū)號0來發(fā)送:

可以看到無論key是什么,都是分區(qū)0來消費

2)自定義分區(qū)
參考代碼:MyPartitioner.java , MyPartitionTemplate.java。
發(fā)送使用:MyPartitionProducer.java。

public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//        定義自己的分區(qū)策略
//                如果key以0開頭,發(fā)到0號分區(qū)
//                其他都扔到1號分區(qū)
        String keyStr = key+"";
        if (keyStr.startsWith("0")){
            return 0;
        }else {
            return 1;
        }
    }
    public void close() {
    public void configure(Map<String, ?> map) {
}
@Configuration
public class MyPartitionTemplate {
 
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    KafkaTemplate kafkaTemplate;
    @PostConstruct
    public void setKafkaTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //注意分區(qū)器在這里?。?!
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
        this.kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));
    }
    public KafkaTemplate getKafkaTemplate(){
        return kafkaTemplate;
}
//測試自定義分區(qū)發(fā)送
@RestController
public class MyPartitionProducer {

    @Autowired
    MyPartitionTemplate template;

//    使用0開頭和其他任意字母開頭的key發(fā)送消息
//    看控制臺的輸出,在哪個分區(qū)里?
    @GetMapping("/kafka/myPartitionSend/{key}")
    public void setPartition(@PathVariable("key") String key) {
        template.getKafkaTemplate().send("test", key,"key="+key+",msg=自定義分區(qū)策略");
    }
}

使用swagger,發(fā)送0開頭和非0開頭兩種key

3. 消息消費

3.1 消息組別

發(fā)送者使用:KafkaProducer.java

@RestController
public class KafkaProducer {
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    @GetMapping("/kafka/test/{msg}")
    public void sendMessage(@PathVariable("msg") String msg) {
        Message message = new Message();
        message.setMessage(msg);
        kafkaTemplate.send("test", JSON.toJSONString(message));
    }
}

1)代碼參考:GroupConsumer.java,Listener拷貝3份,分別賦予兩組group,驗證分組消費:

//測試組消費
@Component
public class GroupConsumer {
    private final Logger logger = LoggerFactory.getLogger(GroupConsumer.class);

    //組1,消費者1
    @KafkaListener(topics = {"test"},groupId = "group1")
    public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("group:group1-1 , message:{}", msg);
        }
    }
    //組1,消費者2
    public void onMessage2(ConsumerRecord<?, ?> consumerRecord) {
            logger.info("group:group1-2 , message:{}", msg);
    //組2,只有一個消費者
    @KafkaListener(topics = {"test"},groupId = "group2")
    public void onMessage3(ConsumerRecord<?, ?> consumerRecord) {
            logger.info("group:group2 , message:{}", msg);
}

2)啟動

3)通過swagger發(fā)送2條消息

  • 同一group下的兩個消費者,在group1均分消息
  • group2下只有一個消費者,得到全部消息

4)消費端閑置
注意分區(qū)數(shù)與消費者數(shù)的搭配,如果 ( 消費者數(shù) > 分區(qū)數(shù)量 ),將會出現(xiàn)消費者閑置(因為一個分區(qū)只能分配給一個消費者),浪費資源!

驗證方式:
停掉項目,刪掉test主題,重新建一個 ,這次只給它分配一個分區(qū)。
重新發(fā)送兩條消息,試一試

  • group2可以消費到1、2兩條消息
  • group1下有兩個消費者,但是只分配給了 1 , 2這個進程被閑置

3.2 位移提交

1)自動提交
前面的案例中,我們設(shè)置了以下兩個選項,則kafka會按延時設(shè)置自動提交

enable-auto-commit: true # 是否自動提交offset
auto-commit-interval: 100 # 提交offset延時(接收到消息后多久提交offset,默認單位為ms)

2)手動提交
有些時候,我們需要手動控制偏移量的提交時機,比如確保消息嚴格消費后再提交,以防止丟失或重復(fù)。
下面我們自己定義配置,覆蓋上面的參數(shù)
代碼參考:MyOffsetConfig.java

@Configuration
public class MyOffsetConfig {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Bean
    public KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 注意這里!??!設(shè)置手動提交
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));
        // ack模式:
        //          AckMode針對ENABLE_AUTO_COMMIT_CONFIG=false時生效,有以下幾種:
        //
        //          RECORD
        //          每處理一條commit一次
        //          BATCH(默認)
        //          每次poll的時候批量提交一次,頻率取決于每次poll的調(diào)用頻率
        //          TIME
        //          每次間隔ackTime的時間去commit(跟auto commit interval有什么區(qū)別呢?)
        //          COUNT
        //          累積達到ackCount次的ack去commit
        //          COUNT_TIME
        //          ackTime或ackCount哪個條件先滿足,就commit
        //          MANUAL
        //          listener負責ack,但是背后也是批量上去
        //          MANUAL_IMMEDIATE
        //          listner負責ack,每調(diào)用一次,就立即commit
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

然后通過在消費端的Consumer來提交偏移量
MyOffsetConsumer:

@Component
public class MyOffsetConsumer {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @KafkaListener(topics = "test", groupId = "myoffset-group-1", containerFactory = "manualKafkaListenerContainerFactory")
    public void manualCommit(@Payload String message,
                             @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                             @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                             Consumer consumer,
                             Acknowledgment ack) {
        logger.info("手動提交偏移量 , partition={}, msg={}", partition, message);
        // 同步提交
        consumer.commitSync();
        //異步提交
        //consumer.commitAsync();
        // ack提交也可以,會按設(shè)置的ack策略走(參考MyOffsetConfig.java里的ack模式)
        // ack.acknowledge();
    }
    @KafkaListener(topics = "test", groupId = "myoffset-group-2", containerFactory = "manualKafkaListenerContainerFactory")
    public void noCommit(@Payload String message,
                         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                         @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                         Consumer consumer,
                         Acknowledgment ack) {
        logger.info("忘記提交偏移量, partition={}, msg={}", partition, message);
        // 不做commit!
    /**
     * 現(xiàn)實狀況:
     * commitSync和commitAsync組合使用
     * <p>
     * 手工提交異步 consumer.commitAsync();
     * 手工同步提交 consumer.commitSync()
     * commitSync()方法提交最后一個偏移量。在成功提交或碰到無怯恢復(fù)的錯誤之前,
     * commitSync()會一直重試,但是commitAsync()不會。
     * 一般情況下,針對偶爾出現(xiàn)的提交失敗,不進行重試不會有太大問題
     * 因為如果提交失敗是因為臨時問題導(dǎo)致的,那么后續(xù)的提交總會有成功的。
     * 但如果這是發(fā)生在關(guān)閉消費者或再均衡前的最后一次提交,就要確保能夠提交成功。否則就會造成重復(fù)消費
     * 因此,在消費者關(guān)閉前一般會組合使用commitAsync()和commitSync()。
     */
//   @KafkaListener(topics = "test", groupId = "myoffset-group-3",containerFactory = "manualKafkaListenerContainerFactory")
    public void manualOffset(@Payload String message,
        try {
            logger.info("同步異步搭配 , partition={}, msg={}", partition, message);
            //先異步提交
            consumer.commitAsync();
            //繼續(xù)做別的事
        } catch (Exception e) {
            System.out.println("commit failed");
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }
     * 甚至可以手動提交,指定任意位置的偏移量
     * 不推薦日常使用?。?!
//    @KafkaListener(topics = "test", groupId = "myoffset-group-4",containerFactory = "manualKafkaListenerContainerFactory")
    public void offset(ConsumerRecord record, Consumer consumer) {
        logger.info("手動指定任意偏移量, partition={}, msg={}", record.partition(), record);
        Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();
        currentOffset.put(new TopicPartition(record.topic(), record.partition()),
                new OffsetAndMetadata(record.offset() + 1));
        consumer.commitSync(currentOffset);
}

3)重復(fù)消費問題
如果手動提交模式被打開,一定不要忘記提交偏移量。否則會造成重復(fù)消費!

用km將test主題刪除,新建一個test空主題。方便觀察消息偏移 注釋掉其他Consumer的Component注解,只保留當前MyOffsetConsumer.java 啟動項目,使用swagger的KafkaProducer發(fā)送連續(xù)幾條消息 留心控制臺,都能消費,沒問題:

但是!重啟項目:

無論重啟多少次,不提交偏移量的消費組,會重復(fù)消費一遍?。。?/p>

再通過命令行查詢偏移量

4)經(jīng)驗與總結(jié)
commitSync()方法,即同步提交,會提交最后一個偏移量。在成功提交或碰到無怯恢復(fù)的錯誤之前,commitSync()會一直重試,但是commitAsync()不會。

這就造成一個陷阱:
如果異步提交,針對偶爾出現(xiàn)的提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導(dǎo)致的,那么后續(xù)的提交總會有成功的。只要成功一次,偏移量就會提交上去。

但是!如果這是發(fā)生在關(guān)閉消費者時的最后一次提交,就要確保能夠提交成功,如果還沒提交完就停掉了進程。就會造成重復(fù)消費!

因此,在消費者關(guān)閉前一般會組合使用commitAsync()和commitSync()。
詳細代碼參考:MyOffsetConsumer.manualOffset()

到此這篇關(guān)于Springboot整合kafka的文章就介紹到這了,更多相關(guān)Springboot整合kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 關(guān)于Mybatis與JPA的優(yōu)缺點說明

    關(guān)于Mybatis與JPA的優(yōu)缺點說明

    這篇文章主要介紹了關(guān)于Mybatis與JPA的優(yōu)缺點說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-06-06
  • 基于logback.xml不生效問題的解決

    基于logback.xml不生效問題的解決

    這篇文章主要介紹了基于logback.xml不生效問題的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-06-06
  • Spring實現(xiàn)IoC和DI的方法詳解

    Spring實現(xiàn)IoC和DI的方法詳解

    IoC全稱Inversion of Control (控制反轉(zhuǎn)) ,這里的控制其實是控制權(quán)的意思,可以理解為對象的獲取權(quán)力和方式發(fā)生了發(fā)轉(zhuǎn),DI依賴注?是?個過程,是指IoC容器在創(chuàng)建Bean時, 去提供運?時所依賴的資源,?資源指的就是對象,本文介紹了Spring實現(xiàn)IoC和DI的方法
    2024-08-08
  • Maven打jar包的三種方式(小結(jié))

    Maven打jar包的三種方式(小結(jié))

    這篇文章主要介紹了Maven打jar包的三種方式,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-07-07
  • JVM調(diào)優(yōu)實戰(zhàn)

    JVM調(diào)優(yōu)實戰(zhàn)

    本文主要介紹了JVM調(diào)優(yōu)實戰(zhàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-06-06
  • IDEA Project不顯示/缺失文件問題及解決

    IDEA Project不顯示/缺失文件問題及解決

    在側(cè)邊欄的project模式下,如果發(fā)現(xiàn)缺少部分文件,可以嘗試關(guān)閉項目,打開項目所在目錄,刪除目錄下的.idea文件夾,然后重新打開項目即可解決
    2024-11-11
  • Tomcat安裝配置及Eclipse配置詳解

    Tomcat安裝配置及Eclipse配置詳解

    給大家介紹一下Tomcat安裝配置及Eclipse配置的全部圖文過程,如果你對這個還有不明白,一起跟著小編學習下。
    2017-11-11
  • SpringCloud用Zookeeper搭建配置中心的方法

    SpringCloud用Zookeeper搭建配置中心的方法

    本篇文章主要介紹了SpringCloud用Zookeeper搭建配置中心的方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-04-04
  • 詳談java中File類getPath()、getAbsolutePath()、getCanonical的區(qū)別

    詳談java中File類getPath()、getAbsolutePath()、getCanonical的區(qū)別

    下面小編就為大家?guī)硪黄斦刯ava中File類getPath()、getAbsolutePath()、getCanonical的區(qū)別。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-07-07
  • 在springboot中使用p6spy方式

    在springboot中使用p6spy方式

    這篇文章主要介紹了在springboot中使用p6spy方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-01-01

最新評論