Skip to content

Commit

Permalink
add disable, restart to importer
Browse files Browse the repository at this point in the history
  • Loading branch information
andreas committed Apr 28, 2024
1 parent d2c94ab commit 5500fe7
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 17 deletions.
100 changes: 85 additions & 15 deletions server/handler/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,19 @@ def getConverterCommand(self, input, outname):
return [sys.executable,self._converter,self.getOutFileOrDir(outname),input]

class ConversionResult:
def __init__(self,md5,error=None,ts=None):
def __init__(self,md5,error=None,ts=None,disabled=False):
self.error=error
self.md5=md5
self.ts=ts if ts is not None else time.time()
self.disabled=disabled
def valid(self):
return self.md5 is not None
def dateStr(self):
return datetime.datetime.fromtimestamp(self.ts).strftime('%Y-%m-%d %H:%M')
def isOk(self):
return self.error is None
def isDisabled(self):
return self.disabled


class ConversionCandidate:
Expand All @@ -170,9 +173,11 @@ class State(Enum):
NOCONV=4
DONENC=5 #done but no converter
ERROR=6
DISABLED=7
def __init__(self,name,filename):
self.name=name # type: str
self.filename=filename # type: str
self.isDir=os.path.isdir(filename)
self.converter=None # type: ConverterApi
self.score=0
self.currentmd5=None # type: str
Expand All @@ -199,10 +204,16 @@ def getInfoKey(self):
def _isConverted(self):
return self.result.valid() and self.result.isOk()
def hasError(self):
return self.result.valid() and not self.result.isOk()
return not self.result.isOk()
def isDisabled(self):
return self.result.isDisabled()
def getState(self):
if self.running:
return self.State.CONVERTING
if self.hasError():
return self.State.ERROR
if self.isDisabled():
return self.State.DISABLED
if self.score == 0:
if self._isConverted():
return self.State.DONENC
Expand All @@ -226,6 +237,8 @@ def getWstate(self,st=None):
return WorkerStatus.RUNNING
if st == self.State.DONE:
return WorkerStatus.INACTIVE
if st == self.State.DISABLED:
return WorkerStatus.INACTIVE
return WorkerStatus.ERROR
def getStateInfo(self,st=None):
if st is None:
Expand All @@ -239,9 +252,16 @@ def getStateInfo(self,st=None):
if st == self.State.SETTLE:
return "changed, waiting to settle (%d files)"%self.score
if st == self.State.DONE:
return "already converted at %s"%self.result.dateStr()
return "converted at %s"%self.result.dateStr()
if st == self.State.DISABLED:
return "disabled"
return "conversion failed at %s: %s"%(self.result.dateStr(),str(self.result.error))

def couldConvert(self):
if self.isDisabled() or self.hasError() or self.score < 1:
return False
return self.md5changed()

