欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot集成kafka全面實戰(zhàn)記錄

 更新時間:2021年11月26日 14:36:07   作者:Felix-Yuan  
在實際開發(fā)中,我們可能有這樣的需求,應用A從TopicA獲取到消息,經(jīng)過處理后轉(zhuǎn)發(fā)到TopicB,再由應用B監(jiān)聽處理消息,即一個應用處理完成后將該消息轉(zhuǎn)發(fā)至其他應用,完成消息的轉(zhuǎn)發(fā),這篇文章主要介紹了SpringBoot集成kafka全面實戰(zhàn),需要的朋友可以參考下

本文是SpringBoot+Kafka的實戰(zhàn)講解,如果對kafka的架構(gòu)原理還不了解的讀者,建議先看一下《大白話kafka架構(gòu)原理》、《秒懂kafka HA(高可用)》兩篇文章。

一、生產(chǎn)者實踐

  • 普通生產(chǎn)者
  • 帶回調(diào)的生產(chǎn)者
  • 自定義分區(qū)器
  • kafka事務提交

二、消費者實踐

  • 簡單消費
  • 指定topic、partition、offset消費
  • 批量消費
  • 監(jiān)聽異常處理器
  • 消息過濾器
  • 消息轉(zhuǎn)發(fā)
  • 定時啟動/停止監(jiān)聽器

一、前戲

1、在項目中連接kafka,因為是外網(wǎng),首先要開放kafka配置文件中的如下配置(其中IP為公網(wǎng)IP),

advertised.listeners=PLAINTEXT://112.126.74.249:9092

2、在開始前我們先創(chuàng)建兩個topic:topic1、topic2,其分區(qū)和副本數(shù)都設置為

2,用來測試,

[root@iZ2zegzlkedbo3e64vkbefZ ~]#  cd /usr/local/kafka-cluster/kafka1/bin/
[root@iZ2zegzlkedbo3e64vkbefZ bin]# ./kafka-topics.sh --create --zookeeper 172.17.80.219:2181 --replication-factor 2 --partitions 2 --topic topic1
Created topic topic1.
[root@iZ2zegzlkedbo3e64vkbefZ bin]# ./kafka-topics.sh --create --zookeeper 172.17.80.219:2181 --replication-factor 2 --partitions 2 --topic topic2
Created topic topic2.

當然我們也可以不手動創(chuàng)建topic,在執(zhí)行代碼kafkaTemplate.send("topic1", normalMessage)發(fā)送消息時,kafka會幫我們自動完成topic的創(chuàng)建工作,但這種情況下創(chuàng)建的topic默認只有一個分區(qū),分區(qū)也沒有副本。所以,我們可以在項目中新建一個配置類專門用來初始化topic,如下,

@Configuration
public class KafkaInitialConfiguration {
    // 創(chuàng)建一個名為testtopic的Topic并設置分區(qū)數(shù)為8,分區(qū)副本數(shù)為2
    @Bean
    public NewTopic initialTopic() {
        return new NewTopic("testtopic",8, (short) 2 );
    }
?
     // 如果要修改分區(qū)數(shù),只需修改配置值重啟項目即可
    // 修改分區(qū)數(shù)并不會導致數(shù)據(jù)的丟失,但是分區(qū)數(shù)只能增大不能減小
    @Bean
    public NewTopic updateTopic() {
        return new NewTopic("testtopic",10, (short) 2 );
    }
}

3、新建SpringBoot項目

① 引入pom依賴

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

② application.propertise配置(本文用到的配置項這里全列了出來)

###########【Kafka集群】###########
spring.kafka.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093
###########【初始化生產(chǎn)者配置】###########
# 重試次數(shù)
spring.kafka.producer.retries=0
# 應答級別:多少個分區(qū)副本備份完成時向生產(chǎn)者發(fā)送ack確認(可選0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延時
spring.kafka.producer.properties.linger.ms=0
# 當生產(chǎn)端積累的消息達到batch-size或接收到消息linger.ms后,生產(chǎn)者就會將消息提交給kafka
# linger.ms為0表示每接收到一條消息就提交給kafka,這時候batch-size其實就沒用了
?
# 生產(chǎn)端緩沖區(qū)大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化類
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定義分區(qū)器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
?
###########【初始化消費者配置】###########
# 默認的消費組ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自動提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延時(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 當kafka中沒有初始offset或offset超出范圍時將自動重置offset
# earliest:重置為分區(qū)中最小的offset;
# latest:重置為分區(qū)中最新的offset(消費分區(qū)中新產(chǎn)生的數(shù)據(jù));
# none:只要有一個分區(qū)不存在已提交的offset,就拋出異常;
spring.kafka.consumer.auto-offset-reset=latest
# 消費會話超時時間(超過這個時間consumer沒有發(fā)送心跳,就會觸發(fā)rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消費請求超時時間
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化類
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消費端監(jiān)聽的topic不存在時,項目啟動會報錯(關掉)
spring.kafka.listener.missing-topics-fatal=false
# 設置批量消費
# spring.kafka.listener.type=batch
# 批量消費每次最多消費多少條消息
# spring.kafka.consumer.max-poll-records=50

