python - python - 在Pyspark中使用udf和numpy排序列表

我有一个PySpark dataframe,其中第二列是列表的列表。

以下是我拥有的PySpark dataframe:


+---+------------------------------+


|A |B |


+---+------------------------------+


|a |[[95.0], [25.0, 25.0], [40.0]]|


|a |[[95.0], [20.0, 80.0]] |


|a |[[95.0], [25.0, 75.0]] |


|b |[[95.0], [25.0, 75.0]] |


|b |[[95.0], [12.0, 88.0]] |


+---+------------------------------+




下面是我期望的输出:


+---+------------------------------+


|A |B |


+---+------------------------------+


|a |[25.0, 25.0, 40.0] |


|a |[20.0, 80.0] |


|a |[25.0, 75.0] |


|b |[25.0, 75.0] |


|b |[12.0, 88.0] |


+---+------------------------------+




下面是我目前使用的udf:


def remove_highest(col):


 return np.sort( np.asarray([item for sublist in col for item in sublist]) )[:-1]



udf_remove_highest = F.udf( remove_highest , T.ArrayType() )



尝试创建此udf时出现以下错误:


---------------------------------------------------------------------------


TypeError Traceback (most recent call last)


<ipython-input-20-6984c2f41293> in <module>()


 2 return np.sort( np.asarray([item for sublist in col for item in sublist]) )[:-1]


 3 


----> 4 udf_remove_highest = F.udf( remove_highest , T.ArrayType() )



TypeError: __init__() missing 1 required positional argument: 'elementType'




如何实现上述目标?

时间:

我不建议使用UDF,只需添加FloatType


udf_remove_highest = F.udf( remove_highest , T.ArrayType(T.FloatType()) )



这段代码将给出相同的结果,并且它使用pyspark函数,这将更加高效,快速和便宜:


df1=df.withColumn("B",F.flatten("B")).withColumn("B", F.sort_array("B")).withColumn("Max",F.array((F.array_max("B"))))


df1.withColumn("B",F.array_except("B","Max")).drop("Max").show()



+---+------------------+


| A| B|


+---+------------------+


| a|[25.0, 25.0, 40.0]|


| a| [20.0, 80.0]|


| a| [25.0, 75.0]|


| b| [25.0, 75.0]|


| b| [12.0, 88.0]|


+---+------------------+



...