Storm框架整合springboot的方法
Storm:最火的流式處理框架
伴隨著信息科技日新月異的發(fā)展,信息呈現(xiàn)出爆發(fā)式的膨脹,人們獲取信息的途徑也更加多樣、更加便捷,同時(shí)對(duì)于信息的時(shí)效性要求也越來(lái)越高。舉個(gè)搜索場(chǎng)景中的例子,當(dāng)一個(gè)賣(mài)家發(fā)布了一條寶貝信息時(shí),他希望的當(dāng)然是這個(gè)寶貝馬上就可以被賣(mài)家搜索出來(lái)、點(diǎn)擊、購(gòu)買(mǎi)啦,相反,如果這個(gè)寶貝要等到第二天或者更久才可以被搜出來(lái),估計(jì)這個(gè)大哥就要罵娘了。再舉一個(gè)推薦的例子,如果用戶(hù)昨天在淘寶上買(mǎi)了一雙襪子,今天想買(mǎi)一副泳鏡去游泳,但是卻發(fā)現(xiàn)系統(tǒng)在不遺余力地給他推薦襪子、鞋子,根本對(duì)他今天尋找泳鏡的行為視而不見(jiàn),估計(jì)這哥們心里就會(huì)想推薦你妹呀。其實(shí)稍微了解點(diǎn)背景知識(shí)的碼農(nóng)們都知道,這是因?yàn)楹笈_(tái)系統(tǒng)做的是每天一次的全量處理,而且大多是在夜深人靜之時(shí)做的,那么你今天白天做的事情當(dāng)然要明天才能反映出來(lái)啦。
•實(shí)現(xiàn)一個(gè)實(shí)時(shí)計(jì)算系統(tǒng)
全量數(shù)據(jù)處理使用的大多是鼎鼎大名的hadoop或者h(yuǎn)ive,作為一個(gè)批處理系統(tǒng),hadoop以其吞吐量大、自動(dòng)容錯(cuò)等優(yōu)點(diǎn),在海量數(shù)據(jù)處理上得到了廣泛的使用。但是,hadoop不擅長(zhǎng)實(shí)時(shí)計(jì)算,因?yàn)樗烊痪褪菫榕幚矶?,這也是業(yè)界一致的共識(shí)。否則最近這兩年也不會(huì)有s4,storm,puma這些實(shí)時(shí)計(jì)算系統(tǒng)如雨后春筍般冒出來(lái)啦。先拋開(kāi)s4,storm,puma這些系統(tǒng)不談,我們首先來(lái)看一下,如果讓我們自己設(shè)計(jì)一個(gè)實(shí)時(shí)計(jì)算系統(tǒng),我們要解決哪些問(wèn)題。
1.低延遲。都說(shuō)了是實(shí)時(shí)計(jì)算系統(tǒng)了,延遲是一定要低的。
2.高性能。性能不高就是浪費(fèi)機(jī)器,浪費(fèi)機(jī)器是要受批評(píng)的哦。
3.分布式。系統(tǒng)都是為應(yīng)用場(chǎng)景而生的,如果你的應(yīng)用場(chǎng)景、你的數(shù)據(jù)和計(jì)算單機(jī)就能搞定,那么不用考慮這些復(fù)雜的問(wèn)題了。我們所說(shuō)的是單機(jī)搞不定的情況。
4.可擴(kuò)展。伴隨著業(yè)務(wù)的發(fā)展,我們的數(shù)據(jù)量、計(jì)算量可能會(huì)越來(lái)越大,所以希望這個(gè)系統(tǒng)是可擴(kuò)展的。
5.容錯(cuò)。這是分布式系統(tǒng)中通用問(wèn)題。一個(gè)節(jié)點(diǎn)掛了不能影響我的應(yīng)用。
好,如果僅僅需要解決這5個(gè)問(wèn)題,可能會(huì)有無(wú)數(shù)種方案,而且各有千秋,隨便舉一種方案,使用消息隊(duì)列+分布在各個(gè)機(jī)器上的工作進(jìn)程就ok啦。我們?cè)倮^續(xù)往下看。
1.容易在上面開(kāi)發(fā)應(yīng)用程序。親,你設(shè)計(jì)的系統(tǒng)需要應(yīng)用程序開(kāi)發(fā)人員考慮各個(gè)處理組件的分布、消息的傳遞嗎?如果是,那有點(diǎn)麻煩啊,開(kāi)發(fā)人員可能會(huì)用不好,也不會(huì)想去用。
2.消息不丟失。用戶(hù)發(fā)布的一個(gè)寶貝消息不能在實(shí)時(shí)處理的時(shí)候給丟了,對(duì)吧?更嚴(yán)格一點(diǎn),如果是一個(gè)精確數(shù)據(jù)統(tǒng)計(jì)的應(yīng)用,那么它處理的消息要不多不少才行。這個(gè)要求有點(diǎn)高哦。
誕 生
在2011年Storm開(kāi)源之前,由于Hadoop的火紅,整個(gè)業(yè)界都在喋喋不休地談?wù)摯髷?shù)據(jù)。Hadoop的高吞吐,海量數(shù)據(jù)處理的能力使得人們可以方便地處理海量數(shù)據(jù)。但是,Hadoop的缺點(diǎn)也和它的優(yōu)點(diǎn)同樣鮮明——延遲大,響應(yīng)緩慢,運(yùn)維復(fù)雜。
有需求也就有創(chuàng)造,在Hadoop基本奠定了大數(shù)據(jù)霸主地位的時(shí)候,很多的開(kāi)源項(xiàng)目都是以彌補(bǔ)Hadoop的實(shí)時(shí)性為目標(biāo)而被創(chuàng)造出來(lái)。而在這個(gè)節(jié)骨眼上Storm橫空出世了。
Storm帶著流式計(jì)算的標(biāo)簽華麗麗滴出場(chǎng)了,看看它的一些賣(mài)點(diǎn):
•分布式系統(tǒng):可橫向拓展,現(xiàn)在的項(xiàng)目不帶個(gè)分布式特性都不好意思開(kāi)源。
•運(yùn)維簡(jiǎn)單:Storm的部署的確簡(jiǎn)單。雖然沒(méi)有Mongodb的解壓即用那么簡(jiǎn)單,但是它也就是多安裝兩個(gè)依賴(lài)庫(kù)而已。
•高度容錯(cuò):模塊都是無(wú)狀態(tài)的,隨時(shí)宕機(jī)重啟。
•無(wú)數(shù)據(jù)丟失:Storm創(chuàng)新性提出的ack消息追蹤框架和復(fù)雜的事務(wù)性處理,能夠滿(mǎn)足很多級(jí)別的數(shù)據(jù)處理需求。不過(guò),越高的數(shù)據(jù)處理需求,性能下降越嚴(yán)重。
•多語(yǔ)言:實(shí)際上,Storm的多語(yǔ)言更像是臨時(shí)添加上去似的。因?yàn)椋愕奶峤徊糠诌€是要使用Java實(shí)現(xiàn)。
下面介紹下Storm框架整合springboot的方法
我們知道Storm本身是一個(gè)獨(dú)立運(yùn)行的分布式流式數(shù)據(jù)處理框架,Springboot也是一個(gè)獨(dú)立運(yùn)行的web框架。那么如何在Strom框架中集成Springboot使得我們能夠在Storm開(kāi)發(fā)中運(yùn)用Spring的Ioc容器及其他如Spring Jpa等功能呢?我們先來(lái)了解以下概念:
•Storm主要的三個(gè)Component:Topology、Spout、Bolt。Topology作為主進(jìn)程控制著spout、bolt線(xiàn)程的運(yùn)行,他們相當(dāng)于獨(dú)立運(yùn)行的容器分布于storm集群中的各個(gè)機(jī)器節(jié)點(diǎn)。
•SpringApplication:是配置Spring應(yīng)用上下文的起點(diǎn)。通過(guò)調(diào)用SpringApplication.run()方法它將創(chuàng)建ApplicationContext實(shí)例,這是我們能夠使用Ioc容器的主要BeanFactory。之后Spring將會(huì)加載所有單例模式的beans,并啟動(dòng)后臺(tái)運(yùn)行的CommandLineRunner beans等。
•ApplicationContextAware:這是我們能夠在普通Java類(lèi)中調(diào)用Spring容器里的beans的關(guān)鍵接口。