二、Hello Kafka

1、簡單生產(chǎn)者

@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
?
    // 發(fā)送消息
    @GetMapping("/kafka/normal/{message}")
    public void sendMessage1(@PathVariable("message") String normalMessage) {
        kafkaTemplate.send("topic1", normalMessage);
    }
}

?2、簡單消費

@Component
public class KafkaConsumer {
    // 消費監(jiān)聽
    @KafkaListener(topics = {"topic1"})
    public void onMessage1(ConsumerRecord<?, ?> record){
        // 消費的哪個topic、partition的消息,打印出消息內(nèi)容
        System.out.println("簡單消費:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }
}

上面示例創(chuàng)建了一個生產(chǎn)者,發(fā)送消息到topic1,消費者監(jiān)聽topic1消費消息。監(jiān)聽器用@KafkaListener注解,topics表示監(jiān)聽的topic,支持同時監(jiān)聽多個,用英文逗號分隔。啟動項目,postman調(diào)接口觸發(fā)生產(chǎn)者發(fā)送消息,

可以看到監(jiān)聽器消費成功,

三、生產(chǎn)者

1、帶回調(diào)的生產(chǎn)者

kafkaTemplate提供了一個回調(diào)方法addCallback,我們可以在回調(diào)方法中監(jiān)控消息是否發(fā)送成功 或 失敗時做補償處理,有兩種寫法,

@GetMapping("/kafka/callbackOne/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
    kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {
        // 消息發(fā)送到的topic
        String topic = success.getRecordMetadata().topic();
        // 消息發(fā)送到的分區(qū)
        int partition = success.getRecordMetadata().partition();
        // 消息在分區(qū)內(nèi)的offset
        long offset = success.getRecordMetadata().offset();
        System.out.println("發(fā)送消息成功:" + topic + "-" + partition + "-" + offset);
    }, failure -> {
        System.out.println("發(fā)送消息失敗:" + failure.getMessage());
    });
}
@GetMapping("/kafka/callbackTwo/{message}")
public void sendMessage3(@PathVariable("message") String callbackMessage) {
    kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("發(fā)送消息失?。?+ex.getMessage());
        }
 
        @Override
        public void onSuccess(SendResult<String, Object> result) {
            System.out.println("發(fā)送消息成功:" + result.getRecordMetadata().topic() + "-"
                    + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
        }
    });
}

2、自定義分區(qū)器

我們知道,kafka中每個topic被劃分為多個分區(qū),那么生產(chǎn)者將消息發(fā)送到topic時,具體追加到哪個分區(qū)呢?這就是所謂的分區(qū)策略,Kafka 為我們提供了默認的分區(qū)策略,同時它也支持自定義分區(qū)策略。其路由機制為:

① 若發(fā)送消息時指定了分區(qū)(即自定義分區(qū)策略),則直接將消息append到指定分區(qū);

② 若發(fā)送消息時未指定 patition,但指定了 key(kafka允許為每條消息設置一個key),則對key值進行hash計算,根據(jù)計算結(jié)果路由到指定分區(qū),這種情況下可以保證同一個 Key 的所有消息都進入到相同的分區(qū);

③ ?patition 和 key 都未指定,則使用kafka默認的分區(qū)策略,輪詢選出一個 patition;

※ 我們來自定義一個分區(qū)策略,將消息發(fā)送到我們指定的partition,首先新建一個分區(qū)器類實現(xiàn)Partitioner接口,重寫方法,其中partition方法的返回值就表示將消息發(fā)送到幾號分區(qū),

