others - scala - 为有复杂列的DataFrame添加新列(Array<Map<String,String>>)

我正在使用以下模式从外部源加载数据框:


 |-- A: string (nullable = true)


 |-- B: timestamp (nullable = true)


 |-- C: long (nullable = true)


 |-- METADATA: array (nullable = true)


 | |-- element: struct (containsNull = true)


 | | |-- M_1: integer (nullable = true)


 | | |-- M_2: string (nullable = true)


 | | |-- M_3: string (nullable = true)


 | | |-- M_4: string (nullable = true)


 | | |-- M_5: double (nullable = true)


 | | |-- M_6: string (nullable = true)


 | | |-- M_7: double (nullable = true)


 | | |-- M_8: boolean (nullable = true)


 | | |-- M_9: boolean (nullable = true)


 |-- E: string (nullable = true)



现在,我需要添加新列METADATA_PARSED,其中包含列类型数组和以下事例类:

case class META_DATA_COL(M_1 : String ,M_2 : String ,M_3 , M_10:String )

我在此基于示例的方法是创建一个UDF,并且在METADATA列中传递。但是由于它类型复杂,我在解析它时遇到了很多麻烦。

解决这个问题的最好方法是什么?

时间:

你可以用这个


import org.apache.spark.sql.functions._



val tempdf = df.select(


 explode( col("METADATA")).as("flat") 


)



val processedDf = tempdf.select( col("flat.M_1"),col("flat.M_2"),col("flat.M_3"))



现在编写一个udf





def processudf = udf((col1:Int,col2:String,col3:String) => /* do the processing*/)



...