class Conversion:
def __init__(self,process,candidate:ConversionCandidate):
self.process=process
Expand Down Expand Up @@ -270,8 +290,10 @@ class AVNImporter(AVNWorker):
P_DIR=WorkerParameter('converterDir','',editable=False)
P_IMPORTDIR=WorkerParameter('importDir','import',editable=False)
P_WORKDIR=WorkerParameter('workDir','',editable=False)
P_WAITTIME=WorkerParameter('waittime',30,type=WorkerParameter.T_NUMBER,
description='time to wait in seconds before a conversion is started after detecting a change (and no further change)')
P_WAITTIME=WorkerParameter('dirsettle',30,type=WorkerParameter.T_NUMBER,
description='time to wait in seconds before a conversion is started after detecting a change of a directory (settle time)')
P_FWAITTIME=WorkerParameter('filesettls',10,type=WorkerParameter.T_NUMBER,
description='time to wait in seconds before a conversion is started after detecting a change of a directory (settle time)')
P_SCANINTERVAL=WorkerParameter('scanInterval', 10, type=WorkerParameter.T_NUMBER,
description='seconds between import dir scan',editable=False)
@classmethod
Expand All @@ -287,6 +309,7 @@ def getConfigParam(cls, child=None):
cls.P_IMPORTDIR, #directory below our data dir
cls.P_WORKDIR, #working directory
cls.P_WAITTIME,
cls.P_FWAITTIME,
cls.P_SCANINTERVAL
]
return rt
Expand All @@ -313,7 +336,6 @@ def __init__(self,param):
self.lastTimeStamps={} #a dictionary of timestamps - key is the directory/filename, value the last read timestamp
self.candidateTimes={} #dictionary with candidates for conversion - same layout as lastTimeStamps
self.runningConversion=None # type: Conversion
self.waittime=None
self.chartbase=None
self.importDir=AVNHandlerManager.getDirWithDefault(self.param, self.P_IMPORTDIR.name, 'import')
self.workDir=AVNHandlerManager.getDirWithDefault(self.param, self.P_WORKDIR.name, 'work')
Expand Down Expand Up @@ -379,15 +401,16 @@ def run(self):
self.setInfo("converter","free",WorkerStatus.STARTED)
waitingCandidates={}
while not self.shouldStop():
self.waittime=self.getWParam(self.P_WAITTIME)
waittime=self.getWParam(self.P_WAITTIME)
fwaittime=self.getWParam(self.P_FWAITTIME)
AVNLog.debug("mainloop")
currentCandidates=self.readImportDir()
self.candidates=currentCandidates
if self.runningConversion is None:
self.setInfo("main","scanning",WorkerStatus.NMEA)
now=time.monotonic()
for k in currentCandidates:
if not k.md5changed() or k.score == 0:
if not k.couldConvert():
try:
del waitingCandidates[k.name]
except:
Expand All @@ -399,7 +422,8 @@ def run(self):
if existing.hasChanged(k):
waitingCandidates[k.name]=k
else:
if (existing.timestamp+self.waittime) < now:
wt=waittime if k.isDir else fwaittime
if (existing.timestamp+wt) < now:
startConversion=True
waitingCandidates[k.name]=k
else:
Expand Down Expand Up @@ -433,7 +457,7 @@ def run(self):
#immediately scan again as we could start the next
continue
if self.runningConversion is not None or len(waitingCandidates.keys())> 0:
self.wait(self.waittime/5)
self.wait(min(waittime,fwaittime)/5)
else:
self.wait(self.getWParam(self.P_SCANINTERVAL))
if self.runningConversion is not None:
Expand Down Expand Up @@ -476,6 +500,14 @@ def getLastResult(self, name):
except:
pass
return rt
def deleteLastResult(self,name):
resultname=self.getResultName(name)
try:
os.unlink(resultname)
return True
except:
return False

def getNameFromImport(self,dirOrFile):
path,ext=os.path.splitext(dirOrFile)
path=os.path.basename(path)
Expand Down Expand Up @@ -571,11 +603,7 @@ def deleteImportByCandidate(self,candidate):
except:
pass
fullname=candidate.filename
resultname=self.getResultName(candidate.name)
try:
os.unlink(resultname)
except:
pass
self.deleteLastResult(candidate.name)
workdir=os.path.join(self.workDir,candidate.name)
if os.path.isdir(workdir):
try:
Expand Down Expand Up @@ -657,6 +685,19 @@ def findCandidate(self,key,isName=False):
if c.getInfoKey() == key:
return c

def checkDuplicateName(self,dir,name):
'''
check if there is a file with the same name but different extension
@param dir:
@param name:
@return:
'''
path,ext=os.path.splitext(name)
for f in os.listdir(dir):
fpath,fext=os.path.splitext(f)
if fpath == path and fext != ext:
return f

def handleApiRequest(self, type, subtype, requestparam, **kwargs):
if type == "list":
status=self.getInfo(['main','converter'])
Expand Down Expand Up @@ -696,6 +737,7 @@ def handleApiRequest(self, type, subtype, requestparam, **kwargs):
return AVNUtil.getReturnData(error="item %s not found"%name)
if not self.deleteImportByCandidate(candidate):
return AVNUtil.getReturnData(error="unable to delete")
self.wakeUp()
return AVNUtil.getReturnData()
if type == "download":
name=AVNUtil.getHttpRequestParam(requestparam,'name',True)
Expand Down Expand Up @@ -735,8 +777,12 @@ def handleApiRequest(self, type, subtype, requestparam, **kwargs):
os.makedirs(dir)
if not os.path.isdir(dir):
return AVNUtil.getReturnData(error="unable to create directory %s"%dir)
dupl=self.checkDuplicateName(dir,name)
if dupl is not None:
return AVNUtil.getReturnData(error="name conflict with %s (same base)"%dupl)
fname=os.path.join(dir,name)
AVNDirectoryHandlerBase.writeAtomic(fname,handler.rfile,True,int(kwargs.get('flen')))
self.wakeUp()
return AVNUtil.getReturnData()

