Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Procrustes improvements #2

Merged
merged 1 commit into from
Jul 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions src/main/kotlin/com/bardsoftware/papeeria/backend/FileStage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ typealias SaveTaskConsumer = (FileTask) -> Unit

private val LOG = LoggerFactory.getLogger("base.file")

class FileStageException(msg: String, cause: Throwable?) : Exception(msg, cause)
/**
* @author [email protected]
*/
Expand All @@ -47,13 +48,30 @@ class FileStage(
private val fetchContext = newFixedThreadPoolContext(args.postgresConnections, "FetchThread")

suspend fun process(taskId: String, task: FileRequestDto, resultChannel: Channel<Path>) {
val volumePath = procrustes.makeVolume(ProcrustesVolumeRequest(id = taskId))
fetch(task = task, saveTaskConsumer = createSaveTaskConsumer(volumePath))
resultChannel.send(volumePath)
val responseChannel = Channel<ProcrustesVolumeResponse>()
responseChannel.invokeOnClose { ex ->
if (ex != null) {
resultChannel.close(FileStageException("Unexpected exception", ex))
}
}
procrustes.makeVolume(ProcrustesVolumeRequest(id = taskId), responseChannel)
val response = responseChannel.receive()
when (response.code) {
200 -> {
fetch(task = task, saveTaskConsumer = createSaveTaskConsumer(response.path))
resultChannel.send(response.path)
}
502, 504 -> {
resultChannel.close(FileStageException("Procrustes timed out or failed to connect", null))
}
else -> {
resultChannel.close(FileStageException("Procrustes failed with code ${response.code}: ${response.message}", null))
}
}
}

suspend fun fetch(task: FileRequestDto, isFetchNeeded: FetchPredicate = { true },
saveTaskConsumer: SaveTaskConsumer) {
private suspend fun fetch(task: FileRequestDto, isFetchNeeded: FetchPredicate = { true },
saveTaskConsumer: SaveTaskConsumer) {
val taskChannel = Channel<FileTask>()
for (file in task.fileList) {
GlobalScope.launch(fetchContext) {
Expand Down Expand Up @@ -83,7 +101,7 @@ class FileStage(
LOG.info("All saved")
}

fun createSaveTaskConsumer(rootAbsPath: Path): SaveTaskConsumer {
private fun createSaveTaskConsumer(rootAbsPath: Path): SaveTaskConsumer {
return { saveTask ->
LOG.debug("Saving file {} at {}", saveTask.dto.name, rootAbsPath)
if (saveTask.fetchError.first) {
Expand Down
60 changes: 46 additions & 14 deletions src/main/kotlin/com/bardsoftware/papeeria/backend/Procrustes.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@ import com.google.api.client.http.HttpStatusCodes
import com.google.common.io.Files
import com.xenomachina.argparser.ArgParser
import com.xenomachina.argparser.default
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.newFixedThreadPoolContext
import java.io.File
import java.io.IOException
import java.net.ConnectException
import java.net.SocketTimeoutException
import java.nio.file.Path
import java.nio.file.Paths

class ProcrustesArgs(parser: ArgParser) {
val procrustesImpl by parser.storing("--procrustes-impl", help = "Volume manager implementation").default { "plain" }
Expand All @@ -35,26 +41,31 @@ class ProcrustesArgs(parser: ArgParser) {
data class ProcrustesVolumeRequest(val id: String,
val sizeMb: Int = 64,
val makeClean: Boolean = false)
data class ProcrustesVolumeResponse(val path: Path, val code: Int, val message: String)


/**
* @author [email protected]
*/
interface Procrustes {
@Throws(IOException::class)
suspend fun makeVolume(req: ProcrustesVolumeRequest): Path
suspend fun makeVolume(req: ProcrustesVolumeRequest, outputChannel: Channel<ProcrustesVolumeResponse>)
suspend fun rmVolume(id: String)
}

class PlainProcrustes : Procrustes {
private val tempDir = Files.createTempDir()

override suspend fun makeVolume(req: ProcrustesVolumeRequest): Path {
val dir = File(tempDir, req.id)
if (!dir.exists()) {
if (!dir.mkdirs()) {
throw IOException("Failed to mkdirs directory ${dir.path}")
override suspend fun makeVolume(req: ProcrustesVolumeRequest, outputChannel: Channel<ProcrustesVolumeResponse>) {
GlobalScope.launch {
val dir = File(tempDir, req.id)
if (!dir.exists()) {
if (!dir.mkdirs()) {
outputChannel.close(IOException("Failed to mkdirs directory ${dir.path}"))
}
}
outputChannel.send(ProcrustesVolumeResponse(dir.toPath(), 200, ""))
}
return dir.toPath()
}

override suspend fun rmVolume(id: String) {
Expand All @@ -65,26 +76,47 @@ class PlainProcrustes : Procrustes {
}
}

private val procrustesContext = newFixedThreadPoolContext(1, "ProcrustesThread")
private val EMPTY_PATH = Paths.get("")

class HttpProcrustes(private val procrustesRoot: Path,
private val procrustesAddress: String,
private val procrustesPassword: String) : Procrustes {
override suspend fun makeVolume(req: ProcrustesVolumeRequest): Path {
override suspend fun makeVolume(req: ProcrustesVolumeRequest, outputChannel: Channel<ProcrustesVolumeResponse>) {
val url = """http://$procrustesAddress/create"""
return runBlocking {
GlobalScope.launch(procrustesContext) {
val (_, resp, result) = Fuel.get(url, listOf(
"password" to procrustesPassword,
"namespace" to "tex",
"name" to req.id,
"paid" to if (req.sizeMb > 64) "true" else "false",
"withReset" to req.makeClean.toString()
)).timeout(5000).awaitStringResponseResult()
)).timeout(10000).awaitStringResponseResult()
result.fold({
when (resp.statusCode) {
HttpStatusCodes.STATUS_CODE_CREATED, HttpStatusCodes.STATUS_CODE_OK -> procrustesRoot.resolve(req.id)
else -> throw IOException(resp.responseMessage)
HttpStatusCodes.STATUS_CODE_CREATED,
HttpStatusCodes.STATUS_CODE_OK -> outputChannel.send(
ProcrustesVolumeResponse(procrustesRoot.resolve(req.id), 200, "")
)
else -> outputChannel.send(
ProcrustesVolumeResponse(EMPTY_PATH, resp.statusCode, resp.responseMessage)
)
}
}, {
throw IOException(it)
val cause = it.exception
when (cause) {
is SocketTimeoutException -> {
Fuel.reset()
outputChannel.send(ProcrustesVolumeResponse(EMPTY_PATH, 504, cause.message ?: ""))
}
is ConnectException -> {
Fuel.reset()
outputChannel.send(ProcrustesVolumeResponse(EMPTY_PATH, 502, cause.message ?: ""))
}
else -> {
outputChannel.close(cause)
}
}
})
}
}
Expand Down