public class CustomizePartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 自定義分區(qū)規(guī)則(這里假設全部發(fā)到0號分區(qū))
        // ......
        return 0;
    }
?
    @Override
    public void close() {
?
    }
?
    @Override
    public void configure(Map<String, ?> configs) {
?
    }
}

在application.propertise中配置自定義分區(qū)器,配置的值就是分區(qū)器類的全路徑名,

# 自定義分區(qū)器
spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner

3、kafka事務提交

如果在發(fā)送消息時需要創(chuàng)建事務,可以使用 KafkaTemplate 的 executeInTransaction 方法來聲明事務,

@GetMapping("/kafka/transaction")
public void sendMessage7(){
    // 聲明事務:后面報錯消息不會發(fā)出去
    kafkaTemplate.executeInTransaction(operations -> {
        operations.send("topic1","test executeInTransaction");
        throw new RuntimeException("fail");
    });
?
    // 不聲明事務:后面報錯但前面消息已經(jīng)發(fā)送成功了
   kafkaTemplate.send("topic1","test executeInTransaction");
   throw new RuntimeException("fail");
}

四、消費者

1、指定topic、partition、offset消費

前面我們在監(jiān)聽消費topic1的時候,監(jiān)聽的是topic1上所有的消息,如果我們想指定topic、指定partition、指定offset來消費呢?也很簡單,@KafkaListener注解已全部為我們提供,

# 設置批量消費
spring.kafka.listener.type=batch
# 批量消費每次最多消費多少條消息
spring.kafka.consumer.max-poll-records=50

屬性解釋:

① id:消費者ID;

② groupId:消費組ID;

③ topics:監(jiān)聽的topic,可監(jiān)聽多個;

④ topicPartitions:可配置更加詳細的監(jiān)聽信息,可指定topic、parition、offset監(jiān)聽。

上面onMessage2監(jiān)聽的含義:監(jiān)聽topic1的0號分區(qū),同時監(jiān)聽topic2的0號分區(qū)和topic2的1號分區(qū)里面offset從8開始的消息。

注意:topics和topicPartitions不能同時使用;

2、批量消費

設置application.prpertise開啟批量消費即可,

@KafkaListener(id = "consumer2",groupId = "felix-group", topics = "topic1")
public void onMessage3(List<ConsumerRecord<?, ?>> records) {
    System.out.println(">>>批量消費一次,records.size()="+records.size());
    for (ConsumerRecord<?, ?> record : records) {
        System.out.println(record.value());
    }
}

接收消息時用List來接收,監(jiān)聽代碼如下,

// 新建一個異常處理器,用@Bean注入
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
    return (message, exception, consumer) -> {
        System.out.println("消費異常:"+message.getPayload());
        return null;
    };
}
?
// 將這個異常處理器的BeanName放到@KafkaListener注解的errorHandler屬性里面
@KafkaListener(topics = {"topic1"},errorHandler = "consumerAwareErrorHandler")
public void onMessage4(ConsumerRecord<?, ?> record) throws Exception {
    throw new Exception("簡單消費-模擬異常");
}
?
// 批量消費也一樣,異常處理器的message.getPayload()也可以拿到各條消息的信息
@KafkaListener(topics = "topic1",errorHandler="consumerAwareErrorHandler")
public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception {
    System.out.println("批量消費一次...");
    throw new Exception("批量消費-模擬異常");
}

3、ConsumerAwareListenerErrorHandler 異常處理器

通過異常處理器,我們可以處理consumer在消費時發(fā)生的異常。

新建一個?ConsumerAwareListenerErrorHandler 類型的異常處理方法,用@Bean注入,BeanName默認就是方法名,然后我們將這個異常處理器的BeanName放到@KafkaListener注解的errorHandler屬性里面,當監(jiān)聽拋出異常的時候,則會自動調(diào)用異常處理器,

@Component
public class KafkaConsumer {
    @Autowired
    ConsumerFactory consumerFactory;
?
    // 消息過濾器
    @Bean
    public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        // 被過濾的消息將被丟棄
        factory.setAckDiscarded(true);
        // 消息過濾策略
        factory.setRecordFilterStrategy(consumerRecord -> {
            if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
                return false;
            }
            //返回true消息則被過濾
            return true;
        });
        return factory;
    }
?
    // 消息過濾監(jiān)聽
    @KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory")
    public void onMessage6(ConsumerRecord<?, ?> record) {
        System.out.println(record.value());
    }
}

