python - python - 将PADAS数据帧转换为SPARK数据帧错误

我正在尝试将Pandas DF转换为Spark ,DF头:


10000001,1,0,1,12:35,OK,10002,1,0,9,f,NA,24,24,0,3,9,0,0,1,1,0,0,4,543


10000001,2,0,1,12:36,OK,10002,1,0,9,f,NA,24,24,0,3,9,2,1,1,3,1,3,2,611


10000002,1,0,4,12:19,PA,10003,1,1,7,f,NA,74,74,0,2,15,2,0,2,3,1,2,2,691



代码:


dataset = pd.read_csv("data/AS/test_v2.csv")


sc = SparkContext(conf=conf)


sqlCtx = SQLContext(sc)


sdf = sqlCtx.createDataFrame(dataset)



我得到了一个错误:


TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>



时间:

你需要确保你的pandas数据帧列适用于spark推断的类型。如果你的Pandas dataframe列出如下内容:


pd.info()


<class 'pandas.core.frame.DataFrame'>


RangeIndex: 5062 entries, 0 to 5061


Data columns (total 51 columns):


SomeCol 5062 non-null object


Col2 5062 non-null object



并且你得到了错误的尝试:


df[['SomeCol', 'Col2']] = df[['SomeCol', 'Col2']].astype(str)



确定.astype(str)实际上是你想要这些列的类型,基本上,当底层Java代码试图从python中的对象推断出类型时,它会使用一些观察并进行猜测,如果该猜测不适用于列中的所有数据,那么它正在尝试要从pandas转换为spark它就会失败。

通过按如下方式强制模式,可以避免相关的错误:

注:文本文件使用原始数据(如上所述)创建(test.csv ),假设的列名称插入("col1","col2","col25")。


import pyspark


from pyspark.sql import SparkSession


import pandas as pd



spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()



pdDF = pd.read_csv("test.csv")



Pandas数据帧的内容:


pdDF



col1 col2 col3 col4 col5 col6 col7 col8 col9 col10 ... col16 col17 col18 col19 col20 col21 col22 col23 col24 col25


0 10000001 1 0 1 12:35 OK 10002 1 0 9 ... 3 9 0 0 1 1 0 0 4 543


1 10000001 2 0 1 12:36 OK 10002 1 0 9 ... 3 9 2 1 1 3 1 3 2 611


2 10000002 1 0 4 12:19 PA 10003 1 1 7 ... 2 15 2 0 2 3 1 2 2 691



接下来,创建schema:


from pyspark.sql.types import *



mySchema = StructType([ StructField("Col1", LongType(), True)


 ,StructField("Col2", IntegerType(), True)


 ,StructField("Col3", IntegerType(), True)


 ,StructField("Col4", IntegerType(), True)


 ,StructField("Col5", StringType(), True)


 ,StructField("Col6", StringType(), True)


 ,StructField("Col7", IntegerType(), True)


 ,StructField("Col8", IntegerType(), True)


 ,StructField("Col9", IntegerType(), True)


 ,StructField("Col10", IntegerType(), True)


 ,StructField("Col11", StringType(), True)


 ,StructField("Col12", StringType(), True)


 ,StructField("Col13", IntegerType(), True)


 ,StructField("Col14", IntegerType(), True)


 ,StructField("Col15", IntegerType(), True)


 ,StructField("Col16", IntegerType(), True)


 ,StructField("Col17", IntegerType(), True)


 ,StructField("Col18", IntegerType(), True)


 ,StructField("Col19", IntegerType(), True)


 ,StructField("Col20", IntegerType(), True)


 ,StructField("Col21", IntegerType(), True)


 ,StructField("Col22", IntegerType(), True)


 ,StructField("Col23", IntegerType(), True)


 ,StructField("Col24", IntegerType(), True)


 ,StructField("Col25", IntegerType(), True)])



注:True (表示允许为空)

创建pyspark数据帧:


df = spark.createDataFrame(pdDF,schema=mySchema)



确认Pandas数据帧现在是pyspark数据帧:

 
type(df)



 

输出:


pyspark.sql.dataframe.DataFrame



要通过以下方式解决Kate的注释-要强制使用一般的(字符串)架构,可以执行以下操作:


df=spark.createDataFrame(pdPD.astype(str)) 



我已经尝试了你的数据,并且它工作正常:


%pyspark


import pandas as pd


from pyspark.sql import SQLContext


print sc


df = pd.read_csv("test.csv")


print type(df)


print df


sqlCtx = SQLContext(sc)


sqlCtx.createDataFrame(df).show()



我曾经收到过类似的错误消息,在我的案例中,这是因为我的pandas数据帧包含空值,我建议在转换为spark之前在pandas中处理这个问题(这解决了我的问题)。

...