Flink StreamingFilesink -Parquetavrowriters
我正在使用Flink -Streaming文件接收器来编写传入数据S3存储桶.我的代码完美地使用forRowFormat选项. 现在,我正在尝试设置forBulkFormat选项,以将数据编写为S3.我的水槽功能如下. private static SinkFunction getS3Sink() { final StreamingFileSink sink = StreamingFileSink .forBulkFormat(new Path(s3SinkPath), ParquetAvroWriters.forSpecificRecord(Pojo.class) ) .withBucketAssigner(new CustomBucketAssigner()) .build(); return sink; } 我正在Intellij
2 2023-05-20
编程技术问答社区
Apache Flink。在KeyedStream上的偏斜数据分布
我有这个Java代码: env.setParallelism(6); //Read from Kafka topic with 12 partitions DataStream line = env.addSource(myConsumer); //Filter half of the records DataStream> line_Num_Odd = line_Num.filter(new FilterOdd()); DataStream> line_Num_Odd_2 = line_Num_Odd.map(new OddAdder()); //Filter the other half DataStream> line_Num_Even = line_Num.filter(new FilterE
0 2023-05-19
编程技术问答社区
广播流的替代方法
我的弗林克工作中有两个不同的流; 第一个代表将应用于实际流的规则集.我刚刚播放了这些规则.更改来自Kafka,每小时可能会有一些更改(例如每小时100-200个). 第二个是称为客户流的实际流,其中包含每个客户的数值.这基本上是基于CustomerId的键流. 因此,基本上我正在准备实际的客户流数据,然后在键流上应用some rules并获得计算结果. ,而且,我还知道应该通过检查客户流数据字段来计算哪些规则.例如;客户数据字段包含值X,这意味着作业必须仅应用rule1, rule2, rule5,而不是为给定客户计算所有规则(假设有90个规则).当然,在这种情况下,我必须按照传入数据的字段值来获取和过滤所有规则. 在这种情况下,一切都可以,并且非常适合广播模式使用情况.但是这里的问题是广播的大小.有时它可能非常大,例如20 GB或更多.它认为对于广播状态来说是非常巨大的. 是否有其他方法可以解决此限制?就像,使用Rocks DB后端(我知道它不支持,但是
24 2023-04-21
编程技术问答社区
FLINK-SQL窗口将定期刷新元素进行处理。
我很困惑,如果滚筒窗口会定期计算并发出处理元素以进行处理.示例我有一个查询,预计将在10秒内进行间隔工作. select id, key from eventTable GROUP BY TUMBLE(rowTime, INTERVAL '10' SECOND), id, key ; 现在说:申请接收事件 e1 @10:00:00 e2 @10:00:05 e3 @12:00:10 您可以看到E1和E2在5秒内到达,E3在 @12:00:15. 到达. 当E1和E2发出处理时,您能帮我吗?会 @10:00:11吗?或者何时到达E3,然后查询将评估窗口并发出? 如果是在E3之后,那么是否有任何方法可以确保每10秒执行查询? 感谢您对此的帮助. 解决方案 如果您使用的是事件时间处理,则当水印通过10:00:10时,将在10:00:10结束的窗口发射.如果以通常的订购方式进行水印,并且如果没有其他事件,则水印将在处理E3之前不会推进.
16 2023-04-21
编程技术问答社区
Flink CEP事件未触发
我已经在Flink中实现了CEP模式,该模式正常与与本地Kafka经纪人连接的预期工作.但是,当我连接到基于群集的云Kafka设置时,Flink CEP不会触发. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //saves checkpoint env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 我正在使用AscendingTimestampExtractor,
10 2023-04-21
编程技术问答社区
Apache Flink模式检测没有发现任何匹配的内容
我正在尝试使用Apache Flink CEP(复杂事件处理)库来捕捉模式.我从以下结构开始,我希望看到ID [1,2]和[3,4]的2匹匹配.但是我认为没有结果. public class StreamingJob { private static Logger logger = LoggerFactory.getLogger(StreamingJob.class); public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); ArrayList
Apache flink: 为RocksDB后端从保存点懒散加载
我们想将Apache Flink与RockSDB后端(HDFS)一起使用,以进行状态流处理.但是,我们的申请状态(钥匙状态)将按照trabytes的顺序. 据我了解,当我们从保存点还原作业时,所有运算符状态数据将从HDFS上的Savepoint位置运送到每个任务管理人员.如果状态按照trabytes的顺序,则如果需要转移所有这些状态,则每个部署将导致很长的停机时间. 我想理解,如果在RockSDB的情况下,可以配置懒惰加载,其中需要在需要时从HDF中检索键状状态,然后在本地磁盘上进行缓存. . 谢谢! 解决方案 如果您使用的是RocksDB并配置Flink群集以使用本地恢复,则可以阅读有关在这里,然后,RockSDB文件的副本将保留在每个任务管理器的本地磁盘上,并且恢复将几乎是恢复立即(除了必须旋转的任何新节点外). 但是,这实际上并不适用于保存点,因为这种机制需要增量快照才能真正正常工作. 您可能需要阅读文档的整页,大约是如何配置和调整使用大量状
14 2023-04-21
编程技术问答社区
Flink广播状态RocksDB状态后端
在广播模式的文档中,提到没有RockSDB状态后端: No RocksDB state backend: Broadcast state is kept in-memory at runtime and memory provisioning should be done accordingly. This holds for all operator states. 如果应用程序使用RockSDB作为状态后端,这将如何影响保存点行为?这是否意味着状态在保存期间不存储并因此无法恢复吗? 解决方案 广播状态 IS 包含在保存点和检查点中. flink区分键入和非键状态.保存点和检查点中都包含了由Flink管理的所有状态,包括钥匙和非钥匙. . 广播状态是一种非钥匙状态,与所有非钥匙状态一样,并未存储在RockSDB中.当将RockSDB用作状态后端时,这意味着将钥匙状态的工作状态保存在RockSDB中,而不是在堆中.当拍摄状态快照(即保存点或检查点)时,Rock
10 2023-04-21
编程技术问答社区
在flink 1.13中配置RocksDB
我已经在Flink 1.13版本中阅读了有关EmbeddedRocksDBStateBackend的信息,但具有尺寸限制,因此我想保留我以前的Flink版本1.11的当前配置,但要点是,将这种配置RockSDB的方式弃用(). 我已经尝试使用EmbeddedRocksDBStateBackend (new EmbeddedRocksDBStateBackend(true))使用新配置,并且有一个错误: java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=9126648 , maxSize=5242880 . Consider using a different state backend, like the File System State b
0 2023-04-21
编程技术问答社区
如何在Flink中用SINGLE窗口做多个聚合?
我是新手Flink,我想在Spark多次做我做过的事情. 例如,在火花中,我可以做以下 之类的事情 ds.groupByKey(???).mapGroups(???) // aggregate 1 .groupByKey(???).mapGroups(???) // aggregate 2 第一个汇总处理一批输入数据,第二个汇总处理第一个汇总的输出.我需要的是第二个骨料的输出. 但在flink中,似乎任何聚集体都应使用像下面的特定窗口执行 ds.keyBy(???) .window(???) // window 1 .aggregate(???) // aggregate 1 .keyBy(???) .window(???) // window 2 .aggregate(???) // aggregate 2 如果我设置了窗口2,则第二个汇总的输入数据可能不是第一个骨料的输出,这将违反我的愿望. 我想使用相同的批处理数据进行多个
16 2023-04-21
编程技术问答社区
我可以使用Flink CEP对流进行分类吗?
我知道我可以使用Flink SQL要按时间戳进行分类,但是当我已经在使用CEP时,我想将其用于排序. 解决方案 用CEP分类非常容易,因为CEP始终按时间戳对其输入进行分类.这样的事情将有能力: DataStream streamWithTimestampsAndWatermarks = ... Pattern matchEverything = Pattern.begin("any") .where(new SimpleCondition() { @Override public boolean filter(Event event) throws Exception { return true;
8 2023-04-21
编程技术问答社区
Apache Flink: 即使在给定的聚合窗口中没有输入记录到达,Flink也会根据键入的状态发出输出记录
我正在尝试将Apache Flink用于IoT应用程序.我有一堆可以在几个州之一中的设备.当设备更改状态时,它会发出一条消息,其中包括事件时间戳及其更改为的状态.对于一种设备,这可能是这样的: {device_id:1,event_timestamp:9:01,state:state_1} {device_id:1,event_timestamp:9:03,state:state_2} 对于每个设备,我需要在给定的五分钟窗口上在每个状态上花费的设备在每个状态上花费的时间五分钟.为了做到这一点,我计划使用键控状态存储每个设备的最后一个状态更新,以便我知道该设备在聚合窗口的开头中处于什么状态.例如,假设具有ID" 1"的设备具有键值状态值,该值在8:58输入了" state_2".然后,9:00-9:05窗口的聚合输出希望这样(基于上面的两个示例事件): {device_id:1,时间戳:9:00,state:state_1,持续时间:120秒} {device
8 2023-04-21
编程技术问答社区
Flink scala地图与死信队列
我正在尝试制作一些Scala功能,以帮助使Flink map和filter操作将其错误重定向到死字母排队. 但是,我正在为Scala的类型擦除而苦苦挣扎,这使我无法使其成为通用.下面的mapWithDeadLetterQueue的实现未编译. sealed trait ProcessingResult[T] case class ProcessingSuccess[T,U](result: U) extends ProcessingResult[T] case class ProcessingError[T: TypeInformation](errorMessage: String, exceptionClass: String, stackTrace: String, sourceMessage: T) extends ProcessingResult[T] object FlinkUtils { // https://stackoverflow.com/qu
14 2023-04-21
编程技术问答社区
关于StateTtlConfig
我正在为MapState配置我的StatettlConfig,我的兴趣是,进入状态的对象有3个小时的生命,然后它们应该从状态消失并传递给GC以进行清理并释放一些内存和检查点我认为也应该释放一些体重.我以前有这种配置,看来它不起作用,因为检查站总是成长: private final StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(org.apache.flink.api.common.time.Time.hours(3)).cleanupFullSnapshot().build(); 然后,我才意识到该配置仅在从保存点读取状态而不是在我的方案中读取状态时起作用.我将把我的TTL配置更改为: private final StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(org.apache.flink.api.common.time.Time.hours(3))
4 2023-04-21
编程技术问答社区
Apache Flink : 可用的任务槽 0
我已经通过Windows终端中的start-cluster.bat启动了一个flink群集. 使用一个JobManager进程和一个TaskManager进程开始本地群集. 您可以在产卵的外壳窗口中通过CTRL-C终止进程. Web界面默认为 我能够访问Localhost URL,但可用的任务插槽为0.默认情况下,预计它将具有一个任务(如Yamlfile中配置).有人有类似的问题吗? 解决方案 您的流程已经开始了吗?您的JobManager启动可能失败.您的系统也可能用尽了支票. 其他解决方案 一种可能的解决方案: 将远程JVM调试器附加到Taskmanager. 或:suspend=n 我的案子: FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.memory.process.size: 2728m taskmanager.memory.f
6 2023-04-21
编程技术问答社区
Apache Flink-将流写入S3的错误-null uri主机
我有一个flink数据管道,该管道将下载的日志文件从S3转换并以Parquet文件格式写回到另一个S3存储桶中.我已经在flink-conf.yaml中配置了 中的s3键和秘密 s3.access-key: "key" s3.secret-key: "secret" 另外将flink-s3-fs-hadoop-1.15.0.jar&aws-java-sdk-1.12.217.jar复制到FLINK_HOME/plugins/s3-fs-presto目录. 使用flink run 命令将作业提交到群集时,我得到以下异常 Caused by: java.io.IOException: null uri host. at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:166) at org.apache.fli
16 2023-04-21
编程技术问答社区
在嵌入式Flinkrunner(apache_beam [GCP])中使用pub/sub io运行beam流管道(Python)时出错。
我在FlinkRunner上的Apache Beam运行流媒体管道(Python)时面临以下错误.该管道包含GCP Pub/sub io源和酒吧/子目标. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter. ERROR:root:java.lang.IllegalArgumentException: PCollectionNodes [PCollectionNode{id=ref_PCollection_PCollection_1, PCollection=unique_name: "23 Read from Pub/Sub/Read.None" coder_id: "ref_Coder_BytesCoder_1" is_bounded: UNBOUNDED windowing_strategy_id: "ref_Windowing_Window
6 2023-04-21
编程技术问答社区
将配置参数传递给自定义Flink ProcessFunction
我需要能够将配置参数传递到扩展ProcessFunction的MyFunction.我是在参数中做到这一点的唯一方法吗?我不需要将其与每个元素传递.我可以以某种方式使用open方法? public class MyProcessFunction extends ProcessFunction, MyOutput> { public void open(Configuration parameters) { } @Override public void processElement(AbstractMap.SimpleEntry value, Context ctx, Collector out) throws Exception { 解决方案 添加传递参数的构造函数是一种很好的方法. 处理应用
8 2023-04-21
编程技术问答社区
Flink Python Datastream API Kafka Producer Sink Serializaion
我试图从一个Kafka主题中读取数据,并在进行一些处理后写给另一个主题. 当我尝试将其写入另一个主题时,我可以阅读数据并处理它.它给出了错误 如果我尝试编写数据,而没有对其进行任何处理. Kafka生产者SimpleStringSchema接受它. 但是我想将字符串转换为JSON.使用JSON播放,然后以字符串格式写入另一个主题. 我的代码: import json from pyflink.common import Row from pyflink.common.serialization import SimpleStringSchema, SerializationSchema,JsonRowSerializationSchema,Encoder from pyflink.common.typeinfo import Types,BasicType,TypeInformation,BasicTypeInfo from pyflink.datastream im
20 2023-04-21
编程技术问答社区
在kubernetes上持续部署有状态的apache flink应用程序
我想在Kubernetes上运行Apache Flink(1.11.1)流媒体应用程序.将文件系统状态后端保存到S3.检查点到S3正在工作 args: - "standalone-job" - "-s" - "s3://BUCKET_NAME/34619f2862ce3e5fc91d80eae13a434a/chk-4/_metadata" - "--job-classname" - "com.abc.def.MY_JOB" - "--kafka-broker" - "KAFKA_HOST:9092" 所以我面临的问题是: 我必须手动选择先前的状态.有可能使它变得更好吗? 作业会增加CHK DIR,但不使用检查点.表示我第一次看到一个事件并将其存储到ListState时,每当我通过gitlab部署我的应用程序的较新版本时,它再次抛出了此事件时. 当我定义状态时,为什么我必须在代码中明确启用检查点. e
8 2023-04-21
编程技术问答社区