From 50bc56a93cebe052a0f5c2272282694c5cd0cb0b Mon Sep 17 00:00:00 2001 From: ddrag Date: Tue, 20 Apr 2021 09:53:09 +0200 Subject: [PATCH 1/6] remove arrangement --- .../hyperspace/index/IndexLogEntry.scala | 52 ++++++++++++++----- .../hyperspace/index/IndexLogEntryTest.scala | 41 +++++++++++++++ 2 files changed, 81 insertions(+), 12 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 4d9227dc9..f3e992e72 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.types.{DataType, StructType} import com.microsoft.hyperspace.{BuildInfo, HyperspaceException} import com.microsoft.hyperspace.actions.Constants +import com.microsoft.hyperspace.index.Content.recFilesApply import com.microsoft.hyperspace.util.PathUtils // IndexLogEntry-specific fingerprint to be temporarily used where fingerprint is not defined. @@ -45,27 +46,17 @@ case class Content(root: Directory, fingerprint: NoOpFingerprint = NoOpFingerpri @JsonIgnore lazy val files: Seq[Path] = { // Recursively find files from directory tree. - rec(new Path(root.name), root, (f, prefix) => new Path(prefix, f.name)) + recFilesApply(new Path(root.name), root, (f, prefix) => new Path(prefix, f.name)) } @JsonIgnore lazy val fileInfos: Set[FileInfo] = { - rec( + recFilesApply( new Path(root.name), root, (f, prefix) => FileInfo(new Path(prefix, f.name).toString, f.size, f.modifiedTime, f.id)).toSet } - - private def rec[T]( - prefixPath: Path, - directory: Directory, - func: (FileInfo, Path) => T): Seq[T] = { - val files = directory.files.map(f => func(f, prefixPath)) - files ++ directory.subDirs.flatMap { dir => - rec(new Path(prefixPath, dir.name), dir, func) - } - } } object Content { @@ -111,6 +102,43 @@ object Content { None } } + + /** + * Apply `func` to each file in directory recursively. + * + * @param prefixPath Root prefix + * @param directory Root directory + * @param func Function which would apply to current prefix and file + * @tparam T + * @return Result list of applying function to all files + */ + def recFilesApply[T]( + prefixPath: Path, + directory: Directory, + func: (FileInfo, Path) => T): Seq[T] = { + @tailrec + def recAcc[A]( + dirMap: List[(Path, Seq[Directory])], + func: (FileInfo, Path) => A, + acc: Seq[A] = Seq.empty): Seq[A] = { + dirMap match { + case Nil => acc + case (curPrefixPath, curDirs) :: otherDirs => + val curAcc = for { + dir <- curDirs + file <- dir.files + } yield func(file, new Path(curPrefixPath, dir.name)) + + val newLevels = curDirs + .filter(_.subDirs.nonEmpty) + .map(dir => (new Path(curPrefixPath, dir.name), dir.subDirs)) + + recAcc(otherDirs ++ newLevels, func, curAcc ++ acc) + } + } + + recAcc(List(prefixPath -> Seq(directory)), func) + } } /** diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index c4936d88f..f8e31750f 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -242,6 +242,47 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter assert(actual.sourceFilesSizeInBytes == 200L) } + test("Content.recFilesApply returns a result list of applying function to all files.") { + val directory = Directory( + "file:/", + files = Seq(FileInfo("f0", 0, 0, UNKNOWN_FILE_ID)), + subDirs = Seq( + Directory( + "a", + files = + Seq(FileInfo("f1", 0, 0, UNKNOWN_FILE_ID), FileInfo("f2", 0, 0, UNKNOWN_FILE_ID)), + subDirs = Seq( + Directory( + "b", + files = + Seq(FileInfo("f3", 0, 0, UNKNOWN_FILE_ID), FileInfo("f4", 0, 0, UNKNOWN_FILE_ID)), + subDirs = Seq(Directory("c"))), + Directory("d"))))) + + def theFunction: (FileInfo, Path) => Path = (f, prefix) => new Path(prefix, f.name) + + val res = Content.recFilesApply(new Path("file:/"), directory, theFunction) + + val expected = + Seq("file:/f0", "file:/a/f1", "file:/a/f2", "file:/a/b/f3", "file:/a/b/f4") + .map(new Path(_)) + .toSet + + val actual = res.toSet + assert(actual.equals(expected)) + } + + test("Content.recFilesApply returns empty list for directories without files.") { + val directory = Directory("file:/") + + val res = Content.recFilesApply( + new Path("file:/"), + directory, + (f, prefix) => new Path(prefix, f.name)) + + assert(res.isEmpty) + } + test("Content.files api lists all files from Content object.") { val content = Content(Directory("file:/", subDirs = Seq( From d9e93a1702579538c3273e3016d9d651a20f804a Mon Sep 17 00:00:00 2001 From: ddrag Date: Mon, 26 Apr 2021 16:01:10 +0200 Subject: [PATCH 2/6] scalafmt only recFilesApply func --- .../microsoft/hyperspace/index/IndexLogEntry.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index f3e992e72..3ceff80d7 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -113,14 +113,14 @@ object Content { * @return Result list of applying function to all files */ def recFilesApply[T]( - prefixPath: Path, - directory: Directory, - func: (FileInfo, Path) => T): Seq[T] = { + prefixPath: Path, + directory: Directory, + func: (FileInfo, Path) => T): Seq[T] = { @tailrec def recAcc[A]( - dirMap: List[(Path, Seq[Directory])], - func: (FileInfo, Path) => A, - acc: Seq[A] = Seq.empty): Seq[A] = { + dirMap: List[(Path, Seq[Directory])], + func: (FileInfo, Path) => A, + acc: Seq[A] = Seq.empty): Seq[A] = { dirMap match { case Nil => acc case (curPrefixPath, curDirs) :: otherDirs => @@ -258,7 +258,7 @@ object Directory { * @param files List of leaf files. * @param fileIdTracker FileIdTracker to keep mapping of file properties to assigned file ids. * Note: If a new leaf file is discovered, the input fileIdTracker gets - * updated by adding it to the files it is tracking. + * updated by adding it to the files it is tracking. * @return Content object with Directory tree from leaf files. */ def fromLeafFiles( From 848b4930e869355c40ce3f2f35805e7eaf8465bc Mon Sep 17 00:00:00 2001 From: Dmytro Dragan <5161047+dmytroDragan@users.noreply.github.com> Date: Mon, 26 Apr 2021 16:02:31 +0200 Subject: [PATCH 3/6] Update src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala reduce visibility scope for recFilesApply Co-authored-by: EJ Song <51077614+sezruby@users.noreply.github.com> --- .../scala/com/microsoft/hyperspace/index/IndexLogEntry.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 3ceff80d7..979e52cf9 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -112,7 +112,7 @@ object Content { * @tparam T * @return Result list of applying function to all files */ - def recFilesApply[T]( + private[hyperspace] def recFilesApply[T]( prefixPath: Path, directory: Directory, func: (FileInfo, Path) => T): Seq[T] = { From 90046da78592d1ae5061ad3792be3fbcfe0f7d45 Mon Sep 17 00:00:00 2001 From: Dmytro Dragan <5161047+dmytroDragan@users.noreply.github.com> Date: Mon, 26 Apr 2021 16:02:39 +0200 Subject: [PATCH 4/6] Update src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala Co-authored-by: EJ Song <51077614+sezruby@users.noreply.github.com> --- .../scala/com/microsoft/hyperspace/index/IndexLogEntry.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 979e52cf9..aa7d2522b 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -137,7 +137,7 @@ object Content { } } - recAcc(List(prefixPath -> Seq(directory)), func) + recAcc(List((prefixPath, Seq(directory))), func) } } From eb4d5d1c3b925fff37ece3dab77d40f87265b33f Mon Sep 17 00:00:00 2001 From: ddrag Date: Wed, 5 May 2021 18:00:30 +0200 Subject: [PATCH 5/6] rename function and added description --- .../hyperspace/index/IndexLogEntry.scala | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index aa7d2522b..2d74ec81a 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.types.{DataType, StructType} import com.microsoft.hyperspace.{BuildInfo, HyperspaceException} import com.microsoft.hyperspace.actions.Constants -import com.microsoft.hyperspace.index.Content.recFilesApply +import com.microsoft.hyperspace.index.Content.applyToFilesRecursively import com.microsoft.hyperspace.util.PathUtils // IndexLogEntry-specific fingerprint to be temporarily used where fingerprint is not defined. @@ -46,12 +46,12 @@ case class Content(root: Directory, fingerprint: NoOpFingerprint = NoOpFingerpri @JsonIgnore lazy val files: Seq[Path] = { // Recursively find files from directory tree. - recFilesApply(new Path(root.name), root, (f, prefix) => new Path(prefix, f.name)) + applyToFilesRecursively(new Path(root.name), root, (f, prefix) => new Path(prefix, f.name)) } @JsonIgnore lazy val fileInfos: Set[FileInfo] = { - recFilesApply( + applyToFilesRecursively( new Path(root.name), root, (f, prefix) => @@ -104,16 +104,18 @@ object Content { } /** - * Apply `func` to each file in directory recursively. + * Apply `func` to each file in directory recursively. As far as FileInfo does not contain + * information about path, we need to build it also recursively when we switch to deeper + * levels. * - * @param prefixPath Root prefix + * @param path Root path * @param directory Root directory - * @param func Function which would apply to current prefix and file + * @param func Function which would apply to current path and file * @tparam T * @return Result list of applying function to all files */ - private[hyperspace] def recFilesApply[T]( - prefixPath: Path, + private[hyperspace] def applyToFilesRecursively[T]( + path: Path, directory: Directory, func: (FileInfo, Path) => T): Seq[T] = { @tailrec @@ -123,21 +125,21 @@ object Content { acc: Seq[A] = Seq.empty): Seq[A] = { dirMap match { case Nil => acc - case (curPrefixPath, curDirs) :: otherDirs => + case (curPath, curDirs) :: otherDirs => val curAcc = for { dir <- curDirs file <- dir.files - } yield func(file, new Path(curPrefixPath, dir.name)) + } yield func(file, new Path(curPath, dir.name)) val newLevels = curDirs .filter(_.subDirs.nonEmpty) - .map(dir => (new Path(curPrefixPath, dir.name), dir.subDirs)) + .map(dir => (new Path(curPath, dir.name), dir.subDirs)) recAcc(otherDirs ++ newLevels, func, curAcc ++ acc) } } - recAcc(List((prefixPath, Seq(directory))), func) + recAcc(List((path, Seq(directory))), func) } } From a0c4d21608616fdfab7a878b99a3e6ef065ada56 Mon Sep 17 00:00:00 2001 From: ddrag Date: Wed, 5 May 2021 18:25:56 +0200 Subject: [PATCH 6/6] fixed tests --- .../com/microsoft/hyperspace/index/IndexLogEntryTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index f8e31750f..0048d634d 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -261,7 +261,7 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter def theFunction: (FileInfo, Path) => Path = (f, prefix) => new Path(prefix, f.name) - val res = Content.recFilesApply(new Path("file:/"), directory, theFunction) + val res = Content.applyToFilesRecursively(new Path("file:/"), directory, theFunction) val expected = Seq("file:/f0", "file:/a/f1", "file:/a/f2", "file:/a/b/f3", "file:/a/b/f4") @@ -275,7 +275,7 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter test("Content.recFilesApply returns empty list for directories without files.") { val directory = Directory("file:/") - val res = Content.recFilesApply( + val res = Content.applyToFilesRecursively( new Path("file:/"), directory, (f, prefix) => new Path(prefix, f.name))