scala - Scala Spark: 只有在路径存在时才读取文件

我尝试读取Scala中路径Sequence的文件,下面是例子(伪)代码:


val paths = Seq[String] //Seq of paths


val dataframe = spark.read.parquet(paths: _*)



现在,按照上述顺序,存在一些路径,而有些则不存在。在读取parquet文件时,有没有方法忽略丢失的路径(为了避免,org.apache.spark.sql.AnalysisException: Path does not exist)?


val filteredPaths = paths.filter(p => Try(spark.read.parquet(p)).isSuccess)



我检查了DataFrameReaderoptions方法,但是没有类似于ignore_if_missing的选项。

时间:

你可以过滤掉那些不相关的文件,spark中,最好的方法是使用内部spark配置,假设spark会话变量被称为"spark",你可以执行以下操作:


import org.apache.hadoop.fs.FileSystem


import org.apache.hadoop.fs.Path



val hadoopfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)



def testDirExist(path: String): Boolean = {


 val p = new Path(path)


 hadoopfs.exists(p) && hadoopfs.getFileStatus(p).isDirectory


}


val filteredPaths = paths.filter(p => testDirExists(p))


val dataframe = spark.read.parquet(filteredPaths: _*)



先过滤paths怎么样


paths.filter(f => new java.io.File(f).exists)



举个例子:


Seq("/tmp","xx").filter(f => new java.io.File(f).exists)


// res18: List[String] = List(/tmp)



可能这样对你有用?


def read(path: Seq[String]): Try[DataFrame] = Try(spark.read.parquet(p))



read("somePath") match {


 case Success(df) => df.show()


 case Failure(_) => Unit


}



...