中文字幕日韩精品一区二区免费_精品一区二区三区国产精品无卡在_国精品无码专区一区二区三区_国产αv三级中文在线

如何實(shí)現(xiàn)FluneClient開發(fā)-創(chuàng)新互聯(lián)

如何實(shí)現(xiàn)Flune Client 開發(fā),很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。

創(chuàng)新互聯(lián)是一家以網(wǎng)絡(luò)技術(shù)公司,為中小企業(yè)提供網(wǎng)站維護(hù)、成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、外貿(mào)網(wǎng)站建設(shè)、網(wǎng)站備案、服務(wù)器租用、主機(jī)域名、軟件開發(fā)、重慶小程序開發(fā)等企業(yè)互聯(lián)網(wǎng)相關(guān)業(yè)務(wù),是一家有著豐富的互聯(lián)網(wǎng)運(yùn)營(yíng)推廣經(jīng)驗(yàn)的科技公司,有著多年的網(wǎng)站建站經(jīng)驗(yàn),致力于幫助中小企業(yè)在互聯(lián)網(wǎng)讓打出自已的品牌和口碑,讓企業(yè)在互聯(lián)網(wǎng)上打開一個(gè)面向全國(guó)乃至全球的業(yè)務(wù)窗口:建站歡迎聯(lián)系:18980820575

由于在實(shí)際工作中,數(shù)據(jù)的生產(chǎn)方式極具多樣性,F(xiàn)lume 雖然包含了一些內(nèi)置的機(jī)制來采集數(shù)據(jù),但是更多的時(shí)候用戶更希望能將應(yīng)用程序和flume直接相通。所以這邊運(yùn)行用戶開發(fā)應(yīng)用程序,通過IPC或者RPC連接flume并往flume發(fā)送數(shù)據(jù)。

一、RPC client interface

Flume的RpcClient實(shí)現(xiàn)了Flume的RPC機(jī)制。用戶的應(yīng)用程序可以很簡(jiǎn)單的調(diào)用Flume Client SDK的append(Event) 或者appendBatch(List<Event>) 方法發(fā)送數(shù)據(jù),不用擔(dān)心底層信息交換的細(xì)節(jié)。用戶可以提供所需的event通過直接實(shí)現(xiàn)Event接口,例如可以使用簡(jiǎn)單的方便的實(shí)現(xiàn)SimpleEvent類或者使用EventBuilder的writeBody()靜態(tài)輔助方法。

自Flume 1.4.0起,Avro是默認(rèn)的RPC協(xié)議。NettyAvroRpcClient和ThriftRpcClient實(shí)現(xiàn)了RpcClient接口。實(shí)現(xiàn)中我們需要知道我們將要連接的目標(biāo)flume agent的host和port用于創(chuàng)建client實(shí)例,然后使用RpcClient發(fā)送數(shù)據(jù)到flume agent。

官網(wǎng)給了一個(gè)Avro RPCclients的例子,這邊直接拿來做實(shí)際測(cè)試?yán)印?/p>

這里我們把client.init("host.example.org",41414);

改成 client.init("192.168.233.128",50000); 與我們的主機(jī)對(duì)接

[java] view plain copy

  1. import org.apache.flume.Event;

  2. import org.apache.flume.EventDeliveryException;

  3. import org.apache.flume.api.RpcClient;

  4. import org.apache.flume.api.RpcClientFactory;

  5. import org.apache.flume.event.EventBuilder;

  6. import java.nio.charset.Charset;

  7. public class MyApp {

  8.   public static voidmain(String[] args) {

  9.    MyRpcClientFacade client = new MyRpcClientFacade();

  10.    // Initializeclient with the remote Flume agent's host and port

  11. //client.init("host.example.org",41414);

  12. client.init("192.168.233.128",50000);

  13.    // Send 10events to the remote Flume agent. That agent should be

  14.    // configured tolisten with an AvroSource.

  15.    String sampleData = "Hello Flume!";

  16.    for (int i =0; i < 10; i++) {

  17.      client.sendDataToFlume(sampleData);

  18.    }

  19.    client.cleanUp();

  20.   }

  21. }

  22. class MyRpcClientFacade {

  23.   private RpcClient client;

  24.   private String hostname;

  25.   private int port;

  26.   public void init(String hostname, int port) {

  27.    // Setup the RPCconnection

  28.    this.hostname = hostname;

  29.    this.port = port;

  30.    this.client = RpcClientFactory.getDefaultInstance(hostname, port);

  31.    // Use thefollowing method to create a thrift client (instead of the above line):

  32.     // this.client = RpcClientFactory.getThriftInstance(hostname, port);

  33.   }

  34.   public void sendDataToFlume(String data) {

  35.    // Create aFlume Event object that encapsulates the sample data

  36.    Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

  37.    // Send theevent

  38.    try {

  39.      client.append(event);

  40.    } catch (EventDeliveryException e) {

  41.      // clean up andrecreate the client

  42.      client.close();

  43.      client = null;

  44.      client = RpcClientFactory.getDefaultInstance(hostname, port);

  45.      // Use thefollowing method to create a thrift client (instead of the above line):

  46.      // this.client =RpcClientFactory.getThriftInstance(hostname, port);

  47.    }

  48.   }

  49.   public void cleanUp() {

  50.    // Close the RPCconnection

  51.    client.close();

  52.   }

  53. }

