Skip to content

Commit

Permalink
Merge pull request #2 from bardsoftware/tkt-1-non-blocking-procrustes
Browse files Browse the repository at this point in the history
Procrustes improvements
  • Loading branch information
dbarashev committed Jul 15, 2019
2 parents 2ca1eaf + b522d49 commit e89f523
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 20 deletions.
30 changes: 24 additions & 6 deletions src/main/kotlin/com/bardsoftware/papeeria/backend/FileStage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ typealias SaveTaskConsumer = (FileTask) -> Unit

private val LOG = LoggerFactory.getLogger("base.file")

class FileStageException(msg: String, cause: Throwable?) : Exception(msg, cause)
/**
* @author [email protected]
*/
Expand All @@ -47,13 +48,30 @@ class FileStage(
private val fetchContext = newFixedThreadPoolContext(args.postgresConnections, "FetchThread")

suspend fun process(taskId: String, task: FileRequestDto, resultChannel: Channel<Path>) {
val volumePath = procrustes.makeVolume(ProcrustesVolumeRequest(id = taskId))
fetch(task = task, saveTaskConsumer = createSaveTaskConsumer(volumePath))
resultChannel.send(volumePath)
val responseChannel = Channel<ProcrustesVolumeResponse>()
responseChannel.invokeOnClose { ex ->
if (ex != null) {
resultChannel.close(FileStageException("Unexpected exception", ex))
}
}
procrustes.makeVolume(ProcrustesVolumeRequest(id = taskId), responseChannel)
val response = responseChannel.receive()
when (response.code) {
200 -> {
fetch(task = task, saveTaskConsumer = createSaveTaskConsumer(response.path))
resultChannel.send(response.path)
}
502, 504 -> {
resultChannel.close(FileStageException("Procrustes timed out or failed to connect", null))
}
else -> {
resultChannel.close(FileStageException("Procrustes failed with code ${response.code}: ${response.message}", null))
}
}
}

suspend fun fetch(task: FileRequestDto, isFetchNeeded: FetchPredicate = { true },
saveTaskConsumer: SaveTaskConsumer) {
private suspend fun fetch(task: FileRequestDto, isFetchNeeded: FetchPredicate = { true },
saveTaskConsumer: SaveTaskConsumer) {
val taskChannel = Channel<FileTask>()
for (file in task.fileList) {
GlobalScope.launch(fetchContext) {
Expand Down Expand Up @@ -83,7 +101,7 @@ class FileStage(
LOG.info("All saved")
}

fun createSaveTaskConsumer(rootAbsPath: Path): SaveTaskConsumer {
private fun createSaveTaskConsumer(rootAbsPath: Path): SaveTaskConsumer {
return { saveTask ->
LOG.debug("Saving file {} at {}", saveTask.dto.name, rootAbsPath)
if (saveTask.fetchError.first) {
Expand Down
60 changes: 46 additions & 14 deletions src/main/kotlin/com/bardsoftware/papeeria/backend/Procrustes.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@ import com.google.api.client.http.HttpStatusCodes
import com.google.common.io.Files
import com.xenomachina.argparser.ArgParser
import com.xenomachina.argparser.default
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.newFixedThreadPoolContext
import java.io.File
import java.io.IOException
import java.net.ConnectException
import java.net.SocketTimeoutException
import java.nio.file.Path
import java.nio.file.Paths

class ProcrustesArgs(parser: ArgParser) {
val procrustesImpl by parser.storing("--procrustes-impl", help = "Volume manager implementation").default { "plain" }
Expand All @@ -35,26 +41,31 @@ class ProcrustesArgs(parser: ArgParser) {
data class ProcrustesVolumeRequest(val id: String,
val sizeMb: Int = 64,
val makeClean: Boolean = false)
data class ProcrustesVolumeResponse(val path: Path, val code: Int, val message: String)


/**
* @author [email protected]
*/
interface Procrustes {
@Throws(IOException::class)
suspend fun makeVolume(req: ProcrustesVolumeRequest): Path
suspend fun makeVolume(req: ProcrustesVolumeRequest, outputChannel: Channel<ProcrustesVolumeResponse>)
suspend fun rmVolume(id: String)
}

class PlainProcrustes : Procrustes {
private val tempDir = Files.createTempDir()

override suspend fun makeVolume(req: ProcrustesVolumeRequest): Path {
val dir = File(tempDir, req.id)
if (!dir.exists()) {
if (!dir.mkdirs()) {
throw IOException("Failed to mkdirs directory ${dir.path}")
override suspend fun makeVolume(req: ProcrustesVolumeRequest, outputChannel: Channel<ProcrustesVolumeResponse>) {
GlobalScope.launch {
val dir = File(tempDir, req.id)
if (!dir.exists()) {
if (!dir.mkdirs()) {
outputChannel.close(IOException("Failed to mkdirs directory ${dir.path}"))
}
}
outputChannel.send(ProcrustesVolumeResponse(dir.toPath(), 200, ""))
}
return dir.toPath()
}

override suspend fun rmVolume(id: String) {
Expand All @@ -65,26 +76,47 @@ class PlainProcrustes : Procrustes {
}
}

private val procrustesContext = newFixedThreadPoolContext(1, "ProcrustesThread")
private val EMPTY_PATH = Paths.get("")

class HttpProcrustes(private val procrustesRoot: Path,
private val procrustesAddress: String,
private val procrustesPassword: String) : Procrustes {
override suspend fun makeVolume(req: ProcrustesVolumeRequest): Path {
override suspend fun makeVolume(req: ProcrustesVolumeRequest, outputChannel: Channel<ProcrustesVolumeResponse>) {
val url = """http://$procrustesAddress/create"""
return runBlocking {
GlobalScope.launch(procrustesContext) {
val (_, resp, result) = Fuel.get(url, listOf(
"password" to procrustesPassword,
"namespace" to "tex",
"name" to req.id,
"paid" to if (req.sizeMb > 64) "true" else "false",
"withReset" to req.makeClean.toString()
)).timeout(5000).awaitStringResponseResult()
)).timeout(10000).awaitStringResponseResult()
result.fold({
when (resp.statusCode) {
HttpStatusCodes.STATUS_CODE_CREATED, HttpStatusCodes.STATUS_CODE_OK -> procrustesRoot.resolve(req.id)
else -> throw IOException(resp.responseMessage)
HttpStatusCodes.STATUS_CODE_CREATED,
HttpStatusCodes.STATUS_CODE_OK -> outputChannel.send(
ProcrustesVolumeResponse(procrustesRoot.resolve(req.id), 200, "")
)
else -> outputChannel.send(
ProcrustesVolumeResponse(EMPTY_PATH, resp.statusCode, resp.responseMessage)
)
}
}, {
throw IOException(it)
val cause = it.exception
when (cause) {
is SocketTimeoutException -> {
Fuel.reset()
outputChannel.send(ProcrustesVolumeResponse(EMPTY_PATH, 504, cause.message ?: ""))
}
is ConnectException -> {
Fuel.reset()
outputChannel.send(ProcrustesVolumeResponse(EMPTY_PATH, 502, cause.message ?: ""))
}
else -> {
outputChannel.close(cause)
}
}
})
}
}
Expand Down

0 comments on commit e89f523

Please sign in to comment.