如何實(shí)現(xiàn)Flune Client 開發(fā),很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。
創(chuàng)新互聯(lián)是一家以網(wǎng)絡(luò)技術(shù)公司,為中小企業(yè)提供網(wǎng)站維護(hù)、成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、外貿(mào)網(wǎng)站建設(shè)、網(wǎng)站備案、服務(wù)器租用、主機(jī)域名、軟件開發(fā)、重慶小程序開發(fā)等企業(yè)互聯(lián)網(wǎng)相關(guān)業(yè)務(wù),是一家有著豐富的互聯(lián)網(wǎng)運(yùn)營(yíng)推廣經(jīng)驗(yàn)的科技公司,有著多年的網(wǎng)站建站經(jīng)驗(yàn),致力于幫助中小企業(yè)在互聯(lián)網(wǎng)讓打出自已的品牌和口碑,讓企業(yè)在互聯(lián)網(wǎng)上打開一個(gè)面向全國(guó)乃至全球的業(yè)務(wù)窗口:建站歡迎聯(lián)系:18980820575由于在實(shí)際工作中,數(shù)據(jù)的生產(chǎn)方式極具多樣性,F(xiàn)lume 雖然包含了一些內(nèi)置的機(jī)制來采集數(shù)據(jù),但是更多的時(shí)候用戶更希望能將應(yīng)用程序和flume直接相通。所以這邊運(yùn)行用戶開發(fā)應(yīng)用程序,通過IPC或者RPC連接flume并往flume發(fā)送數(shù)據(jù)。
Flume的RpcClient實(shí)現(xiàn)了Flume的RPC機(jī)制。用戶的應(yīng)用程序可以很簡(jiǎn)單的調(diào)用Flume Client SDK的append(Event) 或者appendBatch(List<Event>) 方法發(fā)送數(shù)據(jù),不用擔(dān)心底層信息交換的細(xì)節(jié)。用戶可以提供所需的event通過直接實(shí)現(xiàn)Event接口,例如可以使用簡(jiǎn)單的方便的實(shí)現(xiàn)SimpleEvent類或者使用EventBuilder的writeBody()靜態(tài)輔助方法。
自Flume 1.4.0起,Avro是默認(rèn)的RPC協(xié)議。NettyAvroRpcClient和ThriftRpcClient實(shí)現(xiàn)了RpcClient接口。實(shí)現(xiàn)中我們需要知道我們將要連接的目標(biāo)flume agent的host和port用于創(chuàng)建client實(shí)例,然后使用RpcClient發(fā)送數(shù)據(jù)到flume agent。
官網(wǎng)給了一個(gè)Avro RPCclients的例子,這邊直接拿來做實(shí)際測(cè)試?yán)印?/p>
這里我們把client.init("host.example.org",41414);
改成 client.init("192.168.233.128",50000); 與我們的主機(jī)對(duì)接
[java] view plain copy
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;
public class MyApp {
public static voidmain(String[] args) {
MyRpcClientFacade client = new MyRpcClientFacade();
// Initializeclient with the remote Flume agent's host and port
//client.init("host.example.org",41414);
client.init("192.168.233.128",50000);
// Send 10events to the remote Flume agent. That agent should be
// configured tolisten with an AvroSource.
String sampleData = "Hello Flume!";
for (int i =0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}
client.cleanUp();
}
}
class MyRpcClientFacade {
private RpcClient client;
private String hostname;
private int port;
public void init(String hostname, int port) {
// Setup the RPCconnection
this.hostname = hostname;
this.port = port;
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use thefollowing method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
public void sendDataToFlume(String data) {
// Create aFlume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
// Send theevent
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up andrecreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use thefollowing method to create a thrift client (instead of the above line):
// this.client =RpcClientFactory.getThriftInstance(hostname, port);
}
}
public void cleanUp() {
// Close the RPCconnection
client.close();
}
}
這邊代碼不解釋了,主要是將HelloFlume 發(fā)送10遍給flume,同時(shí)記得將flume 安裝主目錄下的lib 文件都添加進(jìn)項(xiàng)目,才能正常運(yùn)行程序。
下面是代理配置:
[html] view plain copy
#配置文件:avro_client_case20.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
這里要注意下,之前說了,在接收端需要AvroSource或者Thrift Source來監(jiān)聽接口。所以配置代理的時(shí)候要把a(bǔ)1.sources.r1.type 寫成avro或者thrift
#敲命令
flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console
啟動(dòng)成功后
在eclipse 里運(yùn)行Java程序,當(dāng)然也可以打包后在服務(wù)器上運(yùn)行JAVA程序。
#在啟動(dòng)源發(fā)送的代理終端查看console輸出
可以看到10條數(shù)據(jù)正常發(fā)送。
這里要說明下,開發(fā)代碼中client.append(event)不僅僅可以發(fā)送一條數(shù)據(jù),也可以發(fā)送一個(gè)List(string) 的數(shù)據(jù)信息,也就是批量發(fā)送。這邊就不做演示了。
這個(gè)類包封裝了Avro RPCclient的類默認(rèn)提供故障處理能力。hosts采用空格分開host:port所代表的flume agent,構(gòu)成一個(gè)故障處理組。這Failover RPC Client目前不支持thrift。如果當(dāng)前選擇的host agent有問題,這個(gè)failover client會(huì)自動(dòng)負(fù)載到組中下一個(gè)host中。
下面是官網(wǎng)開發(fā)例子:
[java] view plain copy
// Setup properties for the failover
Properties props = new Properties();
props.put("client.type", "default_failover");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h2 h3 h4");
// host/port pair for each host alias
String host1 = "host1.example.org:41414";
String host2 = "host2.example.org:41414";
String host3 = "host3.example.org:41414";
props.put("hosts.h2", host1);
props.put("hosts.h3", host2);
props.put("hosts.h4", host3);
// create the client with failover properties
RpcClient client = RpcClientFactory.getInstance(props);
下面是測(cè)試的開發(fā)例子
[java] view plain copy
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;
import java.util.Properties;
public class Failover_Client {
public static void main(String[] args) {
MyRpcClientFacade2 client = new MyRpcClientFacade2();
// Initialize client with the remote Flume agent's host and port
client.init();
// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
String sampleData = "Hello Flume!";
for (int i = 0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}
client.cleanUp();
}
}
class MyRpcClientFacade2 {
private RpcClient client;
private String hostname;
private int port;
public void init() {
// Setup the RPC connection
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
// Setup properties for the failover
Properties props = new Properties();
props.put("client.type", "default_failover");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h2 h3 h4");
// host/port pair for each host alias
String host1 = "192.168.233.128:50000";
String host2 = "192.168.233.128:50001";
String host3 = "192.168.233.128:50002";
props.put("hosts.h2", host1);
props.put("hosts.h3", host2);
props.put("hosts.h4", host3);
// create the client with failover properties
client = RpcClientFactory.getInstance(props);
}
public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
}
public void cleanUp() {
// Close the RPC connection
client.close();
}
}
這邊代碼設(shè)三個(gè)host用于故障轉(zhuǎn)移,這里偷懶,用同一個(gè)主機(jī)的3個(gè)端口模擬。代碼還是將Hello Flume 發(fā)送10遍給第一個(gè)flume代理,當(dāng)?shù)谝粋€(gè)代理故障的時(shí)候,則發(fā)送給第二個(gè)代理,以順序進(jìn)行故障轉(zhuǎn)移。
下面是代理配置沿用之前的那個(gè),并對(duì)配置文件進(jìn)行拷貝,
cp avro_client_case20.conf avro_client_case21.conf
cp avro_client_case20.conf avro_client_case22.conf
分別修改avro_client_case21.conf與avro_client_case22.conf中的
a1.sources.r1.port= 50001 與a1.sources.r1.port = 50002
#敲命令
flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console
flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console
flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console
啟動(dòng)成功后
在eclipse 里運(yùn)行JAVA程序Failover_Client.java,當(dāng)然也可以打包后在服務(wù)器上運(yùn)行JAVA程序。
#在啟動(dòng)源發(fā)送的3個(gè)代理終端查看console輸出
我們可以看到第一個(gè)代理終端收到了,數(shù)據(jù)而其他2個(gè)終端沒有數(shù)據(jù)。
然后我們把第一個(gè)終端的進(jìn)程關(guān)掉,再運(yùn)行一遍client程序,然后會(huì)發(fā)現(xiàn)這個(gè)時(shí)候是發(fā)生到第二個(gè)終端中。當(dāng)?shù)诙€(gè)終端也關(guān)閉的時(shí)候,再發(fā)送數(shù)據(jù),則是發(fā)送到最后一個(gè)終端。這里我們可以看到,故障轉(zhuǎn)移的代理主機(jī)轉(zhuǎn)移是采用順序序列的。
Flume Client SDK也支持在多個(gè)host之間使用負(fù)載均衡的Rpc Client。這種類型的client帶有一個(gè)通過空格分隔的host:port主機(jī)列表并構(gòu)成了一個(gè)負(fù)載均衡組。這個(gè)client可以指定一個(gè)負(fù)載均衡的策略,既可以隨機(jī)的選擇一個(gè)配置的host,也可以循環(huán)選擇一個(gè)host。當(dāng)然你也可以自己編寫一個(gè)類實(shí)現(xiàn)LoadBalancingRpcClient$HostSelector接口以至于用戶可以使用自己編寫的選擇順序。在這種情況下,用戶自定義的類需要被指定為host-selector屬性的值。LoadBalancing RPC Client當(dāng)前不支持thrift。
如果開啟了backoff,那么client失敗將被放入黑名單中,只有過了被指定的超時(shí)之間之后這個(gè)被選擇的失敗的主機(jī)才會(huì)從黑名單中被排除。當(dāng)超時(shí)到了,如果主機(jī)還是沒有反應(yīng),那么這被認(rèn)為是一個(gè)連續(xù)的失敗并且超時(shí)時(shí)間會(huì)成倍的增長(zhǎng),以避免可能陷入對(duì)反應(yīng)遲鈍主機(jī)的長(zhǎng)時(shí)間等待中。
這backoff的大超時(shí)時(shí)間可以通過maxBackoff屬性來配置,單位是毫秒。在默認(rèn)情況下maxBackoff的值是30秒(在orderSelector類里面指定)。
下面是官網(wǎng)例子
[java] view plain copy
// Setup properties for the load balancing
Properties props = new Properties();
props.put("client.type", "default_loadbalance");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h2 h3 h4");
// host/port pair for each host alias
String host1 = "host1.example.org:41414";
String host2 = "host2.example.org:41414";
String host3 = "host3.example.org:41414";
props.put("hosts.h2", host1);
props.put("hosts.h3", host2);
props.put("hosts.h4", host3);
props.put("host-selector", "random"); // For random host selection
// props.put("host-selector", "round_robin"); // For round-robin host
// // selection
props.put("backoff", "true"); // Disabled by default.
props.put("maxBackoff", "10000"); // Defaults 0, which effectively
// becomes 30000 ms
// Create the client with load balancing properties
RpcClient client = RpcClientFactory.getInstance(props);
下面是測(cè)試的開發(fā)例子
[java] view plain copy
import java.nio.charset.Charset;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.util.Properties;
public class Load_Client {
public static void main(String[] args) {
MyRpcClientFacade3 client = new MyRpcClientFacade3();
// Initialize client with the remote Flume agent's host and port
client.init();
// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
String sampleData = "Flume Load_Client";
for (int i = 0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}
client.cleanUp();
}
}
class MyRpcClientFacade3{
private RpcClient client;
private String hostname;
private int port;
public void init() {
Properties props = new Properties();
props.put("client.type", "default_loadbalance");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h2 h3 h4");
// host/port pair for each host alias
String host1 = "192.168.233.128:50000";
String host2 = "192.168.233.128:50001";
String host3 = "192.168.233.128:50002";
props.put("hosts.h2", host1);
props.put("hosts.h3", host2);
props.put("hosts.h4", host3);
props.put("host-selector", "random"); // For random host selection
// props.put("host-selector", "round_robin"); // For round-robin host
// // selection
props.put("backoff", "true"); // Disabled by default.
props.put("maxBackoff", "10000"); // Defaults 0, which effectively
// becomes 30000 ms
// Create the client with load balancing properties
client = RpcClientFactory.getInstance(props);
}
public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
}
public void cleanUp() {
// Close the RPC connection
client.close();
}
}
這里采用隨機(jī)的負(fù)載均衡props.put("host-selector","random") 。測(cè)試的時(shí)候沿用之前的3個(gè)接受代理配置avro_client_case20.conf、avro_client_case21.conf和avro_client_case22.conf,并將他們起起來。
#敲命令
flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console
flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console
flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console
啟動(dòng)成功后
在eclipse 里運(yùn)行JAVA程序Failover_Client.java,當(dāng)然也可以打包后在服務(wù)器上運(yùn)行JAVA程序。
#在啟動(dòng)源發(fā)送的3個(gè)代理終端查看console輸出
下面是Host1,收到了2條數(shù)據(jù)
下面是Host2,收到了2條數(shù)據(jù)
下面是Host3,收到了6條數(shù)據(jù)。
可以看到我們開發(fā)例子中,host-selector選擇的是隨機(jī),因此程序也是隨機(jī)發(fā)送數(shù)據(jù)。下面我們測(cè)試輪詢r(jià)ound_robin選項(xiàng)。
程序里我們修改這句
//props.put("host-selector","random"); // For random host selection
props.put("host-selector", "round_robin");// Forround-robin host
再運(yùn)行Java 程序
下面是Host1,收到了4條數(shù)據(jù)
下面是Host2,收到了3條數(shù)據(jù)
同樣Host3,收到了3條數(shù)據(jù),這邊就不放圖了。輪詢就是按照順序放圖。
看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對(duì)創(chuàng)新互聯(lián)的支持。
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。
網(wǎng)站標(biāo)題:如何實(shí)現(xiàn)FluneClient開發(fā)-創(chuàng)新互聯(lián)
瀏覽路徑:http://www.rwnh.cn/article36/dghgpg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供標(biāo)簽優(yōu)化、響應(yīng)式網(wǎng)站、電子商務(wù)、Google、小程序開發(fā)、企業(yè)建站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容