pyspark:数据框架中like()方法的对应部分
在Spark DataFrame中有类似的计数器方法(某种东西不是不真熟())? 或者除了使用Traditonal SQL查询之外还有其他方法可以做到吗? 我想做以下的相反: df.where(col("_c2").like("XY6%")).show(5) 解决方案 它工作:) 我不得不使用否定运算符(〜)而不是'不是'关键字. df.where(~ col("_c2").like("XY6%")).show(5) 其他解决方案 或您可以执行: df.where( col("_c2").like("XY6%") == False ).show(5) 其他解决方案 两个条件我们可以这样做: df.where((~col("_c2").like("XY6%")) & (~col("_c2").like("X6%")))
28 2023-01-27
编程技术问答社区
是否有办法使用Databricks将多个文本文件加载到一个数据框中?
我正在尝试通过文件夹和子文件夹中的所有文件来测试一些想法来递归循环,并将所有内容加载到单个数据帧中.我有12种不同的文件,差异基于文件命名约定.因此,我有文件名以'abc'开头,以'cn'开头的文件名,以'cz'为开头的文件名等.我尝试了以下3个想法. import pyspark import os.path from pyspark.sql import SQLContext from pyspark.sql.functions import input_file_name df = sqlContext.read.format("com.databricks.spark.text").option("header", "false").load("dbfs/mnt/rawdata/2019/06/28/Parent/ABC*.gz") df.withColumn('input', input_file_name()) print(dfCW) 或 df = sc
28 2023-01-21
编程技术问答社区
NameError: name 'dbutils' is not defined in pyspark
我正在Databricks Cloud中运行Pyspark作业.我需要将一些CSV文件写入Databricks文件系统(DBFS),作为此作业的一部分,我还需要使用一些dbutils本机命令,例如 #mount azure blob to dbfs location dbutils.fs.mount (source="...",mount_point="/mnt/...",extra_configs="{key:value}") 将文件写入MOUNT目录后,我也试图卸载.但是,当我直接在Pyspark作业中使用dbutils时,它会失败 NameError: name 'dbutils' is not defined 我应该导入任何包装中使用dbutils中的dbutils吗?预先感谢. 解决方案 尝试使用以下方式: def get_dbutils(spark): try: from pyspark.dbutils
272 2023-01-21
编程技术问答社区
如何列出AWS Glue Catalog中的所有数据库和表?
我在AWS胶水控制台中创建了一个开发端点,现在我可以访问GluepySpark控制台中的SparkContext和SQLContext. 如何访问目录并列出所有数据库和表?通常的sqlContext.sql("show tables").show()不起作用. 什么可能有助于 CatalogConnection类,但我不知道它是哪个包.我尝试从awsglue.context导入. 解决方案 我花了几个小时试图找到关于CatalogConnection类的一些信息,但尚未找到任何内容. (即使在AWS-glue-lib存储库 https://github.com/awslabs/aws-grue -libs ) 在我的情况下,我需要胶水作业脚本控制台中的表名 最后我使用了Boto库和检索到的数据库和表名,其中胶合客户端: import boto3 client = boto3.client('glue',region_name='us-east-1')
32 2023-01-20
编程技术问答社区
在2个Pyspark数据框架之间生成一个不匹配列的报告
团队,我们有要求基于2 Pyspark DataFrame的键字段生成一个不匹配列的报告. 这是第一个dataframe - >>> df.show() +--------+----+----+----+----+----+----+----+----+ | key|col1|col2|col3|col4|col5|col6|col7|col8| +--------+----+----+----+----+----+----+----+----+ | abcd| 123| xyz| a| ab| abc| def| qew| uvw| | abcd1| 123| xyz| a| ab| abc| def| qew| uvw| | abcd12| 123| xyz| a| ab| abc| def| qew| uvw| | abcd123| 123| xyz| a| ab| abc| def| qew| uvw| |abcd1234|
34 2022-12-21
编程技术问答社区
pyspark-java.lang.IllegalStateException。输入行不具备模式所要求的预期值数
我在霍顿沙盒上运行pyspark-sql代码 18/08/11 17:02:22信息spark.sparkcontext:运行火花版1.6.3 # code from pyspark.sql import * from pyspark.sql.types import * rdd1 = sc.textFile ("/user/maria_dev/spark_data/products.csv") rdd2 = rdd1.map( lambda x : x.split("," ) ) df1 = sqlContext.createDataFrame(rdd2, ["id","cat_id","name","desc","price", "url"]) df1.printSchema() root |-- id: string (nullable = true) |-- cat_id: string (nullable = true) |-- name: string
98 2022-12-21
编程技术问答社区
在新的列上过滤Spark DataFrame
上下文:我的数据集太大,无法适合内存我正在培训一个keras rnn.我正在使用AWS EMR集群上的Pyspark,以批量培训模型,足以存储在内存中.我无法使用elephas分发的模型,我怀疑这与我的模型有关.我并不完全肯定. DataFrame为每个用户的每个用户都有一行,从0到29的安装日期.查询数据库后,我在dataframe上执行许多操作: query = """WITH max_days_elapsed AS ( SELECT user_id, max(days_elapsed) as max_de FROM table GROUP BY user_id ) SELECT table.* FROM table LEFT OUTER JOIN max_days_elapsed USING (user_id)
32 2022-12-21
编程技术问答社区
定义sparksql数据框架模式的语法错误
我的pyspark控制台告诉我,我在循环后面的行上的语法无效.控制台未执行for循环,直到架构= structtype(字段)行,它具有syntaxError,但对于循环对我来说很好... from pyspark import SparkContext from pyspark.sql import SQLContext from pyspark.sql.types import * sqlContext = SQLContext(sc) lines = sc.textFile('file:///home/w205/hospital_compare/surveys_responses.csv') parts = lines.map(lambda l: l.split(',')) surveys_responses = parts.map(lambda p: (p[0:33])) schemaString = 'Provider Number, Hospital Name, Ad
22 2022-12-21
编程技术问答社区
如何在CASE语句中使用数组类型的列值
我有一个具有两个列的Dataframe,listA存储为Seq[String]和valB存储为String.我想创建第三列valC,它将是int类型,它的值是 iff valB is present in listA then 1 otherwise 0 我尝试过执行以下操作: val dfWithAdditionalColumn = df.withColumn("valC", when($"listA".contains($"valB"), 1).otherwise(0)) 但Spark无法执行此操作并提供以下错误: cannot resolve 'contains('listA', 'valB')' due to data type mismatch: argument 1 requires string type, however, 'listA' is of array type.; 如何在案例语句中使用数组类型列值? 谢谢, devj
84 2022-12-21
编程技术问答社区
Spark sql查询导致巨大的数据洗牌读/写
我正在使用Spark SQL来处理数据.这是查询 select /*+ BROADCAST (C) */ A.party_id, IF(B.master_id is NOT NULL, B.master_id, 'MISSING_LINK') as master_id, B.is_matched, D.partner_name, A.partner_id, A.event_time_utc, A.funnel_stage_type, A.product_id_set, A.ip_address, A.session_id, A.tdm_retailer_id, C.product_name , CASE WHEN C.product_category_lvl_01 is NULL THEN 'OUTOFSALE' ELSE product_category
16 2022-12-21
编程技术问答社区
由于连接键为空值,Spark SQL-1任务运行时间过长
我在两个表之间执行左连接,每个表格中有13亿条记录,每个表1中为null,为大约600百万条记录,并且由于这个所有空记录都会分配给1个任务,因此数据歪斜发生在这1任务运行几个小时. from pyspark.sql import SparkSession spark = SparkSession.builder.appName("report").enableHiveSupport() tbl1 = spark.sql("""select a.col1,b.col2,a.Col3 from table1 a left join table2 b on a.col1 = b.col2""") tbl1.write.mode("overwrite").saveAsTable("db.tbl3") 没有其他连接条件,这是使用唯一的使用.有没有办法,我可以使火花在不同的任务中分发这些空记录而不是一个或任何其他方法? 解决方案 有一个优秀的答案通过@mikhail dubk
8 2022-12-21
编程技术问答社区
pyspark数据框架中的rdd是什么意思?
我是新的去脓包.我想知道rdd在pyspark dataframe中的意思是什么. weatherData = spark.read.csv('weather.csv', header=True, inferSchema=True) 这两行代码具有相同的输出.我想知道具有RDD的效果 weatherData.collect() weatherData.rdd.collect() 解决方案 数据帧是表,或二维数组类似的结构,其中每列包含一个变量上的测量,每行包含一个案例. 所以,由于其片状格式,DataFrame具有额外的元数据,这允许火花在最终确定的查询上运行某些优化. 另一方面, RDD仅仅是一个 r eSirient d 可被分配 d ataset,即更多的黑框无法优化为可以针对它执行的操作的数据不是约束. 但是,您可以通过其.rdd方法从DataFrame到RDD,您可以通过.toDF()从RDD从RDD转到DataFrame(如果RDD处于表
24 2022-12-21
编程技术问答社区
火花。并行创建多个DataFrames
我当前正在基于ID列表生成DataFrame - 基于一个ID的每个查询给出了一个可管理的a 非常大的PostgreSQL表.然后,我根据我需要写出的文件结构来分区该输出.问题是我正在击中速度限制,主要是利用我的执行者资源. 我不确定这是否是重新思考我的体系结构的问题,或者如果有一些简单的方法来解决这个问题,而且基本上我想得到更加平行的任务,但我无法保持所有16个求职者忙碌在尝试尽快执行此ETL工作时. 所以......这是我认为我可以做到这一点的原因: 并行化列表. 然后在列表中的每个元素,在executor上out,通过jdbc选择(相对较小的)dataframe. 然后foreachpartition(必然很少),我需要做一些动作(包括来自每个分区的数据的原子写入),并且这些分区操作也可以分支为工作节点/executors. 当前代码看起来像这样,但当然抛出"py4j.py4jexception:方法 getNewargs ([])不存在"因为Spark
40 2022-12-21
编程技术问答社区
如何解除数据集的堆叠(使用透视法)?
我尝试了较大的堆叠数据集.它有5,656,458行,IndicatorCode列有1344个不同的代码. 这个想法是使用枢轴到"unstack"(在Pandas术语中)此数据集并为每个指标码具有列. schema = StructType([ \ StructField("CountryName", StringType(), True), \ StructField("CountryCode", StringType(), True), \ StructField("IndicatorName", StringType(), True), \ StructField("IndicatorCode", StringType(), True), \ StructField("Year", IntegerType(), True), \ StructField("Value", DoubleType(), True) \ ]) data =
28 2022-12-21
编程技术问答社区
如何在Spark 2.1上更新pyspark数据框元数据?
我面临着sparkml的onehotencoder的问题,因为它读取了dataframe元数据,以便确定它应该为其创建的稀疏传染料对象分配. 更具体地说,我使用包含0到23之间的所有单个值的训练集来编码"小时"字段. 现在,我现在使用"转换"方法OD管道来评分单行数据帧. 不幸的是,这导致oneHotEncoder的不同编码的稀疏向量对象 (24,[5],[1.0])与(11,[10],[1.0]) 我记录了这个这里,但这被确定为重复.所以在这个 thread 有一个解决方案更新DataFrames的元数据以反映"小时"字段的实际范围: from pyspark.sql.functions import col meta = {"ml_attr": { "vals": [str(x) for x in range(6)], # Provide a set of levels "type": "nominal", "name":
32 2022-12-21
编程技术问答社区
Pyspark的sql计数返回的行数与纯sql不同
我已经开始在我的一个项目中使用Pyspark.我正在测试不同的命令来探索图书馆的功能,我发现了一些我不了解的东西. 采用此代码: from pyspark import SparkContext from pyspark.sql import HiveContext from pyspark.sql.dataframe import Dataframe sc = SparkContext(sc) hc = HiveContext(sc) hc.sql("use test_schema") hc.table("diamonds").count() 最后一个 count()操作返回53941记录.如果我从Hive中的Diamonds 中选择A 选择计数(*),我得到了53940. 这是包括标题在内的Pyspark计数吗? 我试图研究: df = hc.sql("select * from diamonds").collect() df[0] df[1]
22 2022-12-21
编程技术问答社区
删除重复的行,不管是否有新的信息 -PySpark
说我有类似的数据框架: ID Media 1 imgix.com/20830dk 2 imgix.com/202398pwe 3 imgix.com/lvw0923dk 4 imgix.com/082kldcm 4 imgix.com/lks032m 4 imgix.com/903248 我想结束: ID Media 1 imgix.com/20830dk 2 imgix.com/202398pwe 3 imgix.com/lvw0923dk 4 imgix.com/082kldcm ,即使这会导致我失去2个链接,但我不在乎.在Python/pyspark中有什么简单的方法? 解决方案 by on Col('id') 将collect_list与agg一起汇总列表
36 2022-12-21
编程技术问答社区
如何改变一个数据框架的列名与其他数据框架的关系
我需要使用pyspark 更改dataframe df的列名df df +----+---+----+----+ |code| id|name|work| +----+---+----+----+ | ASD|101|John| DEV| | klj|102| ben|prod| +----+---+----+----+ df_col +-----------+-----------+ |col_current|col_updated| +-----------+-----------+ | id| Row_id| | name| Name| | code| Row_code| | Work| Work_Code| +-----------+-----------+ 如果df列匹配col_current,则DF列应替换为col_updated.例如:如果df.id匹配df.col_cur
22 2022-12-21
编程技术问答社区
PySpark试图将前一个字段的模式应用于下一个字段
用pyspark拥有这个奇怪的问题.它似乎正在尝试将前一个字段的模式应用于下一个字段,就像它正在处理一样. 最简单的测试用例我可以提出: %pyspark from pyspark.sql.types import ( DateType, StructType, StructField, StringType, ) from datetime import date from pyspark.sql import Row schema = StructType( [ StructField("date", DateType(), True), StructField("country", StringType(), True), ] ) test = spark.createDataFrame( [ Row( date=date(2019,
22 2022-12-21
编程技术问答社区