Skip to content

Commit caee050

Browse files
committed
Remove a number of potential crashers, some macro/token logic errors
and fix the issue where the main thread would try to exit before all workers had finished. Also, force the actual compressed file rewrite to be exclusive, so that a crash in a parallel non-crucial operation is less likely to lead to data loss.
1 parent 5ca24b2 commit caee050

File tree

5 files changed

+68
-25
lines changed

5 files changed

+68
-25
lines changed

CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ find_package(ZLIB 1.2.8 REQUIRED)
88
pkg_check_modules(SPARSEHASH libsparsehash)
99
include_directories(${ZLIB_INCLUDE_DIRS} ${SPARSEHASH_INCLUDEDIR})
1010
link_directories(${SPARSEHASH_LIBRARY_DIRS})
11+
add_definitions(-DSUPPORT_PARALLEL)
1112

1213
add_executable(afsctool
1314
afsctool.c
@@ -19,7 +20,7 @@ add_executable(afsctool
1920
CritSectEx/timing.c
2021
)
2122
target_include_directories(afsctool PRIVATE ${CMAKE_SOURCE_DIR})
22-
set_target_properties(afsctool PROPERTIES COMPILE_FLAGS -DSUPPORT_PARALLEL)
23+
# set_target_properties(afsctool PROPERTIES COMPILE_FLAGS -DSUPPORT_PARALLEL)
2324
target_link_libraries(afsctool ${ZLIB_LIBRARIES} ${SPARSEHASH_LIBRARIES} "-framework CoreServices")
2425

2526
install(TARGETS afsctool DESTINATION ${CMAKE_INSTALL_PREFIX}/bin)

ParallelProcess.cpp

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ ParallelFileProcessor::ParallelFileProcessor(const int n, const int verbose)
9292
{
9393
threadPool.clear();
9494
nJobs = n;
95+
nProcessing = 0;
9596
nProcessed = 0;
9697
allDoneEvent = NULL;
9798
ioLock = new CRITSECTLOCK(4000);
@@ -157,7 +158,11 @@ int ParallelFileProcessor::run()
157158
}
158159
if( allDoneEvent ){
159160
DWORD waitResult = ~WAIT_OBJECT_0;
160-
while( nJobs >= 1 && !quitRequested() && size() > 0 && waitResult != WAIT_OBJECT_0 ){
161+
// contrary to what one might expect, we should NOT use size()==0 as a stopping criterium.
162+
// The queue size is decreased when a worker picks a new file to process, not when it's
163+
// finished. Using size()==0 as a stopping criterium caused the processing interrupts
164+
// that were observed with large files.
165+
while( nJobs >= 1 && !quitRequested() && waitResult != WAIT_OBJECT_0 ){
161166
waitResult = WaitForSingleObject( allDoneEvent, 2000 );
162167
if( nJobs ){
163168
double perc = 100.0 * nProcessed / N;
@@ -176,10 +181,15 @@ int ParallelFileProcessor::run()
176181
prevPerc = perc;
177182
}
178183
}
179-
if( quitRequested() && !threadPool.empty() ){
180-
// the WaitForSingleObject() call above was interrupted by the signal that
181-
// led to quitRequested() being set and as a result the workers haven't yet
182-
// had the chance to exit cleanly. Give them that chance now.
184+
}
185+
if( (quitRequested() && !threadPool.empty()) || nProcessing > 0 ){
186+
// the WaitForSingleObject() call above was interrupted by the signal that
187+
// led to quitRequested() being set and as a result the workers haven't yet
188+
// had the chance to exit cleanly. Give them that chance now.
189+
fprintf( stderr, " quitting [%ld]...", nProcessing ); fflush(stderr);
190+
waitResult = WaitForSingleObject( allDoneEvent, 2000 );
191+
for( i = 0 ; i < 4 && waitResult == WAIT_TIMEOUT ; ++i ){
192+
fprintf( stderr, " [%ld]...", nProcessing) ; fflush(stderr);
183193
waitResult = WaitForSingleObject( allDoneEvent, 2000 );
184194
}
185195
}
@@ -191,7 +201,11 @@ int ParallelFileProcessor::run()
191201
while( !threadPool.empty() ){
192202
FileProcessor *thread = threadPool.front();
193203
if( thread->GetExitCode() == (THREAD_RETURN)STILL_ACTIVE ){
194-
fprintf( stderr, "Stopping worker thread #%d that is still active!\n", i );
204+
fprintf( stderr, "Stopping worker thread #%d that is still %s!\n", i, (thread->scope)? "processing" : "active" );
205+
std::string currentFileName = thread->currentFileName();
206+
if( currentFileName.c_str()[0] ){
207+
fprintf( stderr, "\tcurrent file: %s\n", currentFileName.c_str() );
208+
}
195209
thread->Stop(true);
196210
}
197211
if( thread->nProcessed ){
@@ -229,10 +243,10 @@ int ParallelFileProcessor::run()
229243

230244
int ParallelFileProcessor::workerDone(FileProcessor *worker)
231245
{ CRITSECTLOCK::Scope scope(threadLock);
232-
char name[17];
246+
// char name[17];
247+
// pthread_getname_np( (pthread_t) GetThreadId(worker->GetThread()), name, sizeof(name) );
248+
// fprintf( stderr, "workerDone(): worker \"%s\" is done; %ld workers left\n", name, nJobs - 1 );
233249
nJobs -= 1;
234-
pthread_getname_np( (pthread_t) GetThreadId(worker->GetThread()), name, sizeof(name) );
235-
// fprintf( stderr, "workerDone(): worker \"%s\" is done\n", name );
236250
if( nJobs <= 0 ){
237251
if( allDoneEvent ){
238252
SetEvent(allDoneEvent);
@@ -252,8 +266,15 @@ DWORD FileProcessor::Run(LPVOID arg)
252266
// create a scoped lock without closing it immediately
253267
CRITSECTLOCK::Scope scp(PP->ioLock, 0);
254268
scope = &scp;
269+
currentEntry = &entry;
270+
_InterlockedIncrement(&PP->nProcessing);
255271
entry.compress( this, PP );
272+
_InterlockedDecrement(&PP->nProcessing);
256273
_InterlockedIncrement(&PP->nProcessed);
274+
currentEntry = NULL;
275+
nProcessed += 1;
276+
scope = NULL;
277+
257278
runningTotalRaw += entry.fileInfo.st_size;
258279
runningTotalCompressed += (entry.compressedSize > 0)? entry.compressedSize : entry.fileInfo.st_size;
259280
if( PP->verbose() > 1 ){
@@ -264,8 +285,6 @@ DWORD FileProcessor::Run(LPVOID arg)
264285
cpuUsage += info.cpu_usage/10.0;
265286
}
266287
}
267-
nProcessed += 1;
268-
scope = NULL;
269288
}
270289
}
271290
return DWORD(nProcessed);
@@ -281,19 +300,25 @@ void FileProcessor::InitThread()
281300

282301
inline bool FileProcessor::lockScope()
283302
{
284-
if( scope ){
285-
PP->ioLockedFlag = scope->Lock();
303+
if( PP ){
304+
if( scope ){
305+
PP->ioLockedFlag = scope->Lock();
306+
}
307+
return PP->ioLockedFlag;
286308
}
287-
return PP->ioLockedFlag;
309+
return false;
288310
}
289311

290312
inline bool FileProcessor::unLockScope()
291313
{
292-
if( scope ){
293-
scope->Unlock();
294-
PP->ioLockedFlag = *scope;
314+
if( PP ){
315+
if( scope ){
316+
scope->Unlock();
317+
PP->ioLockedFlag = *scope;
318+
}
319+
return PP->ioLockedFlag;
295320
}
296-
return PP->ioLockedFlag;
321+
return false;
297322
}
298323

299324
// ================================= C interface functions =================================

ParallelProcess_p.h

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// kate: auto-insert-doxygen true; backspace-indents true; indent-width 4; keep-extra-spaces true; replace-tabs true; tab-indents true; tab-width 4;
1+
// kate: auto-insert-doxygen true; backspace-indents true; indent-width 4; keep-extra-spaces true; replace-tabs false; tab-indents true; tab-width 4;
22

33
/*
44
* @file ParallelProcess_p.h
@@ -157,6 +157,8 @@ class ParallelFileProcessor : public ParallelProcessor<FileEntry>
157157
int workerDone(FileProcessor *worker);
158158
// the number of configured or active worker threads
159159
volatile long nJobs;
160+
// the number of processing threads
161+
volatile long nProcessing;
160162
// the number of processed items
161163
volatile long nProcessed;
162164
// a pool containing pointers to the worker threads
@@ -183,7 +185,15 @@ class FileProcessor : public Thread
183185
, runningTotalCompressed(0)
184186
, runningTotalRaw(0)
185187
, cpuUsage(0.0)
188+
, currentEntry(NULL)
186189
{}
190+
~FileProcessor()
191+
{
192+
// better be safe than sorry
193+
PP = NULL;
194+
scope = NULL;
195+
currentEntry = NULL;
196+
}
187197
bool lockScope();
188198
bool unLockScope();
189199

@@ -192,6 +202,10 @@ class FileProcessor : public Thread
192202
return procID;
193203
}
194204

205+
inline std::string currentFileName() const
206+
{
207+
return (currentEntry)? currentEntry->fileName : "";
208+
}
195209
protected:
196210
DWORD Run(LPVOID arg);
197211
void InitThread();
@@ -210,6 +224,8 @@ class FileProcessor : public Thread
210224
const int procID;
211225
CRITSECTLOCK::Scope *scope;
212226
friend class ParallelFileProcessor;
227+
private:
228+
FileEntry *currentEntry;
213229
};
214230

215231

afsctool.c

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ static const char *lbasename(const char *url)
117117
return c;
118118
}
119119

120-
#if SUPPORT_PARALLEL
120+
#ifdef SUPPORT_PARALLEL
121121
void compressFile(const char *inFile, struct stat *inFileInfo, struct folder_info *folderinfo, void *worker )
122122
#else
123123
void compressFile(const char *inFile, struct stat *inFileInfo, struct folder_info *folderinfo)
@@ -404,7 +404,8 @@ void compressFile(const char *inFile, struct stat *inFileInfo, struct folder_inf
404404
}
405405

406406
#ifdef SUPPORT_PARALLEL
407-
if( exclusive_io && worker ){
407+
// 20160928: the actual rewrite of the file is never done in parallel
408+
if( worker ){
408409
locked = lockParallelProcessorIO(worker);
409410
}
410411
#else
@@ -512,7 +513,7 @@ void compressFile(const char *inFile, struct stat *inFileInfo, struct folder_inf
512513
chmod(inFile, orig_mode);
513514
}
514515
#ifdef SUPPORT_PARALLEL
515-
if( exclusive_io && worker ){
516+
if( worker ){
516517
locked = unLockParallelProcessorIO(worker);
517518
}
518519
#endif
@@ -1669,7 +1670,7 @@ void printUsage()
16691670
"-b make a backup of files before compressing them\n"
16701671
#ifdef SUPPORT_PARALLEL
16711672
"-jN compress (only compressable) files using <N> threads (compression is concurrent, disk IO is exclusive)\n"
1672-
"-JN read, compress and write files (only compressable ones) using <N> threads (everything including disk IO is concurrent)\n"
1673+
"-JN read, compress and write files (only compressable ones) using <N> threads (everything is concurrent except writing the compressed file)\n"
16731674
#endif
16741675
"-<level> Compression level to use when compressing (ranging from 1 to 9, with 1 being the fastest and 9 being the best - default is 5)\n");
16751676
}

afsctool.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ struct filetype_info
9999
long long int num_hard_link_files;
100100
};
101101

102-
#if SUPPORT_PARALLEL
102+
#ifdef SUPPORT_PARALLEL
103103
# ifdef __cplusplus
104104
extern void compressFile(const char *inFile, struct stat *inFileInfo, struct folder_info *folderinfo, void *worker=NULL);
105105
# else

0 commit comments

Comments
 (0)