欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Springboot整合mqtt服務(wù)的示例代碼

 更新時間:2022年03月18日 15:56:52   作者:李泰山  
MQTT是一個基于客戶端-服務(wù)器的消息發(fā)布/訂閱傳輸協(xié)議。MQTT協(xié)議是輕量、簡單、開放和易于實(shí)現(xiàn)的,這些特點(diǎn)使它適用范圍非常廣泛。本文為大家分享了Springboot整合mqtt服務(wù)的示例代碼,需要的可以參考一下

首先在pom文件里引入mqtt的依賴配置

        <!--mqtt-->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.4</version>
        </dependency>

其次在springboot 的配置yml文件,配置mqtt的服務(wù)配置

spring:  
  mqtt:
    url: tcp://127.0.0.1:1883
    client-id: niubility-tiger
    username:
    password:
    topic: [/unify/test]

創(chuàng)建 MqttProperties配置參數(shù)類

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
 
@Data
@ConfigurationProperties("spring.mqtt")
public class MqttProperties {
    private String url;
    private String clientId;
    private String username;
    private String password;
    private String[] topic;
}

創(chuàng)建 MqttConfiguration 配置類

import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springblade.core.tool.utils.Func;
import org.springblade.ubw.listener.MqttSubscribeListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
@EnableConfigurationProperties({MqttProperties.class})
public class MqttConfiguration {
    private static final Logger log = LoggerFactory.getLogger(MqttConfiguration.class);
    @Autowired
    private MqttProperties mqttProperties;
 
    public MqttConfiguration() {
    }
 
    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions connectOptions = new MqttConnectOptions();
        connectOptions.setServerURIs(new String[]{this.mqttProperties.getUrl()});
        if (Func.isNotBlank(this.mqttProperties.getUrl())) {
            connectOptions.setUserName(this.mqttProperties.getUsername());
        }
 
        if (Func.isNotBlank(this.mqttProperties.getPassword())) {
            connectOptions.setPassword(this.mqttProperties.getPassword().toCharArray());
        }
 
        connectOptions.setKeepAliveInterval(60);
        return connectOptions;
    }
 
    @Bean
    public IMqttClient mqttClient(MqttConnectOptions options) throws MqttException {
        IMqttClient mqttClient = new MqttClient(this.mqttProperties.getUrl(), this.mqttProperties.getClientId());
        mqttClient.connect(options);
        for(int i = 0; i< this.mqttProperties.getTopic().length; ++i) {
            mqttClient.subscribe(this.mqttProperties.getTopic()[i], new MqttSubscribeListener());
        }
        return mqttClient;
    }
}

創(chuàng)建 訂閱事件類

import org.springframework.context.ApplicationEvent;
 
 
public class UWBMqttSubscribeEvent extends ApplicationEvent {
    private String topic;
 
    public UWBMqttSubscribeEvent(String topic, Object source) {
        super(source);
        this.topic = topic;
    }
 
    public String getTopic() {
        return this.topic;
    }
}

創(chuàng)建訂閱事件監(jiān)聽器

import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springblade.core.tool.utils.SpringUtil;
import org.springblade.ubw.event.UWBMqttSubscribeEvent;
 
 
public class MqttSubscribeListener implements IMqttMessageListener {
 
    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) {
        String content = new String(mqttMessage.getPayload());
        UWBMqttSubscribeEvent event = new UWBMqttSubscribeEvent(s, content);
        SpringUtil.publishEvent(event);
    }
}

創(chuàng)建mqtt消息事件異步處理監(jiān)聽器

import com.baomidou.mybatisplus.core.toolkit.StringPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springblade.core.tool.utils.Func;
import org.springblade.ubw.config.MqttProperties;
import org.springblade.ubw.event.UWBMqttSubscribeEvent;
import org.springblade.ubw.service.MqttService;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
 
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;
 
 
@Configuration
public class MqttEventListener {
 
 
    private static final Logger log = LoggerFactory.getLogger(MqttEventListener.class);
 
    @Resource
    private MqttProperties mqttProperties;
 
    @Resource
    private MqttService mqttService;
 
