diff --git a/R/run-tests.sh b/R/run-tests.sh index 51ca7d600caf0..11fe6355e8568 100755 --- a/R/run-tests.sh +++ b/R/run-tests.sh @@ -23,7 +23,7 @@ FAILED=0 LOGFILE=$FWDIR/unit-tests.out rm -f $LOGFILE -SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE +SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.sql.sources.ignoreDataLocality="true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE FAILED=$((PIPESTATUS[0]||$FAILED)) NUM_TEST_WARNING="$(grep -c -e 'Warnings ----------------' $LOGFILE)" diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index c0a135e04bac5..428afb72045ca 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -207,18 +207,14 @@ private[spark] object HadoopFSUtils extends Logging { // Note that statuses only include FileStatus for the files and dirs directly under path, // and does not include anything else recursively. val statuses: Array[FileStatus] = try { - fs match { - // DistributedFileSystem overrides listLocatedStatus to make 1 single call to namenode - // to retrieve the file status with the file block location. The reason to still fallback - // to listStatus is because the default implementation would potentially throw a - // FileNotFoundException which is better handled by doing the lookups manually below. - case (_: DistributedFileSystem | _: ViewFileSystem) if !ignoreLocality => - val remoteIter = fs.listLocatedStatus(path) - new Iterator[LocatedFileStatus]() { - def next(): LocatedFileStatus = remoteIter.next - def hasNext(): Boolean = remoteIter.hasNext - }.toArray - case _ => fs.listStatus(path) + if (ignoreLocality) { + fs.listStatus(path) + } else { + val remoteIter = fs.listLocatedStatus(path) + new Iterator[LocatedFileStatus]() { + def next(): LocatedFileStatus = remoteIter.next + def hasNext(): Boolean = remoteIter.hasNext + }.toArray } } catch { // If we are listing a root path for SQL (e.g. a top level directory of a table), we need to @@ -288,55 +284,7 @@ private[spark] object HadoopFSUtils extends Logging { if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles } - val missingFiles = mutable.ArrayBuffer.empty[String] - val filteredLeafStatuses = doFilter(allLeafStatuses) - val resolvedLeafStatuses = filteredLeafStatuses.flatMap { - case f: LocatedFileStatus => - Some(f) - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not - // be a big deal since we always use to `parallelListLeafFiles` when the number of - // paths exceeds threshold. - case f if !ignoreLocality => - // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), - // which is very slow on some file system (RawLocalFileSystem, which is launch a - // subprocess and parse the stdout). - try { - val locations = fs.getFileBlockLocations(f, 0, f.getLen).map { loc => - // Store BlockLocation objects to consume less memory - if (loc.getClass == classOf[BlockLocation]) { - loc - } else { - new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, loc.getLength) - } - } - val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, - f.getModificationTime, 0, null, null, null, null, f.getPath, locations) - if (f.isSymlink) { - lfs.setSymlink(f.getSymlink) - } - Some(lfs) - } catch { - case _: FileNotFoundException if ignoreMissingFiles => - missingFiles += f.getPath.toString - None - } - - case f => Some(f) - } - - if (missingFiles.nonEmpty) { - logWarning( - s"the following files were missing during file scan:\n ${missingFiles.mkString("\n ")}") - } - - resolvedLeafStatuses + doFilter(allLeafStatuses) } // scalastyle:on argcount diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 02be8c9221704..5c885027f22cc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -214,9 +214,9 @@ class FileIndexSuite extends SharedSparkSession { assert(leafFiles.isEmpty) } else { assert(raceCondition == classOf[FileDeletionRaceFileSystem]) - // One of the two leaf files was missing, but we should still list the other: - assert(leafFiles.size == 1) - assert(leafFiles.head.getPath == nonDeletedLeafFilePath) + // listLocatedStatus will fail as a whole because the default impl calls + // getFileBlockLocations + assert(leafFiles.isEmpty) } } else { // We're NOT ignoring missing files, so catalog construction should fail diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index cf9664a9764be..123fe2567ef06 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1282,6 +1282,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { // This is to avoid running a spark job to list of files in parallel // by the InMemoryFileIndex. spark.sessionState.conf.setConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles * 2) + spark.sessionState.conf.setConf(SQLConf.IGNORE_DATA_LOCALITY, true) withTempDirs { case (root, tmp) => val src = new File(root, "a=1") @@ -1946,6 +1947,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val originClassForLocalFileSystem = spark.conf.getOption(optionKey) try { spark.conf.set(optionKey, classOf[CountListingLocalFileSystem].getName) + spark.conf.set(SQLConf.IGNORE_DATA_LOCALITY.key, "true") body } finally { originClassForLocalFileSystem match {