使用dask.delayed和pandas.DataFrame将dask.bag of dictionary转换为dask.dataframe[英] convert dask.bag of dictionaries to dask.dataframe using dask.delayed and pandas.DataFrame

本文是小编为大家收集整理的关于使用dask.delayed和pandas.DataFrame将dask.bag of dictionary转换为dask.dataframe的处理方法,想解了使用dask.delayed和pandas.DataFrame将dask.bag of dictionary转换为dask.dataframe的问题怎么解决?使用dask.delayed和pandas.DataFrame将dask.bag of dictionary转换为dask.dataframe问题的解决办法?那么可以参考本文帮助大家快速定位并解决问题。

问题描述

我正在努力将dask.bag词典转换为dask.delayed pandas.DataFrames转换为最终dask.dataframe

我有一个函数(make_dict),它将文件读取为相当复杂的嵌套词典结构和另一个函数(make_df),以将这些字典变成a pandas.DataFrame(每个文件的结果数据帧大约为100 mb).我想将所有数据范围附加到一个dask.dataframe中以进行进一步分析.

到目前为止,我一直在使用dask.delayed对象来加载,转换和附加所有工作正常的数据(请参见下面的示例).但是,对于将来的工作,我想使用dask.persist().

我设法将数据加载到dask.bag>中,导致dicts或pandas.DataFrame列表的列表,我可以在调用compute()后本地使用.但是,当我尝试使用to_delayed()将dask.bag转换为dask.dataframe时,我遇到了一个错误(请参阅下文).

感觉我在这里缺少一些简单的东西,或者我对dask.bag的方法是错误的?

下面的示例显示了我使用简化功能的方法,并引发了相同的错误.关于如何解决此问题的任何建议.

import numpy as np
import pandas as pd
import dask
import dask.dataframe
import dask.bag

print(dask.__version__) # 1.1.4
print(pd.__version__) # 0.24.2

def make_dict(n=1):
    return {"name":"dictionary","data":{'A':np.arange(n),'B':np.arange(n)}}

def make_df(d):
    return pd.DataFrame(d['data'])

k = [1,2,3]

# using dask.delayed
dfs = []
for n in k:
    delayed_1 = dask.delayed(make_dict)(n)
    delayed_2 = dask.delayed(make_df)(delayed_1)
    dfs.append(delayed_2)
ddf1 = dask.dataframe.from_delayed(dfs).compute() # this works as expected

# using dask.bag and turning bag of dicts into bag of DataFrames
b1 = dask.bag.from_sequence(k).map(make_dict)
b2 = b1.map(make_df)

df = pd.DataFrame().append(b2.compute()) # <- I would like to do this using delayed dask.DataFrames like above
ddf2 = dask.dataframe.from_delayed(b2.to_delayed()).compute() # <- this fails

# error:
# ValueError: Expected iterable of tuples of (name, dtype), got [   A  B
# 0  0  0]

我最终想使用分布式调度程序:

b = dask.bag.from_sequence(k).map(make_dict)
b = b.persist()
ddf = dask.dataframe.from_delayed(b.map(make_df).to_delayed())

推荐答案

在袋子情况下,延迟对象指向元素列表,因此您有pandas dataframes列表的列表,这不是您想要的.两个建议

  1. 只是坚持使用dask.它似乎对您有效
  2. 使用本文地址:https://www.itbaoku.cn/post/1793920.html