为什么在Spark Streaming中读取广播变量在运行数天后出现异常?
我正在使用Spark流(Spark V1.6.0)以及我的项目中的HBase,而HBASE(HBASE v1.1.2)配置在具有广播变量的执行者之间传输.火花流应用程序作品一开始,而大约2天后,将出现例外. val hBaseContext: HBaseContext = new HBaseContext(sc, HBaseCock.hBaseConfiguration()) private def _materialDStream(dStream: DStream[(String, Int)], columnName: String, batchSize: Int) = { hBaseContext.streamBulkIncrement[(String, Int)]( dStream, hTableName, (t) => { val rowKey = t._1 val incVal = t._
2 2024-04-03
编程技术问答社区
如何在spark streaming中更新一个广播变量?
我相信,火花流的一个相对常见的用例: 我有一系列对象,我想根据某些参考数据过滤 最初,我认为使用广播变量: 实现这将是一件非常简单的事情. public void startSparkEngine { Broadcast refdataBroadcast = sparkContext.broadcast(getRefData()); final JavaDStream filteredStream = objectStream.filter(obj -> { final ReferenceData refData = refdataBroadcast.getValue(); return obj.getField().equals(refData.getField()); } filteredStream.foreachRDD(rdd -> {
6 2024-04-03
编程技术问答社区
读取二进制文件到Spark
我有一组文件,每个文件都包含Marc21二进制格式的特定记录.我想将一组文件作为RDD摄取,其中每个元素将作为二进制数据的记录对象.稍后,我将使用Marc库将对象转换为Java Object以进行进一步处理. 到目前为止,我对如何阅读二进制文件感到困惑. 我已经看到以下功能: binaryRecord(path: string, recordLength: int, conf) 但是,它假定它是一个具有相同长度的记录的文件.我的记录绝对是不同的大小.在每个文件旁边,都在一个单独的文件上. 有办法解决这个问题吗?我该如何为每个文件付出一个长度?唯一的方法是仅计算文件的长度,然后读取记录? 我看到的另一个解决方案显然是以Java格式阅读记录,然后将其序列化为任何舒适的格式. 请建议. 解决方案 您是否尝试过Spark? 的sc.binaryfiles() 这是文档的链接 https://spark.apache.org/docs/2.1.1
2 2024-03-30
编程技术问答社区
streaming.StreamingContext。启动上下文出错,将其标记为停止 [Spark Streaming]
我试图运行样品火花流码.但是我得到了这个错误: 16/06/02 15:25:42 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute at scala.Predef$.require(Predef.scala:233) at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161) at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542) at org.
10 2024-03-30
编程技术问答社区
在Spark Streaming作业中获取broadcast_1的broadcast_piece0失败了
我在集群模式下在纱线上运行火花作业.该作业从Kafka Direct流中获取消息.我正在使用广播变量并每30秒进行一次检查点.当我第一次开始工作时,它可以很好地运行,没有任何问题.如果我杀死工作并重新启动,则在收到卡夫卡消息后,在执行人中的异常下方投掷: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1178) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165) at org.apache.spark.broadcast.TorrentBroadca
8 2024-03-30
编程技术问答社区
用mapWithState Spark Streaming过滤部分重复内容
我们有一个Dstream,例如 val ssc = new StreamingContext(sc, Seconds(1)) val kS = KafkaUtils.createDirectStream[String, TMapRecord]( ssc, PreferConsistent, Subscribe[String, TMapRecord](topicsSetT, kafkaParamsInT)). mapPartitions(part => { part.map(_.value()) }). mapPartitions(part1 => { part1.map(c => { TMsg(1, c.field1, c.field2, //And others c.startTimeSeconds ) }) }) 因此,每个RDD都有一堆TMsg对象
0 2024-03-30
编程技术问答社区
火花流:长队列/活动批次
任何人都可以指出,这种主动批次在那里悬挂数周而从未处理过的原因是什么原因?非常感谢. 我的猜测还不够执行者,更多的工人/执行者会解决问题吗?或在其任务调度程序中的不同批次上分配优先级? 但是,这里的情况是,最近的批次(6月底)成功地处理了,但是批次仍在5月份被排队. 我刚刚检查了我的火花设置,调度程序策略是fifo spark.scheduler.mode FIFO 解决方案 事实证明主节点是瓶颈. 主节点缺乏内存,然后调度程序也不能足够快. 解决方案:将主节点更改为更强大的EC2实例 其他解决方案 在Spark-Submit中 设置-Diver-Memory和-Executor-Memory到适当的值 基于您的活动数量,使得10000m
0 2024-03-29
编程技术问答社区
无法将Eventhub的数据写入Databricks的ADLS中,得到运行时错误
当我在数据映中运行Writestream代码时,该代码如下: df_out = cdf.writeStream\ .format("json")\ .outputMode("append")\ .option("checkpointLocation", "/path/to/checkpoint/directory")\ .start("/mnt/container-name/folder-name") 我在数据映中遇到运行时错误: 现在运行Writestream代码后,我会收到以下错误: 我如何解决此错误 org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:289) at org.apache.spark.sql
0 2024-03-25
编程技术问答社区
如何使用Spark/Scala从Azure blob中获取文件列表?
如何从Spark和Scala中的Azure Blob存储中获取文件列表. 我没有任何想法来解决这个问题. 解决方案 我不知道您使用的火花在Azure或本地使用.因此它们是两种情况,但类似. 对于在本地运行的Spark,有一个官方 blog 介绍了如何从Spark访问Azure Blob存储.关键是您需要将Azure存储帐户配置为core-site.xml文件中的HDFS兼容存储,然后将两个Jars hadoop-azure&azure-storage添加到您的类路径中,以通过协议wasb[s]访问HDFS.您可以参考官方 tutorial 使用wasb了解HDFS-Compatible Storage,然后博客关于hdinsight的配置更多详细信息. 对于在Azure上运行的Spark,差异只是使用wasb访问HDF,其他准备工作是通过使用Spark创建HDinsight群集的Azure完成的. 列出文件的方法为 listFiles 或 wholeTex
4 2024-03-24
编程技术问答社区
用Spark从Azure Blob中读取数据
我在通过火花流读取来自Azure Blobs的数据时遇到问题 JavaDStream lines = ssc.textFileStream("hdfs://ip:8020/directory"); 类似上述代码适用于HDFS,但无法从Azure Blob读取文件 https://blobstorage.blob.core.windows.net/containerid/folder1/ 上面是Azure UI中显示的路径,但这不起作用,我是否缺少某些内容,我们如何访问它. 我知道EventHub是流数据流数据的理想选择,但是我当前的情况需要使用存储而不是队列 解决方案 为了从Blob存储中读取数据,需要完成两件事.首先,您需要告诉Spark在基础Hadoop配置中使用哪个本机文件系统.这意味着您还需要 hadoop-azure jar 要在您的classpath上可用(请注意,可能有与Hadoop家族有关的更多罐子的运行时要求):
2 2024-03-24
编程技术问答社区
从检查点加载的Spark StreamContext没有HadoopConf设置
无法使用wasbs://... url 从检查点恢复到Azure Blob存储 在群集模式下使用独立火​​花2.0.2. val ssc = StreamingContext.getOrCreate(checkpointPath, () => createSSC(), hadoopConf) i通过hadoopConf in hadoopConf.set in fs.azure和fs.azure.account.key.$account.blob.core.windows.net通过hadoopConf在createssc函数中通过sparkSession.sparkContext.hadoopConfiguration.set 进行冗余 工作在运行并运行时成功地写了检查点,直到我停止. 重新启动时,从检查点数据创建的上下文没有HadoopConf信息重新访问wasbs://存储并引发错误,说它不能使用匿名访问创建容器. 我想念什么?我发现了一些有关S3但没
0 2024-03-24
编程技术问答社区
在Azure的流分析中,规则引擎意味着什么?
我是Azure和分析的新手. 我试图了解流动警报规则引擎.我已经使用了一些示例数据作为输入,并有查询来过滤数据. 但是,我不确定rules engine的含义,它只是查询还是还有其他内容,如果是,我们是否可以将所有规则都包含在一个中,如果是,如何? ? 解决方案 定义ASA逻辑的主要方法是使用SQL,它提供了一种使用SQL语句定义规则的方法(例如,选择DeviceID ...其中温度> 50).可以在同一查询中使用多个条件,并且可以在同一作业中定义多个查询. 这种方法非常灵活,但是在工作开始之前,需要定义规则本身,因为ASA将编译工作. 请参阅然后,当我们谈论规则引擎时,用户通常需要更多的规则动态性.为了提供这种动态性,可以动态地注入规则条件,甚至可以为运行的作业提供完整的规则(以后的作业是为高级用户).以下一些详细信息: 使用参考数据动态配置阈值规则:ASA可以使用参考数据来获取最新条件.查看更多详细信息使用JavaScript UDF动态注入规则(这更高级):用户可以
2 2024-03-23
编程技术问答社区
来自Kafka的Spark流并以Avro格式写入HDFS
我基本上想消费来自Kafka的数据并将其写给HDFS.但是发生的情况是,它没有在HDFS中编写任何文件.它创建空文件. ,也请指导我,如果我想在HDFS中以Avro格式写作,该如何修改代码. 为了简单起见,请写入本地C驱动器. import org.apache.spark.SparkConf import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkContext import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.str
2 2024-03-21
编程技术问答社区
当试图写一个通用记录类型的rdd时出现任务不可序列化异常
val file = File.createTempFile("temp", ".avro") val schema = new Schema.Parser().parse(st) val datumWriter = new GenericDatumWriter[GenericData.Record](schema) val dataFileWriter = new DataFileWriter[GenericData.Record](datumWriter) dataFileWriter.create(schema , file) rdd.foreach(r => { dataFileWriter.append(r) }) dataFileWriter.close() 我有一个类型GenericData.Record的A DStream,我试图以Avro格式写入HDFS,但我会收到此Task Not Serializable错误: org.apache.spark.Spa
0 2024-03-21
编程技术问答社区
在运行的Spark Streaming应用程序中处理模式变化
我希望使用Spark 1.6上的DataFrames API构建火花流应用程序.在我走得太远之前,我希望有人可以帮助我了解数据范围如何处理具有不同模式的数据. 想法是,消息将带有AVRO模式流入Kafka.我们应该能够以向后兼容的方式发展模式,而不必重新启动流式应用程序(应用程序逻辑仍然可以使用). 使用架构注册表和使用kafkautils嵌入的架构ID的新版本对消息进行了挑选,从而创建直接流和Avrokafkadecoder(来自Conlantuent).那让我到了Dstream. 问题#1: 在该dstream中,将有具有不同版本的模式的对象.因此,当我将每个对象转换为行对象时,我应该将读取器架构传递到正确迁移数据的最新读取器架构中,并且我需要将最新的模式传递到SQLContext.CreateDataFrame(Rowrdd,schema)调用中. dstream中的对象是类型的genericdata.record,据我所知,没有简单的方法可以判断哪个是最新版本.
0 2024-03-21
编程技术问答社区
Spark Python Avro Kafka反序列化器
我已经在Python Spark应用程序中创建了一个Kafka流,可以解析任何通过它的文本. kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1}) 我想将其更改为能够从Kafka主题中解析Avro消息.从文件中解析Avro消息时,我会喜欢: reader = DataFileReader(open("customer.avro", "r"), DatumReader()) 我是Python和Spark的新手,我该如何更改流以解析Avro消息?另外,在阅读Kafka的AVRO消息时,该如何指定用于使用的模式???我以前在Java做过所有这一切,但是Python让我感到困惑. 编辑: 我尝试更改以包括AVRO解码器 kafkaStream = Kaf
6 2024-03-21
编程技术问答社区
使用模式将Spark的AVRO消息转换为DataFrame
是否有一种方法可以使用模式转换 avro 消息来自 kafka with a href = "/问题/标记/spark" class =" post-tag" title ="显示问题标记'spark'" rel =" tag'> spark to dataframe ?用户记录的架构文件: { "fields": [ { "name": "firstName", "type": "string" }, { "name": "lastName", "type": "string" } ], "name": "user", "type": "record" } 和代码段来自 sqlnetworkwordcount示例和 kafka,spark and avro-第3部分,生产和消费avro消息以阅读消息. object Injection { val parser = new Schema.Parser() val schema =
4 2024-03-21
编程技术问答社区
pySpark Kafka直接流更新Zookeeper/Kafka偏移量
目前,我正在与Kafka/Zookeeper和Pyspark(1.6.0)合作. 我已经成功创建了一个使用KafkaUtils.createDirectStream()的KAFKA消费者. 所有流媒体都没有问题,但是我认识到,在我消耗一些消息后,我的kafka主题未更新到当前偏移量. 由于我们需要更新的主题才能在此处进行监视,这很奇怪. 在Spark的文档中,我找到了以下评论: offsetRanges = [] def storeOffsetRanges(rdd): global offsetRanges offsetRanges = rdd.offsetRanges() return rdd def printOffsetRanges(rdd): for o in offsetRanges: print "%s %s %s %s" %
2 2024-03-01
编程技术问答社区
Apache Zeppelin & Spark Streaming: Twitter的例子只适用于本地
我刚刚从我使用以下安装: Spark 1.5.1(Hadoop 2.6+的预建),Master + 2X奴隶 Zeppelin 0.5.5(安装在Spark的主节点上) 编辑 另外,以下安装对我不起作用: Spark 1.5.0(Hadoop 2.6+的Prebuild),Master + 2X奴隶 Zeppelin 0.5.5(安装在Spark的主节点上) 屏幕截图:本地设置(工作!) 屏幕截图:群集设置(不起作用!) 该作业似乎在集群模式下正确运行: 解决方案 我经过2天的尝试! 本地Zeppelin Spark解释器与Spark群集之间的区别似乎是,本地Zecuts ofers over twitter utter over ofstecuts over over ofstrapt executs of执行Twitter流式示例所需. 因此,在用Spark Cluster启动应用程序之前,您必须在Zeppelin Notebook中手
4 2024-03-01
编程技术问答社区
Apache Zeppelin 0.6.1: 运行Spark 2.0 Twitter流应用程序
我有一个带有Spark 2.0的簇,安装了Zeppelin 0.6.1.由于类TwitterUtils.scala从Spark Project转移到Apache Bahir,因此我无法在Zeppelin Notebook中使用Twitterutil. 在这里我笔记本的摘要: 依赖性加载: %dep z.reset z.load("org.apache.bahir:spark-streaming-twitter_2.11:2.0.0") DepInterpreter(%dep) deprecated. Remove dependencies and repositories through GUI interpreter menu instead. DepInterpreter(%dep) deprecated. Load dependency through GUI interpreter menu instead. res1: org.apache.zeppelin
4 2024-03-01
编程技术问答社区