diff --git a/pom.xml b/pom.xml index 9babd2b..4ced8be 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ org.ohnlp.medxn medxn - 1.0.5 + 1.0.6 The MedXN medication Information extraction pipeline @@ -104,12 +104,12 @@ org.ohnlp.medtagger medtagger - 1.0.46 + 1.0.72 org.ohnlp.backbone api - 1.0.6 + 3.0.24 provided diff --git a/src/main/java/org/ohnlp/medxn/backbone/MedXNBackboneTransform.java b/src/main/java/org/ohnlp/medxn/backbone/MedXNBackboneTransform.java index 922ed00..fc24b9c 100644 --- a/src/main/java/org/ohnlp/medxn/backbone/MedXNBackboneTransform.java +++ b/src/main/java/org/ohnlp/medxn/backbone/MedXNBackboneTransform.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.uima.UIMAFramework; @@ -23,6 +24,10 @@ import org.apache.uima.util.CasCreationUtils; import org.apache.uima.util.InvalidXMLException; import org.ohnlp.backbone.api.Transform; +import org.ohnlp.backbone.api.annotations.ComponentDescription; +import org.ohnlp.backbone.api.annotations.ConfigurationProperty; +import org.ohnlp.backbone.api.components.OneToOneTransform; +import org.ohnlp.backbone.api.config.InputColumn; import org.ohnlp.backbone.api.exceptions.ComponentInitializationException; import org.ohnlp.medxn.type.Drug; import org.ohnlp.medxn.type.MedAttr; @@ -40,15 +45,36 @@ * Given an input row representing a document, duplicates row contents and adds a nlp_output_json column for each * drug mention in the input text. */ -public class MedXNBackboneTransform extends Transform { +@ComponentDescription( + name = "MedXN", + desc = "Extracts Drug Mentions and Associated Information from Text using MedXN" +) +public class MedXNBackboneTransform extends OneToOneTransform { + + @ConfigurationProperty( + path = "input", + desc = "Column to use as input" + ) + private InputColumn inputField; + + private Schema schema; + @Override - public void initFromConfig(JsonNode jsonNode) throws ComponentInitializationException { - // No Configurable Initialization + public Schema calculateOutputSchema(Schema schema) { + List fields = new LinkedList<>(schema.getFields()); + fields.add(Schema.Field.of("nlp_output_json", Schema.FieldType.STRING)); + this.schema = Schema.of(fields.toArray(new Schema.Field[0])); + return this.schema; } @Override public PCollection expand(PCollection input) { - return null; + return input.apply("Run MedXN", ParDo.of(new MedXNPipelineFunction(inputField.getSourceColumnName(), schema))); + } + + @Override + public void init() throws ComponentInitializationException { + } private static class MedXNPipelineFunction extends DoFn { @@ -56,6 +82,7 @@ private static class MedXNPipelineFunction extends DoFn { private static ThreadLocal sdf = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssXXX")); private final String textField; + private final Schema schema; // UIMA components are not serializable, and thus must be initialized per-executor via the @Setup annotation private transient AnalysisEngine aae; @@ -63,8 +90,9 @@ private static class MedXNPipelineFunction extends DoFn { private transient CAS cas; private transient ObjectMapper om; - private MedXNPipelineFunction(String textField) { + private MedXNPipelineFunction(String textField, Schema schema) { this.textField = textField; + this.schema = schema; } @Setup @@ -81,9 +109,6 @@ public void init() throws IOException, InvalidXMLException, ResourceInitializati @ProcessElement public void processElement(@Element Row input, OutputReceiver output) { // First create the output row schema - List fields = new LinkedList<>(input.getSchema().getFields()); - fields.add(Schema.Field.of("nlp_output_json", Schema.FieldType.STRING)); - Schema schema = Schema.of(fields.toArray(new Schema.Field[0])); String text = input.getString(this.textField); cas.reset(); cas.setDocumentText(text); diff --git a/src/main/java/org/ohnlp/medxn/backbone/MedXNOutputToOHDSIFormatTransform.java b/src/main/java/org/ohnlp/medxn/backbone/MedXNOutputToOHDSIFormatTransform.java index 536425c..57adf61 100644 --- a/src/main/java/org/ohnlp/medxn/backbone/MedXNOutputToOHDSIFormatTransform.java +++ b/src/main/java/org/ohnlp/medxn/backbone/MedXNOutputToOHDSIFormatTransform.java @@ -10,6 +10,7 @@ import org.apache.beam.sdk.values.Row; import org.joda.time.Instant; import org.ohnlp.backbone.api.Transform; +import org.ohnlp.backbone.api.components.OneToOneTransform; import org.ohnlp.backbone.api.exceptions.ComponentInitializationException; import java.io.BufferedReader; @@ -31,12 +32,24 @@ * Important: Requires that the OHDSI vocabulary load query first be run and loaded into backbone resources folder * as ohdsi_rxnorm_map.csv. Please refer to documentation for further details */ -public class MedXNOutputToOHDSIFormatTransform extends Transform { +public class MedXNOutputToOHDSIFormatTransform extends OneToOneTransform { private static ThreadLocal sdf = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssXXX")); + private Schema schema; @Override - public void initFromConfig(JsonNode config) throws ComponentInitializationException { + public Schema calculateOutputSchema(Schema input) { + // First transform row schemas + List fields = new LinkedList<>(input.getFields()); + fields.add(Schema.Field.of("section_concept_id", Schema.FieldType.INT32)); + fields.add(Schema.Field.of("lexical_variant", Schema.FieldType.STRING)); + fields.add(Schema.Field.of("snippet", Schema.FieldType.STRING)); + fields.add(Schema.Field.of("note_nlp_concept_id", Schema.FieldType.INT32)); + fields.add(Schema.Field.of("note_nlp_source_concept_id", Schema.FieldType.INT32)); + fields.add(Schema.Field.of("nlp_datetime", Schema.FieldType.DATETIME)); + fields.add(Schema.Field.of("term_modifiers", Schema.FieldType.STRING)); + this.schema = Schema.of(fields.toArray(new Schema.Field[0])); + return this.schema; } @Override @@ -65,16 +78,7 @@ public void init() { @ProcessElement public void processElement(@Element Row input, OutputReceiver output) throws JsonProcessingException, ParseException { - // First transform row schemas - List fields = new LinkedList<>(input.getSchema().getFields()); - fields.add(Schema.Field.of("section_concept_id", Schema.FieldType.INT32)); - fields.add(Schema.Field.of("lexical_variant", Schema.FieldType.STRING)); - fields.add(Schema.Field.of("snippet", Schema.FieldType.STRING)); - fields.add(Schema.Field.of("note_nlp_concept_id", Schema.FieldType.INT32)); - fields.add(Schema.Field.of("note_nlp_source_concept_id", Schema.FieldType.INT32)); - fields.add(Schema.Field.of("nlp_datetime", Schema.FieldType.DATETIME)); - fields.add(Schema.Field.of("term_modifiers", Schema.FieldType.STRING)); - Schema schema = Schema.of(fields.toArray(new Schema.Field[0])); + MedXNDrugBean bean = om.readValue(input.getString("nlp_output_json"), MedXNDrugBean.class); @@ -94,4 +98,8 @@ public void processElement(@Element Row input, OutputReceiver output) throw })); } + @Override + public void init() throws ComponentInitializationException { + + } }