執(zhí)行看一下效果,

4、消息過濾器

消息過濾器可以在消息抵達consumer之前被攔截,在實際應用中,我們可以根據(jù)自己的業(yè)務邏輯,篩選出需要的信息再交由KafkaListener處理,不需要的消息則過濾掉。

配置消息過濾只需要為 監(jiān)聽器工廠 配置一個RecordFilterStrategy(消息過濾策略),返回true的時候消息將會被拋棄,返回false時,消息能正常抵達監(jiān)聽容器。

/**
 * @Title 消息轉(zhuǎn)發(fā)
 * @Description 從topic1接收到的消息經(jīng)過處理后轉(zhuǎn)發(fā)到topic2
 * @Author long.yuan
 * @Date 2020/3/23 22:15
 * @Param [record]
 * @return void
 **/
@KafkaListener(topics = {"topic1"})
@SendTo("topic2")
public String onMessage7(ConsumerRecord<?, ?> record) {
    return record.value()+"-forward message";
}

上面實現(xiàn)了一個"過濾奇數(shù)、接收偶數(shù)"的過濾策略,我們向topic1發(fā)送0-99總共100條消息,看一下監(jiān)聽器的消費情況,可以看到監(jiān)聽器只消費了偶數(shù),

5、消息轉(zhuǎn)發(fā)

在實際開發(fā)中,我們可能有這樣的需求,應用A從TopicA獲取到消息,經(jīng)過處理后轉(zhuǎn)發(fā)到TopicB,再由應用B監(jiān)聽處理消息,即一個應用處理完成后將該消息轉(zhuǎn)發(fā)至其他應用,完成消息的轉(zhuǎn)發(fā)。

在SpringBoot集成Kafka實現(xiàn)消息的轉(zhuǎn)發(fā)也很簡單,只需要通過一個@SendTo注解,被注解方法的return值即轉(zhuǎn)發(fā)的消息內(nèi)容,如下,

/**
 * @Title 消息轉(zhuǎn)發(fā)
 * @Description 從topic1接收到的消息經(jīng)過處理后轉(zhuǎn)發(fā)到topic2
 * @Author long.yuan
 * @Date 2020/3/23 22:15
 * @Param [record]
 * @return void
 **/
@KafkaListener(topics = {"topic1"})
@SendTo("topic2")
public String onMessage7(ConsumerRecord<?, ?> record) {
    return record.value()+"-forward message";
}

6、定時啟動、停止監(jiān)聽器

默認情況下,當消費者項目啟動的時候,監(jiān)聽器就開始工作,監(jiān)聽消費發(fā)送到指定topic的消息,那如果我們不想讓監(jiān)聽器立即工作,想讓它在我們指定的時間點開始工作,或者在我們指定的時間點停止工作,該怎么處理呢——使用KafkaListenerEndpointRegistry,下面我們就來實現(xiàn):

① 禁止監(jiān)聽器自啟動;

② 創(chuàng)建兩個定時任務,一個用來在指定時間點啟動定時器,另一個在指定時間點停止定時器;

新建一個定時任務類,用注解@EnableScheduling聲明,KafkaListenerEndpointRegistry 在SpringIO中已經(jīng)被注冊為Bean,直接注入,設置禁止KafkaListener自啟動,

@EnableScheduling
@Component
public class CronTimer {
?
    /**
     * @KafkaListener注解所標注的方法并不會在IOC容器中被注冊為Bean,
     * 而是會被注冊在KafkaListenerEndpointRegistry中,
     * 而KafkaListenerEndpointRegistry在SpringIOC中已經(jīng)被注冊為Bean
     **/
    @Autowired
    private KafkaListenerEndpointRegistry registry;
?
    @Autowired
    private ConsumerFactory consumerFactory;
?
    // 監(jiān)聽器容器工廠(設置禁止KafkaListener自啟動)
    @Bean
    public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
        container.setConsumerFactory(consumerFactory);
        //禁止KafkaListener自啟動
        container.setAutoStartup(false);
        return container;
    }
