Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 44 additions & 11 deletions ZFSin/zfs/lib/libzfs/libzfs_import.c
Original file line number Diff line number Diff line change
Expand Up @@ -1036,8 +1036,10 @@ zpool_read_label_win(HANDLE h, off_t offset, uint64_t len, nvlist_t **config, in
drivesize = len;
size = P2ALIGN_TYPED(drivesize, sizeof(vdev_label_t), uint64_t);

if ((label = malloc(sizeof(vdev_label_t))) == NULL)
if ((label = malloc(sizeof(vdev_label_t))) == NULL) {
TraceWrite("func return.malloc() failed ThreadID:%lu [%s:%d]", threadID, __func__, __LINE__);
return (-1);
}

for (l = 0; l < VDEV_LABELS; l++) {
uint64_t state, guid, txg;
Expand Down Expand Up @@ -1088,6 +1090,8 @@ zpool_read_label_win(HANDLE h, off_t offset, uint64_t len, nvlist_t **config, in
free(label);
*config = expected_config;

TraceWrite("func return.ThreadID:%lu [%s:%d]", threadID, __func__, __LINE__);

return (0);
}

Expand Down Expand Up @@ -1487,7 +1491,10 @@ zpool_open_func_win(void *arg)
*/
if ((strncmp(rn->rn_name, "core", 4) == 0) ||
(strncmp(rn->rn_name, "watchdog", 8) == 0))
{
TraceWrite("func return.rn->rn_name:%s ThreadID:%lu [%s:%d]", rn->rn_name, threadID, __func__, __LINE__);
return;
}

/*
* Ignore failed stats. We only want regular files and block devices.
Expand All @@ -1504,6 +1511,9 @@ zpool_open_func_win(void *arg)
while (end && *end == '#') end++;
len = strtoull(end, &end, 10);
while (end && *end == '#') end++;

TraceWrite("calling createfile() for rn->rn_name:%s ThreadID:%lu [%s:%d]", rn->rn_name, threadID, __func__, __LINE__);

fd = CreateFile(end,
GENERIC_READ,
FILE_SHARE_READ /*| FILE_SHARE_WRITE*/,
Expand All @@ -1512,21 +1522,28 @@ zpool_open_func_win(void *arg)
FILE_ATTRIBUTE_NORMAL /*| FILE_FLAG_OVERLAPPED*/,
NULL);
if (fd == INVALID_HANDLE_VALUE) {
int error = GetLastError();
DWORD error = GetLastError();
TraceWrite("func return.CreateFile() failed with error %lu.rn->rn_name:%s ThreadID:%lu [%s:%d]", error,rn->rn_name, threadID, __func__, __LINE__);
return;
}

LARGE_INTEGER place;
place.QuadPart = offset;
SetFilePointerEx(fd, place, NULL, FILE_BEGIN); // If it fails, we cant read label
drive_len = len;

TraceWrite("Call SetFilePointerEx: offset:%lld,rn->rn_name:%s ThreadID:%lu [%s:%d]", place.QuadPart, rn->rn_name, threadID, __func__, __LINE__);
BOOL retval = SetFilePointerEx(fd, place, NULL, FILE_BEGIN); // If it fails, we cant read label

if(retval)
TraceWrite("SetFilePointerEx:success. rn->rn_name:%s ThreadID:%lu [%s:%d]", rn->rn_name, threadID, __func__, __LINE__);
else
TraceWrite("SetFilePointerEx:failed with error.%lu rn->rn_name:%s ThreadID:%lu [%s:%d]", GetLastError(), rn->rn_name, threadID, __func__, __LINE__);

drive_len = len;
} else {
// We have no openat() - so stich paths togther.
char fullpath[MAX_PATH];
snprintf(fullpath, sizeof(fullpath), "%s%s",
rn->rn_parent ? rn->rn_parent : "", rn->rn_name);
TraceWrite("FullPath :%s ThreadID:%lu [%s:%d]", fullpath, threadID,__func__,__LINE__);
TraceWrite("calling createfile().FullPath :%s ThreadID:%lu [%s:%d]", fullpath, threadID,__func__,__LINE__);
fd = CreateFile(fullpath,
GENERIC_READ,
FILE_SHARE_READ /*| FILE_SHARE_WRITE*/,
Expand All @@ -1535,17 +1552,23 @@ zpool_open_func_win(void *arg)
FILE_ATTRIBUTE_NORMAL /*| FILE_FLAG_OVERLAPPED*/,
NULL);
if (fd == INVALID_HANDLE_VALUE) {
int error = GetLastError();
DWORD error = GetLastError();
TraceWrite("func return.CreateFile() failed with error %lu.FullPath:%s ThreadID:%lu [%s:%d]", error, fullpath, threadID, __func__, __LINE__);
return;
}

TraceWrite("Get GetFileDriveSize().FullPath:%s ThreadID:%lu [%s:%d]", fullpath, threadID, __func__, __LINE__);
drive_len = GetFileDriveSize(fd);
}

