当我在Wildfly v 10中部署简单的Kafka项目(作为JAR的工作正常)时,我得到了一些Zookeeper连接例外[1]. [1]] 15:21:58,531 ERROR [org.jboss.msc.service.fail] (ServerService Thread Pool -- 82) MSC000001: Failed to start service jboss.deployment.unit."ratha.war".component.KafkaServiceBean.START: org.jboss.msc.service.StartException in service jboss.deployment.unit."ratha.war".component.KafkaServiceBean.START: java.lang.IllegalStateException: WFLYEE0042: Failed to construct component
以下是关于 kafka-consumer-api 的编程技术问答
consumer.subscribe(Pattern.compile(".*"),new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection clctn) { } @Override public void onPartitionsAssigned(Collection clctn) { } }); 如何在Apache/kafka中使用Regex消费所有主题? 我尝试了上述代码,但它不起作用. 解决方案 用于正则使用以下签名 KafkaConsumer.subscribe(Pattern pattern,
我正在尝试在Unity环境中运行 kafka 的代码样本,因此,我创建了 consumer client(下面给出的代码). using System.Collections; using System.Collections.Generic; using UnityEngine; using Confluent.Kafka; using Confluent.Kafka.Serialization; using System.Text; public class KafkaConsumer : MonoBehaviour { // Use this for initialization void Start () { /* * The consumer application will then pick the messages from the same topic and write them to consol
我正在尝试将我的原始流"文本"中获得的字符串值转换为使用函数" mapValues"到NewsTream中的Jonobject消息.然后将我在NewsTream中获得的任何内容流式传输到称为" TestOutput"的主题.但是,每次消息实际上都会通过转换块,我都会得到一个NullPoInterException,其中仅指向Kafka流库中的错误.不知道发生了什么:( P.S.当我从原始流创建新的Kafka流时,新的流属于原始建筑商?由于我需要构建器来创建kafkastreams对象并开始流式传输,因此我不确定我是否需要对新流进行其他操作,而只是指定其去向. //Testing a Kafka Stream Application public class testStream { public static void main(String[] args) throws Exception { //Configurations Properties props
我正在运行KAFKA 2.13-2.4.1,并在用Java编写的Kafka客户端(消费者)和一个Kafka群集(每个节点都有一个经纪人)之间配置SSL连接. 我通过package kafkaconsumerssl; import java.time.Duration; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; public class KafkaConsumerSSLTest { public
我有一个Kafka消费者.它消耗了一个字符串.然后,基于字符串,我们将其转换为不同的AVRO对象,然后将它们发布到不同的主题.我们需要EOS,而我们得到的问题是标有@Primary Works的生产者,但是没有主要失败的情况下,下面的错误.无论如何是否有两个? kafkaconsumer @Configuration public class KafkaConsumerConfig { @Value("${kafka.server}") String server; @Value("${kafka.consumer.groupid}") String groupid; @Autowired Tracer tracer; @Bean public ConsumerFactory consumerFactory() { Map
在kafka文档中,我试图理解此属性 max.poll.interval.ms 使用消费者组管理时,Poll()调用()调用之间的最大延迟.这将消费者在获取更多记录之前可能会闲置的时间上有一个上限.如果在此超时到期之前未调用Poll(),则将消费者视为失败,并且该组将重新平衡以将分区重新分配给另一个成员. 这意味着每个民意调查将在poll-time-out之前进行,默认情况下为5分钟.所以我的问题确切地说是在两个连续的民意调查之间进行多少时间消费者线程? 例如:消费者线程1 第一次民意调查 - >有100个记录 - >处理100个记录(花了1分钟) - >消费者提交偏移 第二个民意调查 - >有100个记录 - >处理100个记录(花了1分钟) - >消费者提交偏移 消费者在第一民意调查之间是否需要时间?如果是,为什么?以及我们如何更改这段时间(当主题具有庞大的数据时,假设此时间) 解决方案 尚不清楚您的意思是"花时间";如果您谈论的是spri
我正在尝试在Spring Boot中开发Kafka消费者.我能够在Kafka工具中设置Kafka群集,并能够手动阅读消息.我也在Spring Boot中使用相同的配置,但最终出现以下错误和此警告. 2019-06-10 13:45:40.036 WARN 8364 --- [ id3-0-C-1] org.apache.kafka.clients.NetworkClient : Bootstrap broker XXXXXX.DEVHADOOP.XXXX.COM:6768 disconnected 2019-06-10 13:45:40.038 WARN 8364 --- [ id1-0-C-1] org.apache.kafka.clients.NetworkClient : Bootstrap broker XXXXXXX.DEVHADOOP.XXXXXXXXX.COM:6768 disconnected 2019-06-10 13:45:40.
我正在使用使用org.springframework.kafka.listener.ConcurrentMessageListenerContainer 从KAFKA主题中消费消息的"程序化"方式 我想知道是否有一种"春季"方式将偏移倒置以返回主题的特定分区以返回" n"消息? 想知道这样做的最干净的方式(以编程方式而不是使用CLI). 解决方案 如果要在应用程序启动期间重置偏移量,请使用ConsumerAwareRebalanceListener并在分配分区时对消费者进行搜索;您可以通过调用Consumer.position(). 找到当前偏移量. 如果您想在运行时任意倒带分区,请让侦听器实现ConsumerSeekAware并获取对ConsumerSeekCallback的引用. 参见 . ConsumerSeekAware也具有onIdleContainer(),当idleEventInterval期间未收到记录时,它将被调用;该回调为您提供当前的
我希望能够通过属性阅读主题,而无需在Kafka侦听器注释上指定任何内容.不使用弹簧靴. 我尝试通过"主题"键直接从属性对象读取主题.这给出了一个错误:IllegalStateException:topics, topicPattern, or topicPartitions must be provided. // some class @KafkaListener public void listener(List messages) { System.out.print(messages); } //some other class @Bean public ConsumerFactory consumerFactory(Properties topicProp) { return new DefaultKafkaConsumerFactory(topicProp); } @Bean public Concurr
使用Kafka经纪人:1.0.1 Spring-Kafka:2.1.6.Release 我正在使用以下设置的批处理消费者: // Other settings are not shown.. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100"); 我以以下方式使用春季听众: @KafkaListener(topics = "${topics}", groupId = "${consumer.group.id}") public void receive(final List data, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) final List partitions, @Header(KafkaHeaders.RECEI
我们有一个使用org.apache.kafka.clients.consumer.KafkaConsumer 的Java应用程序,该应用程序消耗了Kafka消息 我们已经创建了具有Spring-Kafka依赖项的Spring Boot应用程序,但无法读取新项目中的消息.已经检查了明显的参数,包括主机名和bootstrap服务器的端口(日志显示为识别),组,主题,该弹簧启动(如原始消费者)使用StringDeserializer.这是我们的配置文件: spring: kafka: bootstrap-servers: hostname1:9092,hostname2:9092 consumer: auto-offset-reset: earliest group-id: our_group enable-auto-commit: false fetch-max-wait: 500 max-poll-r
我想设计一种解决方案,以向几个提供商发送不同类型的电子邮件.一般概述. 我有几个上游提供商sendgrid,zoho,mailgun等.它们将用于发送电子邮件等.例如: 注册新用户的电子邮件 删除用户的电子邮件 空间配额限制的电子邮件 (通常大约6种电子邮件) 每种类型的电子邮件都应生成生产者,转换为串行的Java对象,并将其发送到与上游提供商集成的适当的Kafka消费者. 问题是如何设计kafka以提高性能和可伸缩性? 1-ST解决方案到目前为止,我可以认为是否要为每种类型的电子邮件和每个网关都有主题(6x4 = 24个主题).将来,我希望添加更多类型的消息和网关.也许它将达到600个主题.这将使大量的Java源代码用于维护和许多要管理的主题.另一个不利的是,kafka日志将是巨大的. 2-nd解决方案将为每个消费者使用1个主题(集成网关).但是在这种情况下,我如何根据要发送的消息类型发送每种类型的不同序列化Java对象? 是否有一些
我正在尝试在弹簧靴中实施Kafka重试消费者,并使用SeekTocurrentErrorHandler进行重试. 我已经设定了进行5次重试的退缩政策. 我的问题是,可以说重试的第一次尝试,例外是"数据库不可用",第二次尝试DB可用,但是在另一个步骤中又发生了另一个失败,例如超时,在这种情况下,将重试计数返回到零,启动新鲜,或者将继续尝试从第一个重试剩下的算术. 我的要求是,每次异常都与以前投掷的异常不同. . 如何在kafka中实现这一目标? 这是我的消费者配置. @Bean public ConcurrentKafkaListenerContainerFactory kafkaRetryListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new Concurren
我正在使用基于注释的spring kafka侦听器来消耗kafka消息,而代码如下 消费员工对象 Class Employee{ private String name; private String address; private Object account; //getters //setters } 帐户对象在运行时决定是保存帐户还是当前帐户等. Class SavingAcc{ private BigDecimal balance; } Class CurrentAcc{ private BigDecimal balance; private BigDecimal limit; } 保存和经常账户具有大量的字段来存储余额. 因此,在从Kafka生产者那里发送员工对象时,所有字段均正确映射并以正确的bigdecimal形式出现,等等. 但是,在另一个服务中消耗员工对象时,帐户对象会以linkedhashmap和bigdecimal字段的形式
我正在尝试发布来自单个生产者的两个不同主题. 在这里我创建了两个主题: @Bean public NewTopic multi1() { return TopicBuilder.name("multi1").partitions(1).build(); } @Bean public NewTopic multi2() { return TopicBuilder.name("multi2").partitions(1).build(); } 这就是IAM将消息发送到两个主题的方式: public void sendingtomultitopic() { IntStream.range(0, 100).forEach(i->this.template.send("multi1", "mutli1 data value->"+i)); IntStream.range(
我希望我的消费者处理大批批量,因此我的目标是让消费者"醒着",例如,在1800MB的数据或每5分钟的数据中,以一项为准. . 我是一个Kafka-springboot应用程序,该主题有28个分区,这是我明确更改的配置: 参数 值我设置 默认值 为什么我以这种方式设置 fetch.max.bytes 1801MB 50MB fetch.min.bytes+1MB fetch.min.bytes 1800MB 1B 所需的批量尺寸 fetch.max.wait.ms 5min 500ms 所需的节奏 max.partition.fetch.bytes 1801MB 1MB 不平衡的分区 request.timeout.ms 5分钟+1 sec 30秒 fetch.max.wait.ms + 1 sec max.poll.records 10000 500 1500发现太低 max.poll.interval.ms 5分钟+1 sec 5min fetch.max.
Kakfa-broker在管理偏移方面是否有任何已知问题? BCZ,我们面临的问题是当我们尝试重新启动Kafka-Consumer(即应用程序重新启动)时,有时所有偏移量都重置为0. 关于为什么消费者无法从上次任命的偏移开始. 完全毫无意义. 我们最终在产品中面临此问题,其中整个Q事件再次重播: Spring-boot版本-2.2.6版本 Spring -Kafka -2.3.7版本 kafka -client -2.3.1 apache-kafka-kafka_2.12-2.3.1 我们有10个主题,每个主题属于同一组的50个分区,我们根据负载在运行时增加主题分区和消费者数量. auto-commit = false 同步处理后的每个偏移 Max-Poll Records设置为1 在所有这些配置之后,它按照本地设置的预期运行,在部署到产品后,我们会看到此类问题并非每个重新启动. 我缺少任何配置吗? 完全无知!!!!! 解决方案 请勿在另
我正在尝试从像卡夫卡这样的特定主题中进行调查数据 但是大多数情况下,它并没有获取所有记录. 我使用的超时为5000ms,每个100ms我都在调用此方法 注意:我也订阅了特定主题 @scheduled(fixedDelayString =" 100") public void pollRecords() { ConsumerRecords records = leadConsumer.poll("5000"); 如何从kafka获取所有数据? 解决方案 使用max.poll.records消费者配置参数指定了从Poll()返回的最大记录数. (默认值为500)此外,还有另一个消费者配置参数限制了从服务器返回的最大数据. (fetch.max.bytes和max.partition.fetch.bytes) 另一方面,在经纪人方面,还有另一个尺寸限制,称为message.max.bytes