?
    // 監(jiān)聽器
    @KafkaListener(id="timingConsumer",topics = "topic1",containerFactory = "delayContainerFactory")
    public void onMessage1(ConsumerRecord<?, ?> record){
        System.out.println("消費成功:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }
?
    // 定時啟動監(jiān)聽器
    @Scheduled(cron = "0 42 11 * * ? ")
    public void startListener() {
        System.out.println("啟動監(jiān)聽器...");
        // "timingConsumer"是@KafkaListener注解后面設置的監(jiān)聽器ID,標識這個監(jiān)聽器
        if (!registry.getListenerContainer("timingConsumer").isRunning()) {
            registry.getListenerContainer("timingConsumer").start();
        }
        //registry.getListenerContainer("timingConsumer").resume();
    }
?
    // 定時停止監(jiān)聽器
    @Scheduled(cron = "0 45 11 * * ? ")
    public void shutDownListener() {
        System.out.println("關閉監(jiān)聽器...");
        registry.getListenerContainer("timingConsumer").pause();
    }
}

啟動項目,觸發(fā)生產(chǎn)者向topic1發(fā)送消息,可以看到consumer沒有消費,因為這時監(jiān)聽器還沒有開始工作,

11:42分監(jiān)聽器啟動開始工作,消費消息,

11:45分監(jiān)聽器停止工作,

到此這篇關于SpringBoot集成kafka全面實戰(zhàn)的文章就介紹到這了,更多相關SpringBoot集成kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • java圖形界面AWT編寫計算器

    java圖形界面AWT編寫計算器

    這篇文章主要為大家詳細介紹了基于java語言下圖形界面AWT編寫計算器,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2016-12-12
  • @Value設置默認值后,獲取不到配置值的原因分析

    @Value設置默認值后,獲取不到配置值的原因分析

    這篇文章主要介紹了@Value設置默認值后,獲取不到配置值的原因,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-07-07
  • Java實現(xiàn)汽車租賃系統(tǒng)

    Java實現(xiàn)汽車租賃系統(tǒng)

    這篇文章介紹了Java實現(xiàn)汽車租賃系統(tǒng)的方法,文中通過示例代碼介紹的非常詳細。對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-03-03
  • Java多線程實現(xiàn)模擬12306火車站售票系統(tǒng)

    Java多線程實現(xiàn)模擬12306火車站售票系統(tǒng)

    12360火車票售票系統(tǒng)基本上大家都用過,那你知道是怎么實現(xiàn)的嗎,今天我們就模擬12306火車站售票系統(tǒng)來實現(xiàn),需要的朋友們下面隨著小編來一起學習學習吧
    2021-05-05
  • 一篇文章帶你學習JAVA MyBatis底層原理

    一篇文章帶你學習JAVA MyBatis底層原理

    近來想寫一個mybatis的分頁插件,但是在寫插件之前肯定要了解一下mybatis具體的工作原理吧,本文就詳細總結(jié)了MyBatis工作原理,,需要的朋友可以參考下
    2021-09-09
  • SpringBoot+Elasticsearch實現(xiàn)數(shù)據(jù)搜索的方法詳解

    SpringBoot+Elasticsearch實現(xiàn)數(shù)據(jù)搜索的方法詳解

    Elasticsearch是一個基于Lucene的搜索服務器。它提供了一個分布式多用戶能力的全文搜索引擎,基于RESTful?web接口。本文將利用SpringBoot整合Elasticsearch實現(xiàn)海量級數(shù)據(jù)搜索,需要的可以參考一下
    2022-05-05
  • spring?cloud?使用oauth2?問題匯總

    spring?cloud?使用oauth2?問題匯總

    這篇文章主要介紹了spring?cloud?使用oauth2?問題匯總,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-09-09
  • SpringMVC ajax請求的處理方法介紹

    SpringMVC ajax請求的處理方法介紹

    Ajax即異步的 JavaScript和XML,是一種無需重新加載整個網(wǎng)頁的情況下,能夠更新部分模塊的網(wǎng)頁技術,下面這篇文章主要給大家介紹了關于SpringMVC Ajax請求的處理,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下
    2022-11-11
  • MyBatis Generator介紹及使用方法

    MyBatis Generator介紹及使用方法

    MyBatis Generator 是一款針對 MyBatis 或 iBATIS 設計的代碼生成器,由 MyBatis 官方提供,這篇文章主要介紹了MyBatis Generator介紹及使用方法,需要的朋友可以參考下
    2023-06-06
  • Springboot FatJa原理機制源碼解析

    Springboot FatJa原理機制源碼解析

    這篇文章主要為大家介紹了Springboot FatJa原理機制源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-12-12

最新評論