Java Spark广播和连接两个RDDs
我有一个大桌子JavaPairRDD RDD1,一个较小的JavaPairRDD RDD2.我想加入这两个RDD,我知道最好的方法是使RDD2成为广播变量,然后加入以减少改组.如何处理广播部分?我的意思是在广播之后,我将获得一个变量(列表或集合),而不再是RDD.如何使用RDD加入广播变量? // I ignored the parsing part, just simplified it as loading from the files. JavaPairRDD RDD1 = sc.textFile ("path_to_small_dataset"); JavaPairRDD RDD2 = sc.textFile("path_to_large_dataset"); // Broadcast RDD2 Set
14 2024-04-03
编程技术问答社区
在spark RDD中,如何在没有combinedByKey和aggregateByKey的情况下获得指定的输出?
以下是我的数据: val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", bar=C","bar=D", "bar=D") 现在,我想要以下输出类型,但不使用combineByKey和aggregateByKey: 1) Array[(String, Int)] = Array((foo,5), (bar,3)) 2) Array((foo,Set(B, A)), (bar,Set(C, D))) 以下是我的尝试: scala> val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", | "bar=D", "bar=D") scala> val sample=keysWithValuesList.map(_.split("="))
8 2024-03-30
编程技术问答社区
如何使用Spark Scala连接3个RDD's
我想使用spark rdd加入3 tables.我使用Spark SQL实现了目标,但是当我尝试使用RDD加入时,我没有得到所需的结果.以下是我使用spark SQL和output的查询: scala> actorDF.as("df1").join(movieCastDF.as("df2"),$"df1.act_id"===$"df2.act_id").join(movieDF.as("df3"),$"df2.mov_id"===$"df3.mov_id"). filter(col("df3.mov_title")==="Annie Hall").select($"df1.act_fname",$"df1.act_lname",$"df2.role").show(false) +---------+---------+-----------+ |act_fname|act_lname|ro
8 2024-03-30
编程技术问答社区
如何对Java Spark的RDD进行标准偏差和平均值操作?
我有一个看起来像这样的javardd., [ [A,8] [B,3] [C,5] [A,2] [B,8] ... ... ] 我希望我的结果是 平均 [ [A,5] [B,5.5] [C,5] ] 仅使用Java RDDS我该执行此操作. P.S:我想避免使用GroupBy操作,因此我不使用DataFrames. 解决方案 您去这里: import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.util.StatCounter; import scala.Tuple2; import scala.Tuple3; impor
10 2024-03-30
编程技术问答社区
我如何将RDD保存到HDFS中,然后再读回它?
我有一个RDD,其元素是类型(长,字符串).由于某种原因,我想将整个RDD保存到HDFS中,后来还阅读了SPARK程序中的RDD.可以这样做吗?如果是这样,如何? 解决方案 这是可能的. 在RDD中,您具有saveAsObjectFile和saveAsTextFile函数.元组存储为(value1, value2),因此您以后可以解析. 可以使用SparkContext的textFile函数进行读取,然后.map消除() so: 版本1: rdd.saveAsTextFile ("hdfs:///test1/"); // later, in other program val newRdds = sparkContext.textFile("hdfs:///test1/part-*").map (x => { // here remove () and parse long / strings }) 版本2: rdd.saveAsObje
10 2024-03-30
编程技术问答社区
为什么Spark在字数上很快?
测试案例:spark. 我了解 mapReduce , fp 和 stream 编程模型,但无法弄清楚这个词计数是如此之快. 在这种情况下,我认为这是I/O密集型计算,并且不可能在20秒内扫描6G文件.我想像Lucene一样,在单词计数之前进行了索引.魔术应该是在 rdd (弹性分布式数据集)设计中,我对我的理解不够. 我感谢有人可以解释 rdd 单词计数案例.谢谢! 解决方案 首先是启动时间. Hadoop MapReduce Job Startup需要启动许多不快的JVM. Spark Job Startup(在现有的Spark群集上)导致现有的JVM到叉新任务线程,这比启动JVM 要快得多. 接下来,没有索引,也没有魔术. 6GB文件存储在每个128MB的47个块中.想象一下,您有一个足够大的Hadoop群集,所有这47个HDF块都驻留在不同的Jbod HDD上.他们每个人都会为您提供70 MB/秒的扫描率,这意味着您可以在约2秒内阅读此数据.在群集中使用
6 2024-03-30
编程技术问答社区
Spark的RDD是如何工作的?
我有一个小的Scala程序,可以在单个节点上运行良好.但是,我正在将其缩放,因此它可以在多个节点上运行.这是我的第一次尝试.我只是想了解RDD在Spark中的工作原理,因此这个问题是基于理论,并且可能不是100%正确的. 假设我创建了一个RDD: val rdd = sc.textFile(file) 现在,一旦完成了,这是否意味着file的文件现在已在节点上分区(假设所有节点都可以访问文件路径)? 其次,我想计算RDD中的对象数量(足够简单),但是,我需要在计算中使用该数字,该计算需要应用于RDD中的对象 - 伪代码示例: rdd.map(x => x / rdd.size) 假设rdd中有100个对象,并说有10个节点,因此每个节点的10个对象计数(假设这是RDD概念的工作方式),现在当我调用该方法时,每个节点是每个节点用rdd.size为10或100执行计算?因为总体而言,RDD是大小100,但在每个节点上本地仅是10.在进行计算之前,我是否需要进行广播
10 2024-03-30
编程技术问答社区
斯帕克分区:大量的文件
我试图利用火花分区.我试图做 之类的事情 data.write.partitionBy("key").parquet("/location") 此处的问题每个分区都会创建大量的镶木件文件,如果我试图从根目录中读取 避免我尝试过 data.coalese(numPart).write.partitionBy("key").parquet("/location") 然而,这会在每个分区中创建数量的小木木数字. 现在我的分区尺寸不同.因此,理想情况下,我希望每个分区分开结合.但是,这看起来并不容易.我需要访问所有分区结合到一个特定数字,然后在一个单独的位置存储. 写作后我应该如何使用分区来避免多个文件? 解决方案 首先,我真的会避免使用coalesce,因为这通常在转型链中进一步推动,并可能会破坏您的工作的并行性(我在这里询问了此问题:每个斑点 - 分区编写1个文件非常简单(请参阅 Spark DataFrame写入方法写许多小文件): data.r
10 2024-03-30
编程技术问答社区
PySpark DataFrames--如何在不转换为Pandas的情况下进行枚举?
我有一个非常大的 pyspark.sql.dataframe.dataframe nater df. 我需要某种枚举记录的方式 - 因此,能够使用某些索引访问记录. (或选择具有索引范围的记录组) 在熊猫中,我可以做 indexes=[2,3,6,7] df[indexes] 在这里,我想要类似的东西,(并且不转换为pandas) 我能达到的最接近: 枚举原始数据框中的所有对象: indexes=np.arange(df.count()) df_indexed=df.withColumn('index', indexes) 搜索我需要使用Where()函数的值. 问题: 为什么它不起作用以及如何使其正常工作?如何将行添加到dataframe? 稍后会使用以下操作: indexes=[2,3,6,7] df1.where("index in indexes").collect() 任何更快,更简单的处理方式?
12 2024-03-30
编程技术问答社区
如何在PySpark中读取一个大的JSON数组文件
发行 我最近在Azure Data Lake Analytics中遇到了一个挑战,当时我试图在大型UTF-8 JSON数组文件中阅读并切换到Hdinsight Pyspark(v2.x,而不是3)来处理该文件.该文件是〜110G,具有〜150m的JSON对象. hdinsight pyspark似乎并不支持输入的JSON文件格式的数组,因此我被困了.另外,我在每个包含数百列中都有不同的模式的"许多"此类文件,因此为此不是为此创建模式. 问题 如何在HDinsight上使用PySpark 2中的开箱即用功能来允许将这些文件读取为JSON? 谢谢, J 我尝试的东西 我使用了此页面底部的方法: import json df = sc.wholeTextFiles('/tmp/*.json').flatMap(lambda x: json.loads(x[1])).toDF() display(df) 我尝试了上述内容,不了解" whole
8 2024-03-24
编程技术问答社区
如何在 spark 中将 Avro 模式对象转换为 StructType
我有一个类型行的rdd,即rdd [row]和avro架构对象.我需要使用此信息创建一个数据框架. 我需要将AVRO架构对象转换为structType,以创建数据框架. 您可以帮忙. 解决方案 com.databricks.spark.avro有一个可以帮助您解决此 的课程 StructType requiredType = (StructType) SchemaConverters.toSqlType(AvroClass.getClassSchema()).dataType(); 请浏览这个具体示例: http:http:http:http:http:http:http:http:http://bytepadding.com/big-data/spark/read-write-parquet-files-using-spark/ 其他解决方案 在Pyspark 2.4.7中,我的求解是用Avroschema创建一个空数据框 with open('
0 2024-03-21
编程技术问答社区
在Spark中用Python计算配对(K,V)RDD中每个KEY的平均数
我想与python解决方案共享这种特殊的apache spark,因为它的文档很差. 我想通过键计算K/V对的平均值(存储在成对RDD中).这是示例数据的样子: >>> rdd1.take(10) # Show a small sample. [(u'2013-10-09', 7.60117302052786), (u'2013-10-10', 9.322709163346612), (u'2013-10-10', 28.264462809917358), (u'2013-10-07', 9.664429530201343), (u'2013-10-07', 12.461538461538463), (u'2013-10-09', 20.76923076923077), (u'2013-10-08', 11.842105263157894), (u'2013-10-13', 32.32514177693762), (u'2013-10-13', 26.249999999999
4 2024-03-21
编程技术问答社区
通过传入一个需要匹配的值列表,过滤掉数据框架(JSON)中的嵌套数组条目
我在数据框架中读取,其中每行的每行都有一个巨大的文件,如下所示: { "userId": "12345", "vars": { "test_group": "group1", "brand": "xband" }, "modules": [ { "id": "New" }, { "id": "Default" }, { "id": "BestValue" }, { "id": "Rating" }, { "id": "DeliveryMin" }, { "id": "Distance" } ] } 我想传递到一个模块ID-S列表并清除所有项目的方法,这并不能成为该模块ID-S列表的一部分.它应该删除所有其他模块,该模块的ID不等于列表中传递的任何值. 您有解决方案吗?
12 2024-03-01
编程技术问答社区
Spark DataFrame列转换为Map类型和List of Map类型
我有以下数据框架,并感谢有人可以帮助我以低于不同格式的输出. 输入: |customerId|transHeader|transLine| |1001 |1001aa |1001aa1 | |1001 |1001aa |1001aa2 | |1001 |1001aa |1001aa3 | |1001 |1001aa |1001aa4 | |1002 |1002bb |1002bb1 | |1002 |1002bb |1002bb2 | |1002 |1002bb |1002bb3 | |1002 |1002bb |1002bb4 | |1003 |1003cc |1003cc1 | |1003 |1003cc |1003cc2 | |1003 |1
8 2024-03-01
编程技术问答社区
在spark中加载csv文件到RDD和Dataframe的区别
我不确定是否早些时候问这个具体问题.可能是可能的重复,但我找不到这样的用例. 我们知道,我们可以将CSV文件直接加载到DataFrame,并可以将其加载到RDD中,然后将该RDD转换为DataFrame. RDD = sc.textFile("pathlocation") 我们可以在此RDD上应用一些地图,过滤器和其他操作,并可以将其转换为DataFrame. 我们也可以直接创建一个数据框,直接读取CSV文件 Dataframe = spark.read.format("csv").schema(schema).option("header","false").load("pathlocation") 我的问题是,当我们必须先使用RDD加载文件并将其转换为数据框架时,用例是什么? 我只知道TextFile逐行读取数据. 当我们不得不选择RDD方法而不是数据框架时,该方案是什么? 解决方案 dataFrames/数据集对RDD的性能进行了巨大的
22 2024-03-01
编程技术问答社区
火花如何删除csv文件中的最后一行
我是新来的火花,我想从CSV文件中删除标头和最后一行 Notes xyz "id","member_id" "60045257","63989975", "60981766","65023535", Total amount:4444228900 Total amount: 133826689 我想删除行注意xyz ,总金额:44444228900 和总金额:133826689 从文件中删除了第一个.文件中的线 val dfRetail = sc.textFile("file:////home/cloudera/Projects/Project3/test/test_3.csv"); var header=dfRetail.first(); var final_data=dfRetail.filter(row => row!=header); 如何删除最后一行? 解决方案 使用ZipWithIn
10 2024-03-01
编程技术问答社区
自制的DataFrame聚合/删除重复数据 Spark
我想对我的dataframe df进行转换,以便在最终数据框架中只有一次和仅一次. 出于机器学习目的,我不想在数据集中有偏见.这绝不应该发生,但是我从数据源获得的数据包含了这种"怪异".因此,如果我的行具有相同的键,我希望能够选择两个(例如平均值)或字符串串联(例如标签)或设置的随机值的组合. . 说我的dataframe df看起来像这样: +---+----+-----------+---------+ |ID1| ID2| VAL1| VAL2| +---+----+-----------+---------+ | A| U| PIERRE| 1| | A| U| THOMAS| 2| | A| U| MICHAEL| 3| | A| V| TOM| 2| | A| V| JACK| 3| |
4 2024-03-01
编程技术问答社区
斯帕克为每个(项目1,项目2,分数)得到前N个最高分的结果。
我有以下格式的 dataFrame : item_id1: Long, item_id2: Long, similarity_score: Double 我要做的是获取每个item_id1的顶级n最高相似_score记录. 因此,例如: 1 2 0.5 1 3 0.4 1 4 0.3 2 1 0.5 2 3 0.4 2 4 0.3 具有前两个类似项目的 将给出: 1 2 0.5 1 3 0.4 2 1 0.5 2 3 0.4 我模糊地猜测可以通过将记录首先按Item_id1进行分组,然后通过得分进行反向排序,然后限制结果.但是我坚持如何在Spark Scala中实现它. 谢谢. 解决方案 我建议为此使用窗口功能: df .withColumn("rnk",row_number().over(Window.partitionBy($"item_id1").orderBy($"similarity_score"))) .where
12 2024-03-01
编程技术问答社区
在Spark中用复杂的过滤方式从elasticsearch中获取esJsonRDD
我目前正在根据单行弹性查询(例如)在我们的Spark Job过滤中获取elasticsearch rdd(例如): val elasticRdds = sparkContext.esJsonRDD(esIndex, s"?default_operator=AND&q=director.name:DAVID + \n movie.name:SEVEN") 现在,如果我们的搜索查询变得复杂如: { "query": { "filtered": { "query": { "query_string": { "default_operator": "AND", "query": "director.name:DAVID + \n movie.name:SEVEN" }
14 2024-03-01
编程技术问答社区