Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add clear dataset operator #830

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
6 changes: 5 additions & 1 deletion silk-core/src/main/scala/org/silkframework/config/Port.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,8 @@ case class FixedNumberOfInputs(ports: Seq[Port]) extends InputPorts
*/
case class FlexibleNumberOfInputs(portDefinition: Port = FlexibleSchemaPort,
min: Int = 0,
max: Option[Int] = None) extends InputPorts
max: Option[Int] = None) extends InputPorts

object InputPorts {
final val NoInputPorts = FixedNumberOfInputs(Seq.empty)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ object SilkVocab {
val TripleSchemaType: String = namespace + "TripleSchemaType"
val QuadSchemaType: String = namespace + "QuadSchemaType"

// Clear dataset
val ClearDatasetType: String = namespace + "ClearDatasetType"

// Empty table
val EmptySchemaType: String = namespace + "EmptySchemaType"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ class CombinedEntitySink(val sinks: Seq[EntitySink]) extends EntitySink {
}
}

override def clear()(implicit userContext: UserContext): Unit = {
override def clear(force: Boolean = false)(implicit userContext: UserContext): Unit = {
for(sink <- sinks) {
sink.clear()
sink.clear(force)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ trait DataSink extends CloseableDataset {

/**
* Makes sure that the next write will start from an empty dataset.
*
* @param force If set to true, it should clear the dataset no matter what the config is.
*/
def clear()(implicit userContext: UserContext): Unit
def clear(force: Boolean = false)(implicit userContext: UserContext): Unit = { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ case class DatasetSpec[+DatasetType <: Dataset](plugin: DatasetType,
/** Datasets don't define input schemata, because any data can be written to them. */
override def inputPorts: InputPorts = {
if(readOnly || characteristics.readOnly) {
FixedNumberOfInputs(Seq.empty)
InputPorts.NoInputPorts
} else if(characteristics.supportsMultipleWrites) {
FlexibleNumberOfInputs()
} else {
Expand Down Expand Up @@ -291,7 +291,12 @@ object DatasetSpec {
/**
* Makes sure that the next write will start from an empty dataset.
*/
override def clear()(implicit userContext: UserContext): Unit = entitySink.clear()
override def clear(force: Boolean = false)(implicit userContext: UserContext): Unit = {
if(datasetSpec.readOnly) {
throw new RuntimeException(s"Cannot clear dataset, because it is configured as read-only.")
}
entitySink.clear(force)
}

@inline
private def prependUri(uri: String, values: IndexedSeq[Seq[String]]): IndexedSeq[Seq[String]] = {
Expand Down Expand Up @@ -354,7 +359,12 @@ object DatasetSpec {
/**
* Makes sure that the next write will start from an empty dataset.
*/
override def clear()(implicit userContext: UserContext): Unit = linkSink.clear()
override def clear(force: Boolean = false)(implicit userContext: UserContext): Unit = {
if(datasetSpec.readOnly) {
throw new RuntimeException(s"Cannot clear dataset, because it is configured as read-only.")
}
linkSink.clear(force)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ trait DirtyTrackingFileDataSink extends DataSink {
DirtyTrackingFileDataSink.addUpdatedFile(resource.name)
super.close()
}

abstract override def clear(force: Boolean)(implicit userContext: UserContext): Unit = {
DirtyTrackingFileDataSink.addUpdatedFile(resource.name)
super.clear(force)
}
}

object DirtyTrackingFileDataSink {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object EmptyDataset extends Dataset with Serializable {
* Makes sure that the next write will start from an empty dataset.
* Does nothing as this dataset is always empty
*/
override def clear()(implicit userContext: UserContext): Unit = {}
override def clear(force: Boolean = false)(implicit userContext: UserContext): Unit = {}
}

/**
Expand All @@ -67,7 +67,7 @@ object EmptyDataset extends Dataset with Serializable {
* Makes sure that the next write will start from an empty dataset.
* Does nothing as this dataset is always empty
*/
override def clear()(implicit userContext: UserContext): Unit = {}
override def clear(force: Boolean = false)(implicit userContext: UserContext): Unit = {}
}

override def characteristics: DatasetCharacteristics = DatasetCharacteristics.attributesOnly()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ case class FilteredLinkSink(linkSink: LinkSink, filterFn: Link => Boolean) exten
/**
* Makes sure that the next write will start from an empty dataset.
*/
override def clear()(implicit userContext: UserContext): Unit = linkSink.clear()
override def clear(force: Boolean = false)(implicit userContext: UserContext): Unit = linkSink.clear(force)
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object SafeModeDataSource extends DataSource {

object SafeModeSink extends DataSink with LinkSink with EntitySink {

override def clear()(implicit userContext: UserContext): Unit = {
override def clear(force: Boolean = false)(implicit userContext: UserContext): Unit = {
SafeModeException.throwSafeModeException()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class TableLinkSink(entitySink: EntitySink) extends LinkSink {
entitySink.close()
}

override def clear()(implicit userContext: UserContext): Unit = {
entitySink.clear()
override def clear(force: Boolean = false)(implicit userContext: UserContext): Unit = {
entitySink.clear(force)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.silkframework.dataset.operations

import org.silkframework.config._
import org.silkframework.entity.EntitySchema
import org.silkframework.execution.EmptyEntityHolder
import org.silkframework.execution.local.LocalEntities
import org.silkframework.runtime.plugin.annotations.Plugin

@Plugin(
id = "clearDataset",
label = "Clear dataset",
description =
"""Clears the dataset that is connected to the output of this operator."""
)
case class ClearDatasetOperator() extends CustomTask {

/**
* The input ports and their schemata.
*/
override def inputPorts: InputPorts = InputPorts.NoInputPorts

/**
* The output port and it's schema.
* None, if this operator does not generate any output.
*/
override def outputPort: Option[Port] = Some(FixedSchemaPort(ClearDatasetOperator.clearDatasetSchema))
}

object ClearDatasetOperator {
private val clearDatasetSchema = EntitySchema(SilkVocab.ClearDatasetType, IndexedSeq.empty)

case class ClearDatasetTable(task: Task[TaskSpec]) extends LocalEntities with EmptyEntityHolder {
override def entitySchema: EntitySchema = clearDatasetSchema
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.silkframework.dataset.operations

import org.silkframework.config.{Task, TaskSpec}
import org.silkframework.dataset.operations.ClearDatasetOperator.ClearDatasetTable
import org.silkframework.execution.local.{LocalEntities, LocalExecution, LocalExecutor}
import org.silkframework.execution.{ExecutionReport, ExecutionReportUpdater, ExecutorOutput, SimpleExecutionReport}
import org.silkframework.runtime.activity.ActivityContext
import org.silkframework.runtime.plugin.PluginContext

/** Executes a clear dataset operator. */
case class ClearDatasetOperatorLocalExecutor() extends LocalExecutor[ClearDatasetOperator] {

override def execute(task: Task[ClearDatasetOperator],
inputs: Seq[LocalEntities],
output: ExecutorOutput,
execution: LocalExecution,
context: ActivityContext[ExecutionReport])
(implicit pluginContext: PluginContext): Option[LocalEntities] = {
context.value.update(SimpleExecutionReport(
task = task,
summary = Seq.empty,
warnings = Seq.empty,
error = None,
isDone = true,
entityCount = 1,
operation = Some("generate clear instruction"),
operationDesc = "clear instruction generated"
))
Some(ClearDatasetTable(task))
}
}

case class ClearDatasetOperatorExecutionReportUpdater(task: Task[TaskSpec],
context: ActivityContext[ExecutionReport]) extends ExecutionReportUpdater {

override def operationLabel: Option[String] = Some("clear dataset")

override def entityLabelSingle: String = "dataset"
override def entityLabelPlural: String = "datasets"
override def entityProcessVerb: String = "cleared"

override def minEntitiesBetweenUpdates: Int = 1

override def additionalFields(): Seq[(String, String)] = Seq(
"Cleared dataset" -> task.fullLabel
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import org.silkframework.config.{Prefixes, Task, TaskSpec}
import org.silkframework.dataset.CloseableDataset.using
import org.silkframework.dataset.DatasetSpec.{EntitySinkWrapper, GenericDatasetSpec}
import org.silkframework.dataset._
import org.silkframework.dataset.operations.ClearDatasetOperator.ClearDatasetTable
import org.silkframework.dataset.operations.ClearDatasetOperatorExecutionReportUpdater
import org.silkframework.dataset.rdf._
import org.silkframework.entity._
import org.silkframework.execution._
Expand Down Expand Up @@ -147,6 +149,8 @@ abstract class LocalDatasetExecutor[DatasetType <: Dataset] extends DatasetExecu
uploadFilesViaGraphStore(dataset, graphStoreFiles, reportUpdater)
case sparqlUpdateTable: SparqlUpdateEntityTable =>
executeSparqlUpdateQueries(dataset, sparqlUpdateTable, execution)
case _: ClearDatasetTable =>
executeClearDataset(dataset)
case et: LocalEntities =>
writeGenericLocalEntities(dataset, et, execution)
}
Expand Down Expand Up @@ -229,6 +233,17 @@ abstract class LocalDatasetExecutor[DatasetType <: Dataset] extends DatasetExecu
}
}

private def executeClearDataset(dataset: Task[DatasetSpec[DatasetType]])
(implicit userContext: UserContext, context: ActivityContext[ExecutionReport]): Unit = {
if(dataset.readOnly) {
throw new RuntimeException(s"Cannot clear dataset '${dataset.fullLabel}', because it is configured as read-only.")
}
val executionReport = ClearDatasetOperatorExecutionReportUpdater(dataset, context)
dataset.entitySink.clear(force = true)
executionReport.increaseEntityCounter()
executionReport.executionDone()
}

/** Buffers queries to make prediction about how many queries will be executed.
*
* @param bufferSize max size of queries that should be buffered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.silkframework.config.Task.GenericTaskFormat
import org.silkframework.config.TaskSpec.TaskSpecXmlFormat
import org.silkframework.dataset.DatasetSpec.{DatasetSpecFormat, DatasetTaskXmlFormat}
import org.silkframework.dataset.VariableDataset
import org.silkframework.dataset.operations.{ClearDatasetOperator, ClearDatasetOperatorLocalExecutor}
import org.silkframework.entity.EntitySchema.EntitySchemaFormat
import org.silkframework.entity.ValueType
import org.silkframework.execution.local.LocalExecutionManager
Expand All @@ -32,13 +33,19 @@ import scala.language.existentials
*/
class CorePlugins extends PluginModule {

override def pluginClasses: Seq[Class[_ <: AnyPlugin]] = datasets ++ serializers ++ valueTypes :+ classOf[LocalExecutionManager]
override def pluginClasses: Seq[Class[_ <: AnyPlugin]] = datasets ++ datasetOperations ++ serializers ++ valueTypes :+ classOf[LocalExecutionManager]

private def datasets: Seq[Class[_ <: AnyPlugin]] =
classOf[InternalDataset] ::
classOf[VariableDataset] ::
Nil

private def datasetOperations: Seq[Class[_ <: AnyPlugin]] = {
classOf[ClearDatasetOperator] ::
classOf[ClearDatasetOperatorLocalExecutor] ::
Nil
}

private def serializers: Seq[Class[_ <: AnyPlugin]] =
TaskSpecXmlFormat.getClass ::
GenericTaskFormat.getClass ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ case class DummyLinkSink(writeLinkFn: (Link, String) => Unit,
writeLinkFn(link, predicateUri)
}

override def clear()(implicit userContext: UserContext): Unit = { clearFn() }
override def clear(force: Boolean = false)(implicit userContext: UserContext): Unit = { clearFn() }

override def close()(implicit userContext: UserContext): Unit = {}
}
Expand All @@ -79,7 +79,7 @@ case class DummyEntitySink(writeEntityFn: (String, Seq[Seq[String]]) => Unit,
writeEntityFn(subject, values)
}

override def clear()(implicit userContext: UserContext): Unit = { clearFn() }
override def clear(force: Boolean = false)(implicit userContext: UserContext): Unit = { clearFn() }

override def closeTable()(implicit userContext: UserContext): Unit = {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ abstract class HierarchicalSink extends EntitySink {
/**
* Makes sure that the next write will start from an empty dataset.
*/
override def clear()(implicit userContext: UserContext): Unit = { }
override def clear(force: Boolean = false)(implicit userContext: UserContext): Unit = { }

/**
* Outputs all entities in the cache to a HierarchicalEntityWriter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ case class CsvDataset (
quoteEscapeCharacter: String = "\"",
@Param(label = "ZIP file regex", value = "If the input resource is a ZIP file, files inside the file are filtered via this regex.", advanced = true)
override val zipFileRegex: String = CsvDataset.defaultZipFileRegex,
@Param(label = "Delete file before workflow execution",
value = "If set to true this will clear the specified file before executing a workflow that writes to it.",
@Param(label = "Delete file before workflow execution (deprecated)",
value = "This is deprecated, use the 'Clear dataset' operator instead to clear a dataset in a workflow. If set to true this will clear the specified file before executing a workflow that writes to it.",
advanced = true)
clearBeforeExecution: Boolean = false) extends Dataset with DatasetPluginAutoConfigurable[CsvDataset]
with CsvDatasetTrait with TextBulkResourceBasedDataset with WritableResourceDataset {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.silkframework.util.Uri
import java.io.{File, IOException}
import java.util.logging.Logger

class CsvSink(val resource: WritableResource, settings: CsvSettings) extends DataSink with DirtyTrackingFileDataSink {
class CsvSink(val resource: WritableResource, settings: CsvSettings) extends DirtyTrackingFileDataSink {
private val log: Logger = Logger.getLogger(getClass.getName)

@volatile
Expand Down Expand Up @@ -41,8 +41,8 @@ class CsvSink(val resource: WritableResource, settings: CsvSettings) extends Dat
/**
* Makes sure that the next write will start from an empty dataset.
*/
override def clear()(implicit userContext: UserContext): Unit = {
if(settings.clearBeforeExecution) {
override def clear(force: Boolean = false)(implicit userContext: UserContext): Unit = {
if(settings.clearBeforeExecution || force) {
val resourceFile = new File(resource.path).getAbsoluteFile
val resourcePath = resourceFile.toPath
val crcFile = new File(resourcePath.getParent.toFile, s".${resourcePath.getFileName.toString}.crc")
Expand All @@ -56,6 +56,7 @@ class CsvSink(val resource: WritableResource, settings: CsvSettings) extends Dat
case e: IOException =>
log.warning("IO exception occurred when deleting CRC file: " + e.getMessage)
}
super.clear(force)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class TextFileSink(ds: TextFileDataset) extends EntitySink with LinkSink {
writeEntity("", IndexedSeq(Seq(link.source), Seq(link.target)))
}

override def clear()(implicit userContext: UserContext): Unit = {
override def clear(force: Boolean = false)(implicit userContext: UserContext): Unit = {
ds.file.writeString("", codec = ds.codec)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ class JsonSink (val resource: WritableResource,
/**
* Makes sure that the next write will start from an empty dataset.
*/
override def clear()(implicit userContext: UserContext): Unit = {
override def clear(force: Boolean = false)(implicit userContext: UserContext): Unit = {
resource.delete()
super.clear(force)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ case class GraphStoreSink(graphStore: GraphStoreTrait,
writeStatement(subject, predicate, obj, valueType)
}

override def clear()(implicit userContext: UserContext): Unit = {
if(dropGraphOnClear) {
override def clear(force: Boolean = false)(implicit userContext: UserContext): Unit = {
if(dropGraphOnClear || force) {
log.fine("Clearing graph " + graphUri)
graphStore.deleteGraph(graphUri)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ class SparqlSink(params: SparqlParams,
}
}

override def clear()(implicit userContext: UserContext): Unit = {
if(dropGraphOnClear) {
override def clear(force: Boolean = false)(implicit userContext: UserContext): Unit = {
if(dropGraphOnClear || force) {
params.graph match {
case Some(graph) =>
endpoint.update(s"DROP SILENT GRAPH <$graph>")
Expand Down
Loading