apache-spark - Pyspark - 在大型数据帧上执行groupby和聚合时引发java堆内存不足

我是spark新手,在Java方面没有编程经验,我正在使用pyspark处理一个非常大的时间序列数据集,它有接近4000个数字(浮点)列和数十亿行。

我希望使用此数据集实现的内容如下:

时间序列数据为10ms间隔,我想将数据按1s间隔分组,并使用平均值作为聚合函数。

下面是我编写的groupby和聚合的代码:


col_list = [... list of numeric columns in the dataframe ...]



agg_funcs = [mean] # I also want to add other aggregation functions here later.



exprs = [f(df[c]).alias(f.__name__ + '_' + c) for f in agg_funcs for c in col_list]



result = (df.groupBy(['Year', 'Month', 'Day', 'Hour', 'Minute', 'Second'])


 .agg(*exprs))



现在我想把上面的结果dataframe写到一个分区的parquet 上:


(result.write.mode("overwrite")


 .partitionBy('Year', 'Month', 'Day', 'Hour', 'Minute', 'Second')


 .parquet('/out/'))



但是,我得到了一个java堆内存错误。

我尝试增加spark.sql.shuffle.partitions以便每个分区都有较小的大小,但是这并没有帮助。

我的spark集群配置:


2 workers + 1 master


Both the worker nodes have 256 GB RAM and 32 cores each.


Master node has 8 cores and 32 GB RAM.



我为spark作业指定的配置为:


{


"driverMemory":"8G", 


"driverCores": 4, 


"executorMemory":"20G", 


"executorCores": 4, 


"numExecutors": 14, 


"conf": {


"spark.sql.shuffle.partitions": 2000000


 }


}



有谁能帮助我理解为什么存在内存问题以及如何修复它? 谢谢

时间:

我相信这是由于数据偏斜( data skew )而发生的,其中一个分区正在运行。

Spark的groupBy()需要一次将所有的键值加载到内存中以执行。

由于你可能拥有有类似GROUP BY键的大型数据,所以增加分区无法工作,检查有没有类似GROUP BY键的数据倾斜。

最好再看看这篇文章

...