了解flink的保存点和检查点
考虑使用这样的管道考虑Apache Flink流式应用: Kafka-Source -> flatMap 1 -> flatMap 2 -> flatMap 3 -> Kafka-Sink 其中每个flatMap函数都是非国家运算符(例如,Datastream的正常.flatMap函数). 如果传入消息将在flatMap 3上进行待处理,则检查点/保存点如何工作?从flatMap 1开始重新启动后,该消息会被重新处理,还是会跳至flatMap 3? 我有点困惑,因为,或者将在失败/重新启动后重新处理整个管道? 这是故障( - > Flink恢复检查点)和使用SavePoint的手动重新启动之间的差异吗? 我试图通过将Thread.sleep()放置在flatMap 3中,然后用SavePoint取消作业,从而发现自己(使用EXACTLY_ONCE和RocksDB-Backend的启用检查点).但是,这导致flink命令行工具悬挂在sleep结束之前,即使
0 2023-05-25
编程技术问答社区
如何在Kafka和Flink环境下测试性能?
如何用kafka作为输入源测试flink的性能.另外,建议使用此情况的任何性能测试工具. 解决方案 flink包括吞吐量(numRecordSinperSecond和numRecordSoutpersecond)的指标和延迟. 如果您想更仔细地测量端到端的延迟,则可以在接收器(或其他终端节点)中添加一个自定义度量标准,将事件中的时间戳与当前时间进行比较.看起来像这样: public class LatencyMeasuringSink extends RichSinkFunction { private transient DescriptiveStatisticsHistogram eventTimeLag; private static final int EVENT_TIME_LAG_WINDOW_SIZE = 10_000; @Override public void open(Configuration parameters)
0 2023-05-24
编程技术问答社区
java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/scala/StreamExecutionEnvironment
package com.knoldus import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time object SocketWindowWordCount { def main(args: Array[String]) : Unit = { var hostname: String = "localhost" var port: Int = 9000 try { val params = ParameterTool.fromArgs(args) hostname = if (params.has("hostname")) params.get("hostname") else "
10 2023-05-23
编程技术问答社区
Apache Flink-将流平等地划分为输入的Kafka主题
我想在Apache Flink中实现以下方案: 给定一个具有4个分区的Kafka主题,我想使用不同的逻辑在Flink中独立处理分区数据,具体取决于事件的类型. 特别是假设输入KAFKA主题包含上一个图像中描述的事件.每个事件都有不同的结构:分区1具有" a "字段,作为键,分区2具有字段" b "作为键等.在Flink中,我想要要根据事件应用不同的业务逻辑,所以我认为我应该以某种方式将流分开.为了实现图片中描述的内容,我认为只使用一个消费者做类似的事情(我不明白为什么我应该使用更多): FlinkKafkaConsumer consumer = ... DataStream stream = flinkEnv.addSource(consumer); stream.keyBy("a").map(new AEventMapper()).addSink(...); stream.keyBy("b").map(new BEventMapper()).addS
2 2023-05-20
编程技术问答社区
Flink StreamingFilesink -Parquetavrowriters
我正在使用Flink -Streaming文件接收器来编写传入数据S3存储桶.我的代码完美地使用forRowFormat选项. 现在,我正在尝试设置forBulkFormat选项,以将数据编写为S3.我的水槽功能如下. private static SinkFunction getS3Sink() { final StreamingFileSink sink = StreamingFileSink .forBulkFormat(new Path(s3SinkPath), ParquetAvroWriters.forSpecificRecord(Pojo.class) ) .withBucketAssigner(new CustomBucketAssigner()) .build(); return sink; } 我正在Intellij
2 2023-05-20
编程技术问答社区
如何在flink中管理许多avsc文件,并优雅地消费多个主题
这是我的情况: 我使用Flink用SimpleStringschema在Kafka中消费许多主题.使用outputTag,因为我们需要通过主题将数据将数据存储到parquet + Snappy中的数据中.然后,在使用AVSC架构文件处理每个主题时,我们会浏览所有主题. 现在,当添加了一些新列时,我必须修改AVSC架构文件.当需要修改十或一百个文件时,这会让我陷入困境. 因此,是否有一种更优雅的方法来避免更改AVSC文件或如何更好地管理它们? 解决方案 通常,我避免在同一源中使用不同的模式摄入数据.对于同一主题中的多个模式尤其如此. 避免它的常见且可扩展的方法是使用某种信封格式. { "namespace": "example", "name": "Envelope", "type": "record", "fields": [ { "name": "type1", "type": ["null", {
0 2023-05-20
编程技术问答社区
在Flink的DataStream API中连续处理parquet文件作为Datastreams
我在HDFS上有一个镶木quet文件.每天都有一个新的.我的目标是使用dataStream api在flink作业中更改时,不断地发射此木板文件 - . 最终目标是在广播状态下使用文件内容,但这不符合此问题的范围. 要处理一个文件连续,有一个非常有用的API: data-sources 关于数据源.更具体地说, fileprocessingmode.process_contlyly :这正是我所需要的.这适用于读取/监视文本文件,没问题,但对于镶木quet文件而不是: // Partial version 1: the raw file is processed continuously val path: String = "hdfs://hostname/path_to_file_dir/" val textInputFormat: TextInputFormat = new TextInputFormat(new Path(path)) // monitor the f
2 2023-05-19
编程技术问答社区
在Apache Flink中,一个有三个槽的任务管理器与三个有一个槽的任务管理器是否相同?
在Flink中,如我的理解,JobManager可以在必要时为具有多个插槽的多个任务管理员分配作业.例如,可以使用五个插槽分配一个作业. 现在,说我用三个插槽执行了一个任务管理器(TM),该插槽分配给3G RAM和一个CPU. . 这与执行三个TaskManager,共享一个CPU完全相同,并且每个CPU分配给1 g ram? case 1 --------------- | 3G RAM | | one CPU | | three slots | | TM | --------------- case 2 --------------------------------------------| | one CPU | | ------------ ------------ ------------ | | | 1G RAM | | 1G RAM
0 2023-05-19
编程技术问答社区
Flink Kafka-如何使应用程序并行运行?
我正在创建一个flink to 的应用程序 阅读主题的消息 对此做一些简单的过程 将结果写入另一个主题 我的代码确实有效,但是 不在并行运行 我该怎么做? 看来我的代码仅在一个线程/块上运行? 在Flink Web仪表板上: 应用程序转到运行状态 但是,概述子任务中只有一个块 和接收/发送的字节,收到/发送的记录始终为零(无更新) 这是我的代码,请帮助我学习如何将我的应用分开以并行运行,并且我是否正确编写了该应用程序? public class SimpleApp { public static void main(String[] args) throws Exception { // create execution environment INPUT StreamExecutionEnvironment env_in = StreamExecutio
0 2023-05-19
编程技术问答社区
Apache Flink。在KeyedStream上的偏斜数据分布
我有这个Java代码: env.setParallelism(6); //Read from Kafka topic with 12 partitions DataStream line = env.addSource(myConsumer); //Filter half of the records DataStream> line_Num_Odd = line_Num.filter(new FilterOdd()); DataStream> line_Num_Odd_2 = line_Num_Odd.map(new OddAdder()); //Filter the other half DataStream> line_Num_Even = line_Num.filter(new FilterE
0 2023-05-19
编程技术问答社区
Flink:Jobmanager用户界面中设置的并行性与任务槽有什么关系?
假设我有8位任务经理,其中有16个任务插槽.如果我使用JobManager UI提交工作并将并行性设置为8,我只使用8个任务插槽吗? 如果我有8个带有8个插槽的任务经理,并以8个并行性提交同一工作?是完全一样的吗?还是处理数据的方式有差异? 谢谢. 解决方案 弗林克集群中任务插槽的总数定义了最大并行性,但是所使用的插槽数可能超过实际的并行性.例如,考虑此工作: 如果在带有2个任务经理的集群中以两个并行性运行,则每个提供3个插槽,调度程序将使用5个任务插槽,例如: 但是,如果基本并行性增加到六个,则调度程序将执行此操作(请注意,在本示例中,水槽保持在一个并行性): ): 参见 flink的分布式运行时环境更多信息.
2 2023-05-19
编程技术问答社区
Flink Hbase连接器:在Hbase水槽表中写入数据 : 无法为写入表创建水槽
我想在HBASE接收器表中写入数据,我有兼容Flink版本1.14.4的HBASE版本2.2.0 我定义了水槽hbase表,如下: sink_ddl = """ CREATE TABLE hTable ( datemin STRING, family2 ROW, family3 ROW, PRIMARY KEY (datemin) NOT ENFORCED ) WITH ( 'connector' = 'hbase-2.2', 'table-name' = 'test', 'zookeeper.quorum' = '127.0.0.1:2181' ) "
14 2023-05-16
编程技术问答社区
org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.hTable';
我试图将Flink 1.14.4与HBase版本2.2.14连接起来; 我被添加了HBase SQL连接器jar flink-sql-connector-hbase-hbase-hbase-2.2-1.15.2.2.2.jar,但对于版本2.2.x,因为这是jar的最后版本. 但是我有以下错误: py4j.protocol.Py4JJavaError: An error occurred while calling o1.executeSql. : org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.hTable'. Table options are: 'connector'='hbase-2.2' 'table-name'='test' 'zookeeper.quorum'=
12 2023-05-16
编程技术问答社区
Apache Flink: java.lang.NoClassDefFoundError
我正在尝试遵循此Error: Unable to initialize main class com.amazonaws.services.kinesisanalytics.aws Caused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/functions/source/SourceFunction 错误是由于此代码: private static DataStream createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
广播流的替代方法
我的弗林克工作中有两个不同的流; 第一个代表将应用于实际流的规则集.我刚刚播放了这些规则.更改来自Kafka,每小时可能会有一些更改(例如每小时100-200个). 第二个是称为客户流的实际流,其中包含每个客户的数值.这基本上是基于CustomerId的键流. 因此,基本上我正在准备实际的客户流数据,然后在键流上应用some rules并获得计算结果. ,而且,我还知道应该通过检查客户流数据字段来计算哪些规则.例如;客户数据字段包含值X,这意味着作业必须仅应用rule1, rule2, rule5,而不是为给定客户计算所有规则(假设有90个规则).当然,在这种情况下,我必须按照传入数据的字段值来获取和过滤所有规则. 在这种情况下,一切都可以,并且非常适合广播模式使用情况.但是这里的问题是广播的大小.有时它可能非常大,例如20 GB或更多.它认为对于广播状态来说是非常巨大的. 是否有其他方法可以解决此限制?就像,使用Rocks DB后端(我知道它不支持,但是
24 2023-04-21
编程技术问答社区
FLINK-SQL窗口将定期刷新元素进行处理。
我很困惑,如果滚筒窗口会定期计算并发出处理元素以进行处理.示例我有一个查询,预计将在10秒内进行间隔工作. select id, key from eventTable GROUP BY TUMBLE(rowTime, INTERVAL '10' SECOND), id, key ; 现在说:申请接收事件 e1 @10:00:00 e2 @10:00:05 e3 @12:00:10 您可以看到E1和E2在5秒内到达,E3在 @12:00:15. 到达. 当E1和E2发出处理时,您能帮我吗?会 @10:00:11吗?或者何时到达E3,然后查询将评估窗口并发出? 如果是在E3之后,那么是否有任何方法可以确保每10秒执行查询? 感谢您对此的帮助. 解决方案 如果您使用的是事件时间处理,则当水印通过10:00:10时,将在10:00:10结束的窗口发射.如果以通常的订购方式进行水印,并且如果没有其他事件,则水印将在处理E3之前不会推进.
16 2023-04-21
编程技术问答社区
Flink CEP事件未触发
我已经在Flink中实现了CEP模式,该模式正常与与本地Kafka经纪人连接的预期工作.但是,当我连接到基于群集的云Kafka设置时,Flink CEP不会触发. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //saves checkpoint env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 我正在使用AscendingTimestampExtractor,
10 2023-04-21
编程技术问答社区
Apache Flink模式检测没有发现任何匹配的内容
我正在尝试使用Apache Flink CEP(复杂事件处理)库来捕捉模式.我从以下结构开始,我希望看到ID [1,2]和[3,4]的2匹匹配.但是我认为没有结果. public class StreamingJob { private static Logger logger = LoggerFactory.getLogger(StreamingJob.class); public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); ArrayList
Apache flink: 为RocksDB后端从保存点懒散加载
我们想将Apache Flink与RockSDB后端(HDFS)一起使用,以进行状态流处理.但是,我们的申请状态(钥匙状态)将按照trabytes的顺序. 据我了解,当我们从保存点还原作业时,所有运算符状态数据将从HDFS上的Savepoint位置运送到每个任务管理人员.如果状态按照trabytes的顺序,则如果需要转移所有这些状态,则每个部署将导致很长的停机时间. 我想理解,如果在RockSDB的情况下,可以配置懒惰加载,其中需要在需要时从HDF中检索键状状态,然后在本地磁盘上进行缓存. . 谢谢! 解决方案 如果您使用的是RocksDB并配置Flink群集以使用本地恢复,则可以阅读有关在这里,然后,RockSDB文件的副本将保留在每个任务管理器的本地磁盘上,并且恢复将几乎是恢复立即(除了必须旋转的任何新节点外). 但是,这实际上并不适用于保存点,因为这种机制需要增量快照才能真正正常工作. 您可能需要阅读文档的整页,大约是如何配置和调整使用大量状
12 2023-04-21
编程技术问答社区
Flink广播状态RocksDB状态后端
在广播模式的文档中,提到没有RockSDB状态后端: No RocksDB state backend: Broadcast state is kept in-memory at runtime and memory provisioning should be done accordingly. This holds for all operator states. 如果应用程序使用RockSDB作为状态后端,这将如何影响保存点行为?这是否意味着状态在保存期间不存储并因此无法恢复吗? 解决方案 广播状态 IS 包含在保存点和检查点中. flink区分键入和非键状态.保存点和检查点中都包含了由Flink管理的所有状态,包括钥匙和非钥匙. . 广播状态是一种非钥匙状态,与所有非钥匙状态一样,并未存储在RockSDB中.当将RockSDB用作状态后端时,这意味着将钥匙状态的工作状态保存在RockSDB中,而不是在堆中.当拍摄状态快照(即保存点或检查点)时,Rock
10 2023-04-21
编程技术问答社区