這邊代碼不解釋了,主要是將HelloFlume 發(fā)送10遍給flume,同時(shí)記得將flume 安裝主目錄下的lib 文件都添加進(jìn)項(xiàng)目,才能正常運(yùn)行程序。

下面是代理配置:

[html] view plain copy

  1. #配置文件:avro_client_case20.conf

  2. # Name the components on this agent

  3. a1.sources = r1

  4. a1.sinks = k1

  5. a1.channels = c1

  6. # Describe/configure the source

  7. a1.sources.r1.type = avro

  8. a1.sources.r1.port = 50000

  9. a1.sources.r1.host = 192.168.233.128

  10. a1.sources.r1.channels = c1

  11. # Describe the sink

  12. a1.sinks.k1.channel = c1

  13. a1.sinks.k1.type = logger

  14. # Use a channel which buffers events inmemory

  15. a1.channels.c1.type = memory

  16. a1.channels.c1.capacity = 1000

  17. a1.channels.c1.transactionCapacity = 100

這里要注意下,之前說了,在接收端需要AvroSource或者Thrift Source來監(jiān)聽接口。所以配置代理的時(shí)候要把a(bǔ)1.sources.r1.type 寫成avro或者thrift

#敲命令

flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console


啟動(dòng)成功后

在eclipse 里運(yùn)行Java程序,當(dāng)然也可以打包后在服務(wù)器上運(yùn)行JAVA程序。

#在啟動(dòng)源發(fā)送的代理終端查看console輸出

如何實(shí)現(xiàn)Flune Client 開發(fā)

可以看到10條數(shù)據(jù)正常發(fā)送。

這里要說明下,開發(fā)代碼中client.append(event)不僅僅可以發(fā)送一條數(shù)據(jù),也可以發(fā)送一個(gè)List(string) 的數(shù)據(jù)信息,也就是批量發(fā)送。這邊就不做演示了。

二、Failover Client

這個(gè)類包封裝了Avro RPCclient的類默認(rèn)提供故障處理能力。hosts采用空格分開host:port所代表的flume agent,構(gòu)成一個(gè)故障處理組。這Failover RPC Client目前不支持thrift。如果當(dāng)前選擇的host agent有問題,這個(gè)failover client會(huì)自動(dòng)負(fù)載到組中下一個(gè)host中。

下面是官網(wǎng)開發(fā)例子:

[java] view plain copy

  1. // Setup properties for the failover

  2. Properties props = new Properties();

  3. props.put("client.type", "default_failover");

  4. // List of hosts (space-separated list of user-chosen host aliases)

  5. props.put("hosts", "h2 h3 h4");

  6. // host/port pair for each host alias

  7. String host1 = "host1.example.org:41414";

  8. String host2 = "host2.example.org:41414";

  9. String host3 = "host3.example.org:41414";

  10. props.put("hosts.h2", host1);

  11. props.put("hosts.h3", host2);

  12. props.put("hosts.h4", host3);

  13. // create the client with failover properties

  14. RpcClient client = RpcClientFactory.getInstance(props);

下面是測(cè)試的開發(fā)例子