if type == "api":
Expand Down Expand Up @@ -773,7 +819,31 @@ def handleApiRequest(self, type, subtype, requestparam, **kwargs):
running=self.runningConversion
if running is None or not running.running:
return AVNUtil.getReturnData(error="no conversion running")
name=AVNUtil.getHttpRequestParam(requestparam,"name",False)
if name is not None and name != (ConversionCandidate.KPRFX+running.candidate.name):
return AVNUtil.getReturnData(error="%s not running any more"%name)
self.stopConversion(running.candidate.name)
self.wakeUp()
return AVNUtil.getReturnData()
if command == 'restart':
name=AVNUtil.getHttpRequestParam(requestparam,"name",True)
candidate=self.findCandidate(name)
if candidate is None:
return AVNUtil.getReturnData(error="%s not found"%name)
if candidate.getState() == ConversionCandidate.State.CONVERTING:
return AVNUtil.getReturnData(error="%s currently converting"%name)
self.deleteLastResult(candidate.name)
self.wakeUp()
return AVNUtil.getReturnData()
if command == 'disable':
name=AVNUtil.getHttpRequestParam(requestparam,"name",True)
candidate=self.findCandidate(name)
if candidate is None:
return AVNUtil.getReturnData(error="%s not found"%name)
if candidate.getState() == ConversionCandidate.State.CONVERTING:
return AVNUtil.getReturnData(error="%s currently converting"%name)
self.saveLastResult(candidate.name,ConversionResult(None,disabled=True))
self.wakeUp()
return AVNUtil.getReturnData()
return AVNUtil.getReturnData(error="unknown command for import")

Expand Down
2 changes: 1 addition & 1 deletion viewer/components/UploadHandler.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ class UploadHandler extends React.Component{
if (props.xhdr) props.xhdr.abort();
this.stateHelper.setState({}, true);
if (this.props.errorCallback) {
this.props.errorCallback();
this.props.errorCallback("cancelled");
}
}}
/>
Expand Down
54 changes: 53 additions & 1 deletion viewer/gui/ImporterPage.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const ImporterItem=(props)=>{
};

const ImportStatusDialog=(props)=>{
let isRunning=props.status === 'NMEA';
return <div className="importStatusDialog flexInner">
<h3 className="dialogTitle">{props.name}</h3>
<div className="dialogRow">
Expand All @@ -79,6 +80,54 @@ const ImportStatusDialog=(props)=>{
>
Delete
</DB>
{!isRunning && <DB name="disable"
onClick={() => {
Requests.getJson({
type:'import',
request:'api',
command:'disable',
name: props.name
})
.then((res)=>{
props.closeCallback()
})
.catch((e)=>Toast("unable to disable "+e,5000));
}}
>
Disable
</DB>}
{isRunning && <DB name="stop"
onClick={() => {
Requests.getJson({
type:'import',
request:'api',
command:'cancel',
name: props.name
})
.then((res)=>{
props.closeCallback()
})
.catch((e)=>Toast("unable to stop "+e,5000));
}}
>
Stop
</DB>}
{!isRunning && <DB name="restart"
onClick={() => {
Requests.getJson({
type:'import',
request:'api',
command:'restart',
name: props.name
})
.then((res)=>{
props.closeCallback()
})
.catch((e)=>Toast("unable to restart "+e,5000));
}}
>
Restart
</DB>}
{props.canDownload && <DB name="download"
onClick={() => {
props.download(props.name);
Expand Down Expand Up @@ -322,7 +371,10 @@ class ImporterPage extends React.Component{
doneCallback={()=>{

}}
errorCallback={(err)=>{if (err) Toast(err);}}
errorCallback={
(err)=>{
if (err) Toast(err);
}}
uploadSequence={this.state.uploadSequence}
checkNameCallback={this.checkNameForUpload}
/>
Expand Down
6 changes: 6 additions & 0 deletions viewer/style/buttons.less
Original file line number Diff line number Diff line change
Expand Up @@ -641,4 +641,10 @@ button{
&.stop{
.dialogIconButton('stop_circle.svg')
}
&.restart{
.dialogIconButton('ic_refresh.svg')
}
&.disable{
.dialogIconButton('ic_hide.svg')
}
}

0 comments on commit 5500fe7

Please sign in to comment.