Dask 数据帧连接 parquet 文件时内存不足
我有1024个镶木文件,每个1Mbin尺寸.我正在使用Python dask将这些1024个文件合并到一个文件中,我有很多磁盘空间,但是RAM有限. . 有没有有效的方法可以使用Python dask? 解决此问题. import dask.dataframe as dd def generatePath(): for i in range(0,1024): return "data/2000-" + i +".parquet" def readDF(): paths = generatePath() for x in paths: df = dd.read_parquet(x, columns=['name', 'address'], engine='pyarrow') yield df def mergeDF(): allDF = readDF() df = next(allDF)
0 2024-02-29
编程技术问答社区
UserWarning: pyarrow.open_stream已被废弃,请使用pyarrow.ipc.open_stream警告
我正在通过pyspark本地运行pyspark用于NLP中的ML项目.管道中的预处理步骤的一部分涉及通过pandas_udf优化的pandas_udf函数的使用.每次我使用预处理的Spark DataFrame操作时,都会出现以下警告: 用户效力:pyarrow.open_stream已弃用,请使用pyarrow.ipc.open_stream warnings.warn(" pyarrow.open_stream已弃用,请使用" 我尝试更新pyarrow,但没有设法避免警告.我的Pyarrow版本是0.14.我想知道该警告的含义,如果有人找到了解决方案?非常感谢您. 火花会话详细信息: conf = SparkConf(). \ setAppName('map'). \ setMaster('local[*]'). \ set('spark.yarn.appMasterEnv.PYSPARK_PYTHON', '~/anaconda3/bin/python').
0 2024-02-29
编程技术问答社区
PySpark 2.4.5: 使用PandasUDF时出现IllegalArgumentException
我正在尝试熊猫UDF,并面对非法分解.我还尝试从Pyspark文档中复制示例 groupeddata 要检查但仍会遇到错误. 以下是环境配置 python3.7 安装Pyspark == 2.4.5使用PIP 使用pip安装pyarrow == 0.16.0 from pyspark.sql.functions import pandas_udf, PandasUDFType @pandas_udf('int', PandasUDFType.GROUPED_AGG) def min_udf(v): return v.min() sorted(gdf.agg(min_udf(df.age)).collect()) 输出 Py4JJavaError Traceback (most recent call last) in
0 2024-02-29
编程技术问答社区
如何本地读取feather/arrow文件?
i有 feather 格式sales.feather我正在用于在python和r. 之间交换数据 在r i使用以下命令: df = arrow::read_feather("sales.feather", as_data_frame=TRUE) 在Python中,我使用了: df = pandas.read_feather("sales.feather") 将数据从该文件加载到内存的最佳方法是从pyspark操作的Spark实例? 我也想控制pyspark.StorageLevel以获取从羽毛读取的数据. 我不想使用熊猫加载数据,因为它为我的19GB羽毛文件提供了从45GB CSV创建的. . 解决方案 丑陋hack-使用 mapinarrow . import pyarrow as pa def read_arrow(spark, filename, schema=None): def mapper(iterator):
0 2024-02-29
编程技术问答社区
pandasUDF和pyarrow 0.15.0
我最近开始在EMR簇上运行的许多pyspark作业上遇到很多错误. erros是 java.lang.IllegalArgumentException at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543) at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58) at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132) at org.apache.
0 2024-02-29
编程技术问答社区
AWS EMR-ModuleNotFoundError。没有名为'pyarrow'的模块。
我遇到了这个问题,没有apache箭头集成. 使用w/spark 2.4.3 的AWS EMR 在本地Spark单机器实例和Cloudera群集上测试了此问题,一切正常. 将它们设置在Spark-Env.sh 中 export PYSPARK_PYTHON=python3 export PYSPARK_PYTHON_DRIVER=python3 在Spark Shell 中确认了这一点 spark.version 2.4.3 sc.pythonExec python3 SC.pythonVer python3 使用Apache Arrow集成运行基本PANDAS_UDF导致错误 from pyspark.sql.functions import pandas_udf, PandasUDFType df = spark.createDataFrame( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2,
2 2024-02-29
编程技术问答社区
如何将一个巨大的pandas数据帧保存到hdfs?
IM与Pandas和Spark DataFrames一起工作.数据范围总是很大(> 20 GB),标准火花功能不足以适合这些尺寸.目前,我将我的pandas dataframe转换为这样的火花数据框架: dataframe = spark.createDataFrame(pandas_dataframe) 我这样做的转换是因为将Spark写入数据范围非常容易: dataframe.write.parquet(output_uri, mode="overwrite", compression="snappy") 但是,大于2 GB的数据范围的转换失败了. 如果我将火花数据帧转换为pandas,我可以使用pyarrow: // temporary write spark dataframe to hdfs dataframe.write.parquet(path, mode="overwrite", compression="snappy") // open
0 2024-02-29
编程技术问答社区
如何在Python中从parquet中获取模式 apache-beam
我目前在Python中有一个Apache-Beam管道,其中我正在阅读Parquet,将其转换为DataFrame进行一些熊猫的清洁,然后转换回Parquet,然后我想在那里写文件.看起来这样: with beam.Pipeline(options=pipeline_options) as p: dataframes = p \ | 'Read' >> beam.io.ReadFromParquetBatched(known_args.input) \ | 'Convert to pandas' >> beam.Map(lambda table: table.to_pandas()) \ | 'Process df' >> beam.ParDo(ProcessDataFrame()) \ | 'Convert to parquet' >> beam.Map(lambda table: table.to_parquet()
0 2024-02-28
编程技术问答社区
读取 S3 拼花表的最后 N 行
如果我应用了讨论的内容在这里要在s3 buck中读取parquet文件到pandas dataframe,尤其是: import pyarrow.parquet as pq import s3fs s3 = s3fs.S3FileSystem() pandas_dataframe = pq.ParquetDataset('s3://your-bucket/', filesystem=s3).read_pandas().to_pandas() 随着时间的流逝,表越来越大,我需要定期进行此检索,我只想将最后的n行读取到数据框架中.这可能吗? 解决方案 是的,这是完全可能的. S3允许部分对象读取. Parquet文件允许根据行组进行部分读取(Pyarrow公开了此功能).此外,如果您有多个文件(无论文件格式如何),则Pyarrow允许部分读取.但是,这些方法将为如何创建输入文件提供一些要求(请参见底部). . 简单的方式 最简单的事情是使用较新的import p
0 2024-01-26
编程技术问答社区
在S3上由pyarrow创建的日志parquet文件名
我们将数据附加到使用Pyarrow中存储在S3(分区)中的现有Parquet数据集中.这每小时几次在AWS Lambda上运行.最小的例子是: import pyarrow as pa import pyarrow.parquet as pq import s3fs df = ... # Existing pandas df table = pa.Table.from_pandas(df) pq.write_to_dataset( table, filesystem=s3, root_path=f"s3://s3-path/", partition_cols=['year', "month"] ) 因此,根据内部数据值,许多镶木木材文件将写入S3.我们的目的是通过输出结果文件名(S3键)来跟踪哪些文件已写入文件系统. 有什么方法可以捕获由pyarrow或s3fs编写的实际文件名?镶木点文件名是根据计算的哈希名称任意命名的,我看不到提到的两
4 2024-01-25
编程技术问答社区
我如何使用awswrangler只读取存储在S3中的parquet文件的前几行?
我正在尝试使用Awswrangler来读取pandas dataFrame,该框架中存储在S3中的任意大型镶木文件,但由于文件的大小而将我的查询限制为第一个N行. P> 我看不到该怎么做,还是在不重新搬迁的情况下是否可能做到. 我可以使用chunked=INTEGER读书后,说明了,如果是的话,如何? 我使用pyarrow遇到了这个不完整的解决方案(最后一个n行;)) - 读取S3 Parquet表的最后一行 - 但是基于时间的过滤器对我而言并不理想,并且被接受的解决方案甚至无法达到故事的结尾(尽管有帮助) . 或者如果没有先下载文件(我现在可能已经完成)? 是否有其他方式? 谢谢! 解决方案 您可以使用awswrangler使用 S3选择.例如: import awswrangler as wr df = wr.s3.select_query( sql="SELECT * FROM s3object s limit 5",
4 2024-01-25
编程技术问答社区
用AWS Lambda读取存储在S3的Parquet文件(Python 3)
我正在尝试使用AWS lambda在S3中加载,处理和编写Parquet文件.我的测试/部署过程是: https://github.com/lambci/lambci/docker-lambda 作为一个容器亚马逊环境,因为需要安装本机库(Numpy等). 此过程生成zip文件: http://docs.aws.amazon.com/lambda/lambda/latest/dg/with-s3-example-deployment-pkg.html#with-s3-with-s3-epample-部署pkg-python 将测试python函数添加到ZIP,将其发送到S3,更新lambda并测试 看来有两种可能的方法,在docker容器: 的本地工作 带有S3FS的FastParquet:不幸的是,包装的未拉链尺寸大于256MB,因此我无法使用它更新Lambda代码. pyarrow with s3fs:我跟随 https://github.com/apach
8 2024-01-25
编程技术问答社区
使用谓词来过滤来自pyarrow.parquet.ParquetDataset的记录
我有一个存储在S3上的Parquet数据集,我想从数据集中查询特定的行.我能够使用petastorm来做到这一点,但是现在我只想使用pyarrow. 来做到这一点. 这是我的尝试: import pyarrow.parquet as pq import s3fs fs = s3fs.S3FileSystem() dataset = pq.ParquetDataset( 'analytics.xxx', filesystem=fs, validate_schema=False, filters=[('event_name', '=', 'SomeEvent')] ) df = dataset.read_pandas().to_pandas() 但这返回pandas dataframe,好像过滤器无法正常工作一样,即我的行具有各种值event_name.我缺少一些东西还是被误解的东西?我可以在获得熊猫数据框后进行过滤,但我会使用比所需
4 2024-01-25
编程技术问答社区
RedShift的FastParquet出口
我有一个非常简单的想法:使用python pandas(为方便起见),用中等数据量进行一些简单的数据库操作,并以parquet格式将数据写回S3. 然后,数据应作为外部表接触红移,以免从实际的红移群集中占据存储空间. 我找到了两种方法. 给定数据: data = { 'int': [1, 2, 3, 4, None], 'float': [1.1, None, 3.4, 4.0, 5.5], 'str': [None, 'two', 'three', 'four', 'five'], 'boolean': [True, None, True, False, False], 'date': [ date(2000, 1, 1), date(2000, 1, 2), date(2000, 1, 3), date(2000, 1, 4), None,
8 2024-01-25
编程技术问答社区
Python pip install pyarrow error, unable to execute 'cmake'
我正在尝试在我的EMR群集的主实例上安装Pyarrow,但是我总是会收到此错误. [hadoop@ip-XXX-XXX-XXX-XXX ~]$ sudo /usr/bin/pip-3.4 install pyarrow Collecting pyarrow Downloading https://files.pythonhosted.org/packages/c0/a0/f7e9dfd8988d94f4952f9b50eb04e14a80fbe39218520725aab53daab57c/pyarrow-0.10.0.tar.gz (2.1MB) 100% |████████████████████████████████| 2.2MB 643kB/s Requirement already satisfied: numpy>=1.10 in /usr/local/lib64/python3.4/site-packages (from pyarrow) Requirement
22 2024-01-24
编程技术问答社区
用pyarrow和pyspark创建的parquet文件是否兼容?
我必须通过两个步骤将JSON中的分析数据转换为Parquet.对于大量现有数据,我正在写一份pyspark工作并做 df.repartition(*partitionby).write.partitionBy(partitionby). mode("append").parquet(output,compression=codec) 但是,对于增量数据,我计划使用AWS lambda.大概,Pyspark对此会过分杀伤,因此我计划使用Pyarrow(我知道它不必要地涉及熊猫,但是我找不到更好的选择).因此,基本上: import pyarrow.parquet as pq pq.write_table(table, outputPath, compression='snappy', use_deprecated_int96_timestamps=True) 我想知道Pyspark和Pyarrow编写的Parquet文件是否兼容(相对于雅典娜)?
4 2024-01-24
编程技术问答社区
将新列添加到HuggingFace数据集中
在数据集中,我有5000000行,我想在我的数据集中添加一个名为"嵌入"的列. dataset = dataset.add_column('embeddings', embeddings) 变量嵌入是一个大小的numpy memmap阵列(5000000,512). 但是我得到了这个错误: arrowinvalidtraceback(最近的最新电话) 在 ----> 1个dataset = dataset.add_column('embeddings',embeddings) /opt/conda/lib/python3.8/site-packages/datasets/arrow_dataset.py in wrapper(*args,** kwargs) 486} 487#应用实际功能 - > 488 OUT:UNION [" DATASET"," DATASETDICT"] = func(self, *args,** kwargs) 489数据集:li
4 2023-12-11
编程技术问答社区
如何在Windows上为Python 3.7正确设置pyarrow?
我一直在尝试通过pip(pip install pyarrow,以及Yagav:py -3.7 -m pip install --user pyarrow>)和conda(conda install -c conda-forge pyarrow,也使用conda install pyarrow),从src构建lib(使用conda环境和conda环境和我并不真正理解的一些魔术),但是在安装(没有错误的情况下)之后,它一直以一个和同一问题结尾,当我打电话时: import pyarrow as pa fs = pa.hdfs.connect(host='my_host', user='my_user@my_host', kerb_ticket='path_to_kerb_ticket') 下一个消息失败: Traceback (most recent call last): File "", line 1, in File "C:\ProgramData\Anac
2 2023-12-06
编程技术问答社区
如何在 pyarrow.compute.assume_timezone 中使用 "tzdata "文件
im尝试使用方法pyarrow.compute.assume_timezone,但我得到了错误: pyarrow.lib.ArrowInvalid: Cannot locate timezone 'UTC': Unable to get Timezone database version from C:\Users\Nick\Downloads\tzdata\ 我尝试从 https://www.iana.org/time-zones- a>没有成功 有人做到了吗? import pyarrow import pyarrow.compute as pc import numpy dt = pyarrow.array([numpy.datetime64("2022-10-10T12:00:12.123456789")], pyarrow.timestamp("ns")) print(pc.assume_timezone(dt, "UTC")) 解决方案 确实有
18 2023-10-22
编程技术问答社区
将 python-polars 连接到 SQL 服务器(目前不支持)
如何将MS SQL Server直接连接到Polars? 该文档未列出任何支持的连接,但建议使用熊猫. 更新: SQL Server身份验证每个答案有效,但是Windows域身份验证行不通.请参阅问题 解决方案 下面的类允许轻松连接连接. import keyring from typing import Optional class KeyringSQL: def __init__(self, username: str, server: str, database: str, server_type: Optional[str]= 'mssql' ,driver: Optional[str] = 'SQL+Server', trusted_connection: Optional[bool] = False): self.server_type = server_type self.username =
2 2023-09-23
编程技术问答社区