[java] view plain copy

  1. import org.apache.flume.Event;

  2. import org.apache.flume.EventDeliveryException;

  3. import org.apache.flume.api.RpcClient;

  4. import org.apache.flume.api.RpcClientFactory;

  5. import org.apache.flume.event.EventBuilder;

  6. import java.nio.charset.Charset;

  7. import java.util.Properties;

  8. public class Failover_Client {

  9.     public static void main(String[] args) {

  10.         MyRpcClientFacade2 client = new MyRpcClientFacade2();

  11.         // Initialize client with the remote Flume agent's host and port

  12.         client.init();

  13.         // Send 10 events to the remote Flume agent. That agent should be

  14.         // configured to listen with an AvroSource.

  15.         String sampleData = "Hello Flume!";

  16.         for (int i = 0; i < 10; i++) {

  17.           client.sendDataToFlume(sampleData);

  18.         }

  19.         client.cleanUp();

  20.       }

  21.     }

  22.     class MyRpcClientFacade2 {

  23.       private RpcClient client;

  24.       private String hostname;

  25.       private int port;

  26.       public void init() {

  27.         // Setup the RPC connection

  28.         // Use the following method to create a thrift client (instead of the above line):

  29.         // this.client = RpcClientFactory.getThriftInstance(hostname, port);

  30.      // Setup properties for the failover

  31.         Properties props = new Properties();

  32.         props.put("client.type", "default_failover");

  33.         // List of hosts (space-separated list of user-chosen host aliases)

  34.         props.put("hosts", "h2 h3 h4");

  35.         // host/port pair for each host alias

  36.         String host1 = "192.168.233.128:50000";

  37.         String host2 = "192.168.233.128:50001";

  38.         String host3 = "192.168.233.128:50002";

  39.         props.put("hosts.h2", host1);

  40.         props.put("hosts.h3", host2);

  41.         props.put("hosts.h4", host3);

  42.         // create the client with failover properties

  43.         client = RpcClientFactory.getInstance(props);

  44.       }

  45.       public void sendDataToFlume(String data) {

  46.         // Create a Flume Event object that encapsulates the sample data

  47.         Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

  48.         // Send the event

  49.         try {

  50.           client.append(event);

  51.         } catch (EventDeliveryException e) {

  52.           // clean up and recreate the client

  53.           client.close();

  54.           client = null;

  55.           client = RpcClientFactory.getDefaultInstance(hostname, port);

  56.           // Use the following method to create a thrift client (instead of the above line):

  57.           // this.client = RpcClientFactory.getThriftInstance(hostname, port);

  58.         }

  59.       }

  60.       public void cleanUp() {

  61.         // Close the RPC connection

  62.         client.close();

  63.       }

  64. }

這邊代碼設(shè)三個(gè)host用于故障轉(zhuǎn)移,這里偷懶,用同一個(gè)主機(jī)的3個(gè)端口模擬。代碼還是將Hello Flume 發(fā)送10遍給第一個(gè)flume代理,當(dāng)?shù)谝粋€(gè)代理故障的時(shí)候,則發(fā)送給第二個(gè)代理,以順序進(jìn)行故障轉(zhuǎn)移。

下面是代理配置沿用之前的那個(gè),并對(duì)配置文件進(jìn)行拷貝,

cp avro_client_case20.conf avro_client_case21.conf

cp avro_client_case20.conf avro_client_case22.conf

分別修改avro_client_case21.conf與avro_client_case22.conf中的

a1.sources.r1.port= 50001 與a1.sources.r1.port = 50002

#敲命令

flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console

啟動(dòng)成功后

在eclipse 里運(yùn)行JAVA程序Failover_Client.java,當(dāng)然也可以打包后在服務(wù)器上運(yùn)行JAVA程序。

#在啟動(dòng)源發(fā)送的3個(gè)代理終端查看console輸出

我們可以看到第一個(gè)代理終端收到了,數(shù)據(jù)而其他2個(gè)終端沒有數(shù)據(jù)。

如何實(shí)現(xiàn)Flune Client 開發(fā)



然后我們把第一個(gè)終端的進(jìn)程關(guān)掉,再運(yùn)行一遍client程序,然后會(huì)發(fā)現(xiàn)這個(gè)時(shí)候是發(fā)生到第二個(gè)終端中。當(dāng)?shù)诙€(gè)終端也關(guān)閉的時(shí)候,再發(fā)送數(shù)據(jù),則是發(fā)送到最后一個(gè)終端。這里我們可以看到,故障轉(zhuǎn)移的代理主機(jī)轉(zhuǎn)移是采用順序序列的。

