为什么每个Spark任务没有利用所有分配的内核?
假设我每个执行人有36个内核,每个节点一个遗嘱执行人,每个节点有3个节点,每个节点有48个可用的内核.我注意到的是,当我将每个任务设置为使用1个核心(默认值)时,我对工人的CPU利用率约为70%,每个执行人同时执行36个任务(正如我预期的那样) .但是,当我将配置更改为每个任务有6个核心(--conf spark.task.cpus=6)时,我每次执行程序一次下降到6个任务(如预期的那样),但是我的CPU利用率也下降到10%以下(意外)以下.我会以为Spark会知道如何在6个内核上平行工作负载. 重要的实现详细信息是我在DataFrame的列上运行一个UDF函数,并将结果作为该数据框架上的新列附加.此UDF函数使用@transient对象,该对象提供了我正在使用的机器学习算法.此UDF功能不是聚合或联合操作的一部分,它只是实现的列的map操作: : def myUdf = udf { ... } val resultSet = myUdf(dataFrame.col("o
44 2023-03-27
编程技术问答社区
Spark和Cassandra的Java应用异常提供者org.apache.hadoop.fs.s3.S3FileSystem未找到。
我想将Cassandra表加载到Spark中的DataFram中,我遵循下面的示例程序(在此 >),但是我在下面提到的执行, 我试图将表加载到RDD,然后将其转换为DataFrme,加载RDD成功,但是当我尝试将其转换为DataFrame时,我会在第一个甲基化的第一个甲基科学中得到相同的执行,任何建议吗?我正在使用Spark 2.0.0,Cassandra 3.7和Java 8. public class SparkCassandraDatasetApplication { public static void main(String[] args) { SparkSession spark = SparkSession .builder() .appName("SparkCassandraDatasetApplication") .config("spark.sql.warehouse.dir", "/file:C
40 2023-03-07
编程技术问答社区
在Pyspark中有效计算连接部件
我试图在城市中找到为朋友的连接组件.我的数据是带有城市属性的边缘列表. 城市| SRC | dest 休斯顿·凯尔 - >本尼 休斯顿本尼 - >查尔斯 休斯顿查尔斯 - >丹尼 奥马哈颂歌 - > Brian 等. 我知道Pyspark的GraphX库的连接程序功能将在图形的所有边缘上迭代以找到连接的组件,我想避免使用.我该怎么办? 编辑: 我以为我可以做 之类的事情 从dataframe中选择Connected_components(*) Groupby City connected_components生成项目列表. 解决方案 假设您的数据就像这个 import org.apache.spark._ import org.graphframes._ val l = List(("Houston","Kyle","Benny"),("Houston","Benny","charles"),
38 2022-12-21
编程技术问答社区
java.lang.IllegalArgumentException: 无法获得数组<string>的JDBC类型
我想将输出数据导入mysql数据库,但发生以下错误,我不会将数组转换为所需的字符串类型,可以帮助我? val Array(trainingData, testData) = msgDF.randomSplit(Array(0.9, 0.1)) val pipeline = new Pipeline().setStages(Array(labelIndexer, word2Vec, mlpc, labelConverter)) val model = pipeline.fit(trainingData) val predictionResultDF = model.transform(testData) val rows = predictionResultDF.select("song", "label", "predictedLabel") val df = rows.registerTempTable("song_classify")
192 2022-12-21
编程技术问答社区
在pyspark中查找并删除匹配的列值
我有一个pyspark dataframe,偶尔列将具有错误的值与另一列匹配.它看起来像这样: | Date | Latitude | | 2017-01-01 | 43.4553 | | 2017-01-02 | 42.9399 | | 2017-01-03 | 43.0091 | | 2017-01-04 | 2017-01-04 | 显然,最后一个纬度值不正确.我需要删除这样的所有行.我想过使用.isin(),但我似乎无法努力工作.如果我尝试 df['Date'].isin(['Latitude']) 我得到: Column 任何建议? 解决方案 如果您对SQL语法更舒服,这里是使用filter(): 内部的pyspark-sql条件的替代方式 df = df.filter("Date NOT IN (Latitu
44 2022-12-21
编程技术问答社区
如何在JAVA中加入没有重复列的Spark数据框架
如何合并2个dataframes而无需重复列 a.show() +-----+-------------------+--------+------+ | Name| LastTime|Duration|Status| +-----+-------------------+--------+------+ | Bob|2015-04-23 12:33:00| 1|logout| |Alice|2015-04-20 12:33:00| 5| login| +-----+-------------------+--------+------+ b.show() +-----+-------------------+--------+------+ | Name| LastTime|Duration|Status| +-----+-------------------+--------+------+ | Bob|2
28 2022-12-21
编程技术问答社区
如何通过包含其他数据框架/集的任何值的数组列来过滤Spark数据框架
我有一个dataframe a,它包含一列数组字符串. ... |-- browse: array (nullable = true) | |-- element: string (containsNull = true) ... 例如,三个样本行为 +---------+--------+---------+ | column 1| browse| column n| +---------+--------+---------+ | foo1| [X,Y,Z]| bar1| | foo2| [K,L]| bar2| | foo3| [M]| bar3| 和另一个包含字符串列 的dataframe b |-- browsenodeid: string (nullable = true) 一些样本行将是 +------------+ |browsenodeid| +----------
40 2022-12-21
编程技术问答社区
Spark CSV 2.1文件名
我正在尝试使用新的火花2.1 CSV选项将Dataframe保存到CSV中 df.select(myColumns: _*).write .mode(SaveMode.Overwrite) .option("header", "true") .option("codec", "org.apache.hadoop.io.compress.GzipCodec") .csv(absolutePath) 一切正常,我不介意haivng part-000xx前缀 但现在似乎一些uuid被添加为后缀 i.e part-00032-10309cf5-a373-4233-8b28-9e10ed279d2b.csv.gz ==> part-00032.csv.gz 任何人都知道如何删除此文件EXT并只使用Part-000xx Concenti
44 2022-12-21
编程技术问答社区
将spark数据框架的列传递给geohash函数-pyspark。不能将列转换为bool。
import pygeohash as pgh pgh.encode(45,55) 'tpzpgxczbzur' 上述步骤很棒.下面我正在尝试创建一个数据框: l = [(45,25),(75,22),(85,20),(89,26)] rdd = sc.parallelize(l) geoCords = rdd.map(lambda x: Row(lat=x[0], long=int(x[1]))) geoCordsSchema = sqlContext.createDataFrame(geoCords) geoCordsSchema.show() +---+----+ |lat|long| +---+----+ | 45| 25| | 75| 22| | 85| 20| | 89| 26| +---+----+ 这成功创建了一个火花dataframe.现在我正在使用Pygeohash编码,并抛出错误,如下所示: pgh.encode(geoCords
26 2022-12-21
编程技术问答社区
pyspark-create DataFrame 在地图类型结构中分组列
我的 dataframe 具有以下结构: ------------------------- | Brand | type | amount| ------------------------- | B | a | 10 | | B | b | 20 | | C | c | 30 | ------------------------- 我希望通过将type和amount分组为类型:Map的单列来减少行数 所以Brand将是唯一的,并且MAP_type_AMOUNT将为每个type amount组合具有key,value. 我认为spark.sql可能有一些功能来帮助在这个过程中,或者我必须让rdd是dataframe,并使我的"自己的"转换为地图类型? 预期: ------------------------- | Brand | MAP_type_AMOUNT ------
84 2022-12-21
编程技术问答社区
上调一个列表,从Spark数据框架中选择多个列
我有一个火花数据框架df.是否有一种方法是使用这些列的列表浏览几列? scala> df.columns res0: Array[String] = Array("a", "b", "c", "d") 我知道我可以做像df.select("b", "c")的东西.但假设我有一个包含几列名称val cols = List("b", "c")的列表,有没有办法将此传递给df.select? df.select(cols)抛出错误.像df.select(*cols)一样的东西,如python 解决方案 使用df.select(cols.head, cols.tail: _*) 让我知道它是否有效:) 解释@ Ben : 密钥是选择的方法签名: select(col: String, cols: String*) cols:String*条目需要一个可变数量的参数. :_*解压缩参数,以便它们可以通过此参数处理.与*args的Python中的解包非常
28 2022-12-21
编程技术问答社区
使用Python从Dataricks写到Postgres
我在databreicks中有一个名为customerdetail的dataframe. +--------------------+-----------+ | customerName| customerId| +--------------------+-----------+ |John Smith | 0001| |Jane Burns | 0002| |Frank Jones | 0003| +--------------------+-----------+ 我希望能够将此从databricks复制到Postgres内的表. 我发现这个 post 使用PSYCOPG2将单个行复制到Postgres,我正在尝试将每行从DataFrame复制到Postgres表? import psycopg2 v1 = 'testing_name' v2 = 'tes
14 2022-12-21
编程技术问答社区
星火数据框架的搜索列以一个字符串开始
我需要基于列值应以预定义的字符串从一个条件进行过滤数据帧. 我正在尝试以下: val domainConfigJSON = sqlContext.read .jdbc(url, "CONFIG", prop) .select("DID", "CONF", "KEY").filter("key like 'config.*'") 并获得异常: 由:com.mysql.jdbc.exceptions.jdbc4.mysqlsyntaxerrorexception: 您的SQL语法中有错误;检查手册 对应于您的MariaDB服务器版本进行右语法使用 近'key ='config.*''在第1行 Using spark: 1.6.1 解决方案 您可以使用列类中存在的startsWith函数. myDataFrame.filter(col("columnName").startswith("PREFIX")) 其他解决方案 我使用了相
20 2022-12-21
编程技术问答社区
在Pyspark中扁平化组
我有一个 pyspark 数据框.例如, d= hiveContext.createDataFrame([("A", 1), ("B", 2), ("D", 3), ("D", 3), ("A", 4), ("D", 3)],["Col1", "Col2"]) +----+----+ |Col1|Col2| +----+----+ | A| 1| | B| 2| | D| 3| | D| 3| | A| 4| | D| 3| +----+----+ 我想按 Col1 分组,然后创建一个 Col2 列表.我需要扁平化组.我确实有很多专栏. +----+----------+ |Col1| Col2| +----+----------+ | A| [1,4] | | B| [2] | | D| [3,3,3]| +----+----------+ 解决方案 你可以做一个 groupBy
102 2022-09-02
编程技术问答社区
如何基于Json对象调用scala spark的方法?
我有如下两个函数 def method1(ip:String,r:Double,op:String)={ val data = spark.read.option("header", true).csv(ip).toDF() val r3= data.select("c", "S").dropDuplicates("C", "S").withColumn("R", lit(r)) r3.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(op) } def method2(ip:String,op:String)={ val data = spark.read.option("header", true).csv(ip).toDF() val r3= data.select("c", "S").dropDuplicates("C", "StockCode") r3.coa
在Java中使用数据帧在Spark中对n列进行求和
String[] col = {"a","b","c"} 数据: id a b c d e 101 1 1 1 1 1 102 2 2 2 2 2 103 3 3 3 3 3 预期输出:- 列字符串中指定列总和的 id id (a+b+c) 101 3 102 6 103 9 如何使用数据框做到这一点? 解决方案 如果你使用的是java,你可以这样做 import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; static SparkConf conf = new Sp
100 2022-08-21
编程技术问答社区
如何用spark DF或DS读取".gz "压缩文件?
我有一个 .gz 格式的压缩文件,是否可以直接使用 spark DF/DS 读取文件? 详细信息:文件是带有制表符分隔的 csv. 解决方案 读取压缩的 csv 与读取未压缩的 csv 文件的方式相同.对于 Spark 2.0+ 版本,可以使用 Scala 进行如下操作(注意制表符分隔符的额外选项): val df = spark.read.option("sep", "\t").csv("file.csv.gz") PySpark: df = spark.read.csv("file.csv.gz", sep='\t') 唯一需要考虑的额外考虑因素是 gz 文件不可拆分,因此 Spark 需要使用单个内核读取整个文件,这会减慢速度.读取完成后,可以对数据进行混洗以增加并行度.
128 2022-08-21
编程技术问答社区
如何将Spark应用程序作为守护程序运行
我有一个关于运行 spark 应用程序的基本问题. 我有一个 Java 客户端,它将向我发送对驻留在 HDFS 中的查询数据的请求. 我收到的请求是基于 HTTP 的 REST API,我需要解释请求并形成 Spark SQL 查询并将响应返回给客户端. 我无法理解如何将我的 spark 应用程序设置为等待请求并可以使用预实例化的 SQL 上下文执行查询的守护程序? 解决方案 你可以有一个无限循环运行的线程来使用 Spark 进行计算. while (true) { request = incomingQueue.poll() // Process the request with Spark val result = ... outgoingQueue.put(result) } 然后在处理 REST 请求的线程中,将请求放在 incomingQueue 中并等待来自输出队列的结果. // Create the reque
198 2022-08-21
编程技术问答社区
在Python中获取Parquet文件的模式,而无需将文件加载到Spark数据框架中?
是否有任何 python 库可用于获取 parquet 文件的架构. 目前我们正在将 parquet 文件加载到 Spark 中的数据框中,并从数据框中获取模式以显示在应用程序的某些 UI 中.但是初始化 spark-context 和加载数据框并从数据框获取模式是耗时的活动.所以寻找一种替代方法来获取架构. 解决方案 除了@mehdio的回答,如果你的parquet是一个目录(例如spark生成的parquet),读取schema/列名: import pyarrow.parquet as pq pfile = pq.read_table("file.parquet") print("Column names: {}".format(pfile.column_names)) print("Schema: {}".format(pfile.schema))
114 2022-08-21
编程技术问答社区
Spark异常 在加载parquet时不支持复杂的类型
我正在尝试在 Spark 中加载 Parquet 文件作为数据框- val df= spark.read.parquet(path) 我得到- org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 12, 10.250.2.32): java.lang.UnsupportedOperationException: Complex types not supported. 在浏览代码时,我意识到在 spark VectorizedParquetRecordReader.java (initializeInternal) 中有一个检查- Type t = requestedSchema.getFields().get(i);
236 2022-08-21
编程技术问答社区