Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ FAILED=0
LOGFILE=$FWDIR/unit-tests.out
rm -f $LOGFILE

SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.sql.sources.ignoreDataLocality="true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
FAILED=$((PIPESTATUS[0]||$FAILED))

NUM_TEST_WARNING="$(grep -c -e 'Warnings ----------------' $LOGFILE)"
Expand Down
70 changes: 9 additions & 61 deletions core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -207,18 +207,14 @@ private[spark] object HadoopFSUtils extends Logging {
// Note that statuses only include FileStatus for the files and dirs directly under path,
// and does not include anything else recursively.
val statuses: Array[FileStatus] = try {
fs match {
// DistributedFileSystem overrides listLocatedStatus to make 1 single call to namenode
// to retrieve the file status with the file block location. The reason to still fallback
// to listStatus is because the default implementation would potentially throw a
// FileNotFoundException which is better handled by doing the lookups manually below.
case (_: DistributedFileSystem | _: ViewFileSystem) if !ignoreLocality =>
val remoteIter = fs.listLocatedStatus(path)
new Iterator[LocatedFileStatus]() {
def next(): LocatedFileStatus = remoteIter.next
def hasNext(): Boolean = remoteIter.hasNext
}.toArray
case _ => fs.listStatus(path)
if (ignoreLocality) {
fs.listStatus(path)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switch to listStatusIterator(path) and again, provide a remoteIterator. This will give you on paged downloads on hdfs, webhdfs, async page prefetch on latest S3A builds, and, at worst elsewhere, exactly the same performance a listStatus

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg - I'll switch to listStatusIterator and create a wrapper class for the returned RemoteIterator in both cases.

} else {
val remoteIter = fs.listLocatedStatus(path)
new Iterator[LocatedFileStatus]() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to pull this out into something reusable for any RemoteIterator[T] as it gets used in a number of API calls (all because of java's checked exceptions...)

def next(): LocatedFileStatus = remoteIter.next
def hasNext(): Boolean = remoteIter.hasNext
}.toArray

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the longer you can incrementally do per entry in the remote iterator, the more latencies talking to the object stores can be hidden. See HADOOP-17074 and HADOOP-17023 for details; one of the PRs shows some numbers there.

If the spark API could return an iterator/yield and the processing of it used that, a lot of that listing cost could be absorbed entirely.

@sunchao sunchao Oct 20, 2020

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it would be lovely if we can get async listing here, but I think it requires a much bigger surgery - up to the top level currently Spark's RDD model requires all the input partitions to be ready before it can start processing (deeply embedded in its primitives such as map/reduce).

We can perhaps add the async logic here in this class but I think "local" processing we're doing here is far cheaper than the remote listing and perhaps can't gain much from the change.

We can wrap the iterator and make it looks like a lazy array until certain info is needed but again I think it won't go very far until we make extensive changes in upper stack like in PartitioningAwareFileIndex or DataSourceScanExec. Anyways I'll perhaps try this in a separate PR.

}
} catch {
// If we are listing a root path for SQL (e.g. a top level directory of a table), we need to
Expand Down Expand Up @@ -288,55 +284,7 @@ private[spark] object HadoopFSUtils extends Logging {
if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
}

val missingFiles = mutable.ArrayBuffer.empty[String]
val filteredLeafStatuses = doFilter(allLeafStatuses)
val resolvedLeafStatuses = filteredLeafStatuses.flatMap {
case f: LocatedFileStatus =>
Some(f)

// NOTE:
//
// - Although S3/S3A/S3N file system can be quite slow for remote file metadata
// operations, calling `getFileBlockLocations` does no harm here since these file system
// implementations don't actually issue RPC for this method.
//
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
// be a big deal since we always use to `parallelListLeafFiles` when the number of
// paths exceeds threshold.
case f if !ignoreLocality =>
// The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
// which is very slow on some file system (RawLocalFileSystem, which is launch a
// subprocess and parse the stdout).
try {
val locations = fs.getFileBlockLocations(f, 0, f.getLen).map { loc =>
// Store BlockLocation objects to consume less memory
if (loc.getClass == classOf[BlockLocation]) {
loc
} else {
new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, loc.getLength)
}
}
val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
if (f.isSymlink) {
lfs.setSymlink(f.getSymlink)
}
Some(lfs)
} catch {
case _: FileNotFoundException if ignoreMissingFiles =>
missingFiles += f.getPath.toString
None
}

case f => Some(f)
}

if (missingFiles.nonEmpty) {
logWarning(
s"the following files were missing during file scan:\n ${missingFiles.mkString("\n ")}")
}

resolvedLeafStatuses
doFilter(allLeafStatuses)
}
// scalastyle:on argcount

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,9 @@ class FileIndexSuite extends SharedSparkSession {
assert(leafFiles.isEmpty)
} else {
assert(raceCondition == classOf[FileDeletionRaceFileSystem])
// One of the two leaf files was missing, but we should still list the other:
assert(leafFiles.size == 1)
assert(leafFiles.head.getPath == nonDeletedLeafFilePath)
// listLocatedStatus will fail as a whole because the default impl calls
// getFileBlockLocations
assert(leafFiles.isEmpty)
}
} else {
// We're NOT ignoring missing files, so catalog construction should fail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
// This is to avoid running a spark job to list of files in parallel
// by the InMemoryFileIndex.
spark.sessionState.conf.setConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles * 2)
spark.sessionState.conf.setConf(SQLConf.IGNORE_DATA_LOCALITY, true)

withTempDirs { case (root, tmp) =>
val src = new File(root, "a=1")
Expand Down Expand Up @@ -1946,6 +1947,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
val originClassForLocalFileSystem = spark.conf.getOption(optionKey)
try {
spark.conf.set(optionKey, classOf[CountListingLocalFileSystem].getName)
spark.conf.set(SQLConf.IGNORE_DATA_LOCALITY.key, "true")
body
} finally {
originClassForLocalFileSystem match {
Expand Down