问 题 JavaAPI中Kafka的消费者不能正确的连接到服务器的Kafka的队列。百度、google了很多都无法解决。 这是配置信息: props.put("zookeeper.connect", "XXXXXX:2181"); //配置Zookeeper连接超时间隔 props.put("zookeeper.connection.timeout.ms", "1000000"); //配置Zookeeper的组id(The ‘group.id’ string defines the Consumer Group this process is consuming on behalf of.) props.put("group.id", "test-group"); props.put("zookeeper.session.timeout.ms", "4000");
以下是关于 kafka 的编程技术问答
问 题 我们一般会用rabbitmq或者kafka作异常消息处理,但是今天看到spring4中有个@Async注解,作用就是异步调用,如果是这样的话我们还需要用消息中间件吗? 代码如下: public String method(){ String result = a(); // 现状:发送kafka事件,在kafka的consumer中处理b方法的内容 sendEvent(); // 问题:能不能把sendEvent方法删除,直接把b()方法放到这里来?? // b(); return result; } @Async public void b(){ ... ... // 这个方法相对比较耗时,并且对method()方法的调用返回值没有影响,完全可以异步处理 } 解决方案 消息中间件的使用不是简单为了异步调用方法,主要是为了解耦,构建分布式系统。 如果只是为了异步调用
我正在尝试在 Eclipse 编辑器中编写一个 java 程序.出于这个原因,我想添加 kfka 第三方库.我在程序顶部添加了这一行 import kafka.javaapi.producer.Producer; 但它给了我错误. “导入的kafka无法解析.". 我是 java 和 eclipse 的新手,从最近 1 天开始寻找解决方案. 我尝试过的: 我从这里下载了Jar文件,并使用 属性->Java 构建路径 ->添加外部jar 但是还是报同样的错误. 解决方案 可能你的代码和你下载的jar不匹配,见api - kafka.javaapi.* 和 org.apache.kafka.* 有什么区别?- 堆栈溢出[^].
问 题 基本环境 centos 7.2 logstash kafka 0.9.1 logstash配置 demo.conf input{ stdin{} } output{ file{ path => "/data/demo.txt" } stdout{ codec => rubydebug } kafka{ bootstrap_servers => "172.19.170.3:31000" message_key => "mifi" topic_id => "test01" } } 报错信息 logstash向kafka打数据报错,报错如下 root@Data172-19-170-4:/data# /opt/logstash/bin/logstash agent -f de
Assume Kafka consumer has to consume 5 data and it has to process in A class & B class. 5 Data --> KAFKA CONSUMER --> PROCESS A CLASS ---> PROCESS B Class. Whether 5 data will be consumed immediately one by one or is there any time period to consume one data after next data. Reason for this question is if Data1 data is consumed in the above flow and PROCESS B class take longer timer whether DATA 2 will consume before PROCESS B class completes.
问 题 import java.util.Properties; import net.sf.json.JSONObject; import kafka.javaapi.producer.Producer; import kafka.javaapi.producer.ProducerData; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import kafka.serializer.StringEncoder; //利用线程池 调用类的静态函数发送消息 但是发布出去 不知道什么原因 public class SendOid2Kafka { private static Producer producer = createProducer(); public static void sendMessage2CMS(JSONObject message)
问 题 scala> val x : scala.math.BigInt = 1881676371789154860897069000 :1: error: integer number too large val x : scala.math.BigInt = 1881676371789154860897069000 使用Scala的解释器给BigInt赋初值失败,为什么BigInt类型却不能赋值超过integer的值? 解决方案 問題不在BigInt, 是右邊那個字面量無法求值. scala> 1881676371789154860897069000 :1: error: integer number too large 1881676371789154860897069000 BigInt可以用字符串初始化 scala> BigInt("1234566789008723457802308
问 题 在网上都只看到一些Java生产STRING类型的消息。 按照Java的producer类来看,是可以自定义发送消息的类型,比如 producer.send(new KeyedMessage>(topic,message); 可是这样运行会报错,报错如下,请求高人解答: Exception in thread "Thread-4" java.lang.ClassCastException: java.util.HashMap cannot be cast to java.lang.String at kafka.serializer.StringEncoder.toBytes(Encoder.scala:46) at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scal
我正在尝试在统一环境中运行 Kafka 的代码示例,因此,我创建了一个消费者客户端(代码如下).
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using System.Text;
public class KafkaConsumer : MonoBehaviour {
// Use this for initialization
void Start () {
/*
The consumer application will then pick the messages from the same topic and write them to console output.
问 题 当使用 spark streaming 2.0.0 集成 kafka 0.10.0时出现 KafkaConsumer 多线程争用的问题。 部分代码如下: val ssc = new StreamingContext(sc, Seconds(5)) val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)) val data = stream.map(_.value()) val word = data.map(word => (word,1)) val result = data.reduceByKeyAndWindow({ (x, y) => x + y }, { (x, y) => x - y },
问 题 kafka启动卡在这一行不动 [2016-11-25 17:34:26,630] INFO [Kafka Server 0], started (kafka.server.KafkaServer) kafka的启动日志如下: [root@VM_102_155_centos kafka_2.11-0.10.1.0]# bin/kafka-server-start.sh config/server.properties [2016-11-25 17:34:25,730] INFO KafkaConfig values: advertised.host.name = null advertised.listeners = null advertised.port = null authorizer.class.name = auto.create.topics.enable = t