如何在不重启集群的情况下在Databricks上重新安装相同版本的轮子
我正在开发一些python代码,这些代码将用作各种基于车轮的工作流程 databricks.鉴于它正在开发中,在进行代码进行测试以进行测试之后,我需要在Databricks群集上构建轮子来运行它(我使用了仅在Databricks运行时可用的功能,因此无法在本地运行). 这是我要做的: REMOTE_ROOT='dbfs:/user/kash@company.com/wheels' cd /home/kash/workspaces/project rm -rf dist poetry build whl_file=$(ls -1tr dist/project-*-py3-none-any.whl | tail -1 | xargs basename) echo 'copying..' && databricks fs cp --overwrite dist/$whl_file $REMOTE_ROOT echo 'installing..' && databrick
0 2023-11-30
编程技术问答社区
Apache Spark警告 "在RowBasedKeyValueBatch上调用spill()"的含义
我正在使用Apache Spark本地模式运行Pyspark 2.2.0作业,并查看以下警告: WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0. 此警告的原因是什么?这是我应该关心的东西还是可以安全地忽略它? 解决方案 如所示在这里此警告意味着您RAM已满,RAM内容的一部分已移至磁盘. 另请参阅 spark faq 我的数据是否需要适合内存才能使用Spark? 否.如果不适合内存,Spark的操作员将数据溢出到磁盘上,从而使其在任何尺寸的数据上都可以很好地运行.同样,根据RDD的存储级别确定的不适合内存的缓存数据集可以溢出到磁盘或随时重新计算. 确定. 其他解决方案 我猜该消息比一个简单的警告更糟糕:它是错误的边缘. 查看源代码: /** * Sometimes the Ta
0 2023-11-27
编程技术问答社区
无法找到PySpark内核-aws胶水与vscode的交互会话
我最近按照说明使用VSCODE安装胶水交互式会话,但是我找不到Pyspark内核(只能看到" Glue Spark"). 我想我已经在下面安装了所有内容(顺便说一句,我在Windows上) pip3 install --upgrade jupyter boto3 aws-glue-sessions pip3 show aws-glue-sessions cd \aws_glue_interactive_sessions jupyter-kernelspec install glue_pyspark jupyter-kernelspec install glue_spark 但是我可以在终端上运行jupyter notebook打开Jupyter笔记本电脑,与Pyspark一起工作而没有问题.我猜这个问题仅在使用VSCODE时发生. 任何人都可以提出建议该怎么办? vscode屏幕截图: 解决方案 根据此错误报告
0 2023-11-25
编程技术问答社区
E0401:在Windows 10中无法在VSCode中导入'pyspark
我已经在Windows 10机器上安装了使用Apache Spark. java, Python 3.6和 Spark(Spark-2.3.1-bin-Hadoop2.7) 我正在尝试在VSCODE中编写与Pyspark相关的代码.它显示"从"下方显示红色下划线,并显示错误消息 e0401:无法导入'pyspark' 我还使用了Ctrl+Shift+P,然后选择" Python:Update Workspace Pyspark库".它显示通知消息 确保您将Spark_Home环境变量设置为本地Spark安装的根路径! 怎么了? 解决方案 您需要使用 pyspark 使用pip install pyspark python package python package .实际上,这是vScode所需的唯一包装,除非您也想在同一台计算机上运行Spark应用程序.
0 2023-11-25
编程技术问答社区
Spark Worker要求获得荒谬的虚拟内存数量
我正在2个节点纱线群上运行火花作业.我的数据集并不大( 16/02/12 05:49:43警告Scheduler.taskSetManager:丢失任务0.0阶段2.1(TID 22,IP-172-31-6-141.EC2.INTERN)运行任务之一)原因:标记为失败的容器:container_1455246675722_0023_01_000003主机上:IP-172-31-6-141.ec2.internal.退出状态:143.诊断:容器[PID = 23206,ContainerId = Container_1455246675722_0023_01_000003]正在超越虚拟内存限制.当前用法:2.1 GB的11 GB物理内存; 305.3 GB的23.1 GB使用虚拟内存.杀死容器. container_14552466675722_0023_01_000003的conseg-tree的转储: | - PID PPID PGRPID SESSID CMD_NAME use
4 2023-11-20
编程技术问答社区
无法在virtualenv内加载pyspark
我已经在Python Virtualenv中安装了Pyspark.我还安装了jupyterlab,该jupyterlab是新发布的 解决方案 首先发射Virtualenv source venv/bin/activate export SPARK_HOME={path_to_venv}/lib/python2.7/site-packages/pyspark export PYSPARK_DRIVER_PYTHON=jupyter-lab 在此之前,我希望您已经完成了:pip install pyspark和pip install jupyterlab virtualenv 检查一下,一旦您的jupyterlab打开,在jupyterlab的盒子中键入sc,您应该具有SparkContext对象,并且输出应为: SparkContext Spark UI Version v2.2.1 Master local[*] AppName PySparkShell 其他
0 2023-11-19
编程技术问答社区
在pyspark工作中运送和使用virtualenv
问题:我正在尝试将Spark-Submit脚本从本地计算机运行到一组机器.集群完成的工作使用Numpy.我目前会收到以下错误: ImportError: Importing the multiarray numpy extension module failed. Most likely you are trying to import a failed build of numpy. If you're working with a numpy git repo, try `git clean -xdf` (removes all files not under version control). Otherwise reinstall numpy. Original error was: cannot import name multiarray 细节: 在我的本地环境中,我设置了一个Virtualenv,其中包括Numpy以及我在项目和其他各种库中使用的私人回购.
0 2023-11-19
编程技术问答社区
iPython笔记本中的PySpark在使用count()和first()时引发Py4JJavaError
我在Mac(Sierra 10.12.3 beta)中使用ipython笔记本(python v.3.6)中的pyspark(v.2.1.0). 1.我通过在终端拍摄ipython笔记本 - PYSPARK_PYTHON=python3 PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook" /Applications/spark-2.1.0-bin-hadoop2.7/bin/pyspark 2.将我的文件加载到火花上下文并确保其加载 - >>>lines = sc.textFile("/Users/PanchusMac/Dropbox/Learn_py/Virtual_Env/pyspark/README.md") >>>for i in lines.collect(): print(i) 效果很好,并在我的控制台上打印了结果,如下所示: # Apache
2 2023-11-19
编程技术问答社区
怎样才能看到SPARK发送给我的数据库的SQL语句?
我有一个火花集群和一个Vertica数据库.我使用 spark.read.jdbc( # etc 将Spark DataFrames加载到集群中.当我执行某个组函数 时 df2 = df.groupby('factor').agg(F.stddev('sum(PnL)')) df2.show() i然后获得vertica语法异常 Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) at org.a
0 2023-11-18
编程技术问答社区
在一个pyspark数据框架中的两列相乘。其中一列包含一个向量,一列包含一个常数
i有一个pyspark数据框架,该框具有一个带有向量值的列,一个具有恒定数值的列.例如说 A | B 1 | [2,4,5] 5 | [6,5,3] 我想用常数列多进行多个矢量列.我正在尝试做到这一点,因为我在b列中有一个单词wmbeddings,并且在A列中有一些权重.我的最终目的是获得加权嵌入. 解决方案 如果将矢量数据存储为一系列双打,则可以执行此操作: import breeze.linalg.{Vector => BV} val data = spark.createDataset(Seq( (1, Array[Double](2, 4, 5)), (5, Array[Double](6, 5, 3)) )).toDF("A", "B") data.as[(Long, Array[Double])].map(r => { (BV(r._2) * r._1.toDouble).toArray }).show() 变成
0 2023-11-17
编程技术问答社区
在多行中用密集的向量爆破列
我有一个带有两个列的数据框:BrandWatchErwaehnungID和word_counts. word_counts列是`countvectorizer(稀疏向量)的输出.掉落空行后,我创建了两个新的列,其中稀疏向量的索引和一个带有其值的索引. help0 = countedwords_text['BrandWatchErwaehnungID','word_counts'].rdd\ .filter(lambda x : x[1].indices.size!=0)\ .map(lambda x : (x[0],x[1],DenseVector(x[1].indices) , DenseVector(x[1].values))).toDF()\ .withColumnRenamed("_1", "BrandWatchErwaenungID").withColumnRenamed("_2", "word_counts")\ .withColumnR
4 2023-11-17
编程技术问答社区
火花: 不支持变量数据类型
从Pyspark中的变体数据类型的SQL Server提取数据时.我得到了一个sqlserverexception:"不支持变体数据类型" 请为任何解决方法提供建议. 解决方案 在获取和工作时,将列数据类型转换为varchar SELECT CONVERT(varchar,Code,20) into Code from DBTable
0 2023-11-14
编程技术问答社区
在PySpark的pandas_udf中使用外部库
可以使用外部库,例如 valueerror:系列的真实价值是模棱两可的.使用A.Empty,A.Bool(),A.Item(),a.any()或a.all(). 我尝试使用Spark 2.3.1. 解决方案 您可以将textdistance与您自己的代码一起包装(使用setup.py and bdist_egg构建egg文件),并在运行Spark时使用option --py-files指定最终软件包. 顺便说一句,错误消息似乎完全没有textdistance. 其他解决方案 您可以使用Spark UDF,例如实现Ratcliff-Obershelp函数: import textdistance def my_ro(s1,s2): d = textdistance.ratcliff_obershelp(s1,s2) return d spark.udf.register("my_ro", my_ro, FloatType()) spark.
0 2023-11-13
编程技术问答社区
计算一次UDF
我想在一个仅计算一次的pyspark dataframe中拥有一个UUID列,以便我可以在其他数据框中选择列,并且使UUID相同.但是,当我选择列时,重新计算了UUID列的UDF. 这是我想做的: >>> uuid_udf = udf(lambda: str(uuid.uuid4()), StringType()) >>> a = spark.createDataFrame([[1, 2]], ['col1', 'col2']) >>> a = a.withColumn('id', uuid_udf()) >>> a.collect() [Row(col1=1, col2=2, id='5ac8f818-e2d8-4c50-bae2-0ced7d72ef4f')] >>> b = a.select('id') >>> b.collect() [Row(id='12ec9913-21e1-47bd-9c59-6ddbe2365247')] # Wanted this to b
0 2023-11-12
编程技术问答社区
使用python的Spark流:如何添加UUID列?
我想在数据框架中添加带有生成ID的列.我尝试了: uuidUdf = udf(lambda x: str(uuid.uuid4()), StringType()) df = df.withColumn("id", uuidUdf()) 但是,当我这样做时,我的输出目录没有任何写作.当我删除这些行时,一切正常,所以必须有一些错误,但我在控制台上没有看到任何内容. 我尝试使用单调_increasing_id()而不是生成uuid,但是在我的测试中,这会产生许多重复.我需要一个唯一的标识符(不必具体是UUID). 我该怎么做? 解决方案 请尝试以下操作: import uuid from pyspark.sql.functions import udf uuidUdf= udf(lambda : str(uuid.uuid4()),StringType()) Df1 = Df.withColumn("id",uuidUdf()) 注意:添加新列后,您应
6 2023-11-12
编程技术问答社区
Pyspark: 从 pyspark 数据框中删除 UTF 空字符
我有一个类似于以下数据的Pyspark数据框架: df = sql_context.createDataFrame([ Row(a=3, b=[4,5,6],c=[10,11,12], d='bar', e='utf friendly'), Row(a=2, b=[1,2,3],c=[7,8,9], d='foo', e=u'ab\u0000the') ]) 列的值之一e包含UTF null字符\u0000.如果我尝试将此df加载到PostgreSQL数据库中,则会收到以下错误: ERROR: invalid byte sequence for encoding "UTF8": 0x00 这很有意义.在将数据加载到Postgres之前,我如何有效地从Pyspark数据框中删除空字符? 我已经尝试使用一些pyspark.sql.functions首先清洁数据而没有成功. encode,decode和regex_replace不起作用: df.s
4 2023-11-11
编程技术问答社区
如何在pyspark中使用Pandas UDF功能
我有一个带有两个列的火花框架,看起来像: +-------------------------------------------------------------+------------------------------------+ |docId |id | +-------------------------------------------------------------+------------------------------------+ |DYSDG6-RTB-91d663dd-949e-45da-94dd-e604b6050cb5-1537142434000|91d663dd-949e-45da-94dd-e604b6050cb5| |VAVLS7-RTB-8e2c1917-0
0 2023-11-10
编程技术问答社区
在Pyspark中的groupedBy对象中爆炸后使用Collect_set
我有一个具有这样的模式的数据框架: root |-- docId: string (nullable = true) |-- field_a: array (nullable = true) | |-- element: string (containsNull = true) |-- field_b: array (nullable = true) | |-- element: string (containsNull = true) 我想在field_a上执行groupBy,并使用collect_set将所有不同的值(基本上是列表中的内在值)保存在field_b中,我不想添加一个通过爆炸field_b然后在聚合中进行collect_set进行新列. 如何使用UDAF或PANDAS UDF实现这一目标? 例如. : +---------------------+----------------+------------+ |docId
PySpark : 当把一个字符串类型的DataFrame列转换为Double时,出现KeyError。
我正在尝试使用PySpark学习机器学习.我有一个数据集,该数据集具有几个String列,它们的值是True or False or Yes or No.我正在使用DecisionTree,我想将这些String值转换为相应的Double值,即True, Yes应该更改为1.0和False, No,应更改为0.0.我看到了一个教程,他们做了同样的事情,我想出了这个代码 df = sqlContext.read.csv("C:/../churn-bigml-20.csv",inferSchema=True,header=True) from pyspark.sql.types import DoubleType from pyspark.sql.functions import UserDefinedFunction binary_map = {'Yes':1.0, 'No':0.0, 'True':1.0, 'False':0.0} toNum = UserDefinedFun
通过udf将Spark数据帧转为numpy数组,或不收集到驱动中
现实生活DF是一个大量的数据框架,无法将其加载到驱动程序内存中. 可以使用常规或熊猫UDF吗? # Code to generate a sample dataframe from pyspark.sql import functions as F from pyspark.sql.types import * import pandas as pd import numpy as np sample = [['123',[[0,1,0,0,0,1,1,1,1,1,1,0,1,0,0,0,1,1,1,1,1,1], [0,1,0,0,0,1,1,1,1,1,1,0,1,0,0,0,1,1,1,1,1,1]]], ['345',[[1,0,0,0,0,1,1,1,0,1,1,0,1,0,0,0,1,1,1,1,1,1], [0,1,0,0,0,1,1,1,1,1,1,0,1,0,0,0,1,1,1,1,1,1]]], ['425',[[1,1,0,0,0,
4 2023-11-10
编程技术问答社区