我知道collect()引起的一些火花动作引起了性能问题. 在 要打印驱动程序上的所有元素,可以使用 collect()方法首先将RDD带到驱动程序节点: rdd.collect().foreach(println) .这可能会导致驱动程序 , ,因为collect()将整个RDD获取到一台机器 ;如果您只需要打印一些rdd的元素,则更安全的方法是使用take():rdd.take(100).foreach(println). 以及另一个相关的SE问题: 我已经知道,如果未正确设置并行设置,groupByKey(), reduceByKey()可能会导致记忆中的出现. 我没有在其他转换和操作命令,必须谨慎使用. 这三个是唯一解决的命令?我对以下命令也有疑问 aggregateByKey() sortByKey() persist()/cache() 如果您提供有关密集命令的信息(跨分区而不是单个分区或低性能命令),则必须通过更好的保护
以下是关于 rdd 的编程技术问答
(特定于Scala特定问题.) 虽然Spark Docs在可能的情况下鼓励使用DataFrame API,但如果DataFrame API不足,则通常是在落入RDD API或使用UDF之间的选择.这两种选择之间是否存在固有的性能差异? RDD和UDF相似,因为它们都无法从催化剂和钨优化中受益.还有其他开销吗?如果有的话,这两种方法之间是否有所不同? 给出一个特定的示例,假设我有一个数据框架,其中包含具有自定义格式的文本数据列(不适合REGEXP匹配).我需要解析该列,并添加一个包含结果令牌的新向量列. 解决方案 他们俩都无法从催化剂和钨优化中受益 这不是完全正确的.尽管UDFS并不能从钨优化中受益(可以说是简单的SQL转换也不会在此处得到巨大的提升),但您仍然可以从Catalyst提供的执行计划优化中受益.让我们用一个简单的示例来说明这一点(注意:Spark 2.0和Scala.不要将其推断到早期版本,尤其是使用Pyspark): val f =
在Pyspark中,我可以从列表中创建一个RDD,并决定具有多少个分区: sc = SparkContext() sc.parallelize(xrange(0, 10), 4) 我决定如何在影响性能的影响下分区的分区数量? 这如何取决于我的机器的核心数量? 解决方案 主要效果是指定分区太少或 far 太多分区. 太少分区您不会利用群集中可用的所有内核. 太多的分区在管理许多小任务时会有过多的开销. 在两个之间,第一个对性能的影响要大得多.在这一点上,安排太多的小型任务对分区的影响相对较小.如果您按照数以万计的分区的顺序,那么Spark会得到 非常 慢速. 其他解决方案 要添加到 @westcoastproject的出色答案中,我记得文档建议将您的分区数设置为群集中CPU内核数的3或4倍CPU内核.意思是,如果您在群集中只有每个CPU核心1个分区,则必须等待完成一个最长的运行任务才能完成,但是如果您进一步打破了该任务,那么工作负载将在晚上快速且
我更喜欢Python而不是Scala.但是,由于Spark是在Scala中本地编写的,因此我期望我的代码在Scala中的运行速度比Python版本更快. 有了这个假设,我想学习并编写一些非常常见的预处理代码的Scala版本,以了解一些1 GB的数据.数据是从 kaggle 上挑选的.只是为了概述数据(其中包含1936个维度和145232行).数据由各种类型组成,例如int,float,string,布尔值.我正在使用8个内核进行火花加工;这就是为什么我使用minPartitions=6以便每个核心都有可以处理的东西. scala代码 val input = sc.textFile("train.csv", minPartitions=6) val input2 = input.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter } val delim1 = "\
为什么在火花中匹配的模式匹配不与Scala中的相同?请参阅下面的示例...函数f()尝试在类上进行模式匹配,该匹配在Scala repl中起作用,但在Spark中失败并导致所有" ???". f2()是一种解决方法,可以使用.isInstanceOf()获得Spark的所需结果,但我知道这是Scala中的不良形式. 在这种情况下,在Spark中以正确方式匹配的模式的任何帮助将不胜感激. abstract class a extends Serializable {val a: Int} case class b(a: Int) extends a case class bNull(a: Int=0) extends a val x: List[a] = List(b(0), b(1), bNull()) val xRdd = sc.parallelize(x) 尝试在Scala repl中起作用的模式匹配,但在Spark 中失败 def f(x: a) = x
我有2个带有不同分区的RDD. case class Person(name: String, age: Int, school: String) case class School(name: String, address: String) rdd1是Person的RDD,我根据人的age进行了分区,然后将钥匙转换为school. val rdd1: RDD[Person] = rdd1.keyBy(person => (person.age, person)) .partitionBy(new HashPartitioner(10)) .mapPartitions(persons => persons.map{case(age,person) =>
我已经阅读了某个地方,这些操作用于在单个RDD上进行操作,例如reduceByKey(),在预分赛的RDD上运行将导致每个键在本机上计算的所有值,仅需要最终,局部缩小的值将从每个工人节点发送回主.这意味着我必须声明一个分区者: val sc = new SparkContext(...) val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...") .partitionBy(new HashPartitioner(100)) // Create 100 partitions .persist() 为了使reduceByKey正如我之前所解释的. 我的问题是,如果我想使用ReedbyKey(最佳),我是否需要每次分区者声明或不需要. 解决方案 实际上,您所谈论的两个品质有些无关. 对于reduceByKey(),同一键的第一个质量聚合元素具
list_1 = [[6, [3, 8, 7]], [5, [9, 7, 3]], [6, [7, 8, 5]], [5, [6, 7, 2]]] rdd1 = sc.parallelize(list_1) newpairRDD = rdd1.partitionBy(2,lambda k: int(k[0])) print("Partitions structure: {}".format(newpairRDD.glom().collect())) 我想按键分区. 我得到 typeError:'int'对象不可订阅 我在做什么错? 解决方案 提供给partitionBy的分区功能在RDD的每个条目的键上操作,即每个条目的第一个元素.因此,您在整数密钥上调用lambda k: int(k[0]),从而导致您遇到的错误. 如果您只想按键进行分区,则您的lambda功能应该是一个身份操作,例如 newpairRDD = rdd1.partitionB
我只是尝试找到一种方法来获取Spark中RDD隔板的局部性. 呼叫RDD.repartition()或PairRDD.combineByKey()返回的RDD被分区.我想知道分区所在的哪些工人实例(用于检查分区行为)?! 有人可以提示吗? 解决方案 一个有趣的问题,我敢肯定没有太多有趣的答案:) 首先,将转换应用于您的RDD与工人实例无关,因为它们是独立的"实体".转换创建 rdd Lineage (=逻辑计划),而执行者仅在执行操作后即将上台(无双关语)(并且Dagscheduler将逻辑计划转换为执行计划,作为一组阶段,并将其作为一组阶段任务). 所以,我相信知道执行分区的唯一方法是使用您可以写一个
考虑以下示例: JavaPairRDD R = input.textFile("test").mapToPair(new PairFunction() { public Tuple2 call(String arg0) throws Exception { String[] parts = arg0.split(" "); Row r = RowFactory.create(parts[0],parts[1]); return new Tuple2(r.get(0).toString(), r); }}).partitionBy(new HashPartitioner(20)); 上面的代码创建了一个名为R的RDD,该RDD通过在名为" test"的TXT文
当我使用reduceByKey或aggregateByKey时,我会面临分区问题. ex)reduceBykey(_+_).map(code) 特别是,如果输入数据偏斜,使用上述方法时,分区问题变得更糟. 因此,作为解决方案,我使用repartition方法. 例如,这对分区分配很有益,但是repartition也很昂贵. 有没有办法明智地解决分区问题? 解决方案 你是对的, 重新分区真的很昂贵.由于洗牌和其他次要步骤.如您所说的那样,创建一个示例: rdd.map(x => (x, x * x)).repartition(8).reduceByKey(_+_) 请参阅此处的DAG: 此步骤将在DAG,一张地图,一个重新分配和一个减少. 中创建. 但是,如果您在reduceByKey内使用重新分配,则可以对"免费"进行重新分配. 重复的主要部分是洗牌,而ReadbyKey的主要部分也是混乱.您可以看到,在Scala
sc = SparkContext("Local") rdd = sc.binaryFiles(Path to the binary file , minPartitions = 5).partitionBy(8) 或 sc = SparkContext("Local") rdd = sc.binaryFiles(Path to the binary file , minPartitions = 5).repartition(8) 使用上述任何一个代码,我都在尝试在我的rdd {其中,我希望数据均匀分布在所有分区} 上.当我打印 {rdd.getnumpartitions()}} 时,所示的分区数仅为8个,但是在 spark ui 上,我观察到,尽管制作了8个分区,但是整个二进制文件数据仅放在一个分区上. 注意: minpartition 属性不起作用.即使设置了MinPartitions = 5,RDD中制作的分区数量仅为1.因此,使用了分区/播放功能. 这是
根据火花文档,只有RDD操作才能触发火花作业,并在调用该动作时懒惰地评估转换. 我看到sortBy转换函数立即应用,并显示为SparkUI中的作业触发器.为什么? 解决方案 sortBy是使用sortByKey实现的,该>取决于RangePartitioner(JVM)或分区功能(Python).当您调用sortBy/sortByKey分区器(分区功能)时,请急切地初始化,并示例输入RDD以计算分区边界.您看到的工作与此过程相对应. 仅在您对新创建的RDD或其后代执行操作时才执行实际排序. 其他解决方案 根据火花文档,只有动作触发了Spark中的工作,当调用动作时,对转换进行了懒惰. 通常,您是对的,但是正如您刚刚经历的那样,很少有例外,sortBy在其中(zipWithIndex). 事实上,它是在Spark的Jira中报道的,并没有解决解决方案.请参阅 spark-1021 sortbykey()在不应该时启动集群作业. 您可以看到启
当我执行以下命令时: scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)),4).partitionBy(new HashPartitioner(10)).persist() rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[10] at partitionBy at :22 scala> rdd.partitions.size res9: Int = 10 scala> rdd.partitioner.isDefined res10: Boolean = true scala> rdd.partitioner.get res11: org.apache.spark.Partitioner = org.apache.spark.HashPartitioner@a 它说有10个分区,使用HashPartitioner进行分区.但是当我在
我阅读了 HashPartitioner .不幸的是,除了API呼叫以外,什么也没解释.我的假设是HashPartitioner根据密钥的哈希分配了分布式集合.例如,如果我的数据像 (1,1), (1,2), (1,3), (2,1), (2,2), (2,3) 因此,分区者会将其放入不同的分区中,而相同的键则落在同一分区中.但是我不理解构造函数参数的重要性 new HashPartitoner(numPartitions) //What does numPartitions do? 对于上述数据集 new HashPartitoner(1) new HashPartitoner(2) new HashPartitoner(10) 那么HashPartitioner实际上是如何工作的? 解决方案 好吧,让您的数据集更加有趣: val rdd = sc.parallelize(for { x
许多教程都提到RDD的细分>将优化Spark作业的数据改组.我感到困惑的是,因为我的理解前也会导致改组,为什么要提前改组会受益于某些操作?特别是激发它会对一组转换进行优化. 例如: 如果我想加入两个数据集国家(ID,国家)和收入(ID,(收入,月份,年)),这两种操作有什么区别? (我使用pyspark模式) ID前分区 country = country.partitionBy(10).persist() income = income.partitionBy(10).persist() income.join(country) 直接加入而无需分区: income.join(country) 如果我只需要计算一次加入,那么在加入之前使用前分区仍然有用吗?我认为partitionBy也需要改组吗?而且,如果我在加入后的进一步计算是使用country作为密钥的基础(以前使用的密钥ID将是没有用的,并且可以从RDD中删除),我该怎么做才能优化计算?
在这些情况下,大文件会发生什么? 1)Spark从Namenode获取数据以获取数据. Spark会在同一时间停止,因为数据大小的时间太长,根据Namenode的信息? 2)spark对数据块块大小进行数据分区,但所有数据都不能存储到主内存中.在这里,我们不使用Storagelevel.那么这里会发生什么? 3)Spark做数据,一旦此主存储器存储的数据将再次处理SPARK将从光盘加载其他数据. 解决方案 首先,当调用动作(例如count,collect或write)时,Spark才开始在数据中读取.一旦调用了一个操作, partitions 中的数据中就会加载 - 同时加载的分区的数量取决于您可用的内核数.因此,在火花中,您可以想到1个分区= 1核心= 1任务.请注意,所有同时加载的分区都必须适合内存,否则您将获得一个OOM. 假设您有几个阶段,Spark然后仅在加载分区的第一阶段运行转换.一旦在加载分区中应用了数据转换后,它将输出存储为Shuffle-
我不确定记忆脚打印的概念.加载Eg的镶木quet文件时. 1GB并在火花中创建RDD,每个RDD的记忆食品印花是什么? 解决方案 当您用镶木quet文件创建一个RDD时,直到在RDD上运行操作(例如,首先,收集)之前,将无需加载/执行. 现在,您的内存足迹很可能会随着时间而变化.假设您有100个分区,它们大小相同(每个10 MB).假设您正在使用20个内核的集群上运行,然后在任何时间点您只需要在内存中具有10MB x 20 = 200MB数据即可. 添加此基础,鉴于Java对象倾向于占用更多空间,因此确切说出1GB文件将在JVM堆中占用多少空间(假设您加载整个文件)并不容易.它可以2倍,也可以更多. 您可以做的一个技巧是迫使您的RDD被缓存.然后,您可以检查存储下的Spark UI,并查看RDD在缓存中的空间. 其他解决方案 Marios,在您的记忆投影中,您没有考虑到Parquet的压缩. 1GB很可能是5GB未压缩.
我正在努力挣扎,我想编写每个RDD分区以使用其自己的目录分开parquet文件.示例将是: data_file.parquet 这种格式的优势是我可以直接在SparkSQL中用作列,并且我不必在实际文件中重复此数据.这将是进入特定分区的好方法,而无需在其他地方存储单独的分区元数据. 作为前一个步骤,我从大量的GZIP文件中加载了所有数据,并根据上述密钥进行了分区. 可能的方法是将每个分区作为单独的RDD,然后写下它,尽管找不到任何好方法. 任何帮助将不胜感激.顺便说一句,我是这个堆栈的新手. 解决方案 我认为您要保存的rdd上打电话foreachPartition(f: Iterator[T] => Unit)是可能的. 在您提供到for