    private String processTopic (String topic) {
        List<String> topics = Arrays.asList(mqttProperties.getTopic());
        for (String wild : topics) {
            wild = wild.replace(StringPool.HASH, StringPool.EMPTY);
            if (topic.startsWith(wild)) {
                return topic.replace(wild, StringPool.EMPTY);
            }
        }
        return StringPool.EMPTY;
    }
 
 
    @Async
    @EventListener(UWBMqttSubscribeEvent.class)
    public void listen (UWBMqttSubscribeEvent event) {
        String topic = processTopic(event.getTopic());
        Object source = event.getSource();
        if (Func.isEmpty(source)) {
            return;
        }
        mqttService.issue(topic,source);
//        log.info("mqtt接收到 通道 {} 的信息為:{}",topic,source);
    }
}

創(chuàng)建MqttService 數(shù)據(jù)處理服務(wù)類

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springblade.core.tool.utils.Func;
import org.springblade.ubw.area.entity.WorkArea;
import org.springblade.ubw.area.entity.WorkSite;
import org.springblade.ubw.area.entity.WorkSiteNeighbourInfo;
import org.springblade.ubw.area.entity.WorkSitePassInfo;
import org.springblade.ubw.area.service.WorkAreaService;
import org.springblade.ubw.area.service.WorkSiteNeighbourInfoService;
import org.springblade.ubw.area.service.WorkSitePassInfoService;
import org.springblade.ubw.area.service.WorkSiteService;
import org.springblade.ubw.constant.UbwConstant;
import org.springblade.ubw.history.entity.HistoryLocusInfo;
import org.springblade.ubw.history.entity.HistoryOverTimeSosAlarmInfo;
import org.springblade.ubw.history.service.HistoryLocusInfoService;
import org.springblade.ubw.history.service.HistoryOverTimeSosAlarmInfoService;
import org.springblade.ubw.loc.entity.LocStatusInfo;
import org.springblade.ubw.loc.entity.LocStatusInfoHistory;
import org.springblade.ubw.loc.service.LocStatusInfoHistoryService;
import org.springblade.ubw.loc.service.LocStatusInfoService;
import org.springblade.ubw.msg.entity.*;
import org.springblade.ubw.msg.service.*;
import org.springblade.ubw.system.entity.*;
import org.springblade.ubw.system.service.*;
import org.springblade.ubw.system.wrapper.MqttWrapper;
import org.springframework.stereotype.Service;
 
import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;
 
 
@Service
public class MqttService {
 
    private static final Logger log = LoggerFactory.getLogger(MqttService.class);
 
    @Resource
    private EmployeeAndDepartmentService employeeAndDepartmentService;
 
    @Resource
    private VehicleInfoService vehicleInfoService;
 
    @Resource
    private WorkSiteService workSiteService;
 
    @Resource
    private LocStatusInfoService locStatusInfoService;
 
    @Resource
    private LocStatusInfoHistoryService locStatusInfoHistoryService;
 
    @Resource
    private LocOverTimeSosAlarminfoService locOverTimeSosAlarminfoService;
 
    @Resource
    private LocAreaOverSosAlarminfoService locAreaOverSosAlarmInfoService;
 
    @Resource
    private LocSosAlarminfoService locSosAlarminfoService;
 
    @Resource
    private AttendanceInfoService attendanceInfoService;
 
    @Resource
    private HistoryLocusInfoService historyLocusInfoService;
 
    @Resource
    private WorkSitePassInfoService workSitePassInfoService;
 
    @Resource
    private EnvironmentalMonitorInfoService environmentalMonitorInfoService;
 
    @Resource
    private TrAlertService trAlertService;
 
    @Resource
    private AddEvacuateInfoService addEvacuateInfoService;
 
    @Resource
    private CancelEvacuateInfoService cancelEvacuateInfoService;
 
    @Resource
    private WorkSiteNeighbourInfoService workSiteNeighbourInfoService;
 
    @Resource
    private LinkMsgAlarmInfoService linkMsgAlarmInfoService;
 
    @Resource
    private LeaderEmployeeInfoService leaderEmployeeInfoService;
 
    @Resource
    private ElectricMsgInfoService electricMsgInfoService;
 
    @Resource
    private WorkAreaService workAreaService;
 