TraceWrite("GetFileType() ThreadID:%lu [%s:%d]", threadID, __func__, __LINE__);
DWORD type = GetFileType(fd);
TraceWrite("Drive Size %llu , Drive type %d ThreadID:%lu [%s:%d]", drive_len, type, threadID,__func__, __LINE__);
//fprintf(stderr, "device '%s' filetype %d 0x%x\n", rn->rn_name, type, type);


TraceWrite("GetDriveType(),filetype=%d ThreadID:%lu [%s:%d]", type, threadID, __func__, __LINE__);
type = GetDriveType(rn->rn_name);
TraceWrite("Drive Size %llu , Drive type %d ThreadID:%lu [%s:%d]", drive_len, type, threadID, __func__, __LINE__);

//fprintf(stderr, "device '%s' filetype %d 0x%x\n", rn->rn_name, type, type);
//if ((fd = openat64(rn->rn_dfd, rn->rn_name, O_RDONLY)) < 0) {
// /* symlink to a device that's no longer there */
Expand All @@ -1559,6 +1582,7 @@ zpool_open_func_win(void *arg)
if (type == FILE_TYPE_DISK &&
drive_len < SPA_MINDEVSIZE) {
CloseHandle(fd);
TraceWrite("func return.drive_len %llu is less than SPA_MINDEVSIZE ThreadID:%lu [%s:%d]", drive_len, threadID, __func__, __LINE__);
return;
}

Expand All @@ -1579,19 +1603,24 @@ zpool_open_func_win(void *arg)
if ((zpool_read_label_win(fd, offset, drive_len, &config, &num_labels)) != 0) {
CloseHandle(fd);
(void)no_memory(rn->rn_hdl);
TraceWrite("func return.zpool_read_label_win failed. ThreadID:%lu [%s:%d]", threadID, __func__, __LINE__);
return;
}
TraceWrite("num_labels %d ThreadID:%lu [%s:%d]", num_labels, threadID,__func__, __LINE__);
if (num_labels == 0) {
CloseHandle(fd);
TraceWrite("Feeing nvlist. num_labels=0 ThreadID:%lu [%s:%d]", threadID, __func__, __LINE__);
nvlist_free(config);
TraceWrite("func return.num_labels=0 ThreadID:%lu [%s:%d]", threadID, __func__, __LINE__);
return;
}

CloseHandle(fd);

rn->rn_config = config;
rn->rn_num_labels = num_labels;

TraceWrite("func return.num_labels=%d config=%p ThreadID:%lu [%s:%d]", num_labels, config, threadID, __func__, __LINE__);
}

/*
Expand Down Expand Up @@ -1877,6 +1906,7 @@ zpool_find_import_impl(libzfs_handle_t *hdl, importargs_t *iarg)
* locks in the kernel, so going beyond this doesn't
* buy us much.
*/

