保证Kafka生产者的全球唯一交易
使用Kafka的最后一个版本0.11.0.0 Apache团队正在介绍IDEMPOTENT生产者和交易. 是否可以保证我们要登录的整个消息(例如100万),仅在最后才能实现? 我希望这样做,例如生产者将与经纪人的联系失去并且无法恢复原状,那么消费者将看不到任何消息.是否可以? 解决方案 是的,使用生产者中的交易是可能的.您开始交易,发布所有消息,然后提交交易.所有消息一次写给Kafka一个,但新的read_comment模式中的消费者只会在生产者投入交易后看到消息,并且将特殊的交易标记添加到Kafka Commit Log中. . 不在read_commitd模式中的消费者可以看到消息是单独编写的,即使他们可能还没有(或曾经)进行. 开放交易可以保持多长时间是有限制的,因此最终,如果生产者死亡并且没有明确结束交易,它将超时,回滚和read_comment的消费者将永远不会看到这些消息.
6 2023-10-25
编程技术问答社区
等待Kafka发送API返回的List of ListenAbleFuture
我有可聆听的列表.我想等待此列表 ListenableFuture>如果他们没有comepled,则在15分钟内持续15分钟. 我该如何实现. 目前我正在这样做,但是这是我不想要的每个可听图15分钟. for (ListenableFuture> m : myFutureList) { m.get(15, TimeUnit.MINUTES) ; } ListenableFuture> is from import org.springframework.util.concurrent.ListenableFuture; 我已经经历了等待未来列表用于完整的future 解决方案 创建CountDownLatch,例如new CountDownLat
4 2023-09-14
编程技术问答社区
为每个Http请求提供多个Kafka Producer实例
我有一个休息终点,可以同时由多个用户调用.此休息终点是交易Kafka生产商. 我了解的是,如果我们使用交易,我不能同时使用同一kafka生产者实例. 如何有效地为每个HTTP请求创建一个新的KAFKA生产者实例? //Kafka Transaction enabled producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-1-" ); @Service public class ProducerService { @Autowired private KafkaTemplate kafkaTemplate; public void postMessage(final MyUser message) {
4 2023-09-14
编程技术问答社区
为什么当我重启我的kafka broker时,我们会丢失一些信息?
i m在单节点中运行kafka,我想在关闭kafka经纪人时看到kafka生产者的行为打印每次发送的JSON对象的偏移. 我的应用程序正常工作,但是当我关闭Kafka经纪人时,几秒钟后,我重新启动了经纪人,我的制作人返回以正常的方式从最新偏移量发送对象. 对于我的示例,当偏移量= 983中的示例中,我沿着kafka降低了我的kafka,当我再次启动经纪人时,我看到kafka开始从 offset = 984 开始发送消息,但我发现3或此消息错误丢失了4条消息!! 983 offset acked 2019-04-01 16:20:34.635 INFO 7656 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=jsa-group] Group coordinator dali-X556UJ:9092 (id: 2147483
4 2023-09-14
编程技术问答社区
默认的KafkaProducerFactory与transactionIdPrefix在bootstrap服务器关闭时有无尽的等待
hy, 我正在使用Spring-Kafka 1.3.0.Release来创建交易生产商. Bootstrap服务器关闭时,DefaultKafKaproDucerFactory无休止地等待直到Bootstrap服务器启动. 我在做什么错? 我可以设置超时和/或其他类似属性吗? 这是我的代码重现方案的一个示例: public static void main(String[] args) { final DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(producerConfigs()); producerFactory.setTransactionIdPrefix("transactionIdPrefix"); final Producer produc
0 2023-09-14
编程技术问答社区
交易Kafka生产商
我试图使我的Kafka生产商交易. 我要发送10条消息.如果发生任何错误 我正在使用Spring Boot Kafkatemplate. @Configuration @EnableKafka public class KakfaConfiguration { @Bean public ProducerFactory producerFactory() { Map config = new HashMap(); // props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); // props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, // appProps.getJksLocation());
4 2023-09-14
编程技术问答社区
@SentTo如何将信息发送到相关主题?
我正在使用ReplyingKafkatemplate在我的REST控制器中返回同步响应.我也在设置标题回复_Topic.对于听众微服务部分, @KafkaListener(topics = "${kafka.topic.request-topic}") @SendTo public Model listen(Model request) throws InterruptedException { SumModel model = request.getRequest(); int sum = model.getNumber1() + model.getNumber2(); SumResp resp = new SumResp(sum); request.setReply(resp); request.setAdditionalProperty("sum", sum); return request
6 2023-09-14
编程技术问答社区
Kafka更新元数据失败
我正在使用 spring-boot . 我正在尝试使用以下生产者代码在Kafka主题中产生消息: 主题mobile-user有5个分区和2个复制因子.我已经在问题结束时附上了 kafka设置. package com.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.Listenab
8 2023-09-13
编程技术问答社区
spring kafka模板生产者性能
我正在使用Spring Kafka模板来生产消息.以及它产生的消息的速度太慢.生产15000条消息大约需要8分钟. 以下是我创建kafka模板的方式: @Bean public ProducerFactory highSpeedAvroProducerFactory( @Qualifier("highSpeedProducerProperties") KafkaProperties properties) { final Map kafkaPropertiesMap = properties.getKafkaPropertiesMap(); System.out.println(kafkaPropertiesMap); kafkaPropertiesMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Str
6 2023-09-13
编程技术问答社区
我们可以在spring boot中使用多个kafka模板吗?
在我的Spring Boot Kafka Publisher应用程序中,我想为在字符串(JSON)或字节格式中发布消息提供支持,因为我想为JSON和AVRO提供支持.但是弹簧靴中的Kafka模板让我们仅定义任何一个模板.有没有一种方法可以使用两个模板或任何其他方法来为JSON和AVRO提供支持? KafkaTemplate仅适用于字符串,但我也想发布avro,它应该像KafkaTemplate 一样 解决方案 您可以尝试使用不同的配置来创建Kafkatemplate: @Bean public ProducerFactory producerFactoryString() { Map configProps = new HashMap(); //additional config parameters .... c
10 2023-09-13
编程技术问答社区
Kafka模板和Kafka生产者之间的区别是什么?
正如我所看到的,Kafka模板内部使用的Kafka生产商.我只想知道确切的区别. 另外,与Kafka生产商相比,我发现了Kafka模板中的许多Send()方法. 请帮助我.如果有人知道更多. 解决方案 生产者是模式,而Kafkatemplate包装了生产者实例,并提供了将消息发送到Kafka主题的便利方法. ( source ) kafka kafkatemplate "> kafkatemplate 是Spring的ITS Spring spring of It springs (尽管它不能直接实施生产者),因此它为您提供了更多方法.因此,您可以使用Kafkatemplate来开始或通过自己实施生产者实施自己的解决方案. 其他解决方案 kafka生产者是PubSub模式中生产者的术语. 如果您想使用Spring实现Apache Kafka,Spring在此生产商周围提供了包装器,我们称其为 kafka Template . kafka Template 是Spr
6 2023-09-13
编程技术问答社区
交易型生产者与单纯的闲置型生产者的比较 Java (Exception OutOfOrderSequenceException)
我将Spring-Kafka与IDEMPOTENT生产者配置: 这些是我的配置道具: Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Joiner.on(",").join(appProps.getBrokers())); //configure the following three settings for SSL Encryption props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, appProps.getJksLocation()); props.put(SslConfigs.SSL_TRUSTSTORE_PA
32 2023-09-13
编程技术问答社区
发送到kafka主题时,序列化消息的错误
我需要测试包含标头的消息,因此我需要使用MessageBuilder,但我无法序列化. 我尝试在生产者道具上添加序列化设置,但不起作用. 有人可以帮我吗? 此错误: org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.springframework.messaging.support.GenericMessage to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer 我的测试课: public class TransactionMastercardAdapterTest extends AbstractTest{ @Autowired private KafkaTemplate
2 2023-09-13
编程技术问答社区
ExecutionException:由于:org.springframework.kafka.requestreply.KafkaReplyTimeoutException: 使用ReplyingKafkaTemplate的回复超时了
我正在使用kafka向经纪人发布异步和同步消息. 我在两个模板上都使用相同的请求主题.. 当使用火和忘记时(​​async)我看不到任何问题,因为听众会从主题中随机收听消息.当使用同步呼叫时,我会得到超时. 我需要维护不同模板的多个听众吗? 具有同步和异步操作的同一主题会有任何问题吗? kafkaconfig.java //同步调用的模板 @Bean public ReplyingKafkaTemplate replyingKafkaTemplate ( ProducerFactory pf, ConcurrentMessageListenerContainer repliesContainer) { ReplyingKafkaTemplate replyTemplate =
如何为托管在Kubernetes中的几个应用程序选择Kafka事务ID?
我有一个经典的微服务体系结构.因此,有不同的应用程序.每个应用程序可能具有1..N实例.该系统已部署到Kubernetes.,因此,我们有很多不同的PODs,可以在任何时候启动和停止. 我想实现 read-process-write 我需要卡夫卡交易. 要配置交易,我需要为每个Kafka生产商设置一些transaction id. (实际上,我需要transaction-id-prefix,因为我将Spring用于我的应用程序,并且它具有这样的API).重新启动应用后,这些IDs必须相同. 那么,如何为多种应用程序选择KAFKA事务ID,在Kubernetes中托管? 解决方案 如果消费者开始交易(阅读过程 - 写入),则对于同一应用程序的所有实例,事务ID前缀必须相同(以便僵尸围栏在重新平衡后正确起作用).实际的交易ID是... 如果您有多个应用程序,则它们应该具有唯一的前缀(尽管如果它们
8 2023-09-13
编程技术问答社区
如何解决kafka.common.errors.TimeoutException。自批处理创建以来已过了1条记录(s)xxx毫秒,加上徘徊时间,即将到期。
我正在使用kafka_2.11-2.1.1 和生产者使用弹簧2.1.0.Release. 我在将消息发送到Kafka主题时正在使用春季,我的制作人生成了很多TimeoutExceptions org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for COMPANY_INBOUND--19: 229 ms has passed since batch creation plus linger time 我正在使用以下Kafka生产者设置 acks: 1 retries: 1 batchSize: 100 lingerMs: 5 bufferMemory: 33554432 requestTimeoutMs: 60 我尝试了许多组合(特别是batchSize&lingerMs),但没有任何效果.任何帮助,请在上述方案中设置什么. 再次使用以下配置尝试...但是没有运气相同
2 2023-09-13
编程技术问答社区
Kafka生产者TimeoutException。过期的1条记录
我正在使用弹簧靴的kafka: kafka生产者类: @Service public class MyKafkaProducer { @Autowired private KafkaTemplate kafkaTemplate; private static Logger LOGGER = LoggerFactory.getLogger(NotificationDispatcherSender.class); // Send Message public void sendMessage(String topicName, String message) throws Exception { LOGGER.debug("========topic Name===== " + topicName + "=========message=======" + message);
8 2023-09-13
编程技术问答社区
带有消费者/生产者API的Spring Cloud Stream for Kafka具有事务id前缀的精确一次语义未按预期工作
我有看到不同行为的方案.像总共3种不同的服务 第一条服务将从Solace队列中聆听并将其生产给Kafka 主题1(启用了交易的情况) 第二次服务将从Kafka主题1上方收听,并将其写入另一个Kafka主题2(我们没有手动提交,交易 启用了其他主题,自动提交为false& 隔离.级别设置为read_commited) 以前删除 第三次服务将从Kafka主题2聆听,然后将其写回Solace队列(我们没有手动提交,Auto Funs offset as as Auto offse false&隔离.级别设置为read_commited). 现在,如果我在第二次服务中禁用了交易,那么我在第二服务上启用了交易和隔离级别后的问题将无法读取任何消息. 我们可以在一项服务中启用交易和隔离级 如果我的服务只是生产者或消费者(EOS保证这些服务的保证) 编辑: 以下是我的YML的外观 - kafka: - binder: - transaction:
20 2023-09-13
编程技术问答社区
对KafkaConsumer poll()行为的理解
试图理解(新的Kafka),Kafka中的民意调查事件循环如何工作. 用例:关于该主题的25个记录,最大民意调查大小设置为5. max.poll.interval.ms = 5000 //5 seconds by default max.poll.records = 5 任务序列 调查该主题的记录. 处理循环中的记录. 逻辑要么通过或失败的一些处理登录. 如果逻辑通过(带有偏移)将添加到地图中. 然后将使用Commitsync呼叫进行. 如果失败,则循环将破裂,任何成功之前的成功. 下一个民意调查只会在错误之后继续批量移动,这是预期的吗? 我们基本上期望的是,循环断开和偏移,直到成功过程逻辑应提交,然后下一个民意调查应从失败的消息中继续. 示例,第一批次民意调查5条调查和1,2个偏移成功并成功了,然后第三次失败.因此,如果我们期望有任何错误它要在这一点上停止和民意调查应在第一种情况下从3开始,或者如果在第8期中的第二批次失败,那么下一个民意调
6 2023-09-13
编程技术问答社区