三、LoadBalancing RPC client

Flume Client SDK也支持在多個(gè)host之間使用負(fù)載均衡的Rpc Client。這種類型的client帶有一個(gè)通過空格分隔的host:port主機(jī)列表并構(gòu)成了一個(gè)負(fù)載均衡組。這個(gè)client可以指定一個(gè)負(fù)載均衡的策略,既可以隨機(jī)的選擇一個(gè)配置的host,也可以循環(huán)選擇一個(gè)host。當(dāng)然你也可以自己編寫一個(gè)類實(shí)現(xiàn)LoadBalancingRpcClient$HostSelector接口以至于用戶可以使用自己編寫的選擇順序。在這種情況下,用戶自定義的類需要被指定為host-selector屬性的值。LoadBalancing RPC Client當(dāng)前不支持thrift。

如果開啟了backoff,那么client失敗將被放入黑名單中,只有過了被指定的超時(shí)之間之后這個(gè)被選擇的失敗的主機(jī)才會(huì)從黑名單中被排除。當(dāng)超時(shí)到了,如果主機(jī)還是沒有反應(yīng),那么這被認(rèn)為是一個(gè)連續(xù)的失敗并且超時(shí)時(shí)間會(huì)成倍的增長(zhǎng),以避免可能陷入對(duì)反應(yīng)遲鈍主機(jī)的長(zhǎng)時(shí)間等待中。

這backoff的大超時(shí)時(shí)間可以通過maxBackoff屬性來配置,單位是毫秒。在默認(rèn)情況下maxBackoff的值是30秒(在orderSelector類里面指定)。

下面是官網(wǎng)例子

[java] view plain copy

  1. // Setup properties for the load balancing

  2. Properties props = new Properties();

  3. props.put("client.type", "default_loadbalance");

  4. // List of hosts (space-separated list of user-chosen host aliases)

  5. props.put("hosts", "h2 h3 h4");

  6. // host/port pair for each host alias

  7. String host1 = "host1.example.org:41414";

  8. String host2 = "host2.example.org:41414";

  9. String host3 = "host3.example.org:41414";

  10. props.put("hosts.h2", host1);

  11. props.put("hosts.h3", host2);

  12. props.put("hosts.h4", host3);

  13. props.put("host-selector", "random"); // For random host selection

  14. // props.put("host-selector", "round_robin"); // For round-robin host

  15. //                                            // selection

  16. props.put("backoff", "true"); // Disabled by default.

  17. props.put("maxBackoff", "10000"); // Defaults 0, which effectively

  18.                                   // becomes 30000 ms

  19. // Create the client with load balancing properties

  20. RpcClient client = RpcClientFactory.getInstance(props);

下面是測(cè)試的開發(fā)例子

