Springboot整合mqtt服務(wù)的示例代碼
首先在pom文件里引入mqtt的依賴配置
<!--mqtt--> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.4</version> </dependency>
其次在springboot 的配置yml文件,配置mqtt的服務(wù)配置
spring: mqtt: url: tcp://127.0.0.1:1883 client-id: niubility-tiger username: password: topic: [/unify/test]
創(chuàng)建 MqttProperties配置參數(shù)類
import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; @Data @ConfigurationProperties("spring.mqtt") public class MqttProperties { private String url; private String clientId; private String username; private String password; private String[] topic; }
創(chuàng)建 MqttConfiguration 配置類
import org.eclipse.paho.client.mqttv3.IMqttClient; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springblade.core.tool.utils.Func; import org.springblade.ubw.listener.MqttSubscribeListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @EnableConfigurationProperties({MqttProperties.class}) public class MqttConfiguration { private static final Logger log = LoggerFactory.getLogger(MqttConfiguration.class); @Autowired private MqttProperties mqttProperties; public MqttConfiguration() { } @Bean public MqttConnectOptions mqttConnectOptions() { MqttConnectOptions connectOptions = new MqttConnectOptions(); connectOptions.setServerURIs(new String[]{this.mqttProperties.getUrl()}); if (Func.isNotBlank(this.mqttProperties.getUrl())) { connectOptions.setUserName(this.mqttProperties.getUsername()); } if (Func.isNotBlank(this.mqttProperties.getPassword())) { connectOptions.setPassword(this.mqttProperties.getPassword().toCharArray()); } connectOptions.setKeepAliveInterval(60); return connectOptions; } @Bean public IMqttClient mqttClient(MqttConnectOptions options) throws MqttException { IMqttClient mqttClient = new MqttClient(this.mqttProperties.getUrl(), this.mqttProperties.getClientId()); mqttClient.connect(options); for(int i = 0; i< this.mqttProperties.getTopic().length; ++i) { mqttClient.subscribe(this.mqttProperties.getTopic()[i], new MqttSubscribeListener()); } return mqttClient; } }
創(chuàng)建 訂閱事件類
import org.springframework.context.ApplicationEvent; public class UWBMqttSubscribeEvent extends ApplicationEvent { private String topic; public UWBMqttSubscribeEvent(String topic, Object source) { super(source); this.topic = topic; } public String getTopic() { return this.topic; } }
創(chuàng)建訂閱事件監(jiān)聽器
import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springblade.core.tool.utils.SpringUtil; import org.springblade.ubw.event.UWBMqttSubscribeEvent; public class MqttSubscribeListener implements IMqttMessageListener { @Override public void messageArrived(String s, MqttMessage mqttMessage) { String content = new String(mqttMessage.getPayload()); UWBMqttSubscribeEvent event = new UWBMqttSubscribeEvent(s, content); SpringUtil.publishEvent(event); } }
創(chuàng)建mqtt消息事件異步處理監(jiān)聽器
import com.baomidou.mybatisplus.core.toolkit.StringPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springblade.core.tool.utils.Func; import org.springblade.ubw.config.MqttProperties; import org.springblade.ubw.event.UWBMqttSubscribeEvent; import org.springblade.ubw.service.MqttService; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import javax.annotation.Resource; import java.util.Arrays; import java.util.List; @Configuration public class MqttEventListener { private static final Logger log = LoggerFactory.getLogger(MqttEventListener.class); @Resource private MqttProperties mqttProperties; @Resource private MqttService mqttService; private String processTopic (String topic) { List<String> topics = Arrays.asList(mqttProperties.getTopic()); for (String wild : topics) { wild = wild.replace(StringPool.HASH, StringPool.EMPTY); if (topic.startsWith(wild)) { return topic.replace(wild, StringPool.EMPTY); } } return StringPool.EMPTY; } @Async @EventListener(UWBMqttSubscribeEvent.class) public void listen (UWBMqttSubscribeEvent event) { String topic = processTopic(event.getTopic()); Object source = event.getSource(); if (Func.isEmpty(source)) { return; } mqttService.issue(topic,source); // log.info("mqtt接收到 通道 {} 的信息為:{}",topic,source); } }
創(chuàng)建MqttService 數(shù)據(jù)處理服務(wù)類
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springblade.core.tool.utils.Func; import org.springblade.ubw.area.entity.WorkArea; import org.springblade.ubw.area.entity.WorkSite; import org.springblade.ubw.area.entity.WorkSiteNeighbourInfo; import org.springblade.ubw.area.entity.WorkSitePassInfo; import org.springblade.ubw.area.service.WorkAreaService; import org.springblade.ubw.area.service.WorkSiteNeighbourInfoService; import org.springblade.ubw.area.service.WorkSitePassInfoService; import org.springblade.ubw.area.service.WorkSiteService; import org.springblade.ubw.constant.UbwConstant; import org.springblade.ubw.history.entity.HistoryLocusInfo; import org.springblade.ubw.history.entity.HistoryOverTimeSosAlarmInfo; import org.springblade.ubw.history.service.HistoryLocusInfoService; import org.springblade.ubw.history.service.HistoryOverTimeSosAlarmInfoService; import org.springblade.ubw.loc.entity.LocStatusInfo; import org.springblade.ubw.loc.entity.LocStatusInfoHistory; import org.springblade.ubw.loc.service.LocStatusInfoHistoryService; import org.springblade.ubw.loc.service.LocStatusInfoService; import org.springblade.ubw.msg.entity.*; import org.springblade.ubw.msg.service.*; import org.springblade.ubw.system.entity.*; import org.springblade.ubw.system.service.*; import org.springblade.ubw.system.wrapper.MqttWrapper; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.List; import java.util.stream.Collectors; @Service public class MqttService { private static final Logger log = LoggerFactory.getLogger(MqttService.class); @Resource private EmployeeAndDepartmentService employeeAndDepartmentService; @Resource private VehicleInfoService vehicleInfoService; @Resource private WorkSiteService workSiteService; @Resource private LocStatusInfoService locStatusInfoService; @Resource private LocStatusInfoHistoryService locStatusInfoHistoryService; @Resource private LocOverTimeSosAlarminfoService locOverTimeSosAlarminfoService; @Resource private LocAreaOverSosAlarminfoService locAreaOverSosAlarmInfoService; @Resource private LocSosAlarminfoService locSosAlarminfoService; @Resource private AttendanceInfoService attendanceInfoService; @Resource private HistoryLocusInfoService historyLocusInfoService; @Resource private WorkSitePassInfoService workSitePassInfoService; @Resource private EnvironmentalMonitorInfoService environmentalMonitorInfoService; @Resource private TrAlertService trAlertService; @Resource private AddEvacuateInfoService addEvacuateInfoService; @Resource private CancelEvacuateInfoService cancelEvacuateInfoService; @Resource private WorkSiteNeighbourInfoService workSiteNeighbourInfoService; @Resource private LinkMsgAlarmInfoService linkMsgAlarmInfoService; @Resource private LeaderEmployeeInfoService leaderEmployeeInfoService; @Resource private ElectricMsgInfoService electricMsgInfoService; @Resource private WorkAreaService workAreaService; @Resource private HistoryOverTimeSosAlarmInfoService historyOverTimeSosAlarmInfoService; @Resource private SpecialWorksService specialWorksService; @Resource private AttendanceLocusInfoService attendanceLocusInfoService; @Resource private WorkTypeService workTypeService; @Resource private OfficePositionService officePositionService; @Resource private ClassTeamService classTeamService; /** * 方法描述: 消息分發(fā) * * @param topic * @param source * @author liwenbin * @date 2021年12月14日 14:14:09 */ public void issue(String topic,Object source){ switch(topic){ case UbwConstant.TOPIC_EMP : //人員和部門信息 employeeAndDepartmentService.saveBatch(source); break; case UbwConstant.TOPIC_VEHICLE : //車輛信息 List<VehicleInfo> vehicleInfos = MqttWrapper.build().toEntityList(source,new VehicleInfo()); vehicleInfoService.deleteAll(); vehicleInfoService.saveBatch(vehicleInfos); break; case UbwConstant.TOPIC_WORK_SITE : //基站信息 List<WorkSite> workSites = MqttWrapper.build().toEntityList(source,new WorkSite()); workSiteService.deleteAll(); workSiteService.saveBatch(workSites); break; case UbwConstant.TOPIC_LOC_STATUS: //井下車輛人員實(shí)時 List<LocStatusInfo> locStatusInfos = MqttWrapper.build().toEntityList(source,new LocStatusInfo()); if (Func.isEmpty(locStatusInfos)){ break; } locStatusInfoService.deleteAll(); //篩選入井人員列表 List<LocStatusInfo> inWellList = locStatusInfos.stream().filter(s -> s.getIsInWell() == 1).collect(Collectors.toList()); locStatusInfoService.saveBatch(inWellList); //人員歷史數(shù)據(jù)入庫 List<LocStatusInfoHistory> locStatusInfoHistorys = MqttWrapper.build().toEntityList(source,new LocStatusInfoHistory()); locStatusInfoHistoryService.saveBatch(locStatusInfoHistorys); break; case UbwConstant.TOPIC_LOC_OVER_TIME: //超時報警信息 List<LocOverTimeSosAlarminfo> locOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocOverTimeSosAlarminfo()); locOverTimeSosAlarminfoService.saveBatch(locOverTimeSosAlarmInfos); break; case UbwConstant.TOPIC_LOC_OVER_AREA: //超員報警信息 List<LocAreaOverSosAlarminfo> locAreaOverSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocAreaOverSosAlarminfo()); locAreaOverSosAlarmInfoService.saveBatch(locAreaOverSosAlarmInfos); break; case UbwConstant.TOPIC_LOC_SOS: //求救報警信息 List<LocSosAlarminfo> locSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocSosAlarminfo()); locSosAlarminfoService.saveBatch(locSosAlarmInfos); break; case UbwConstant.TOPIC_ATTEND: //考勤信息 List<AttendanceInfo> attendanceInfos = MqttWrapper.build().toEntityList(source,new AttendanceInfo()); attendanceInfoService.saveBatch(attendanceInfos); break; case UbwConstant.TOPIC_HISTORY_LOCUS: //精確軌跡信息 List<HistoryLocusInfo> historyLocusInfos = MqttWrapper.build().toEntityList(source,new HistoryLocusInfo()); historyLocusInfoService.saveBatch(historyLocusInfos); break; case UbwConstant.TOPIC_WORK_SITE_PASS: //基站經(jīng)過信息 List<WorkSitePassInfo> workSitePassInfos = MqttWrapper.build().toEntityList(source,new WorkSitePassInfo()); workSitePassInfoService.saveBatch(workSitePassInfos); break; case UbwConstant.TOPIC_ENV_MON: //環(huán)境監(jiān)測信息 List<EnvironmentalMonitorInfo> environmentalMonitorInfos = MqttWrapper.build().toEntityList(source,new EnvironmentalMonitorInfo()); environmentalMonitorInfoService.saveBatch(environmentalMonitorInfos); break; case UbwConstant.TOPIC_TR_ALERT: //環(huán)境監(jiān)測報警信息 List<TrAlert> trAlerts = MqttWrapper.build().toEntityList(source,new TrAlert()); trAlertService.saveBatch(trAlerts); break; case UbwConstant.TOPIC_ADD_EVA: //下發(fā)撤離信息 List<AddEvacuateInfo> addEvacuateInfos = MqttWrapper.build().toEntityList(source,new AddEvacuateInfo()); addEvacuateInfoService.saveBatch(addEvacuateInfos); break; case UbwConstant.TOPIC_CANCEL_EVA: //取消撤離信息 List<CancelEvacuateInfo> cancelEvacuateInfos = MqttWrapper.build().toEntityList(source,new CancelEvacuateInfo()); cancelEvacuateInfoService.saveBatch(cancelEvacuateInfos); break; case UbwConstant.TOPIC_WORK_SITE_NEI: //相鄰基站關(guān)系信息 workSiteNeighbourInfoService.deleteAll(); List<WorkSiteNeighbourInfo> workSiteNeighbourInfos = MqttWrapper.build().toEntityList(source,new WorkSiteNeighbourInfo()); workSiteNeighbourInfoService.saveBatch(workSiteNeighbourInfos); break; case UbwConstant.TOPIC_LINK_MSG: //基站鏈路信息 linkMsgAlarmInfoService.deleteAll(); List<LinkMsgAlarmInfo> linkMsgAlarmInfos = MqttWrapper.build().toEntityList(source,new LinkMsgAlarmInfo()); linkMsgAlarmInfoService.saveBatch(linkMsgAlarmInfos); break; case UbwConstant.TOPIC_LEADER_EMP: //帶班領(lǐng)導(dǎo)信息 leaderEmployeeInfoService.deleteAll(); List<LeaderEmployeeInfo> leaderEmployeeInfos = MqttWrapper.build().toEntityList(source,new LeaderEmployeeInfo()); leaderEmployeeInfoService.saveBatch(leaderEmployeeInfos); break; case UbwConstant.TOPIC_ELE_MSG: //低電報警信息 List<ElectricMsgInfo> electricMsgInfos = MqttWrapper.build().toEntityList(source,new ElectricMsgInfo()); electricMsgInfoService.saveBatch(electricMsgInfos); break; case UbwConstant.TOPIC_WORK_AREA: //區(qū)域信息 workAreaService.deleteAll(); List<WorkArea> workAreas = MqttWrapper.build().toEntityList(source,new WorkArea()); workAreaService.saveBatch(workAreas); break; case UbwConstant.TOPIC_HIS_OVER_TIME_SOS: //歷史超時報警信息 List<HistoryOverTimeSosAlarmInfo> historyOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source,new HistoryOverTimeSosAlarmInfo()); historyOverTimeSosAlarmInfoService.saveBatch(historyOverTimeSosAlarmInfos); break; case UbwConstant.TOPIC_SPECIAL_WORK: //特種人員預(yù)設(shè)線路信息 specialWorksService.deleteAll(); List<SpecialWorks> specialWorks = MqttWrapper.build().toEntityList(source,new SpecialWorks()); specialWorksService.saveBatch(specialWorks); break; case UbwConstant.TOPIC_ATTEND_LOC: //歷史考勤軌跡信息 List<AttendanceLocusInfo> attendanceLocusInfos = MqttWrapper.build().toEntityList(source,new AttendanceLocusInfo()); attendanceLocusInfoService.saveBatch(attendanceLocusInfos); break; case UbwConstant.TOPIC_WORK_TYPE: //工種信息 workTypeService.deleteAll(); List<WorkType> workTypes = MqttWrapper.build().toEntityList(source,new WorkType()); workTypeService.saveBatch(workTypes); break; case UbwConstant.TOPIC_OFFICE_POS: //職務(wù)信息 officePositionService.deleteAll(); List<OfficePosition> officePositions = MqttWrapper.build().toEntityList(source,new OfficePosition()); officePositionService.saveBatch(officePositions); break; case UbwConstant.TOPIC_CLASS_TEAM: //班組信息 classTeamService.deleteAll(); List<ClassTeam> classTeams = MqttWrapper.build().toEntityList(source,new ClassTeam()); classTeamService.saveBatch(classTeams); break; default : //可選 break; } } }
完結(jié),小伙伴們,可以根據(jù)這個demo 改造自己的mqtt服務(wù)處理?。?!
以上就是Springboot整合mqtt服務(wù)的示例代碼的詳細(xì)內(nèi)容,更多關(guān)于Springboot整合mqtt的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot2整合JTA組件實(shí)現(xiàn)多數(shù)據(jù)源事務(wù)管理
這篇文章主要介紹了SpringBoot2整合JTA組件實(shí)現(xiàn)多數(shù)據(jù)源事務(wù)管理,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03java實(shí)現(xiàn)將ftp和http的文件直接傳送到hdfs
前面幾篇文章,我們已經(jīng)做了很好的鋪墊了,幾個要用到的工具我們都做了出來,本文就是將他們集合起來,說下具體的用法,小伙伴們可以參考下。2015-03-03Spring Boot如何使用JDBC獲取相關(guān)的數(shù)據(jù)詳解
這篇文章主要給大家介紹了關(guān)于Spring Boot如何使用JDBC獲取相關(guān)數(shù)據(jù)的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03SpringBoot默認(rèn)使用HikariDataSource數(shù)據(jù)源方式
這篇文章主要介紹了SpringBoot默認(rèn)使用HikariDataSource數(shù)據(jù)源方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10如何使用Idea搭建全注解式開發(fā)的SpringMVC項目
這篇文章主要介紹了如何使用Idea搭建全注解式開發(fā)的SpringMVC項目,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-03-03springboot2.0和springcloud Finchley版項目搭建(包含eureka,gateWay,F(xiàn)re
這篇文章主要介紹了springboot2.0和springcloud Finchley版項目搭建(包含eureka,gateWay,F(xiàn)reign,Hystrix),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2019-05-05