01 Storm与Kafka集成(老版本)

老版本的依赖方式

 <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>1.1.1</version>
    </dependency>

        <!--  use old kafka spout code -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

老版本直接使用KafkaSpout

topologyBuilder.setSpout("WordCountFileSpout", new KafkaSpout(new SpoutConfig(new ZkHosts("zk01:2181,zk02:2181,zk03:2181"),"test","/test","storm")), 1);

02 Storm与Kafka集成(新版本)

新版本的依赖方式

 <!--  use new kafka spout code -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.0</version>
        </dependency>

新版本的代码

 KafkaSpoutConfig.Builder<String, String> builder = KafkaSpoutConfig.builder("node01:9092","test");
        builder.setGroupId("test_storm_wc");
        KafkaSpoutConfig<String, String> kafkaSpoutConfig = builder.build();
        topologyBuilder.setSpout("WordCountFileSpout",new KafkaSpout<String,String>(kafkaSpoutConfig), 1);

标签: storm, kafka

相关文章推荐

已有 2 条评论

  1. kafka不再仅仅是一个类JMS的消息系统,kafka1.0.0前不久发布,开始向流式计算方向发展喽!

    边琪 回复
    1. 没有深入研究,感觉就是一个消费者

      毛祥溢 回复

添加新评论,含*的栏目为必填