Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
remove arrangement
Browse files Browse the repository at this point in the history
  • Loading branch information
ddrag-ai-dragonfly committed Apr 20, 2021
1 parent 336728d commit 50bc56a
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 12 deletions.
52 changes: 40 additions & 12 deletions src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.types.{DataType, StructType}

import com.microsoft.hyperspace.{BuildInfo, HyperspaceException}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.Content.recFilesApply
import com.microsoft.hyperspace.util.PathUtils

// IndexLogEntry-specific fingerprint to be temporarily used where fingerprint is not defined.
Expand All @@ -45,27 +46,17 @@ case class Content(root: Directory, fingerprint: NoOpFingerprint = NoOpFingerpri
@JsonIgnore
lazy val files: Seq[Path] = {
// Recursively find files from directory tree.
rec(new Path(root.name), root, (f, prefix) => new Path(prefix, f.name))
recFilesApply(new Path(root.name), root, (f, prefix) => new Path(prefix, f.name))
}

@JsonIgnore
lazy val fileInfos: Set[FileInfo] = {
rec(
recFilesApply(
new Path(root.name),
root,
(f, prefix) =>
FileInfo(new Path(prefix, f.name).toString, f.size, f.modifiedTime, f.id)).toSet
}

private def rec[T](
prefixPath: Path,
directory: Directory,
func: (FileInfo, Path) => T): Seq[T] = {
val files = directory.files.map(f => func(f, prefixPath))
files ++ directory.subDirs.flatMap { dir =>
rec(new Path(prefixPath, dir.name), dir, func)
}
}
}

object Content {
Expand Down Expand Up @@ -111,6 +102,43 @@ object Content {
None
}
}

/**
* Apply `func` to each file in directory recursively.
*
* @param prefixPath Root prefix
* @param directory Root directory
* @param func Function which would apply to current prefix and file
* @tparam T
* @return Result list of applying function to all files
*/
def recFilesApply[T](
prefixPath: Path,
directory: Directory,
func: (FileInfo, Path) => T): Seq[T] = {
@tailrec
def recAcc[A](
dirMap: List[(Path, Seq[Directory])],
func: (FileInfo, Path) => A,
acc: Seq[A] = Seq.empty): Seq[A] = {
dirMap match {
case Nil => acc
case (curPrefixPath, curDirs) :: otherDirs =>
val curAcc = for {
dir <- curDirs
file <- dir.files
} yield func(file, new Path(curPrefixPath, dir.name))

val newLevels = curDirs
.filter(_.subDirs.nonEmpty)
.map(dir => (new Path(curPrefixPath, dir.name), dir.subDirs))

recAcc(otherDirs ++ newLevels, func, curAcc ++ acc)
}
}

recAcc(List(prefixPath -> Seq(directory)), func)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,47 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter
assert(actual.sourceFilesSizeInBytes == 200L)
}

test("Content.recFilesApply returns a result list of applying function to all files.") {
val directory = Directory(
"file:/",
files = Seq(FileInfo("f0", 0, 0, UNKNOWN_FILE_ID)),
subDirs = Seq(
Directory(
"a",
files =
Seq(FileInfo("f1", 0, 0, UNKNOWN_FILE_ID), FileInfo("f2", 0, 0, UNKNOWN_FILE_ID)),
subDirs = Seq(
Directory(
"b",
files =
Seq(FileInfo("f3", 0, 0, UNKNOWN_FILE_ID), FileInfo("f4", 0, 0, UNKNOWN_FILE_ID)),
subDirs = Seq(Directory("c"))),
Directory("d")))))

def theFunction: (FileInfo, Path) => Path = (f, prefix) => new Path(prefix, f.name)

val res = Content.recFilesApply(new Path("file:/"), directory, theFunction)

val expected =
Seq("file:/f0", "file:/a/f1", "file:/a/f2", "file:/a/b/f3", "file:/a/b/f4")
.map(new Path(_))
.toSet

val actual = res.toSet
assert(actual.equals(expected))
}

test("Content.recFilesApply returns empty list for directories without files.") {
val directory = Directory("file:/")

val res = Content.recFilesApply(
new Path("file:/"),
directory,
(f, prefix) => new Path(prefix, f.name))

assert(res.isEmpty)
}

test("Content.files api lists all files from Content object.") {
val content = Content(Directory("file:/", subDirs =
Seq(
Expand Down

0 comments on commit 50bc56a

Please sign in to comment.