t = taskq_create("z_import", 2 * max_ncpus, defclsyspri,
2 * max_ncpus, INT_MAX, TASKQ_PREPOPULATE);
for (slice = avl_first(&slice_cache); slice;
Expand Down Expand Up @@ -2265,17 +2295,19 @@ zpool_find_import_win(libzfs_handle_t *hdl, importargs_t *iarg)
* locks in the kernel, so going beyond this doesn't
* buy us much.
*/
TraceWrite("num of entries in slice_cache:%d. [%s:%d]", slice_cache.avl_numnodes, __func__, __LINE__);
t = taskq_create("z_import", 2 * max_ncpus, defclsyspri,
2 * max_ncpus, INT_MAX, TASKQ_PREPOPULATE);
for (slice = avl_first(&slice_cache); slice;
(slice = avl_walk(&slice_cache, slice,
AVL_AFTER)))
(void) taskq_dispatch(t, zpool_open_func_win, slice,
TQ_SLEEP);
TraceWrite("Going to wait for taskq thread to complete all zpool_open_func_win [%s:%d]", __func__, __LINE__);
TraceWrite("Main thread going to wait for taskq thread to complete all zpool_open_func_win [%s:%d]", __func__, __LINE__);
taskq_wait(t);
TraceWrite("Main thread going to wait for taskq destroy() to complete[%s:%d]", __func__, __LINE__);
taskq_destroy(t);
TraceWrite("All taskq zpool_open_func_win operation completed [%s:%d]", __func__, __LINE__);
TraceWrite("Main thread.All taskq zpool_open_func_win operation completed [%s:%d]", __func__, __LINE__);
cookie = NULL;

while ((slice = avl_destroy_nodes(&slice_cache,
Expand Down Expand Up @@ -2352,6 +2384,7 @@ zpool_find_import_win(libzfs_handle_t *hdl, importargs_t *iarg)
free(ne);
}

TraceWrite("Main thread. zpool_find_import_win return. [%s:%d]", __func__, __LINE__);
return (ret);
}
#endif // WIN32
Expand Down
47 changes: 46 additions & 1 deletion ZFSin/zfs/lib/libzpool/taskq.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ int taskq_now;
taskq_t *system_taskq;

#define TASKQ_ACTIVE 0x00010000
extern int TraceWrite(const char* fmt, ...);

static taskq_ent_t *
task_alloc(taskq_t *tq, int tqflags)
Expand Down Expand Up @@ -187,10 +188,13 @@ taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
void
taskq_wait(taskq_t *tq)
{
TraceWrite("Main thread. taskq_wait enter.trying to acquire lock. tq->tq_active=%d [%s:%d]", tq->tq_active, __func__, __LINE__);
mutex_enter(&tq->tq_lock);
TraceWrite("Main thread. taskq_wait. Acquired lock.tq->tq_active=%d,Is_task_list_empty:%d [%s:%d]", tq->tq_active, (tq->tq_task.tqent_next == &tq->tq_task), __func__, __LINE__);
while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0)
cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
mutex_exit(&tq->tq_lock);
TraceWrite("Main thread. taskq_wait exit [%s:%d]", __func__, __LINE__);
}

void
Expand All @@ -205,40 +209,62 @@ taskq_wait_outstanding(taskq_t *tq, taskqid_t id)
taskq_wait(tq);
}

static const DWORD duration = 15 * 1000;//msec
static ULONGLONG currentTime;
static ULONGLONG previousTime;

static void
taskq_thread(void *arg)
{
taskq_t *tq = arg;
taskq_ent_t *t;
boolean_t prealloc;
DWORD threadID = GetCurrentThreadId();

mutex_enter(&tq->tq_lock);
TraceWrite("taskq_thread func enter.acquired lock.tq->tq_active=%d ThreadID:%lu [%s:%d]", tq->tq_active,threadID, __func__, __LINE__);

while (tq->tq_flags & TASKQ_ACTIVE) {
if ((t = tq->tq_task.tqent_next) == &tq->tq_task) {
if (--tq->tq_active == 0)
cv_broadcast(&tq->tq_wait_cv);
cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock);
tq->tq_active++;

//print progress log periodically
currentTime = GetTickCount64();
if (currentTime - previousTime >= duration) {
TraceWrite("In taskq_thread while loop.tq->tq_active=%d tq->tq_nthreads:%d ThreadID:%lu [%s:%d]", tq->tq_active, tq->tq_nthreads,threadID, __func__, __LINE__);
previousTime = currentTime;
}
//
continue;
}
t->tqent_prev->tqent_next = t->tqent_next;
t->tqent_next->tqent_prev = t->tqent_prev;
t->tqent_next = NULL;
t->tqent_prev = NULL;
prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC;
TraceWrite("taskq Invoking callback func.tq->tq_active=%d ThreadID:%lu [%s:%d]", tq->tq_active, threadID, __func__, __LINE__);
mutex_exit(&tq->tq_lock);

rw_enter(&tq->tq_threadlock, RW_READER);
t->tqent_func(t->tqent_arg);
rw_exit(&tq->tq_threadlock);

mutex_enter(&tq->tq_lock);
TraceWrite("taskq callback func returned. tq->tq_active=%d ThreadID:%lu [%s:%d]", tq->tq_active, threadID, __func__, __LINE__);

if (!prealloc)
task_free(tq, t);
}

