内射老阿姨1区2区3区4区_久久精品人人做人人爽电影蜜月_久久国产精品亚洲77777_99精品又大又爽又粗少妇毛片

storm-kafka-client使用的示例分析

storm-kafka-client使用的示例分析,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。

巴林左旗ssl適用于網站、小程序/APP、API接口等需要進行數(shù)據(jù)傳輸應用場景,ssl證書未來市場廣闊!成為成都創(chuàng)新互聯(lián)的ssl證書銷售渠道,可以享受市場價格4-6折優(yōu)惠!如果有意向歡迎電話聯(lián)系或者加微信:18982081108(備注:SSL證書合作)期待與您的合作!

package hgs.core.sk;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
//參考如下
//https://community.hortonworks.com/articles/87597/how-to-write-topology-with-the-new-kafka-spout-cli.html
//https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java#L52
public class StormKafkaMainTest {
	
	public static void main(String[] args) {
		TopologyBuilder builder = new TopologyBuilder();
		//該類將傳入的kafka記錄轉換為storm的tuple
		ByTopicRecordTranslator<String,String> brt = 
				new ByTopicRecordTranslator<>( (r) -> new Values(r.value(),r.topic()),new Fields("values","test7"));
		//設置要消費的topic即test7
		brt.forTopic("test7", (r) -> new Values(r.value(),r.topic()), new Fields("values","test7"));
		//類似之前的SpoutConfig
		KafkaSpoutConfig<String,String> ksc = KafkaSpoutConfig
				//bootstrapServers 以及topic(test7)
				.builder("bigdata01:9092,bigdata02:9092,bigdata03:9092", "test7")
				//設置group.id
				.setProp(ConsumerConfig.GROUP_ID_CONFIG, "skc-test")
				//設置開始消費的氣勢位置
				.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)
				//設置提交消費邊界的時長間隔
				.setOffsetCommitPeriodMs(10_000)
				//Translator
				.setRecordTranslator(brt)
				.build();
		
		builder.setSpout("kafkaspout", new KafkaSpout<>(ksc), 2);
		builder.setBolt("mybolt1", new MyboltO(), 4).shuffleGrouping("kafkaspout");
		
     	Config config = new Config();
     	config.setNumWorkers(2);
     	config.setNumAckers(0);
     	try {
			StormSubmitter.submitTopology("storm-kafka-clients", config, builder.createTopology());
		} catch (Exception e) {
			e.printStackTrace();
		}
     	
 /*    	LocalCluster cu  = new LocalCluster();
     	cu.submitTopology("test", config, builder.createTopology());*/
	}
}
class  MyboltO extends  BaseRichBolt{
	private static final long serialVersionUID = 1L;
	OutputCollector collector = null;
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
	}
	public void execute(Tuple input) {
		//這里把消息大一出來,在對應的woker下面的日志可以找到打印的內容
		String out = input.getString(0);
		System.out.println(out);
		//collector.ack(input);
	}
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		
	}
	
	
}

pom.xml文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>hgs</groupId>
  <artifactId>core.sk</artifactId>
  <version>1.0.0-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>core.sk</name>
  <url>http://maven.apache.org</url>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    
	<!--    
	<dependency>
    	<groupId>org.apache.storm</groupId>
    	<artifactId>storm-kafka</artifactId>
    	<version>1.1.3</version>
	</dependency> 
	-->
	
	<dependency>
   		<groupId>org.apache.storm</groupId>
   	 	<artifactId>storm-kafka-client</artifactId>
    	<version>1.1.3</version>
	</dependency>
	<dependency>
  		<groupId>org.apache.storm</groupId>
 		 <artifactId>storm-core</artifactId>
  		<version>1.1.3</version>
  		<scope>provided</scope>
	</dependency>
	<dependency>
    	<groupId>org.apache.kafka</groupId>
    	<artifactId>kafka_2.11</artifactId>
    	<version>1.0.0</version>
    <exclusions>
    		<exclusion>
          		<groupId>org.slf4j</groupId>
          		<artifactId>slf4j-log4j12</artifactId>
        	</exclusion>
        	<exclusion>
            	<groupId>org.apache.zookeeper</groupId>
            	<artifactId>zookeeper</artifactId>
       		</exclusion>
    	</exclusions>
	</dependency>
	
