Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Refactor Content.rec(..) function to support tail-optimisation #417

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
54 changes: 41 additions & 13 deletions src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.types.{DataType, StructType}

import com.microsoft.hyperspace.{BuildInfo, HyperspaceException}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.Content.recFilesApply
import com.microsoft.hyperspace.util.PathUtils

// IndexLogEntry-specific fingerprint to be temporarily used where fingerprint is not defined.
Expand All @@ -45,27 +46,17 @@ case class Content(root: Directory, fingerprint: NoOpFingerprint = NoOpFingerpri
@JsonIgnore
lazy val files: Seq[Path] = {
// Recursively find files from directory tree.
rec(new Path(root.name), root, (f, prefix) => new Path(prefix, f.name))
recFilesApply(new Path(root.name), root, (f, prefix) => new Path(prefix, f.name))
}

@JsonIgnore
lazy val fileInfos: Set[FileInfo] = {
rec(
recFilesApply(
new Path(root.name),
root,
(f, prefix) =>
FileInfo(new Path(prefix, f.name).toString, f.size, f.modifiedTime, f.id)).toSet
}

private def rec[T](
prefixPath: Path,
directory: Directory,
func: (FileInfo, Path) => T): Seq[T] = {
val files = directory.files.map(f => func(f, prefixPath))
files ++ directory.subDirs.flatMap { dir =>
rec(new Path(prefixPath, dir.name), dir, func)
}
}
}

object Content {
Expand Down Expand Up @@ -111,6 +102,43 @@ object Content {
None
}
}

/**
* Apply `func` to each file in directory recursively.
*
* @param prefixPath Root prefix
* @param directory Root directory
* @param func Function which would apply to current prefix and file
* @tparam T
* @return Result list of applying function to all files
*/
private[hyperspace] def recFilesApply[T](
Copy link
Collaborator

@sezruby sezruby Apr 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this function back to case class Content? as this is a private function that should be only used in case class Content.

Other than that, this change looks good to me :) Could you rebase this PR onto master?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, sorry for delay. I consider it as just static function, so it could live separately from class. It also simplify testing

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this being a generic utility function, but:

  1. The name recFilesApply seems unusual. Maybe applyToFilesRecursively?
  2. The meaning of "prefix" here seems unclear. What is the "root prefix" and what is the "current prefix"? Without looking at the code, it is hard to catch the meaning of the parameters.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @clee704. Valid point, I have inherited prefixPath from original function. I have changed it to just path and added a short description why we need it

prefixPath: Path,
directory: Directory,
func: (FileInfo, Path) => T): Seq[T] = {
@tailrec
def recAcc[A](
dirMap: List[(Path, Seq[Directory])],
func: (FileInfo, Path) => A,
acc: Seq[A] = Seq.empty): Seq[A] = {
dirMap match {
case Nil => acc
case (curPrefixPath, curDirs) :: otherDirs =>
val curAcc = for {
dir <- curDirs
file <- dir.files
} yield func(file, new Path(curPrefixPath, dir.name))

val newLevels = curDirs
.filter(_.subDirs.nonEmpty)
.map(dir => (new Path(curPrefixPath, dir.name), dir.subDirs))

recAcc(otherDirs ++ newLevels, func, curAcc ++ acc)
}
}

recAcc(List((prefixPath, Seq(directory))), func)
}
}

/**
Expand Down Expand Up @@ -230,7 +258,7 @@ object Directory {
* @param files List of leaf files.
* @param fileIdTracker FileIdTracker to keep mapping of file properties to assigned file ids.
* Note: If a new leaf file is discovered, the input fileIdTracker gets
* updated by adding it to the files it is tracking.
* updated by adding it to the files it is tracking.
* @return Content object with Directory tree from leaf files.
*/
def fromLeafFiles(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,47 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter
assert(actual.sourceFilesSizeInBytes == 200L)
}

test("Content.recFilesApply returns a result list of applying function to all files.") {
val directory = Directory(
"file:/",
files = Seq(FileInfo("f0", 0, 0, UNKNOWN_FILE_ID)),
subDirs = Seq(
Directory(
"a",
files =
Seq(FileInfo("f1", 0, 0, UNKNOWN_FILE_ID), FileInfo("f2", 0, 0, UNKNOWN_FILE_ID)),
subDirs = Seq(
Directory(
"b",
files =
Seq(FileInfo("f3", 0, 0, UNKNOWN_FILE_ID), FileInfo("f4", 0, 0, UNKNOWN_FILE_ID)),
subDirs = Seq(Directory("c"))),
Directory("d")))))

def theFunction: (FileInfo, Path) => Path = (f, prefix) => new Path(prefix, f.name)

val res = Content.recFilesApply(new Path("file:/"), directory, theFunction)

val expected =
Seq("file:/f0", "file:/a/f1", "file:/a/f2", "file:/a/b/f3", "file:/a/b/f4")
.map(new Path(_))
.toSet

val actual = res.toSet
assert(actual.equals(expected))
}

test("Content.recFilesApply returns empty list for directories without files.") {
val directory = Directory("file:/")

val res = Content.recFilesApply(
new Path("file:/"),
directory,
(f, prefix) => new Path(prefix, f.name))

assert(res.isEmpty)
}

test("Content.files api lists all files from Content object.") {
val content = Content(Directory("file:/", subDirs =
Seq(
Expand Down