    @Resource
    private HistoryOverTimeSosAlarmInfoService historyOverTimeSosAlarmInfoService;
 
    @Resource
    private SpecialWorksService specialWorksService;
 
    @Resource
    private AttendanceLocusInfoService attendanceLocusInfoService;
 
    @Resource
    private WorkTypeService workTypeService;
 
    @Resource
    private OfficePositionService officePositionService;
 
    @Resource
    private ClassTeamService classTeamService;
 
    /**
     * 方法描述: 消息分發(fā)
     *
     * @param topic
     * @param source
     * @author liwenbin
     * @date 2021年12月14日 14:14:09
     */
    public void issue(String topic,Object source){
        switch(topic){
            case UbwConstant.TOPIC_EMP :
                //人員和部門信息
                employeeAndDepartmentService.saveBatch(source);
                break;
            case UbwConstant.TOPIC_VEHICLE :
                //車輛信息
                List<VehicleInfo> vehicleInfos = MqttWrapper.build().toEntityList(source,new VehicleInfo());
                vehicleInfoService.deleteAll();
                vehicleInfoService.saveBatch(vehicleInfos);
                break;
            case UbwConstant.TOPIC_WORK_SITE :
                //基站信息
                List<WorkSite> workSites = MqttWrapper.build().toEntityList(source,new WorkSite());
                workSiteService.deleteAll();
                workSiteService.saveBatch(workSites);
                break;
            case UbwConstant.TOPIC_LOC_STATUS:
                //井下車輛人員實(shí)時
                List<LocStatusInfo> locStatusInfos = MqttWrapper.build().toEntityList(source,new LocStatusInfo());
                if (Func.isEmpty(locStatusInfos)){
                    break;
                }
                locStatusInfoService.deleteAll();
                //篩選入井人員列表
                List<LocStatusInfo> inWellList = locStatusInfos.stream().filter(s -> s.getIsInWell() == 1).collect(Collectors.toList());
                locStatusInfoService.saveBatch(inWellList);
                //人員歷史數(shù)據(jù)入庫
                List<LocStatusInfoHistory> locStatusInfoHistorys = MqttWrapper.build().toEntityList(source,new LocStatusInfoHistory());
                locStatusInfoHistoryService.saveBatch(locStatusInfoHistorys);
                break;
            case UbwConstant.TOPIC_LOC_OVER_TIME:
                //超時報警信息
                List<LocOverTimeSosAlarminfo> locOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocOverTimeSosAlarminfo());
                locOverTimeSosAlarminfoService.saveBatch(locOverTimeSosAlarmInfos);
                break;
            case UbwConstant.TOPIC_LOC_OVER_AREA:
                //超員報警信息
                List<LocAreaOverSosAlarminfo> locAreaOverSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocAreaOverSosAlarminfo());
                locAreaOverSosAlarmInfoService.saveBatch(locAreaOverSosAlarmInfos);
                break;
            case UbwConstant.TOPIC_LOC_SOS:
                //求救報警信息
                List<LocSosAlarminfo> locSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocSosAlarminfo());
                locSosAlarminfoService.saveBatch(locSosAlarmInfos);
                break;
            case UbwConstant.TOPIC_ATTEND:
                //考勤信息
                List<AttendanceInfo> attendanceInfos = MqttWrapper.build().toEntityList(source,new AttendanceInfo());
                attendanceInfoService.saveBatch(attendanceInfos);
                break;
            case UbwConstant.TOPIC_HISTORY_LOCUS:
                //精確軌跡信息
                List<HistoryLocusInfo> historyLocusInfos = MqttWrapper.build().toEntityList(source,new HistoryLocusInfo());
                historyLocusInfoService.saveBatch(historyLocusInfos);
                break;
            case UbwConstant.TOPIC_WORK_SITE_PASS:
                //基站經(jīng)過信息
                List<WorkSitePassInfo> workSitePassInfos = MqttWrapper.build().toEntityList(source,new WorkSitePassInfo());
                workSitePassInfoService.saveBatch(workSitePassInfos);
                break;
            case UbwConstant.TOPIC_ENV_MON:
                //環(huán)境監(jiān)測信息
                List<EnvironmentalMonitorInfo> environmentalMonitorInfos = MqttWrapper.build().toEntityList(source,new EnvironmentalMonitorInfo());
                environmentalMonitorInfoService.saveBatch(environmentalMonitorInfos);
                break;
            case UbwConstant.TOPIC_TR_ALERT:
                //環(huán)境監(jiān)測報警信息
                List<TrAlert> trAlerts = MqttWrapper.build().toEntityList(source,new TrAlert());
                trAlertService.saveBatch(trAlerts);
                break;
            case UbwConstant.TOPIC_ADD_EVA:
                //下發(fā)撤離信息
                List<AddEvacuateInfo> addEvacuateInfos = MqttWrapper.build().toEntityList(source,new AddEvacuateInfo());
                addEvacuateInfoService.saveBatch(addEvacuateInfos);
                break;
            case UbwConstant.TOPIC_CANCEL_EVA:
                //取消撤離信息
                List<CancelEvacuateInfo> cancelEvacuateInfos = MqttWrapper.build().toEntityList(source,new CancelEvacuateInfo());
                cancelEvacuateInfoService.saveBatch(cancelEvacuateInfos);
                break;
            case UbwConstant.TOPIC_WORK_SITE_NEI:
                //相鄰基站關(guān)系信息
                workSiteNeighbourInfoService.deleteAll();
                List<WorkSiteNeighbourInfo> workSiteNeighbourInfos = MqttWrapper.build().toEntityList(source,new WorkSiteNeighbourInfo());
                workSiteNeighbourInfoService.saveBatch(workSiteNeighbourInfos);
                break;
            case UbwConstant.TOPIC_LINK_MSG:
                //基站鏈路信息
                linkMsgAlarmInfoService.deleteAll();
                List<LinkMsgAlarmInfo> linkMsgAlarmInfos = MqttWrapper.build().toEntityList(source,new LinkMsgAlarmInfo());
                linkMsgAlarmInfoService.saveBatch(linkMsgAlarmInfos);
                break;
            case UbwConstant.TOPIC_LEADER_EMP:
                //帶班領(lǐng)導(dǎo)信息
                leaderEmployeeInfoService.deleteAll();
                List<LeaderEmployeeInfo> leaderEmployeeInfos = MqttWrapper.build().toEntityList(source,new LeaderEmployeeInfo());
                leaderEmployeeInfoService.saveBatch(leaderEmployeeInfos);
                break;
            case UbwConstant.TOPIC_ELE_MSG:
                //低電報警信息
                List<ElectricMsgInfo> electricMsgInfos = MqttWrapper.build().toEntityList(source,new ElectricMsgInfo());
                electricMsgInfoService.saveBatch(electricMsgInfos);
                break;
            case UbwConstant.TOPIC_WORK_AREA:
                //區(qū)域信息
                workAreaService.deleteAll();
                List<WorkArea> workAreas = MqttWrapper.build().toEntityList(source,new WorkArea());
                workAreaService.saveBatch(workAreas);
                break;
            case UbwConstant.TOPIC_HIS_OVER_TIME_SOS:
                //歷史超時報警信息
                List<HistoryOverTimeSosAlarmInfo> historyOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source,new HistoryOverTimeSosAlarmInfo());
                historyOverTimeSosAlarmInfoService.saveBatch(historyOverTimeSosAlarmInfos);
                break;
            case UbwConstant.TOPIC_SPECIAL_WORK:
                //特種人員預(yù)設(shè)線路信息
                specialWorksService.deleteAll();
                List<SpecialWorks> specialWorks = MqttWrapper.build().toEntityList(source,new SpecialWorks());
                specialWorksService.saveBatch(specialWorks);
                break;
            case UbwConstant.TOPIC_ATTEND_LOC:
                //歷史考勤軌跡信息
                List<AttendanceLocusInfo> attendanceLocusInfos = MqttWrapper.build().toEntityList(source,new AttendanceLocusInfo());
                attendanceLocusInfoService.saveBatch(attendanceLocusInfos);
                break;
            case UbwConstant.TOPIC_WORK_TYPE:
                //工種信息
                workTypeService.deleteAll();
                List<WorkType> workTypes = MqttWrapper.build().toEntityList(source,new WorkType());
                workTypeService.saveBatch(workTypes);
                break;
            case UbwConstant.TOPIC_OFFICE_POS:
                //職務(wù)信息
                officePositionService.deleteAll();
                List<OfficePosition> officePositions = MqttWrapper.build().toEntityList(source,new OfficePosition());
                officePositionService.saveBatch(officePositions);
                break;
            case UbwConstant.TOPIC_CLASS_TEAM:
                //班組信息
                classTeamService.deleteAll();
                List<ClassTeam> classTeams = MqttWrapper.build().toEntityList(source,new ClassTeam());
                classTeamService.saveBatch(classTeams);
                break;
            default : //可選
                break;
        }
    }
}