<!-- 	<dependency>
    	<groupId>org.apache.storm</groupId>
    	<artifactId>storm-kafka-monitor</artifactId>
    	<version>1.2.2</version>
	</dependency> -->
<!-- 	<dependency>
    	<groupId>org.apache.kafka</groupId>
    	<artifactId>kafka-clients</artifactId>
    	<version>0.8.2.1</version>
	</dependency> -->
	
	<dependency>
	    <groupId>org.clojure</groupId>
	    <artifactId>clojure</artifactId>
	    <version>1.7.0</version>
	</dependency>
	<!-- 嘗試了很多次 都會有這個錯誤:
	java.lang.NullPointerException at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOffsetLags(KafkaOffsetLagUtil.java:272)
	最后修改為kafka相應的kafka-clients版本后問題得到解決,應該是該出的問題
	-->
	<dependency>
	    <groupId>org.apache.kafka</groupId>
	    <artifactId>kafka-clients</artifactId>
	    <version>1.0.0</version>
	</dependency>
	
 </dependencies>
  
  
  
  <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.2</version>
                <configuration>
                    <archive>
                        <manifest>
                            <!-- 我運行這個jar所運行的主類 -->
                            <mainClass>hgs.core.sk.StormKafkaMainTest</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>
                            <!-- 必須是這樣寫 -->
                            jar-with-dependencies
                        </descriptorRef>
                    </descriptorRefs>
                </configuration>
                
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            
             <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
//以下為lambda表達式,因為在上面用大了,所以在這兒記錄一下,以免以后看不懂
import java.util.UUID;
import org.junit.jupiter.api.Test;
public class TEst {
	@Test
	public void sysConfig() {
		String[] ags = {"his is my first storm program so i hope it will success",
				"i love bascketball",
				"the day of my birthday i was alone"};
		String uuid = UUID.randomUUID().toString();
		String nexttuple= ags[new Random().nextInt(ags.length)];
		System.out.println(nexttuple);
	}
	
	@Test
	public void lambdaTest() {
		int b  = 100;
		//該出返回10*a的值、
		//"(a) -> 10*a" 相當于 new  testinter<T>();
		printPerson((a) -> 10*a) ;
	}
	
	void printPerson( testinter<Integer> t) {
		//穿過來的t需要一個參數(shù)a 即下面借口中定義的方法sysoutitems(int a )
		System.out.println(t.sysoutitems(100));
	};
	
}
//定義接口,在lambda表達式運用中,必須為借口,并且借口只能有一個方法
interface testinter<T>{
	T sysoutitems(int a );
	//void aAndb(int a, int b );
}

看完上述內容,你們掌握storm-kafka-client使用的示例分析的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!

名稱欄目:storm-kafka-client使用的示例分析
鏈接分享:http://www.rwnh.cn/article6/ggohig.html

成都網站建設公司_創(chuàng)新互聯(lián),為您提供微信小程序企業(yè)網站制作、用戶體驗手機網站建設、軟件開發(fā)、網站策劃

廣告

聲明:本網站發(fā)布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)

外貿網站建設
菏泽市| 都匀市| 天镇县| 子长县| 宾阳县| 册亨县| 金坛市| 新乐市| 高密市| 长沙市| 苍南县| 九龙城区| 东台市| 防城港市| 石狮市| 西安市| 昌乐县| 柘荣县| 会理县| 通河县| 遵化市| 红安县| 丹凤县| 长阳| 宝山区| 龙井市| 康平县| 东光县| 墨竹工卡县| 五大连池市| 和林格尔县| 手机| 通城县| 仲巴县| 海林市| 蓝田县| 临沧市| 沙河市| 哈密市| 玛多县| 微山县|