Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Refactor Content.rec(..) function to support tail-optimisation #417

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
56 changes: 43 additions & 13 deletions src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.types.{DataType, StructType}

import com.microsoft.hyperspace.{BuildInfo, HyperspaceException}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.Content.applyToFilesRecursively
import com.microsoft.hyperspace.util.PathUtils

// IndexLogEntry-specific fingerprint to be temporarily used where fingerprint is not defined.
Expand All @@ -45,27 +46,17 @@ case class Content(root: Directory, fingerprint: NoOpFingerprint = NoOpFingerpri
@JsonIgnore
lazy val files: Seq[Path] = {
// Recursively find files from directory tree.
rec(new Path(root.name), root, (f, prefix) => new Path(prefix, f.name))
applyToFilesRecursively(new Path(root.name), root, (f, prefix) => new Path(prefix, f.name))
}

@JsonIgnore
lazy val fileInfos: Set[FileInfo] = {
rec(
applyToFilesRecursively(
new Path(root.name),
root,
(f, prefix) =>
FileInfo(new Path(prefix, f.name).toString, f.size, f.modifiedTime, f.id)).toSet
}

private def rec[T](
prefixPath: Path,
directory: Directory,
func: (FileInfo, Path) => T): Seq[T] = {
val files = directory.files.map(f => func(f, prefixPath))
files ++ directory.subDirs.flatMap { dir =>
rec(new Path(prefixPath, dir.name), dir, func)
}
}
}

object Content {
Expand Down Expand Up @@ -111,6 +102,45 @@ object Content {
None
}
}

/**
* Apply `func` to each file in directory recursively. As far as FileInfo does not contain
* information about path, we need to build it also recursively when we switch to deeper
* levels.
*
* @param path Root path
* @param directory Root directory
* @param func Function which would apply to current path and file
* @tparam T
* @return Result list of applying function to all files
*/
private[hyperspace] def applyToFilesRecursively[T](
path: Path,
directory: Directory,
func: (FileInfo, Path) => T): Seq[T] = {
@tailrec
def recAcc[A](
dirMap: List[(Path, Seq[Directory])],
func: (FileInfo, Path) => A,
acc: Seq[A] = Seq.empty): Seq[A] = {
dirMap match {
case Nil => acc
case (curPath, curDirs) :: otherDirs =>
val curAcc = for {
dir <- curDirs
file <- dir.files
} yield func(file, new Path(curPath, dir.name))

val newLevels = curDirs
.filter(_.subDirs.nonEmpty)
.map(dir => (new Path(curPath, dir.name), dir.subDirs))

recAcc(otherDirs ++ newLevels, func, curAcc ++ acc)
}
}

recAcc(List((path, Seq(directory))), func)
}
}

/**
Expand Down Expand Up @@ -230,7 +260,7 @@ object Directory {
* @param files List of leaf files.
* @param fileIdTracker FileIdTracker to keep mapping of file properties to assigned file ids.
* Note: If a new leaf file is discovered, the input fileIdTracker gets
* updated by adding it to the files it is tracking.
* updated by adding it to the files it is tracking.
* @return Content object with Directory tree from leaf files.
*/
def fromLeafFiles(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,47 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
assert(actual.sourceFilesSizeInBytes == 200L)
}

test("Content.recFilesApply returns a result list of applying function to all files.") {
val directory = Directory(
"file:/",
files = Seq(FileInfo("f0", 0, 0, UNKNOWN_FILE_ID)),
subDirs = Seq(
Directory(
"a",
files =
Seq(FileInfo("f1", 0, 0, UNKNOWN_FILE_ID), FileInfo("f2", 0, 0, UNKNOWN_FILE_ID)),
subDirs = Seq(
Directory(
"b",
files =
Seq(FileInfo("f3", 0, 0, UNKNOWN_FILE_ID), FileInfo("f4", 0, 0, UNKNOWN_FILE_ID)),
subDirs = Seq(Directory("c"))),
Directory("d")))))

def theFunction: (FileInfo, Path) => Path = (f, prefix) => new Path(prefix, f.name)

val res = Content.applyToFilesRecursively(new Path("file:/"), directory, theFunction)

val expected =
Seq("file:/f0", "file:/a/f1", "file:/a/f2", "file:/a/b/f3", "file:/a/b/f4")
.map(new Path(_))
.toSet

val actual = res.toSet
assert(actual.equals(expected))
}

test("Content.recFilesApply returns empty list for directories without files.") {
val directory = Directory("file:/")

val res = Content.applyToFilesRecursively(
new Path("file:/"),
directory,
(f, prefix) => new Path(prefix, f.name))

assert(res.isEmpty)
}

test("Content.files api lists all files from Content object.") {
val content = Content(Directory("file:/", subDirs =
Seq(
Expand Down