我如何从一个单子服务中调用一个方法,使其在整个应用生命周期内运行?
我已经实施了Kafka事件总线作为Net Core的单身服务.该服务本身在startup.cs中使用autoFac配置.该服务具有Listen()方法: public void Listen() { using(var consumer = new Consumer(_config, null, new StringDeserializer(Encoding.UTF8))) { consumer.Subscribe(new string[] { "business-write-topic" }); consumer.OnMessage += (_, msg) => { Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}")
8 2024-04-26
编程技术问答社区
消息在Confluent Kafka Dotnet中丢失
我在最近的C#项目中使用Confluent Kafka软件包.我以以下方式创建了一个生产者: prodConfig = new ProducerConfig { BootstrapServers = "xxx.xxx.xxx.xxx:xxx"}; foreach(msg in msglist){ using(var producer = new ProducerBuilder(prodConfig).Build()){ producer.ProduceAsync(topic, new Message {Value = msg}); } } ,但问题是我的某些消息没有传达给消费者.他们在某个地方迷路了.但是,如果我与生产者一起使用 等待 ,那么所有消息都将传递.如何在不等待的情况下传递我的所有消息. (我有一个分区) 解决方案 首先,您应该仅使用一个Producer发送您的msgLi
0 2024-04-26
编程技术问答社区
Confluent Kafka Dotnet Kerberos支持 Dockerfile(没有SASL机制GSSAPI的提供者)。
I am trying to create a dockerfile which is a kafka client using confluent-kafka-dotnet. 它必须使用kerberos keytabs进行连接,因此我已经阅读 wiki . 这是我的dockerfile: # ---- dotnet build stage ---- FROM mcr.microsoft.com/dotnet/core/sdk:3.1 as build ARG BUILDCONFIG=RELEASE ARG VERSION=1.0.0 # Installing dependencies for li RUN apt-get update && apt-get install libsasl2-modules-gssapi-mit libsasl2-dev unzip build-essential -y COPY ./lib/ / RUN unzip librdkafk
10 2024-04-26
编程技术问答社区
我如何使用AdminClient在Event Hub/Kafka上创建一个主题?
我正在尝试使用adminclient.createTopicsAsync从kafka界面编程创建一个主题(事件中心).连接到Kafka时,这起作用,但不能与事件中心相连.我遇到以下错误: 经纪人不支持的默认分区计数(KIP-464) Broker版本 using Confluent.Kafka; using Confluent.Kafka.Admin; var adminClient = new AdminClientBuilder( new[] { ("sasl.mechanism","PLAIN"), ("security.protocol","SASL_SSL"), ("bootstrap.servers", Address), ("sasl.username", "$ConnectionString"), ("sasl.pas
0 2024-04-26
编程技术问答社区
Kafka消费者和消费者建设的编码差异
我正在使用旧的kafka,我在下面使用的一块代码,其中可能是UTF-7,UTF-8,Unicode等. new Consumer(mConfig, null, new StringDeserializer(mEncoding))) 我正在将我的kafka升级到1.4.0版本. 我发现消费者被方法SetValueDeserializer所取代,但仅接受UTF-8(deserializers.utf8).有什么方法可以发送其他编码? 解决方案 您应该只实现自己的求职者.它看起来像这样: public class MyValueDeserializer : IDeserializer { private readonly Encoding _encoding; public MyValueDeserializer(Encoding encoding) { _encoding
4 2024-04-26
编程技术问答社区
从 Spark 流读取 Kafka SSL 客户端信任库文件时出错
我有kafka的火花流应用程序.我正在从EMR运行它.最近,我实施了Kafka SSL.我正在创建KAFKA客户端,如下所示.当应用程序试图读取TrustStore文件时,我会遇到一个奇怪的错误. 错误是: - Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: /tmp/kafka.client.truststore.jks (No such file or directory) 是什么引起了这个问题? DataStreamReader df = session.readStream() .format("kafka") .option("kafka.bootstrap.servers",kafka_server) .option("subscribe",
10 2024-04-23
编程技术问答社区
由于在与正在读取的主题不同的 Kafka 主题上出现错误,导致 Spark Streaming 失败
有关以下写作主题/阅读主题air2008rand串联: import org.apache.spark.sql.streaming.Trigger (spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("startingOffsets", "earliest") .option("subscribe", "air2008rand") .load() .groupBy('value.cast("string").as('key)) .agg(count("*").cast("string") as 'value) .writeStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("startingOffsets", "earliest
0 2024-04-23
编程技术问答社区
如何避免连续出现 "重置偏移量 "和 "寻找最新偏移量"?
我正在尝试遵循本指南: 我需要配置一些东西吗? 这是我的代码: SparkSession spark = SparkSession .builder() .appName("Testing") .config("spark.master", "local") .getOrCreate(); StructType recordSchema = new StructType() .add("description", "string") .add("location", "string") .add("id", "string") .add("title", "string") .add("company", "string") .add("place", "string") .add("date", "string") .add("senorityLevel", "string") .add("function", "string
10 2024-04-23
编程技术问答社区
Spark 结构化流与 Kafka SASL/PLAIN 身份验证
是否有一种方法可以将火花结构化的流式作业连接到由SASL/Plain身份验证确保的Kafka群集? 我正在考虑类似的东西: val df2 = spark.read.format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("kafka.sasl.mechanism", "PLAIN") .option("kafka.security.protocol", "SASL_PLAINTEXT") .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=...") .option("subscribe", "topic1") .load(); 似乎虽然Spark结构化流识别ka
4 2024-04-23
编程技术问答社区
Spark 流式 jdbc 在数据到来时读取数据流--数据源 jdbc 不支持流式读取
我将PostGre用作数据库.我想捕获每个批次的一个表数据,并将其转换为镶木quet文件并将其存储在S3中.我尝试使用Spark和ReadStream的JDBC选项如下... val jdbcDF = spark.readStream .format("jdbc") .option("url", "jdbc:postgresql://myserver:5432/mydatabase") .option("dbtable", "database.schema.table") .option("user", "xxxxx") .option("password", "xxxxx") .load() ,但它抛出了不支持的例外 Exception in thread "main" java.lang.UnsupportedOperationException: Data source jdbc does not support st
8 2024-04-23
编程技术问答社区
C# Confluent.Kafka SetValueDeserializer对象反序列化
在我的消费者中,我想对kafka Protobuf消息进行挑选.关键是字符串类型,但消息值是Protobuf对象.我知道我必须为消息价值创建自己的自定义求职者,但不知道该如何创建一个.这是我的消费者实施,我需要替换标记的行: using Confluent.Kafka; using System; using System.Threading; namespace EventHubsForKafkaSample { class Worker1 { public static void Consumer(string brokerList, string connStr, string consumergroup, string topic, string cacertlocation) { var config = new ConsumerConfig {
2 2024-04-22
编程技术问答社区
Kafka消费者启动延迟 confluent dotnet
在启动汇合点消费者时,在订阅和随后的民意调查后,似乎需要很长时间才能从服务器接收"分配"事件,因此消息(大约10-15秒) . 起初,我认为有一个汽车主题创建开销,但是是否已经存在是否存在的主题/消费者组相同. 我使用此配置启动消费者,其余代码与Contruent Advanced消费者示例相同: var kafkaConfig = new Dictionary { {"group.id", config.ConsumerGroup}, {"statistics.interval.ms", 60000}, {"fetch.wait.max.ms", 10}, {"bootstrap.servers", config.BrokerList}, {"enable.auto.commit
6 2024-04-19
编程技术问答社区
如何使用Confluent.Kafka.Net客户端创建一个Kafka主题
似乎最受欢迎的.NET客户端是Kafka(/confluent-kafka-dotnet )缺少设置和创建主题的方法. 当调用Producer.ProduceAsync()时,主题是自动创建的,但我找不到一种设置分区,保留策略和其他设置的方法. 我试图在网上找到任何示例,但我发现的只是使用默认值. 也许我可以使用另一个.NET客户端? 解决方案 现在可以在Confluent.kafka .net客户端库的最新版本中获得. See: https://github. com/confluentinc/contruent-kafka-dotnet/blob/b7b04fed82762c67c2841d7481eee59dee3e4e20/examples/examples/acminclient/program.cs.cs using (var adminClient = new AdminClientBuilder(new AdminClien
0 2024-04-18
编程技术问答社区
Kafka配置,只看最后5分钟的数据
对不起,我在卡夫卡(Kafka)是新来的,这个问题是如此简单,但我需要一些帮助. 我没有找出一些配置. 有一个流数据, 我希望消费者仅看到探测器发送的最后5分钟消息.我正在使用contruent.kafka for .net, var config = new Dictionary{ {"group.id","Test1Costumers"}, {"bootstrap.servers",brokerEndpoint}, { "auto.commit.interval.ms", 60000}, { "auto.offset.reset", "earliest" } }; 这是github示例中消费者的配置字典, 另一个问题是,我不想在主题中存储消息超过5分钟,因为如果它们年龄在5分钟以上,我就不需要
2 2024-04-14
编程技术问答社区
Karaf-Kafka OSGI bundle-Producer问题
我正在尝试为Apache Karaf版本4.0.3中的Kafka生产商创建一个简单的捆绑包. 这是我的Java代码 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); //props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //props.put("value.serializer", "org.apache.kafka.common.seria
10 2024-04-05
编程技术问答社区
Bitnami Kafka for Kibernetes-java.net.UnknownHostException: kafka-0.kafka-headless.kafka.svc.cluster.local
我在我的K8群集上安装了Bitnami Kafka.但是我不断看到错误java.net.unknownhostexception:kafka-0.kafka-headless.kafka.kafka.svc.cluster.local in Kafka-o logs. i登录到pod(kafka-o),并为kafka-0.kafka-0.kafka-headless.kafka.kafka.svc.cluster.local.local.local ran nsookup登录,但没有问题就可以解决 root@kafka-0:/# nslookup kafka-0.kafka-headless.kafka.svc.cluster.local Server: 10.43.0.10 Address: 10.43.0.10#53 Name: kafka-0.kafka-headless.kafka.svc.mosip02.local Addres
4 2024-03-31
编程技术问答社区
我们应该运行一个有3个副本的Kafka节点还是3个有1个副本的Kafka节点?
我不明白运行1个带有3个副本的Kafka节点和3个kafka节点之间的区别. 我们要维护自己的kubernetes群集,我们想在其中运行一个kafka群集.我们正在使用我们可以设置: ... 3不同的Kafka服务,带有1个副本,每个副本都有自己的URL(例如Localhost:9092,Localhost:9093和Localhost:9094). ... 1 kafka服务在3个复制品中运行(只有1个URL Localhost:9092用于所有副本). 同步方式有什么区别,什么是配置的更好方法? 解决方案 为了提供高可用性并利用Kafka的并行性用于多个消费者,您应该扩大规模,我建议使用3台服务器. 多个经纪人设置将在同一主题的不同经纪人中传播消息/分区,因此消费者组可以从具有高平行性的不同经纪人那里接收消息. 此外,请注意,复制仅帮助您获得高可用性,因此在服务器故障的情况下,分区replica/s将带头. 对于3个节点群集,我会推荐以2个复制品
10 2024-03-31
编程技术问答社区
如何使用Bitnami Helm Chart安装Contruent模式注册表以备外部Kafka?
我有一个由strimzi创建的kafka at hm-kafka-kafka-bootstrap.hm-kafka.svc:9092. 我正在尝试使用bitnami Helm在 这是我的步骤: helm upgrade \ confluent-schema-registry \ oci://registry-1.docker.io/bitnamicharts/schema-registry \ --install \ --namespace=hm-confluent-schema-registry \ --create-namespace \ --values=my-values.yaml my-values.yaml : kafka: enabled: false externalKafka: brokers: - PLAINTEXT://hm-kafka-kafka-bootstrap.hm-kafka.svc:90
10 2024-03-31
编程技术问答社区
在kubernetes集群外访问bitnami/kafka
我目前正在使用bitnami/kafka映像( Kubernetes Master:1 kubernetes工人:3 在集群中,另一个应用程序能够找到kafka.试图从集群外部访问Kafka容器时,就会发生问题.阅读一点时,我读到我们需要为外部Kafka客户端设置属性" advertied.listener = plainttext://hostname:port_number". 我目前正在引用" values.yaml AdvertiedListeners1:10.21.0.191 和 statefulset.yaml - name: KAFKA_CFG_ADVERTISED_LISTENERS value: 'PLAINTEXT://{{ .Values.advertisedListeners }}:9092' 对于单个kafka实例,它工作正常. 但是对于3个节点kafka群集,我更改了一些配置,如下: value
6 2024-03-31
编程技术问答社区
Kafka的分区和偏移量消失了
我的KAFKA客户端正在使用启用自动刻度的GCP App Engine Flex环境中运行(GCP将实例计数至少为两个,并且主要是由于CPU使用率较低).在这2个VM中运行的消费者群体已经在20个分区中消耗了来自各个主题的消息,但最近我注意到,旧主题中的分区缩水到1(!),该消费者组的偏移重置为0. [分区]目录也从Kafka-Logs目录中消失了.奇怪的是,最近创建的主题分区完好无损.我有3种不同的环境(全部在GCP中),这三个环境都发生在这三个环境中.我们没有看到任何丢失的消息或数据问题,但想了解发生了什么事,以免发生这种情况. 卡夫卡经纪人和Zookeeper正在以相同和单个GCP计算引擎实例运行(我知道这不是最好的做法,并且有改进的计划),我怀疑这与机器重新启动有关,这会删除一些信息.但是,我验证了数据文件是在/opt/opt/bitnami/(kafka | bitnami)目录中编写的,而不是/tmp,可以通过机器重新启动删除. Spring Kafka 1.1.
14 2024-03-31
编程技术问答社区