javakafka代码 java往kafka写数据-成都创新互联网站建设

关于创新互联

多方位宣传企业产品与服务 突出企业形象

公司简介 公司的服务 荣誉资质 新闻动态 联系我们

javakafka代码 java往kafka写数据

使用java实现kafka consumer时报错

public static void consumer(){

创新互联公司于2013年成立,是专业互联网技术服务公司,拥有项目成都网站设计、网站建设、外贸网站建设网站策划,项目实施与项目整合能力。我们以让每一个梦想脱颖而出为使命,1280元苏尼特左做网站,已为上家服务,为苏尼特左各地企业和个人服务,联系电话:028-86922220

Properties props = new Properties();  

props.put("zk.connect", "hadoop-2:2181");  

props.put("zk.connectiontimeout.ms", "1000000");  

props.put("groupid", "fans_group");  

// Create the connection to the cluster  

ConsumerConfig consumerConfig = new ConsumerConfig(props);  

ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);  

MapString, Integer map = new HashMapString, Integer();

map.put("fans", 1);

// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume  

MapString, ListKafkaStreamMessage topicMessageStreams = consumerConnector.createMessageStreams(map);  

ListKafkaStreamMessage streams = topicMessageStreams.get("fans");  

// create list of 4 threads to consume from each of the partitions   

ExecutorService executor = Executors.newFixedThreadPool(1);  

long startTime = System.currentTimeMillis();

// consume the messages in the threads  

for(final KafkaStreamMessage stream: streams) {  

executor.submit(new Runnable() {  

public void run() {  

ConsumerIteratorMessage it = stream.iterator();

while (it.hasNext()){

log.debug(byteBufferToString(it.next().message().payload()));

}

}); 

log.debug("use time="+(System.currentTimeMillis()-startTime));

}  

}

Java使用kafka发送消息没有生效

一般消息发不出去很大可能都是配置或环境的问题

1、排查环境是否有问题,zookeeper节点是否存活,kafka节点是否存活,通过命令行的方式能否发出去消息(使用kafka-console-producer.sh),如果通过命令行都发不出去那就是集群的问题了。

2、网络问题,调用机器和集群之间网络是否通畅

3、调用时配置的host、port和集群中配置的是否一致,是否需要使用主机名而不是ip

4、客户端api版本是否和服务端差别太大导致不兼容

5、防火墙问题,关闭集群的防火墙实时

诸如此类,可能性太多就不一 一列举了。

你这既然有打印堆栈,如果报错肯定有异常信息的,可能卡住的时间比较长,耐心等待吧,祝你早日解决bug。

如何写java程序代码测试kafka

我这里是使用的是,kafka自带的zookeeper。

以及关于kafka的日志文件啊,都放在默认里即/tmp下,我没修改。保存默认的

1、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ jps

2625 Jps

2、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ bin/zookeeper-server-start.sh config/zookeeper.properties

此刻,这时,会一直停在这,因为是前端运行。

另开一窗口,

3、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server.properties

也是前端运行。


本文题目:javakafka代码 java往kafka写数据
转载注明:http://kswsj.cn/article/doeecpe.html

其他资讯