tq->tq_nthreads--;
cv_broadcast(&tq->tq_wait_cv);
TraceWrite("taskq_thread exit. tq->tq_nthreads: %d ThreadID:%lu [%s:%d]", tq->tq_nthreads, threadID, __func__, __LINE__);

mutex_exit(&tq->tq_lock);

thread_exit();
}

Expand All @@ -250,6 +276,7 @@ taskq_create(const char *name, int nthreads, pri_t pri,
taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP);
int t;


if (flags & TASKQ_THREADS_CPU_PCT) {
int pct;
SYSTEM_INFO sys;
Expand Down Expand Up @@ -283,42 +310,59 @@ taskq_create(const char *name, int nthreads, pri_t pri,
tq->tq_threadlist = kmem_alloc(nthreads * sizeof (kthread_t *),
KM_SLEEP);

previousTime = GetTickCount64();//msec

if (flags & TASKQ_PREPOPULATE) {
mutex_enter(&tq->tq_lock);
while (minalloc-- > 0)
task_free(tq, task_alloc(tq, KM_SLEEP));
mutex_exit(&tq->tq_lock);
}

TraceWrite("taskq_create(). tq->tq_nthreads:%d [%s:%d]", tq->tq_nthreads, __func__, __LINE__);

for (t = 0; t < nthreads; t++)
VERIFY((tq->tq_threadlist[t] = thread_create(NULL, 0,
taskq_thread, tq, 0, NULL, TS_RUN, pri)) != NULL);

TraceWrite("taskq_create() return [%s:%d]", __func__, __LINE__);

return (tq);
}

void
taskq_destroy(taskq_t *tq)
{
TraceWrite("taskq_destroy() enter. tq->tq_active:%d [%s:%d]", tq->tq_active, __func__, __LINE__);

int nthreads = tq->tq_nthreads;

taskq_wait(tq);

mutex_enter(&tq->tq_lock);

TraceWrite("taskq_destroy.setting ~TASKQ_ACTIVE.tq->tq_active=%d [%s:%d]", tq->tq_active, __func__, __LINE__);

tq->tq_flags &= ~TASKQ_ACTIVE;

cv_broadcast(&tq->tq_dispatch_cv);

TraceWrite("taskq_destroy() waiting for all taskq threads to exit. tq->tq_nthreads:%d tq->tq_active:%d [%s:%d]", tq->tq_nthreads, tq->tq_active, __func__, __LINE__);

while (tq->tq_nthreads != 0)
cv_wait(&tq->tq_wait_cv, &tq->tq_lock);

TraceWrite("taskq_destroy() Freeing the taskq allocs:%d [%s:%d]", tq->tq_nalloc, __func__, __LINE__);

tq->tq_minalloc = 0;
while (tq->tq_nalloc != 0) {
ASSERT(tq->tq_freelist != NULL);
task_free(tq, task_alloc(tq, KM_SLEEP));
}

mutex_exit(&tq->tq_lock);

TraceWrite("taskq_destroy() After freeing all tq entries. [%s:%d]", __func__, __LINE__);

kmem_free(tq->tq_threadlist, nthreads * sizeof (kthread_t *));

Expand All @@ -329,6 +373,7 @@ taskq_destroy(taskq_t *tq)
cv_destroy(&tq->tq_maxalloc_cv);

kmem_free(tq, sizeof (taskq_t));
TraceWrite("taskq_destroy() exit. [%s:%d]", __func__, __LINE__);
}

int
Expand Down