S4是Yahoo!在2010年10月開(kāi)源的一套通用、分布式、可擴展、部分容錯、具備可插拔功能的平臺。這套平臺主要是為了方便開(kāi)發(fā)者開(kāi)發(fā)處理流式數據(continuous unbounded streams of data)的應用。項目官方網(wǎng)站為:http://s4.io/。同時(shí),S4的開(kāi)發(fā)者也發(fā)表了一篇技術(shù)論文《S4![]() 開(kāi)發(fā)動(dòng)機 “We designed this engine to solve real-world problems in the context of search applications that use data mining and machine learning algorithms.” … “To process user feedback, we developed S4, a low latency, scalable stream processing engine.” Yahoo!之所以開(kāi)發(fā)S4系統,主要是為了解決它現實(shí)的問(wèn)題:搜索廣告的展現。搜索廣告是當前各大搜索引擎的主要收入來(lái)源,用戶(hù)發(fā)出查詢(xún)請求,搜索引擎在返回正常結果的同時(shí)也會(huì )返回相關(guān)廣告,而廣告是按照點(diǎn)擊付費。為了在最好的位置,放置最相關(guān)(也就是用戶(hù)最有可能點(diǎn)擊)的廣告,各大搜索引擎使用了大量的數據挖掘和機器學(xué)習算法來(lái)進(jìn)行相關(guān)性計算,以便提高收入,滿(mǎn)足用戶(hù)需求。其中很重要的一點(diǎn)就是要不斷分析用戶(hù)的點(diǎn)擊反饋,以便捕獲用戶(hù)的行為。S4最初主要還只是用來(lái)處理用戶(hù)的點(diǎn)擊反饋。 “The streaming paradigm dictates a very different architecture than the one used in batch processing. Attempting to build a general- purpose platform for both batch and stream computing would result in a highly complex system that may end up not being optimal for either task.” 那么Yahoo!為什么沒(méi)有選擇Hadoop來(lái)處理呢? MapReduce系統主要解決的是對靜態(tài)數據的批量處理,即當前的MapReduce系統實(shí)現啟動(dòng)計算時(shí),一般數據已經(jīng)到位了(比如保存到了分布式文件系統上)。 而流式計算系統在啟動(dòng)時(shí),一般數據并沒(méi)有完全到位,而是源源不斷地流入,并且不像批處理系統重視的是總數據處理的吞吐,而是對數據處理的latency,即希望進(jìn)入的數據越快處理越好。 當然,現在也有很多基于Hadoop系統來(lái)處理流式數據。一般有以下幾種方式。
隨著(zhù)大量實(shí)時(shí)應用的發(fā)展,比如實(shí)時(shí)搜索、實(shí)時(shí)交易系統、實(shí)時(shí)欺騙分析、實(shí)時(shí)監控、社交網(wǎng)絡(luò )等,都需要一個(gè)高度可擴展的流式計算解決方案。不同于原來(lái)的流式計算系統,S4主要解決的是高數據率和大數據量的流式處理。 設計假設和目標 為了簡(jiǎn)化設計,S4給出了下面的假設。 Lossy failover is acceptable,即一旦一個(gè)節點(diǎn)失敗,會(huì )failover到另一個(gè)standby節點(diǎn),但是會(huì )丟失原節點(diǎn)的內存狀態(tài)。這也是為什么說(shuō)S4是一個(gè)部分容錯的系統。 節點(diǎn)不能動(dòng)態(tài)增加和減少。 設計目標包括以下幾個(gè)方面。
Event Stream 一個(gè)Stream是Events的序列流。每個(gè)Event是一個(gè)(K,A)數據,通過(guò)EventType來(lái)標示其類(lèi)型。K、A分別表示這種類(lèi)型的Event的keys和attributes。key和attribute都是tuple-valued,即key=value這種元組值。下面給出一個(gè)event的例子: EV:ClickLog → event type KEY:product=“search”, type=”online” → keys VAL: userid=”123”, ip=”10.0.0.0”, cookieid=”3” → attributes Processing Elements Processing Element(PE)是S4中的基本運算單元。一個(gè)PE通過(guò)下面四個(gè)組件來(lái)表示。
![]() 每個(gè)PE只負責處理自己所關(guān)心的eventtype,并且只處理自己所對應的key值的event。PE處理后可能輸出一個(gè)或多個(gè)event。當平臺處理一個(gè)key值時(shí),會(huì )先檢查相應的PE是否已經(jīng)存在,如果不存在,會(huì )先初始化相應的PE,然后交由這個(gè)PE進(jìn)行處理。舉例如圖1所示。 在圖1中,PE2負責處理相應的單詞事件(WordEvent),主要邏輯是統計所關(guān)心單詞的個(gè)數,然后輸出給下游的PE。PE2所關(guān)心的eventtype為WorkEvent,所關(guān)心的key為word,所關(guān)心的key值為“said”。假如又來(lái)了一個(gè)WordEvent,key為word=“l(fā)isten”,那么這個(gè)事件就不是PE2所關(guān)心的,所以平臺可能會(huì )為“l(fā)isten”值啟動(dòng)一個(gè)新的PE來(lái)處理。 有一類(lèi)特殊的PE,即keylessPE(沒(méi)有key和key值),這些PE會(huì )接收相應eventtype的所有event進(jìn)行處理。這類(lèi)PE主要用來(lái)作為S4cluster的輸入層(InputLayer),即外圍應用會(huì )產(chǎn)生相應的event(keylessevent),將這些event發(fā)到任何一個(gè)節點(diǎn)。而S4cluster中的每個(gè)節點(diǎn)都會(huì )啟動(dòng)一個(gè)keylessPE,這些PE做簡(jiǎn)單的輸入處理后,轉化為keyedevent,交給集群中的其他PE類(lèi)型進(jìn)行處理。 PE的邏輯主要由應用程序員來(lái)開(kāi)發(fā)。 Processing Node Processing Node是一個(gè)邏輯節點(diǎn),負責監聽(tīng)消息的到來(lái),對消息進(jìn)行處理,然后通過(guò)Communication Layer將event在集群中分發(fā)。S4主要依據上面提到的eventtype和key/key值,對key值求hash,在集群中進(jìn)行分發(fā)。關(guān)注的key集合通過(guò)配置文件來(lái)得到。對于需要處理的event,會(huì )交給PN中的Processing Element Container(PEC),然后PEC調用相應的PE進(jìn)行處理。PN功能框如圖2所示。 ![]() 通過(guò)圖2的設計,可以保證,對應于相同event type,key和key值的event一定會(huì )被路由到對應的PN。 底下的Communication Layer和Zookeeper共同完成了集群管理和自動(dòng)failover功能。 編程模型 應用的主要任務(wù)就是實(shí)現一些相應的PE。PE一般提供如下接口供應用實(shí)現。
論文中給出了一個(gè)Word Count的例子,大家可以仔細研究一下。在性能測試部分,論文總結了將S4應用到實(shí)際的CTR(Click-Through Rate)預估中的效果。在應用舉例中,給出了S4在在線(xiàn)參數優(yōu)化的應用。 隨著(zhù)大量實(shí)時(shí)計算需求的增加,分布式流式計算將會(huì )成為分布式計算的下一個(gè)主要研究重點(diǎn),將會(huì )成為類(lèi)似Hadoop這類(lèi)MapReduce框架的有力補充。這一方向的工作還處在初級發(fā)展階段,大家需要多加關(guān)注。 |