[java] view plain copy

  1. import java.nio.charset.Charset;

  2. import org.apache.flume.Event;

  3. import org.apache.flume.EventDeliveryException;

  4. import org.apache.flume.api.RpcClient;

  5. import org.apache.flume.api.RpcClientFactory;

  6. import org.apache.flume.event.EventBuilder;

  7. import java.util.Properties;

  8. public class Load_Client {

  9.     public static void main(String[] args) {

  10.         MyRpcClientFacade3 client = new MyRpcClientFacade3();

  11.         // Initialize client with the remote Flume agent's host and port

  12.         client.init();

  13.         // Send 10 events to the remote Flume agent. That agent should be

  14.         // configured to listen with an AvroSource.

  15.         String sampleData = "Flume Load_Client";

  16.         for (int i = 0; i < 10; i++) {

  17.           client.sendDataToFlume(sampleData);

  18.         }

  19.         client.cleanUp();

  20.       }

  21.     }

  22.     class MyRpcClientFacade3{

  23.       private RpcClient client;

  24.       private String hostname;

  25.       private int port;

  26.       public void init() {

  27.           Properties props = new Properties();

  28.           props.put("client.type", "default_loadbalance");

  29.           // List of hosts (space-separated list of user-chosen host aliases)

  30.           props.put("hosts", "h2 h3 h4");

  31.           // host/port pair for each host alias

  32.           String host1 = "192.168.233.128:50000";

  33.           String host2 = "192.168.233.128:50001";

  34.           String host3 = "192.168.233.128:50002";

  35.           props.put("hosts.h2", host1);

  36.           props.put("hosts.h3", host2);

  37.           props.put("hosts.h4", host3);

  38.           props.put("host-selector", "random"); // For random host selection

  39.           // props.put("host-selector", "round_robin"); // For round-robin host

  40. //                                                    // selection

  41.           props.put("backoff", "true"); // Disabled by default.

  42.           props.put("maxBackoff", "10000"); // Defaults 0, which effectively

  43.                                             // becomes 30000 ms

  44.           // Create the client with load balancing properties

  45.           client = RpcClientFactory.getInstance(props);

  46.       }

  47.       public void sendDataToFlume(String data) {

  48.         // Create a Flume Event object that encapsulates the sample data

  49.         Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

  50.         // Send the event

  51.         try {

  52.           client.append(event);

  53.         } catch (EventDeliveryException e) {

  54.           // clean up and recreate the client

  55.           client.close();

  56.           client = null;

  57.           client = RpcClientFactory.getDefaultInstance(hostname, port);

  58.           // Use the following method to create a thrift client (instead of the above line):

  59.           // this.client = RpcClientFactory.getThriftInstance(hostname, port);

  60.         }

  61.       }

  62.       public void cleanUp() {

  63.         // Close the RPC connection

  64.         client.close();

  65.       }

  66. }

這里采用隨機(jī)的負(fù)載均衡props.put("host-selector","random") 。測(cè)試的時(shí)候沿用之前的3個(gè)接受代理配置avro_client_case20.conf、avro_client_case21.conf和avro_client_case22.conf,并將他們起起來。

#敲命令

flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console

啟動(dòng)成功后

在eclipse 里運(yùn)行JAVA程序Failover_Client.java,當(dāng)然也可以打包后在服務(wù)器上運(yùn)行JAVA程序。

#在啟動(dòng)源發(fā)送的3個(gè)代理終端查看console輸出

下面是Host1,收到了2條數(shù)據(jù)

如何實(shí)現(xiàn)Flune Client 開發(fā)


下面是Host2,收到了2條數(shù)據(jù)

如何實(shí)現(xiàn)Flune Client 開發(fā)


下面是Host3,收到了6條數(shù)據(jù)。

如何實(shí)現(xiàn)Flune Client 開發(fā)


可以看到我們開發(fā)例子中,host-selector選擇的是隨機(jī),因此程序也是隨機(jī)發(fā)送數(shù)據(jù)。下面我們測(cè)試輪詢r(jià)ound_robin選項(xiàng)。

程序里我們修改這句

//props.put("host-selector","random"); // For random host selection

props.put("host-selector", "round_robin");// Forround-robin host

再運(yùn)行Java 程序

下面是Host1,收到了4條數(shù)據(jù)

如何實(shí)現(xiàn)Flune Client 開發(fā)


下面是Host2,收到了3條數(shù)據(jù)

如何實(shí)現(xiàn)Flune Client 開發(fā)


同樣Host3,收到了3條數(shù)據(jù),這邊就不放圖了。輪詢就是按照順序放圖。

看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對(duì)創(chuàng)新互聯(lián)的支持。

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。

網(wǎng)站標(biāo)題:如何實(shí)現(xiàn)FluneClient開發(fā)-創(chuàng)新互聯(lián)
瀏覽路徑:http://www.rwnh.cn/article36/dghgpg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供標(biāo)簽優(yōu)化、響應(yīng)式網(wǎng)站電子商務(wù)、Google、小程序開發(fā)、企業(yè)建站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)站建設(shè)
讷河市| 昌都县| 永善县| 油尖旺区| 手游| 富平县| 宜宾县| 康保县| 广西| 莒南县| 澜沧| 尚义县| 长岛县| 阳城县| 蓬安县| 桐柏县| 常山县| 科技| 土默特左旗| 秭归县| 界首市| 新巴尔虎左旗| 高台县| 阿拉善左旗| 札达县| 尉氏县| 永顺县| 色达县| 区。| 桂东县| 怀来县| 丹凤县| 桦川县| 宾川县| 定陶县| 衡南县| 滨海县| 西乡县| 余干县| 霍林郭勒市| 鸡泽县|