From 263353a611104299f633271ed8b7285607b808b0 Mon Sep 17 00:00:00 2001 From: Keyang Date: Mon, 23 Jan 2017 21:35:35 +0000 Subject: [PATCH] fix 134 --- libs/core/Converter.js | 480 +++++++++++++++++++------------------- libs/core/fileline.js | 10 +- package.json | 2 +- test/data/dataIgnoreEmpty | 3 + test/data/longHeader | 2 + test/testCSVConverter.js | 3 +- test/testCSVConverter2.js | 15 ++ 7 files changed, 275 insertions(+), 240 deletions(-) create mode 100644 test/data/longHeader diff --git a/libs/core/Converter.js b/libs/core/Converter.js index 87e9651..f16f311 100644 --- a/libs/core/Converter.js +++ b/libs/core/Converter.js @@ -3,311 +3,319 @@ var Transform = require("stream").Transform; var os = require("os"); var eol = os.EOL; // var Processor = require("./Processor.js"); -var defParam=require("./defParam"); -var csvline=require("./csvline"); -var fileline=require("./fileline"); -var dataToCSVLine=require("./dataToCSVLine"); -var fileLineToCSVLine=require("./fileLineToCSVLine"); -var linesToJson=require("./linesToJson"); -var CSVError=require("./CSVError"); -var workerMgr=require("./workerMgr"); -function Converter(params,options) { - Transform.call(this,options); - _param=defParam(params); - this._options=options || {}; +var defParam = require("./defParam"); +var csvline = require("./csvline"); +var fileline = require("./fileline"); +var dataToCSVLine = require("./dataToCSVLine"); +var fileLineToCSVLine = require("./fileLineToCSVLine"); +var linesToJson = require("./linesToJson"); +var CSVError = require("./CSVError"); +var workerMgr = require("./workerMgr"); +function Converter(params, options) { + Transform.call(this, options); + _param = defParam(params); + this._options = options || {}; this.param = _param; - this.param._options=this._options; + this.param._options = this._options; // this.resultObject = new Result(this); // this.pipe(this.resultObject); // it is important to have downstream for a transform otherwise it will stuck this.started = false;//indicate if parsing has started. this.recordNum = 0; - this.lineNumber=0; //file line number - this._csvLineBuffer=""; - this.lastIndex=0; // index in result json array + this.lineNumber = 0; //file line number + this._csvLineBuffer = ""; + this.lastIndex = 0; // index in result json array //this._pipe(this.lineParser).pipe(this.processor); // this.initNoFork(); - if (this.param.forked){ - this.param.forked=false; - this.workerNum=2; - } + if (this.param.forked) { + this.param.forked = false; + this.workerNum = 2; + } this.flushCb = null; this.processEnd = false; this.sequenceBuffer = []; - this._needJson=null; - this._needEmitResult=null; - this._needEmitFinalResult=null; - this._needEmitJson=null; - this._needPush=null; - this._needEmitCsv=null; - this._csvTransf=null; - this.finalResult=[]; + this._needJson = null; + this._needEmitResult = null; + this._needEmitFinalResult = null; + this._needEmitJson = null; + this._needPush = null; + this._needEmitCsv = null; + this._csvTransf = null; + this.finalResult = []; // this.on("data", function() {}); this.on("error", emitDone(this)); this.on("end", emitDone(this)); this.initWorker(); - process.nextTick(function(){ - if (this._needEmitFinalResult === null){ - this._needEmitFinalResult=this.listeners("end_parsed").length > 0 + process.nextTick(function () { + if (this._needEmitFinalResult === null) { + this._needEmitFinalResult = this.listeners("end_parsed").length > 0 } - if (this._needEmitResult===null){ - this._needEmitResult=this.listeners("record_parsed").length>0 + if (this._needEmitResult === null) { + this._needEmitResult = this.listeners("record_parsed").length > 0 } - if (this._needEmitJson === null){ - this._needEmitJson=this.listeners("json").length>0 + if (this._needEmitJson === null) { + this._needEmitJson = this.listeners("json").length > 0 } - if (this._needEmitCsv === null){ - this._needEmitCsv=this.listeners("csv").length>0 + if (this._needEmitCsv === null) { + this._needEmitCsv = this.listeners("csv").length > 0 } - if (this._needJson === null){ - this._needJson=this._needEmitJson || this._needEmitFinalResult || this._needEmitResult || this.transform || this._options.objectMode; + if (this._needJson === null) { + this._needJson = this._needEmitJson || this._needEmitFinalResult || this._needEmitResult || this.transform || this._options.objectMode; } - if (this._needPush === null){ - this._needPush = this.listeners("data").length > 0 || this.listeners("readable").length>0 + if (this._needPush === null) { + this._needPush = this.listeners("data").length > 0 || this.listeners("readable").length > 0 // this._needPush=false; } - this.param._needParseJson=this._needJson || this._needPush; + this.param._needParseJson = this._needJson || this._needPush; }.bind(this)) return this; } util.inherits(Converter, Transform); -function emitDone(conv){ - return function(err){ - process.nextTick(function(){ - conv.emit('done',err) +function emitDone(conv) { + return function (err) { + process.nextTick(function () { + conv.emit('done', err) }) } } -Converter.prototype._transform = function(data, encoding, cb) { +Converter.prototype._transform = function (data, encoding, cb) { if (this.param.toArrayString && this.started === false) { this.started = true; - if (this._needPush){ + if (this._needPush) { this.push("[" + eol, "utf8"); } } - data=data.toString("utf8"); - var self=this; - this.preProcessRaw(data,function(d){ - if (d && d.length>0){ + data = data.toString("utf8"); + var self = this; + this.preProcessRaw(data, function (d) { + if (d && d.length > 0) { self.processData(self.prepareData(d), cb); - }else{ + } else { cb(); } }) }; -Converter.prototype.prepareData=function(data){ - return this._csvLineBuffer+data; +Converter.prototype.prepareData = function (data) { + return this._csvLineBuffer + data; } -Converter.prototype.setPartialData=function(d){ - this._csvLineBuffer=d; +Converter.prototype.setPartialData = function (d) { + this._csvLineBuffer = d; } -Converter.prototype.processData=function(data,cb){ - var params=this.param; - var fileLines=fileline(data,this.param) - if (this.preProcessLine && typeof this.preProcessLine === "function"){ - fileLines.lines=this._preProcessLines(fileLines.lines,this.lastIndex) +Converter.prototype.processData = function (data, cb) { + var params = this.param; + if (params.ignoreEmpty && !params._headers){ + data=data.trimLeft(); } - if (!params._headers){ //header is not inited. init header - this.processHead(fileLines,cb); - }else{ - if (params.workerNum<=1){ - var lines=fileLineToCSVLine(fileLines,params); - this.setPartialData(lines.partial); - var jsonArr=linesToJson(lines.lines,params,this.recordNum); - this.processResult(jsonArr) - this.lastIndex+=jsonArr.length; - this.recordNum+=jsonArr.length; - cb(); - }else{ - this.workerProcess(fileLines,cb); + var fileLines = fileline(data, this.param) + if (fileLines.lines.length > 0) { + if (this.preProcessLine && typeof this.preProcessLine === "function") { + fileLines.lines = this._preProcessLines(fileLines.lines, this.lastIndex) } + if (!params._headers) { //header is not inited. init header + this.processHead(fileLines, cb); + } else { + if (params.workerNum <= 1) { + var lines = fileLineToCSVLine(fileLines, params); + this.setPartialData(lines.partial); + var jsonArr = linesToJson(lines.lines, params, this.recordNum); + this.processResult(jsonArr) + this.lastIndex += jsonArr.length; + this.recordNum += jsonArr.length; + cb(); + } else { + this.workerProcess(fileLines, cb); + } + } + }else{ + this.setPartialData(fileLines.partial) + cb(); } } -Converter.prototype._preProcessLines=function(lines,startIdx){ - var rtn=[] - for (var i=0;i0){ - this.workerMgr=workerMgr(); - this.workerMgr.initWorker(workerNum,this.param); +Converter.prototype.initWorker = function () { + var workerNum = this.param.workerNum - 1; + if (workerNum > 0) { + this.workerMgr = workerMgr(); + this.workerMgr.initWorker(workerNum, this.param); } } -Converter.prototype.preRawData=function(func){ - this.preProcessRaw=func; +Converter.prototype.preRawData = function (func) { + this.preProcessRaw = func; return this; } -Converter.prototype.preFileLine=function(func){ - this.preProcessLine=func; +Converter.prototype.preFileLine = function (func) { + this.preProcessLine = func; return this; } /** * workerpRocess does not support embeded multiple lines. */ -Converter.prototype.workerProcess=function(fileLine,cb){ - var self=this; - var line=fileLine - var eol=this.getEol() +Converter.prototype.workerProcess = function (fileLine, cb) { + var self = this; + var line = fileLine + var eol = this.getEol() this.setPartialData(line.partial) - this.workerMgr.sendWorker(line.lines.join(eol)+eol,this.lastIndex,cb,function(results,lastIndex){ - var cur=self.sequenceBuffer[0]; - if (cur.idx === lastIndex){ - cur.result=results; - var records=[]; - while (self.sequenceBuffer[0] && self.sequenceBuffer[0].result){ - var buf=self.sequenceBuffer.shift(); - records=records.concat(buf.result) - } - self.processResult(records) - self.recordNum+=records.length; - }else{ - for (var i=0;i1){ + if (this.param.workerNum > 1) { this.workerMgr.setParams(params); } - var res=linesToJson(lines.lines,params,0); + var res = linesToJson(lines.lines, params, 0); this.processResult(res); - this.lastIndex+=res.length; - this.recordNum+=res.length; + this.lastIndex += res.length; + this.recordNum += res.length; cb(); - }else{ + } else { cb(); } } -Converter.prototype.processResult=function(result){ - - for (var i=0;i 0 && this._needPush) { this.push("," + eol); } - if (this._options && this._options.objectMode){ + if (this._options && this._options.objectMode) { this.push(resultJson); - }else{ - if (this._needPush){ - if (resultStr===null){ - resultStr=JSON.stringify(resultJson) + } else { + if (this._needPush) { + if (resultStr === null) { + resultStr = JSON.stringify(resultJson) } - this.push(!this.param.toArrayString?resultStr+eol:resultStr, "utf8"); + this.push(!this.param.toArrayString ? resultStr + eol : resultStr, "utf8"); } } } -Converter.prototype.preProcessRaw=function(data,cb){ +Converter.prototype.preProcessRaw = function (data, cb) { cb(data); } -Converter.prototype.preProcessLine=function(line,lineNumber){ - return line; +Converter.prototype.preProcessLine = function (line, lineNumber) { + return line; } -Converter.prototype._flush = function(cb) { +Converter.prototype._flush = function (cb) { var self = this; - this.flushCb=function(){ - self.emit("end_parsed",self.finalResult); - if (self.workerMgr){ + this.flushCb = function () { + self.emit("end_parsed", self.finalResult); + if (self.workerMgr) { self.workerMgr.destroyWorker(); } cb() - if (!self._needPush){ + if (!self._needPush) { self.emit("end") } }; if (this._csvLineBuffer.length > 0) { - if (this._csvLineBuffer[this._csvLineBuffer.length-1] != this.getEol()){ - this._csvLineBuffer+=this.getEol(); + if (this._csvLineBuffer[this._csvLineBuffer.length - 1] != this.getEol()) { + this._csvLineBuffer += this.getEol(); } - this.processData(this._csvLineBuffer,function(){ + this.processData(this._csvLineBuffer, function () { this.checkAndFlush(); }.bind(this)); } else { @@ -322,100 +330,98 @@ Converter.prototype._flush = function(cb) { // this.child.stdin.end(); // this.child.on("exit", cb); // } -Converter.prototype.checkAndFlush = function() { - if (this._csvLineBuffer.length !== 0) { - this.emit("error", CSVError.unclosed_quote(this.recordNum,this._csvLineBuffer), this._csvLineBuffer); - } - if (this.param.toArrayString && this._needPush) { - this.push(eol + "]", "utf8"); - } - if (this.workerMgr && this.workerMgr.isRunning()){ - this.workerMgr.drain=function(){ - this.flushCb(); - }.bind(this); - }else{ +Converter.prototype.checkAndFlush = function () { + if (this._csvLineBuffer.length !== 0) { + this.emit("error", CSVError.unclosed_quote(this.recordNum, this._csvLineBuffer), this._csvLineBuffer); + } + if (this.param.toArrayString && this._needPush) { + this.push(eol + "]", "utf8"); + } + if (this.workerMgr && this.workerMgr.isRunning()) { + this.workerMgr.drain = function () { this.flushCb(); - } + }.bind(this); + } else { + this.flushCb(); + } } -Converter.prototype.getEol = function(data) { +Converter.prototype.getEol = function (data) { if (!this.param.eol && data) { - for (var i=0;i