我有一个带有多列的火花数据框.我想找出并删除在列中具有重复值的行(其他列可能不同). 我尝试使用dropDuplicates(col_name),但它只会丢弃重复条目,但仍保留一个记录在数据框架中.我需要的是删除所有最初包含重复条目的条目. 我正在使用Spark 1.6和Scala 2.10. 解决方案 我将使用窗口功能.假设您要删除重复id行: import org.apache.spark.sql.expressions.Window df .withColumn("cnt", count("*").over(Window.partitionBy($"id"))) .where($"cnt"===1).drop($"cnt") .show() 其他解决方案 这可以通过按列(或列)进行分组以在中查找重复项,然后汇总并过滤结果. 示例dataframe df: +---+---+ | id|num| +---+---+ | 1|
以下是关于 spark-dataframe 的编程技术问答
我正在尝试将数据框架的列表转换为单个数据框,下面给出了 dflist是列表[sql.dataframe] dfList=List([ID: bigint, A: string], [ID: bigint, B: string], [ID: bigint, C: string], [ID: bigint, D: string]) dfList = List( +--------+-------------+ +--------+-------------+ +--------+--------+ +--------+--------+ | ID | A | ID | B | | ID | C | | ID | D | +--------+-------------+ +--------+-------------+ +-
是否有一种简单有效的方法来检查仅根据列的重复(不删除)python数据框架? 我想检查数据框是否具有基于列组合的DUP,如果确实如此,则该过程失败. tia. 解决方案 最简单的方法是检查数据框中的行数是否等于删除重复项后的行数. if df.count() > df.dropDuplicates([listOfColumns]).count(): raise ValueError('Data has duplicates') 其他解决方案 如果您也想实际检查重复项,则可以做 df \ .groupby(['column1', 'column2']) \ .count() \ .where('count > 1') \ .sort('count', ascending=False) \ .show() 其他解决方案 另一种检查重复项的方法是: df.exceptAll(df.dropDupl
我试图在数据括号中运行一些带有JAR的代码.我得到的错误与类路径中过时的jar相关联.我已经上传了最新的罐子,但是显然过时的版本仍在类路径中.有没有办法访问,查看或编辑Databricks中的类路径? 会删除该集群并构建新集群解决问题吗? 对不起,我对班级路径的熟悉为0%.谢谢 解决方案 我弄清楚了. Scala笔记本,使用此代码: val jarfiles = dbutils.fs.ls("dbfs:/FileStore/jars") .map(_.path) .filter(_.indexOf("your pattern") > -1) jarfiles.foreach(dbutils.fs.rm(_)) 用罐子名称中的字符串替换"图案"(但不足以删除其他罐子). 重新启动集群,繁荣
关于火花计算不一致的问题.这是否存在?例如,我两次运行完全相同的命令,例如: imp_sample.where(col("location").isNotNull()).count() 每次运行时,我的结果略有不同(141,830,然后是142,314)! 或以下: imp_sample.where(col("location").isNull()).count() 获得2,587,013,然后获得2,586,943.怎么可能? 谢谢! 解决方案 根据您的评论,您在管道中使用sampleBy. sampleBy不能保证您会获得行的确切分数.它采用一个概率的样本,每个记录都等于分数,并且可能因运行而异. . 关于您的monotonically_increasing_id问题,它只能保证下一个ID大于上一个ID,但是,它不能保证ID是连续的(I,I+I,I+2,Ext. ..). 最后,您可以通过在其上持续persist()来坚持一个数据框.
我正在创建一个新的数据框,并带有少数记录. val joined_df = first_df.join(second_df, first_df.col("key") === second_df.col("key") && second_df.col("key").isNull, "left_outer") joined_df.repartition(1) joined_df.cache() joined_df.count() 除了计数操作外,一切都很快(一秒钟以下). RDD转换开始了,实际上需要几个小时才能完成.有什么办法可以加快事物的速度? INFO MemoryStore: Block rdd_63_140 stored as values in memory (estimated size 16.0 B, free 829.3 MB) INFO BlockManagerInfo: Added rdd_63_140 in memory on 192.168.8.52
我想以流媒体方式消耗的Web服务器上有一个.gz文件,并将数据插入Couchbase. .gz文件中只有一个文件,又包含一个每行JSON对象. 由于Spark没有HTTP接收器,所以我自己写了一个(如下所示).我正在使用 couchbase spark connector 进行插入.但是,运行时,作业实际上并没有插入任何内容.我怀疑这是由于我对Spark的经验不足,并且不知道如何开始和等待终止.正如您在下面看到的那样,可以进行两个这样的呼叫. 接收者: public class HttpReceiver extends Receiver { private final String url; public HttpReceiver(String url) { super(MEMORY_AND_DISK()); this.url = url; } @Override public
假设我每个执行人有36个内核,每个节点一个遗嘱执行人,每个节点有3个节点,每个节点有48个可用的内核.我注意到的是,当我将每个任务设置为使用1个核心(默认值)时,我对工人的CPU利用率约为70%,每个执行人同时执行36个任务(正如我预期的那样) .但是,当我将配置更改为每个任务有6个核心(--conf spark.task.cpus=6)时,我每次执行程序一次下降到6个任务(如预期的那样),但是我的CPU利用率也下降到10%以下(意外)以下.我会以为Spark会知道如何在6个内核上平行工作负载. 重要的实现详细信息是我在DataFrame的列上运行一个UDF函数,并将结果作为该数据框架上的新列附加.此UDF函数使用@transient对象,该对象提供了我正在使用的机器学习算法.此UDF功能不是聚合或联合操作的一部分,它只是实现的列的map操作: : def myUdf = udf { ... } val resultSet = myUdf(dataFrame.col("o
IM使用Spark 2.0. 我有一个包含float包装的WrappedArray的dataframe列. 一行的示例是: [[1.0 2.0 2.0][6.0 5.0 2.0][4.0 2.0 3.0]] 我试图将此列转换为 Array[Array[Float]] . 我到目前为止尝试的是以下内容: dataframe.select("mycolumn").rdd.map(r => r.asInstanceOf[Array[Array[Float]]]) ,但我会收到以下错误: Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to [[F 任何想法都将不胜感激.谢谢 解决方案 尝试以下方法: val wawa: WrappedArray[
我想将Cassandra表加载到Spark中的DataFram中,我遵循下面的示例程序(在此 >),但是我在下面提到的执行, 我试图将表加载到RDD,然后将其转换为DataFrme,加载RDD成功,但是当我尝试将其转换为DataFrame时,我会在第一个甲基化的第一个甲基科学中得到相同的执行,任何建议吗?我正在使用Spark 2.0.0,Cassandra 3.7和Java 8. public class SparkCassandraDatasetApplication { public static void main(String[] args) { SparkSession spark = SparkSession .builder() .appName("SparkCassandraDatasetApplication") .config("spark.sql.warehouse.dir", "/file:C
我正在尝试在两个数据范围中为每个行应用Pyspark SQL函数哈希算法,以识别差异.哈希算法是案例敏感的.如果列包含" Apple"和" Apple"被认为是两个不同的值,因此我想将两个数据范围的案例更改为上或更低的情况.我只能为数据帧标题实现,而不是为数据框架值实现.请帮助 #Code for Dataframe column headers self.df_db1 =self.df_db1.toDF(*[c.lower() for c in self.df_db1.columns]) 解决方案 两个答案似乎都可以使用一个例外 - 如果您具有数字列,则将转换为字符串列.为避免这种情况,请尝试: import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ val fields = df.schema.fields val stringFields = df.schema.fie
对于Spark 1.6.0 中的两个DataFrames之间的以下连接 val df0Rep = df0.repartition(32, col("a")).cache val df1Rep = df1.repartition(32, col("a")).cache val dfJoin = df0Rep.join(df1Rep, "a") println(dfJoin.count) 这不仅加入了共同分配,而且还共同存在吗?我知道,对于RDD,如果使用同一分区者并在同一操作中进行洗牌,则将共同置于联接.但是数据框架呢?谢谢. 解决方案 [ https://medium.com/@achilleus/https-medium-com-joins-in-apache-spark-park-park-park-part-3-1d40c1e51e1c] 根据上面提供的文章链接,排序合并加入是默认加入,想添加重要的点 为了理想的分类合并性能,所有人都必须 具有相同值的连
我是python和pyspark的新手.我在pyspark中有一个dataframe,如下所示: ## +---+---+------+ ## | x1| x2| x3 | ## +---+---+------+ ## | 0| a | 13.0| ## | 2| B | -33.0| ## | 1| B | -63.0| ## +---+---+------+ 我有一个数组: arr = [10,12,13] 我想在dataframe中创建一个列x4,使得它应该基于x1作为索引的值与列表中的相应值.最终数据集应如下所示: ## +---+---+------+-----+ ## | x1| x2| x3 | x4 | ## +---+---+------+-----+ ## | 0| a | 13.0| 10 | ## | 2| B | -33.0| 13 | ## | 1| B | -63.0| 12 | ## +---+---+
我试图将火花数据集写入存在的PostgreSQL表中(无法像列类型一样更改表元数据).该表的一列之一是类型 . 我在启动写入时会看到以下异常(这里的原始地图为空,当逃脱时给出一个空字符串): Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO part_d3da09549b713bbdcd95eb6095f929c8 (.., "my_hstore_column", ..) VALUES (..,'',..) was aborted. Call getNextException to see the cause. at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:136) at org.postgresql.core.v3.QueryExecutorImpl$1.hand
我们使用spark来解析一个大csv文件,可能包含无效数据. 我们希望将有效数据保存到数据存储中,并返回我们导入的有效数据以及数量无效数据. 我想知道我们如何在火花中做到这一点,读取数据时的标准方法是什么? 我的当前方法使用Accumulator,但由于Accumulator在火花中的工作原因,它不准确. // we define case class CSVInputData: all fields are defined as string val csvInput = spark.read.option("header", "true").csv(csvFile).as[CSVInputData] val newDS = csvInput .flatMap { row => Try { val data = new DomainData() data.setScore(row.score.trim.toDouble)
我有一个文件.gz我需要阅读这个文件并将时间和文件名添加到这个文件中我有一些问题,需要您的帮助来推荐这一点. 因为文件被压缩了第一行是读取的不是正确的格式我认为由于编码问题我尝试了以下代码但不起作用 implicit val codec = Codec("UTF-8") codec.onMalformedInput(CodingErrorAction.REPLACE) codec.onUnmappableCharacter(CodingErrorAction.REPLACE) 文件具有特殊格式,我需要使用正则表达式读取它进入datafame ==>我发现的唯一方法是使用RDD读取它并将其映射到正则表达式有没有方法可以直接读取它df并通过正则表达式? val Test_special_format_RawData = sc.textFile("file://"+filename.toString()) .map(line ⇒ line.replace("||",
我收到"java.lang.nosuchmethoderror:scala.reflect.api.javauniverse.runtimemirror(ljava/lang/classloader;)"在Scala应用程序中使用Dataframe时出错并使用Spark运行它.但是,如果我使用只使用RDD和NOT DataFrame工作,则不会出现相同的POM和设置错误.同时经过其他帖子的错误,提到Scala版本必须是2.10,因为Spark与2.11 Scala不兼容,我使用2.10 Scala版本,其中2.0.0火花. Below is the snip from pom: /usr/lib/spark/lib/spark-assembly.jar UTF-8 2.7.1
假设我在dataset目录中的file1中有2个文件: val file = sc.wholeTextFiles("file:///root/data/dataset").map((x,y) => y + "," + x) 在上面的代码中,我正在尝试获得具有值的RDD: - >值,键作为单个值进入RDD 假设文件名是file1并说出2条记录: file1: 1,30,ssr 2,43,svr 和 file2: 1,30,psr 2,43,pvr 所需的RDD输出为: (1,30,ssr,file1),(2,43,svr,file1),(1,30,psr,file2),(2,43,pvr,file2) 我们可以实现这个吗? 如果可能请帮助我! 解决方案 var files = sc.wholeTextFiles("file:///root/data/dataset") var yourNeededRdd = files .f
1)最初过滤的RDD为空值. val rddWithOutNull2 = rddSlices.filter(x => x(0) != null) 2)然后将此RDD转换为行 的RDD 3)使用scala将rdd转换为dataframe后: val df = spark.createDataFrame(rddRow,schema) df.printSchema() 输出: root |-- name: string (nullable = false) println(df.count()) 输出: Error : count : : [Stage 11:==================================> (3 + 2) / 5][error] o.a.s.e.Executor - Exception in task 4.0 in stage 11.0 (TID 16) jav
我在阅读大6GB单行JSON文件中获取以下错误: Job aborted due to stage failure: Task 5 in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage 0.0 (TID 5, localhost): java.io.IOException: Too many bytes before newline: 2147483648 火花不读取新行的JSON文件,因此整个6 GB JSON文件位于单行: jf = sqlContext.read.json("jlrn2.json") 配置: spark.driver.memory 20g 解决方案 YEP,您的行中有多于Integer.MAX_VALUE字节.你需要拆分它. 请记住,火花期待每行成为有效的JSON文档,而不是整个文件.以下是来自Spark sql progamming指南