完結(jié),小伙伴們,可以根據(jù)這個demo 改造自己的mqtt服務(wù)處理?。?!

以上就是Springboot整合mqtt服務(wù)的示例代碼的詳細(xì)內(nèi)容,更多關(guān)于Springboot整合mqtt的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • SpringBoot2整合JTA組件實(shí)現(xiàn)多數(shù)據(jù)源事務(wù)管理

    SpringBoot2整合JTA組件實(shí)現(xiàn)多數(shù)據(jù)源事務(wù)管理

    這篇文章主要介紹了SpringBoot2整合JTA組件實(shí)現(xiàn)多數(shù)據(jù)源事務(wù)管理,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-03-03
  • spring是如何解析xml配置文件中的占位符

    spring是如何解析xml配置文件中的占位符

    這篇文章主要介紹了spring是如何解析xml配置文件中的占位符,幫助大家更好的理解和使用spring框架,感興趣的朋友可以了解下
    2020-11-11
  • JAVA如何定義構(gòu)造函數(shù)過程解析

    JAVA如何定義構(gòu)造函數(shù)過程解析

    這篇文章主要介紹了JAVA如何定義構(gòu)造函數(shù)過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-02-02
  • java實(shí)現(xiàn)將ftp和http的文件直接傳送到hdfs

    java實(shí)現(xiàn)將ftp和http的文件直接傳送到hdfs

    前面幾篇文章,我們已經(jīng)做了很好的鋪墊了,幾個要用到的工具我們都做了出來,本文就是將他們集合起來,說下具體的用法,小伙伴們可以參考下。
    2015-03-03
  • Mybatis Plus插件三種方式的逆向工程的使用

    Mybatis Plus插件三種方式的逆向工程的使用

    這篇文章主要介紹了Mybatis Plus插件三種方式的逆向工程的使用,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-11-11
  • Spring Boot如何使用JDBC獲取相關(guān)的數(shù)據(jù)詳解

    Spring Boot如何使用JDBC獲取相關(guān)的數(shù)據(jù)詳解

    這篇文章主要給大家介紹了關(guān)于Spring Boot如何使用JDBC獲取相關(guān)數(shù)據(jù)的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-03-03
  • SpringBoot默認(rèn)使用HikariDataSource數(shù)據(jù)源方式

    SpringBoot默認(rèn)使用HikariDataSource數(shù)據(jù)源方式

    這篇文章主要介紹了SpringBoot默認(rèn)使用HikariDataSource數(shù)據(jù)源方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • Java集合框架之Collection接口詳解

    Java集合框架之Collection接口詳解

    這篇文章主要為大家詳細(xì)介紹了Java集合框架之Collection接口,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2016-12-12
  • 如何使用Idea搭建全注解式開發(fā)的SpringMVC項目

    如何使用Idea搭建全注解式開發(fā)的SpringMVC項目

    這篇文章主要介紹了如何使用Idea搭建全注解式開發(fā)的SpringMVC項目,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-03-03
  • springboot2.0和springcloud Finchley版項目搭建(包含eureka,gateWay,F(xiàn)reign,Hystrix)

    springboot2.0和springcloud Finchley版項目搭建(包含eureka,gateWay,F(xiàn)re

    這篇文章主要介紹了springboot2.0和springcloud Finchley版項目搭建(包含eureka,gateWay,F(xiàn)reign,Hystrix),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2019-05-05

最新評論