java - kakfa Consumer can't rebalance after 5 retries
问 题 JavaAPI中Kafka的消费者不能正确的连接到服务器的Kafka的队列。百度、google了很多都无法解决。 这是配置信息: props.put("zookeeper.connect", "XXXXXX:2181"); //配置Zookeeper连接超时间隔 props.put("zookeeper.connection.timeout.ms", "1000000"); //配置Zookeeper的组id(The ‘group.id’ string defines the Consumer Group this process is consuming on behalf of.) props.put("group.id", "test-group"); props.put("zookeeper.session.timeout.ms", "4000");
1118 2022-07-19
编程技术问答社区
rabbitmq - Spring4为我们提供了@Async注解,我们还需要消息中间件作异步消息处理吗?
问 题 我们一般会用rabbitmq或者kafka作异常消息处理,但是今天看到spring4中有个@Async注解,作用就是异步调用,如果是这样的话我们还需要用消息中间件吗? 代码如下: public String method(){ String result = a(); // 现状:发送kafka事件,在kafka的consumer中处理b方法的内容 sendEvent(); // 问题:能不能把sendEvent方法删除,直接把b()方法放到这里来?? // b(); return result; } @Async public void b(){ ... ... // 这个方法相对比较耗时,并且对method()方法的调用返回值没有影响,完全可以异步处理 } 解决方案 消息中间件的使用不是简单为了异步调用方法,主要是为了解耦,构建分布式系统。 如果只是为了异步调用
350 2022-07-19
编程技术问答社区
kafka - The type scala.ScalaObject cannot be resolved.
初学者kafka的java客户端遇到了一个问题。 producer.send(data);报错 内容为: The type scala.ScalaObject cannot be resolved. It is indirectly referenced from required .class files 在网上查到过说是scala-library.jar添加到项目的classpath路径下即可,而且别人都解决了。 可是我的依然没有任何变化,依然报上述错误。 请各位大神帮帮小弟吧,万分感激。
202 2022-07-19
编程技术问答社区
如何在eclipse java中添加第三方包
我正在尝试在 Eclipse 编辑器中编写一个 java 程序.出于这个原因,我想添加 kfka 第三方库.我在程序顶部添加了这一行 import kafka.javaapi.producer.Producer; 但它给了我错误. “导入的kafka无法解析.". 我是 java 和 eclipse 的新手,从最近 1 天开始寻找解决方案. 我尝试过的: 我从这里下载了Jar文件,并使用 属性->Java 构建路径 ->添加外部jar 但是还是报同样的错误. 解决方案 可能你的代码和你下载的jar不匹配,见api - kafka.javaapi.* 和 org.apache.kafka.* 有什么区别?- 堆栈溢出[^].
344 2022-07-19
编程技术问答社区
logstash打数据到kafka失败
问 题 基本环境 centos 7.2 logstash kafka 0.9.1 logstash配置 demo.conf input{ stdin{} } output{ file{ path => "/data/demo.txt" } stdout{ codec => rubydebug } kafka{ bootstrap_servers => "172.19.170.3:31000" message_key => "mifi" topic_id => "test01" } } 报错信息 logstash向kafka打数据报错,报错如下 root@Data172-19-170-4:/data# /opt/logstash/bin/logstash agent -f de
1250 2022-07-19
编程技术问答社区
java - zkClient API问题?
问 题 public T readData(java.lang.String path, org.apache.zookeeper.data.Stat stat) 这个方法的作用是什么?为什么要有第二个参数stat? 解决方案 把path节点的Stat数据拷贝到参数stat中。因为java中方法不能返回多个值,所以要这么干
272 2022-07-19
编程技术问答社区
控制卡夫卡消费者监听器
Assume Kafka consumer has to consume 5 data and it has to process in A class & B class. 5 Data --> KAFKA CONSUMER --> PROCESS A CLASS ---> PROCESS B Class. Whether 5 data will be consumed immediately one by one or is there any time period to consume one data after next data. Reason for this question is if Data1 data is consumed in the above flow and PROCESS B class take longer timer whether DATA 2 will consume before PROCESS B class completes.
852 2022-07-19
编程技术问答社区
kafka-php - java使用kafka提交byte[]数据php怎么读取?
问 题 java中producer提交数据用的byte[]类型,为什么php-rdkafka读取的payload对象是string类型的? 解决方案 php中string类型是二进制安全的,所以使用string存储二进制数据和对数据进行操作就和操作普通的字节数组基本相同。
208 2022-07-19
编程技术问答社区
redis - 关于队列实时消费?
问 题 在项目中是用的cron每分次跑一次,大家做的是实时的吗还是也是跑定时? 解决方案 使用redis的阻塞队列实时消费,没有消息就会自动阻塞,有了消息会自动消费。 blpop和brpop命令
230 2022-07-19
编程技术问答社区
zookeeper - Java项目,kafka consume相关
问 题 1.Kafka的消费者什么时候开启? 是随着项目的启动就开启消费者吗还是通过监听有新的消息才开启消费者? 回答问题的不要看ID,本人要脸 解决方案 你不开启你从何途径监听有新的消息?如果其他途径也可以,为什么不统一使用 Consumer 呢?结构也简单,程序也可靠,也方便分布到不同服务器。 当然了,还得看你业务了,如果你 8 百年也没来一条消息,来一条处理几天,那是得考虑下。此种情况得到通知后可以 poll(0)
242 2022-07-19
编程技术问答社区
java - kafka spring 框架的集成
问 题 谁能描述一下kafka是怎么接受一个http请求,并在spring mvc的controller是怎么接受处理kafka过来的消息的? 解决方案 Kafka是不会直接接收Http请求的。一般是程序里接收数据然后写到kafka里,另一个程序再去kafka里读取,也就是Producer和Consumer。 至于怎么写到kafka里,用kafka提供的client就可以了。
110 2022-07-19
编程技术问答社区
java - kafka消息发送不出去
问 题 import java.util.Properties; import net.sf.json.JSONObject; import kafka.javaapi.producer.Producer; import kafka.javaapi.producer.ProducerData; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import kafka.serializer.StringEncoder; //利用线程池 调用类的静态函数发送消息 但是发布出去 不知道什么原因 public class SendOid2Kafka { private static Producer producer = createProducer(); public static void sendMessage2CMS(JSONObject message)
112 2022-07-19
编程技术问答社区
java - scala给BigInt赋初值失败?
问 题 scala> val x : scala.math.BigInt = 1881676371789154860897069000 :1: error: integer number too large val x : scala.math.BigInt = 1881676371789154860897069000 使用Scala的解释器给BigInt赋初值失败,为什么BigInt类型却不能赋值超过integer的值? 解决方案 問題不在BigInt, 是右邊那個字面量無法求值. scala> 1881676371789154860897069000 :1: error: integer number too large 1881676371789154860897069000 BigInt可以用字符串初始化 scala> BigInt("1234566789008723457802308
534 2022-07-19
编程技术问答社区
如何用Java向kafka发送json数据
问 题 在网上都只看到一些Java生产STRING类型的消息。 按照Java的producer类来看,是可以自定义发送消息的类型,比如 producer.send(new KeyedMessage>(topic,message); 可是这样运行会报错,报错如下,请求高人解答: Exception in thread "Thread-4" java.lang.ClassCastException: java.util.HashMap cannot be cast to java.lang.String at kafka.serializer.StringEncoder.toBytes(Encoder.scala:46) at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scal
3188 2022-07-17
编程技术问答社区
Win32异常:找不到指定的模块
我正在尝试在统一环境中运行 Kafka 的代码示例,因此,我创建了一个消费者客户端(代码如下). using System.Collections; using System.Collections.Generic; using UnityEngine; using Confluent.Kafka; using Confluent.Kafka.Serialization; using System.Text; public class KafkaConsumer : MonoBehaviour { // Use this for initialization void Start () { /* The consumer application will then pick the messages from the same topic and write them to console output.
556 2022-07-17
编程技术问答社区
spark-streaming - spark streaming 集成 kafka,使用window时出现错误
问 题 当使用 spark streaming 2.0.0 集成 kafka 0.10.0时出现 KafkaConsumer 多线程争用的问题。 部分代码如下: val ssc = new StreamingContext(sc, Seconds(5)) val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)) val data = stream.map(_.value()) val word = data.map(word => (word,1)) val result = data.reduceByKeyAndWindow({ (x, y) => x + y }, { (x, y) => x - y },
766 2022-07-17
编程技术问答社区
zookeeper - kafka启动未成功?
问 题 kafka启动卡在这一行不动 [2016-11-25 17:34:26,630] INFO [Kafka Server 0], started (kafka.server.KafkaServer) kafka的启动日志如下: [root@VM_102_155_centos kafka_2.11-0.10.1.0]# bin/kafka-server-start.sh config/server.properties [2016-11-25 17:34:25,730] INFO KafkaConfig values: advertised.host.name = null advertised.listeners = null advertised.port = null authorizer.class.name = auto.create.topics.enable = t
300 2022-07-17
编程技术问答社区
java - kafka 客户端消费过慢
问 题 现在有个topic 队列里面的数据快速增加的时候客户端消费能力跟不上造成数据的积压,现在的扩展方案是增加了消费者但是都是属于同一个group下面的,查看了下数据还是有很多,我的分区数量是大于consumer 的数量的,想问下有什么好的方法可以提升消费速度, 解决方案 消费逻辑开多线程 加机器,加partition 消费跟不上生产, 看下消费逻辑在什么地方耗时比较高,一味地加机器或者加partition不能根本解决问题的
580 2022-07-17
编程技术问答社区
java - kafka不能连接写入
问 题 远程调用生产者写入数据时报错如下 ERROR Error when sending message to topic test with key: null, value: 6 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) kafka本地连接时可以生产和消费的 解决方案 vi config/server.properties listeners监听注释打开并指定当前机器IP与端口 具体参见kafka不能远程连接问题
242 2022-07-17
编程技术问答社区