原創(chuàng)|使用教程|編輯:陳俊吉|2016-07-13 10:17:20.000|閱讀 498 次
概述:IBM InfoSphere Streams是一個(gè)高級(jí)計(jì)算平臺(tái),幫助用戶開發(fā)的應(yīng)用程序快速攝取、分析和關(guān)聯(lián)來自數(shù)千個(gè)實(shí)時(shí)源的信息。該解決方案可處理非常高的數(shù)據(jù)吞吐率,最高可達(dá)每秒數(shù)百萬個(gè)事件或消息。
# 界面/圖表報(bào)表/文檔/IDE等千款熱門軟控件火熱銷售中 >>
相關(guān)鏈接:
是一個(gè)高級(jí)計(jì)算平臺(tái),幫助用戶開發(fā)的應(yīng)用程序快速攝取、分析和關(guān)聯(lián)來自數(shù)千個(gè)實(shí)時(shí)源的信息。該解決方案可處理非常高的數(shù)據(jù)吞吐率,最高可達(dá)每秒數(shù)百萬個(gè)事件或消息。該平臺(tái)支持流數(shù)據(jù)的實(shí)時(shí)處理,支持不斷更新持續(xù)查詢的結(jié)果,可在仍在移動(dòng)的數(shù)據(jù)流中檢測(cè)洞察。Streams旨在從一個(gè)幾分鐘到幾小時(shí)的窗口中的移動(dòng)信息(數(shù)據(jù)流)中揭示有意義的模式。該平臺(tái)能夠獲取低延遲洞察,并為注重時(shí)效的應(yīng)用程序(比如欺詐檢測(cè)或網(wǎng)絡(luò)管理)獲取更好的成果,從而提供業(yè)務(wù)價(jià)值。流處理的演示如下圖所示:
Streams 的主要設(shè)計(jì)目的是:
提供了一種編程模型和 IDE 來定義數(shù)據(jù)來源,還提供了已融合到處理執(zhí)行單元中的稱為運(yùn)算符的軟件分析模塊。它還提供了基礎(chǔ)架構(gòu)來支持從這些組件合成可擴(kuò)展的流處理應(yīng)用程序。主要平臺(tái)組件包括:
Streams Processing Language (SPL),Streams 的編程語言,是一種分布式數(shù)據(jù)流合成語言。它是一種類似 C++ 或 Java™ 的可擴(kuò)展且全功能的語言,支持用戶定義的數(shù)據(jù)類型。您可以使用 SPL 或原生語言(C++ 或Java)編寫自定義函數(shù)。也可以使用 C++ 或 Java 編寫用戶定義的運(yùn)算符。
Streams 通過SPL將應(yīng)用程序會(huì)描述一個(gè)導(dǎo)向圖,該圖由各個(gè)互聯(lián)且處理多個(gè)數(shù)據(jù)流的運(yùn)算符組成。數(shù)據(jù)流可來自系統(tǒng)外部,或者在應(yīng)用程序內(nèi)部生成。SPL 程序的基本構(gòu)建塊包括:
Java 作為面向?qū)ο蟮母呒?jí)編程語言,以其使用簡(jiǎn)單、完全面象對(duì)象、平臺(tái)可移植性、健壯的沙盒安全機(jī)制、動(dòng)態(tài)性,以及大量可用的開發(fā)包等一系列優(yōu)勢(shì),在互聯(lián)網(wǎng)分布式環(huán)境下得到了極其廣泛的應(yīng)用,具有廣泛的用戶基礎(chǔ)。為了Streams用戶重用已有的Java開發(fā)技能、保護(hù)已有的Java資產(chǎn),IBM Streams平臺(tái)提供了使用 Java 編程語言來構(gòu)建 Streams 應(yīng)用程序的框架,具體包括 Java 運(yùn)算符模型描述文件以及 Java 運(yùn)算符 API(JavaOp)兩種方式。這兩種方式在一定程度上讓開發(fā)人員集成Java功能模塊。
雖然Streams所提供的Java運(yùn)算符模型描述文件以及Java運(yùn)算符API(JavaOp)方式支持了Java代碼調(diào)用,但是,傳統(tǒng)的Java是面向?qū)ο蟮木幊陶Z言,它只能幫助開發(fā)人員實(shí)現(xiàn)業(yè)務(wù)邏輯或重用Java代碼,但它無法以“流處理”的思維,直接進(jìn)行類似SPL的流應(yīng)用開發(fā)。
streamsx.topology開源項(xiàng)目的出現(xiàn),豐富了Streams的開發(fā)方式,為流應(yīng)用的開發(fā)者提供更多的語言選擇。streamsx.topology項(xiàng)目提供Java Application API,面向流處理應(yīng)用的將Java封裝成一套類庫(kù),使得開發(fā)者完全使用Java和Scala語言并按照“流處理”的思維創(chuàng)建IBM Streams流處理應(yīng)用。
streamsx.topology開源項(xiàng)目參考網(wǎng)址:
//ibmstreams.github.io/streamsx.topology/
1. 從www.ibm.com/software/data/infosphere/stream-computing/trials.html下載“IBM InfoSphere Streams 4.0 Java API BetaQuickStart VM Image”。Streams Quick StartEdition 是 InfoSphere Streams 的一個(gè)免費(fèi)的、可下載的非生產(chǎn)版本,它沒有數(shù)據(jù)或時(shí)間限制,支持您在自己的獨(dú)特環(huán)境中試驗(yàn)流計(jì)算,構(gòu)建一個(gè)強(qiáng)大的分析平臺(tái)。該平臺(tái)能夠處理難以置信的高數(shù)據(jù)吞吐量,高達(dá)每秒數(shù)百萬個(gè)事件或消息。InfoSphere Streams QuickStart Edition 沒有提供支持選項(xiàng),僅適用于非生產(chǎn)用途。要獲得相應(yīng)的支持,請(qǐng)購(gòu)買 InfoSphereStreams。
2.解壓VM鏡像,并在VMPlayer啟動(dòng)VM。
該VM已經(jīng)安裝com.ibm.streamsx.topology工具箱,工具箱位于/home/streamsadmin/streamx.topology/streamsx.topology,包含:
1)在桌面雙擊InfoSphere Streams Studio (Eclipse)圖標(biāo)啟動(dòng)Streams Studio.
2)指定workspace為:/home/streamsadmin/Workspaces/topology/
3) 運(yùn)行"Hello World" 示例程序:在Project Explorer標(biāo)簽, 打開src->simple->HelloWorld->HelloWorld.java,代碼如下:
package simple; import com.ibm.streamsx.topology.TStream; import com.ibm.streamsx.topology.Topology; import com.ibm.streamsx.topology.context.StreamsContextFactory; publicclass HelloWorld { publicstaticvoid main(String[] args) throws Exception { /* * Create the container for the topology that will * hold the streams of tuples. */ Topology topology = new Topology("HelloWorld"); /* * Declare a source stream (hw) with String tuples containing two tuples, * "Hello" and "World!". */ TStream<String> hw = topology.strings("Hello", "World!"); /* * Sink hw by printing each of its tuples to System.out. */ hw.print(); if (args.length == 0) StreamsContextFactory.getEmbedded().submit(topology).get(); else StreamsContextFactory.getStreamsContext(args[0]).submit(topology) .get(); } } |
4) 運(yùn)行"Hello World" 示例程序:右擊HelloWorld.java,選擇Run As-> Run Configurations. 在Run Configurations 'Main' 標(biāo)簽頁(yè)面,確保Main class填 simple.HelloWorld. 在 arguments標(biāo)簽頁(yè)面, 設(shè)置Program arguments為EMBEDDED (EMBEDDED表示程序獨(dú)立編譯并嵌入到JVM運(yùn)行,而不依賴Streams運(yùn)行時(shí)環(huán)境)。
5) 設(shè)置必要參數(shù)后,運(yùn)行該應(yīng)用您會(huì)看到以下的輸出:
Hello
world!
我們創(chuàng)建一個(gè)名叫MyGrep的Sample應(yīng)用,用于指導(dǎo)關(guān)鍵字搜索某個(gè)文件夾下的文件,搜索到則顯示相應(yīng)內(nèi)容所在的行數(shù)和內(nèi)容。具體步驟如下:
1)創(chuàng)建Java項(xiàng)目: File->New->Project->Java->JavaProject,點(diǎn)擊Next,在Create a Java Project填寫MySamples,點(diǎn)擊Next。
2)在Libraies標(biāo)簽頁(yè):
點(diǎn)擊External Jar按鈕,選擇com.ibm.streams.topology.jar
點(diǎn)擊Add Library按鈕,選擇IBM InfoSphere Streams
點(diǎn)擊Next和Finish完成項(xiàng)目的創(chuàng)建。新創(chuàng)建項(xiàng)目視圖如下圖所示:
3)創(chuàng)建命名空間:右擊src->New->Package->JavaPackage的Name填寫:mysapce
4)創(chuàng)建Java主類:src->右擊myspace->New->Class,在Name填寫:mysapce,確保勾選“public static void main(String[]args)”。確定后生成MyGrep.java。
5)創(chuàng)建Java類:src->右擊myspace->New->Class,在Name填寫:GrepInfo,不要勾選“public static void main(String[]args)”,確定后生成GrepInfo.java。
6)MyGrep.java和GrepInfo.java的代碼內(nèi)容如下:
MyGrep.java
package myspace; import java.io.ObjectStreamException; import java.util.Arrays; import java.util.concurrent.Future; import com.ibm.streamsx.topology.TStream; import com.ibm.streamsx.topology.Topology; import com.ibm.streamsx.topology.context.StreamsContextFactory; import com.ibm.streamsx.topology.file.FileStreams; import com.ibm.streamsx.topology.function7.Function; publicclass MyGrep { publicstaticvoid main(String[] args) throws Exception { String contextType = args[0]; String directory = args[1]; final String term = args[2]; Topology topology = new Topology("MyGrep"); TStream<String> filePaths = FileStreams.directoryWatcher(topology, directory); TStream<String> lines = FileStreams.textFileReader(filePaths); TStream<GrepInfo> grepInfo = lines.multiTransform( new Function<String, Iterable<GrepInfo>>() { privatestaticfinallongserialVersionUID = 1L; privateintlineNum = 0; @Override public Iterable<GrepInfo> apply(String line) { ++lineNum; if(line.contains(term)){ return Arrays.asList(new GrepInfo(lineNum, line)); } else returnnull; } private Object readResolve() throws ObjectStreamException { returnthis; } }, GrepInfo.class); grepInfo.print(); Future<?> future = StreamsContextFactory.getStreamsContext(contextType) .submit(topology); Thread.sleep(30 * 1000); future.cancel(true); } } |
GrepInfo.java
package myspace; import java.io.Serializable; import com.ibm.streamsx.topology.tuple.Keyable; publicclass GrepInfo implements Keyable<GrepInfo>, Serializable { privatestaticfinallongserialVersionUID = 1L; intlineNum; String lineStr; public GrepInfo(int ln, String ls) { this.lineNum = ln; this.lineStr = ls; } @Override public String toString() { return"Line Num " + lineNum + " : " + lineStr; } @Override public GrepInfo getKey() { // TODO Auto-generated method stub returnnull; } } |
7)運(yùn)行MyGrep之前,請(qǐng)確保Streams Instance已經(jīng)啟動(dòng),并在/home/streamsadmin/test創(chuàng)建一個(gè)文本文件并寫如若干內(nèi)容。
8)運(yùn)行程序:右擊MyGrep.java,選擇Run As -> RunConfigurations. 在Run Configurations 'Main' 標(biāo)簽頁(yè)面,確保Project填寫MySamples和Main class填 myspace.MyGrep。
在 arguments標(biāo)簽頁(yè)面, 設(shè)置Program arguments為DISTRIBUTED /home/streamsadmin/test China (DISTRIBUTED 表示程序部署到Streams運(yùn)行時(shí)環(huán)境,/home/streamsadmin/test是程序搜索關(guān)鍵的目錄;China是搜索關(guān)鍵字)。
9)查看結(jié)果:
在Streams Exploere -> StreamsInstances ->右擊default:<instance>@<Domain>,選擇Show Instance Graph
在Instance Graph窗口,我們能看到MyGrep最終運(yùn)行圖。右擊最后的Print PE->Show Log->Show PEConsole
在Console將會(huì)顯現(xiàn)MyGrep運(yùn)行的結(jié)果
streams.topology開源項(xiàng)目所提供的Java Application API使得Streams開發(fā)者對(duì)流應(yīng)用的編程語言有了新的選擇,它能幫助開發(fā)者重用Java編程能力,并按照“流處理”的思路簡(jiǎn)化流應(yīng)用的開發(fā)過程,讓開發(fā)者更專注于業(yè)務(wù)的處理邏輯而不是流處理的框架。然而,該項(xiàng)目還處于早期階段,很多功能和接口尚未實(shí)現(xiàn);對(duì)比成熟的、完善的SPL,Java Application API的功能和成熟性還有很大差距。相信在不久的將來,streams.topology將會(huì)逐漸完善并成為IBM Streams平臺(tái)的一個(gè)重要補(bǔ)充。
詳情請(qǐng)咨詢!
客服熱線:023-66090381
本站文章除注明轉(zhuǎn)載外,均為本站原創(chuàng)或翻譯。歡迎任何形式的轉(zhuǎn)載,但請(qǐng)務(wù)必注明出處、不得修改原文相關(guān)鏈接,如果存在內(nèi)容上的異議請(qǐng)郵件反饋至chenjj@fc6vip.cn