實(shí)現(xiàn)原理
Storm框架中的每個(gè)Spout和Bolt都相當(dāng)于獨(dú)立的應(yīng)用,Strom在啟動(dòng)spout和bolt時(shí)提供了一個(gè)open方法(spout)和prepare方法(bolt)。我們可以把初始化Spring應(yīng)用的操作放在這里,這樣可以保證每個(gè)spout/bolt應(yīng)用在后續(xù)執(zhí)行過(guò)程中都能獲取到Spring的ApplicationContext,有了ApplicationContext實(shí)例對(duì)象,Spring的所有功能就都能用上了。
•Spout.open方法實(shí)現(xiàn)
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
//啟動(dòng)Springboot應(yīng)用
SpringStormApplication.run();
this.map = map;
this.topologyContext = topologyContext;
this.spoutOutputCollector = spoutOutputCollector;
}
•Bolt.prepare方法實(shí)現(xiàn)
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
//啟動(dòng)Springboot應(yīng)用
SpringStormApplication.run();
this.map = map;
this.topologyContext = topologyContext;
this.outputCollector = outputCollector;
}
•SpringStormApplication啟動(dòng)類(lèi)
@SpringBootApplication
@ComponentScan(value = "com.xxx.storm")
public class SpringStormApplication {
/**
* 非工程啟動(dòng)入口,所以不用main方法
* @param args
*/
public static void run(String ...args) {
SpringApplication app = new SpringApplication(SpringStormApplication.class);
//我們并不需要web servlet功能,所以設(shè)置為WebApplicationType.NONE
app.setWebApplicationType(WebApplicationType.NONE);
//忽略掉banner輸出
app.setBannerMode(Banner.Mode.OFF);
//忽略Spring啟動(dòng)信息日志
app.setLogStartupInfo(false);
app.run(args);
}
}
與我們傳統(tǒng)的Springboot應(yīng)用啟動(dòng)入口稍微有點(diǎn)區(qū)別,主要禁用了web功能,看下正常的啟動(dòng)方式:
@SpringBootApplication
@ComponentScan(value = "com.xxx.web")
public class PlatformApplication {
public static void main(String[] args) {
SpringApplication.run(PlatformApplication.class, args);
}
}
•在spout/bolt中調(diào)用了SpringStormApplication.run方法后,我們還需要能夠拿到ApplicationContext容器對(duì)象,這時(shí)候我們還需要實(shí)現(xiàn)ApplicationContextAware接口,寫(xiě)個(gè)工具類(lèi)BeanUtils:
@Component
public class BeanUtils implements ApplicationContextAware {
private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (BeanUtils.applicationContext == null) {
BeanUtils.applicationContext = applicationContext;
}
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
public static Object getBean(String name) {
return getApplicationContext().getBean(name);
}
public static <T> T getBean(Class<T> clazz) {
return getApplicationContext().getBean(clazz);
}
public static <T> T getBean(String name, Class<T> clazz) {
return getApplicationContext().getBean(name, clazz);
}
}
通過(guò)@Component注解使得Spring在啟動(dòng)時(shí)能夠掃描到該bean,因?yàn)锽eanUtils實(shí)現(xiàn)了ApplicationContextAware接口,Spring會(huì)在啟動(dòng)成功時(shí)自動(dòng)調(diào)用BeanUtils.setApplicationContext方法,將ApplicationContext對(duì)象保存到工具類(lèi)的靜態(tài)變量中,之后我們就可以使用BeanUtils.getBean()去獲取Spring容器中的bean了。
寫(xiě)個(gè)簡(jiǎn)單例子
•在FilterBolt的execute方法中獲取Spring bean
@Override
public void execute(Tuple tuple) {
FilterService filterService = (FilterService) BeanUtils.getBean("filterService");
filterService.deleteAll();
}
•定義FilterService類(lèi),這時(shí)候我們就可以使用Spring的相關(guān)注解,自動(dòng)注入,Spring Jpa等功能了。
@Service("filterService")
public class FilterService {
@Autowired
UserRepository userRepository;
public void deleteAll() {
userRepository.deleteAll();
}
}
將storm應(yīng)用作為Springboot工程的一個(gè)子模塊
工程主目錄的pom文件還是springboot相關(guān)的依賴(lài),在storm子模塊中引入storm依賴(lài),這時(shí)候啟動(dòng)Strom的topology應(yīng)用會(huì)有一個(gè)日志包依賴(lài)沖突。
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Applications/IntelliJ%20IDEA.app/Contents/bin/~/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.11.1/log4j-slf4j-impl-2.11.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Applications/IntelliJ%20IDEA.app/Contents/bin/~/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
我們需要在storm子模塊的pom文件中重寫(xiě)org.springframework.boot:spring-boot-starter包依賴(lài),將Springboot的相關(guān)日志包排除掉,如下:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <exclusions> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j2</artifactId> </exclusion> <exclusion> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic2</artifactId> </exclusion> </exclusions> </dependency>
總結(jié)
以上所述是小編給大家介紹的Storm框架整合springboot的方法,希望對(duì)大家有所幫助,如果大家有任何疑問(wèn)請(qǐng)給我留言,小編會(huì)及時(shí)回復(fù)大家的。在此也非常感謝大家對(duì)腳本之家網(wǎng)站的支持!
- Spring boot集成Kafka+Storm的示例代碼
- spring boot整合mybatis利用Mysql實(shí)現(xiàn)主鍵UUID的方法
- SpringBoot整合Elasticsearch并實(shí)現(xiàn)CRUD操作
- SpringBoot整合Shiro實(shí)現(xiàn)登錄認(rèn)證的方法
- spring boot整合Shiro實(shí)現(xiàn)單點(diǎn)登錄的示例代碼
- spring boot整合CAS Client實(shí)現(xiàn)單點(diǎn)登陸驗(yàn)證的示例
- spring boot整合quartz實(shí)現(xiàn)多個(gè)定時(shí)任務(wù)的方法
- 淺談Springboot整合RocketMQ使用心得
相關(guān)文章
Spring?Data?Jpa?復(fù)雜查詢(xún)方式總結(jié)(多表關(guān)聯(lián)及自定義分頁(yè))
這篇文章主要介紹了Spring?Data?Jpa?復(fù)雜查詢(xún)方式總結(jié)(多表關(guān)聯(lián)及自定義分頁(yè)),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02
IDEA 自定義方法注解模板的實(shí)現(xiàn)方法
這篇文章主要介紹了IDEA 自定義方法注解模板的實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09
一篇文章帶你搞懂Java線(xiàn)程池實(shí)現(xiàn)原理
線(xiàn)程池?zé)o論是工作還是面試都是必備的技能,但是很多人對(duì)于線(xiàn)程池的實(shí)現(xiàn)原理卻一知半解,并不了解線(xiàn)程池內(nèi)部的工作原理,今天就帶大家一塊剖析線(xiàn)程池底層實(shí)現(xiàn)原理2022-11-11
基于Java實(shí)現(xiàn)收發(fā)電子郵件功能
Email就是電子郵件,我們平常使用的QQ郵箱,網(wǎng)易郵箱,F(xiàn)oxmail都是用來(lái)收發(fā)郵件的,利用Java程序也可以完成收發(fā)電子郵件的功能,本文就來(lái)為大家詳細(xì)講講實(shí)現(xiàn)步驟2022-07-07
Spring?Cloud?Alibaba負(fù)載均衡實(shí)現(xiàn)方式
這篇文章主要為大家介紹了Spring?Cloud?Alibaba負(fù)載均衡實(shí)現(xiàn)方式詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-10-10

