pnfs: store a list of layout segments
pnfs_layout_state now stores a list instead of a single pnfs_file_layout entry. when new segments are acquired through LAYOUTGET, they are inserted into the list in order of increasing offset functions related to pnfs_layout_state_prepare() now operate on the list to find missing layout ranges and segments missing devices pattern_init() in pnfs_io.c now allocates and initializes io threads for each layout segment in the range new function pattern_join() will call WaitForMultipleObjects() in a loop, to support io patterns with more than 64 threads. if pattern_fork() is called with a thread count of 1, the thread function is called directly instead of spawning a new thread Signed-off-by: Casey Bodley <cbodley@citi.umich.edu>
This commit is contained in:
parent
62ed5248bf
commit
5cc317e8a5
4 changed files with 290 additions and 170 deletions
216
daemon/pnfs_io.c
216
daemon/pnfs_io.c
|
|
@ -29,6 +29,7 @@
|
|||
|
||||
#define IOLVL 2 /* dprintf level for pnfs io logging */
|
||||
|
||||
#define file_layout_entry(pos) list_container(pos, pnfs_file_layout, layout.entry)
|
||||
|
||||
typedef struct __pnfs_io_pattern {
|
||||
struct __pnfs_io_thread *threads;
|
||||
|
|
@ -63,6 +64,13 @@ typedef struct __pnfs_io_unit {
|
|||
typedef uint32_t (WINAPI *pnfs_io_thread_fn)(void*);
|
||||
|
||||
|
||||
static enum pnfs_status stripe_next_unit(
|
||||
IN const pnfs_file_layout *layout,
|
||||
IN uint32_t stripeid,
|
||||
IN uint64_t *position,
|
||||
IN uint64_t offset_end,
|
||||
OUT pnfs_io_unit *io);
|
||||
|
||||
/* 13.4.2. Interpreting the File Layout Using Sparse Packing
|
||||
* http://tools.ietf.org/html/rfc5661#section-13.4.2 */
|
||||
|
||||
|
|
@ -113,22 +121,107 @@ static enum pnfs_status get_dense_fh(
|
|||
return status;
|
||||
}
|
||||
|
||||
static __inline bool_t layout_compatible(
|
||||
IN const pnfs_layout *layout,
|
||||
IN enum pnfs_iomode iomode,
|
||||
IN uint64_t position)
|
||||
{
|
||||
return layout->iomode >= iomode
|
||||
&& layout->offset <= position
|
||||
&& position < layout->offset + layout->length;
|
||||
}
|
||||
|
||||
/* count stripes for all layout segments that intersect the range
|
||||
* and have not been covered by previous segments */
|
||||
static uint32_t thread_count(
|
||||
IN pnfs_layout_state *state,
|
||||
IN enum pnfs_iomode iomode,
|
||||
IN uint64_t offset,
|
||||
IN uint64_t length)
|
||||
{
|
||||
uint64_t position = offset;
|
||||
struct list_entry *entry;
|
||||
uint32_t count = 0;
|
||||
|
||||
list_for_each(entry, &state->layouts) {
|
||||
pnfs_file_layout *layout = file_layout_entry(entry);
|
||||
|
||||
if (!layout_compatible(&layout->layout, iomode, position))
|
||||
continue;
|
||||
|
||||
position = layout->layout.offset + layout->layout.length;
|
||||
count += layout->device->stripes.count;
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
static enum pnfs_status thread_init(
|
||||
IN pnfs_io_pattern *pattern,
|
||||
IN pnfs_io_thread *thread,
|
||||
IN pnfs_file_layout *layout,
|
||||
IN uint32_t stripeid)
|
||||
IN uint32_t stripeid,
|
||||
IN uint64_t offset)
|
||||
{
|
||||
thread->pattern = pattern;
|
||||
thread->layout = layout;
|
||||
thread->stable = FILE_SYNC4;
|
||||
thread->offset = pattern->offset_start;
|
||||
thread->offset = offset;
|
||||
thread->id = stripeid;
|
||||
|
||||
return is_dense(layout) ? get_dense_fh(layout, stripeid, &thread->file)
|
||||
: get_sparse_fh(layout, pattern->meta_file, stripeid, &thread->file);
|
||||
}
|
||||
|
||||
static enum pnfs_status pattern_threads_init(
|
||||
IN pnfs_io_pattern *pattern,
|
||||
IN enum pnfs_iomode iomode,
|
||||
IN uint64_t offset,
|
||||
IN uint64_t length)
|
||||
{
|
||||
pnfs_io_unit io;
|
||||
uint64_t position = offset;
|
||||
struct list_entry *entry;
|
||||
uint32_t s, t = 0;
|
||||
enum pnfs_status status = PNFS_SUCCESS;
|
||||
|
||||
list_for_each(entry, &pattern->state->layouts) {
|
||||
pnfs_file_layout *layout = file_layout_entry(entry);
|
||||
|
||||
if (!layout_compatible(&layout->layout, iomode, position))
|
||||
continue;
|
||||
|
||||
for (s = 0; s < layout->device->stripes.count; s++) {
|
||||
uint64_t off = position;
|
||||
|
||||
/* does the range contain this stripe? */
|
||||
status = stripe_next_unit(layout, s, &off, offset + length, &io);
|
||||
if (status != PNFS_PENDING)
|
||||
continue;
|
||||
|
||||
if (t >= pattern->count) { /* miscounted threads needed? */
|
||||
status = PNFSERR_NO_LAYOUT;
|
||||
goto out;
|
||||
}
|
||||
|
||||
status = thread_init(pattern, &pattern->threads[t++], layout, s, off);
|
||||
if (status)
|
||||
goto out;
|
||||
}
|
||||
position = layout->layout.offset + layout->layout.length;
|
||||
}
|
||||
|
||||
if (position < offset + length) {
|
||||
/* unable to satisfy the entire range */
|
||||
status = PNFSERR_NO_LAYOUT;
|
||||
goto out;
|
||||
}
|
||||
|
||||
/* update the pattern with the actual number of threads used */
|
||||
pattern->count = t;
|
||||
out:
|
||||
return status;
|
||||
}
|
||||
|
||||
static enum pnfs_status pattern_init(
|
||||
IN pnfs_io_pattern *pattern,
|
||||
IN nfs41_root *root,
|
||||
|
|
@ -136,20 +229,22 @@ static enum pnfs_status pattern_init(
|
|||
IN const stateid_arg *stateid,
|
||||
IN pnfs_layout_state *state,
|
||||
IN unsigned char *buffer,
|
||||
IN enum pnfs_iomode iomode,
|
||||
IN uint64_t offset,
|
||||
IN uint64_t length,
|
||||
IN uint32_t default_lease)
|
||||
{
|
||||
uint32_t i;
|
||||
enum pnfs_status status;
|
||||
|
||||
pattern->count = state->layout->device->stripes.count;
|
||||
/* calculate an upper bound on the number of threads to allocate */
|
||||
pattern->count = thread_count(state, iomode, offset, length);
|
||||
pattern->threads = calloc(pattern->count, sizeof(pnfs_io_thread));
|
||||
if (pattern->threads == NULL) {
|
||||
status = PNFSERR_RESOURCES;
|
||||
goto out;
|
||||
}
|
||||
|
||||
/* information shared between threads */
|
||||
pattern->root = root;
|
||||
pattern->meta_file = meta_file;
|
||||
pattern->stateid = stateid;
|
||||
|
|
@ -159,16 +254,13 @@ static enum pnfs_status pattern_init(
|
|||
pattern->offset_end = offset + length;
|
||||
pattern->default_lease = default_lease;
|
||||
|
||||
for (i = 0; i < pattern->count; i++) {
|
||||
status = thread_init(pattern, &pattern->threads[i], state->layout, i);
|
||||
if (status)
|
||||
goto out_err_free;
|
||||
}
|
||||
|
||||
/* take a reference on the layout so we don't return it during io */
|
||||
status = pnfs_layout_io_start(state);
|
||||
/* initialize a thread for every stripe necessary to cover the range */
|
||||
status = pattern_threads_init(pattern, iomode, offset, length);
|
||||
if (status)
|
||||
goto out_err_free;
|
||||
|
||||
/* take a reference on the layout so we don't return it during io */
|
||||
pnfs_layout_io_start(state);
|
||||
out:
|
||||
return status;
|
||||
|
||||
|
|
@ -278,65 +370,77 @@ static enum pnfs_status thread_data_server(
|
|||
return PNFS_SUCCESS;
|
||||
}
|
||||
|
||||
static enum pnfs_status pattern_join(
|
||||
IN HANDLE *threads,
|
||||
IN DWORD count)
|
||||
{
|
||||
DWORD status;
|
||||
/* WaitForMultipleObjects() supports a maximum of 64 objects */
|
||||
while (count) {
|
||||
const DWORD n = min(count, MAXIMUM_WAIT_OBJECTS);
|
||||
status = WaitForMultipleObjects(n, threads, TRUE, INFINITE);
|
||||
if (status != WAIT_OBJECT_0)
|
||||
return PNFSERR_RESOURCES;
|
||||
|
||||
count -= n;
|
||||
threads += n;
|
||||
}
|
||||
return PNFS_SUCCESS;
|
||||
}
|
||||
|
||||
static enum pnfs_status pattern_fork(
|
||||
IN pnfs_io_pattern *pattern,
|
||||
IN pnfs_io_thread_fn thread_fn)
|
||||
{
|
||||
pnfs_io_unit io;
|
||||
#ifdef PNFS_THREADING
|
||||
HANDLE *threads;
|
||||
uint32_t num_threads;
|
||||
#endif
|
||||
uint32_t i;
|
||||
DWORD status;
|
||||
enum pnfs_status pnfsstat = PNFS_SUCCESS;
|
||||
enum pnfs_status status = PNFS_SUCCESS;
|
||||
|
||||
if (pattern->count == 0)
|
||||
goto out;
|
||||
|
||||
#ifdef PNFS_THREADING
|
||||
/* create a thread for each unit that has actual io */
|
||||
threads = calloc(pattern->count, sizeof(HANDLE));
|
||||
if (threads == NULL) {
|
||||
pnfsstat = PNFSERR_RESOURCES;
|
||||
if (pattern->count == 1) {
|
||||
/* no need to fork if there's only 1 thread */
|
||||
status = (enum pnfs_status)thread_fn(pattern->threads);
|
||||
goto out;
|
||||
}
|
||||
|
||||
/* create a thread for each unit that has actual io */
|
||||
threads = calloc(pattern->count, sizeof(HANDLE));
|
||||
if (threads == NULL) {
|
||||
status = PNFSERR_RESOURCES;
|
||||
goto out;
|
||||
}
|
||||
|
||||
num_threads = 0;
|
||||
for (i = 0; i < pattern->count; i++) {
|
||||
if (thread_next_unit(&pattern->threads[i], &io) == PNFS_PENDING) {
|
||||
threads[num_threads++] = (HANDLE)_beginthreadex(NULL, 0,
|
||||
thread_fn, &pattern->threads[i], 0, NULL);
|
||||
threads[i] = (HANDLE)_beginthreadex(NULL, 0,
|
||||
thread_fn, &pattern->threads[i], 0, NULL);
|
||||
if (threads[i] == NULL) {
|
||||
eprintf("_beginthreadex() failed with %d\n", GetLastError());
|
||||
pattern->count = i; /* join any threads already started */
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (num_threads) { /* wait on all threads to finish */
|
||||
status = WaitForMultipleObjects(num_threads, threads, TRUE, INFINITE);
|
||||
if (status == WAIT_OBJECT_0)
|
||||
status = NO_ERROR;
|
||||
/* wait on all threads to finish */
|
||||
status = pattern_join(threads, pattern->count);
|
||||
if (status) {
|
||||
eprintf("pattern_join() failed with %s\n", pnfs_error_string(status));
|
||||
goto out;
|
||||
}
|
||||
|
||||
for (i = 0; i < num_threads; i++) {
|
||||
/* keep track of the most severe error returned by a thread */
|
||||
if (GetExitCodeThread(threads[i], &status))
|
||||
pnfsstat = max(pnfsstat, (enum pnfs_status)status);
|
||||
for (i = 0; i < pattern->count; i++) {
|
||||
/* keep track of the most severe error returned by a thread */
|
||||
DWORD exitcode;
|
||||
if (GetExitCodeThread(threads[i], &exitcode))
|
||||
status = max(status, (enum pnfs_status)exitcode);
|
||||
|
||||
CloseHandle(threads[i]);
|
||||
}
|
||||
CloseHandle(threads[i]);
|
||||
}
|
||||
|
||||
free(threads);
|
||||
#else
|
||||
/* process each server that has actual io */
|
||||
for (i = 0; i < pattern->count; i++) {
|
||||
if (thread_next_unit(&pattern->threads[i], &io) == PNFS_PENDING) {
|
||||
/* keep track of the most severe error returned by a thread */
|
||||
status = thread_fn(&pattern->threads[i]);
|
||||
pnfsstat = max(pnfsstat, (enum pnfs_status)status);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
out:
|
||||
return pnfsstat;
|
||||
return status;
|
||||
}
|
||||
|
||||
static uint64_t pattern_bytes_transferred(
|
||||
|
|
@ -376,8 +480,6 @@ static enum pnfs_status map_ds_error(
|
|||
AcquireSRWLockExclusive(&state->lock);
|
||||
/* flag the layout for return once io is finished */
|
||||
state->status |= PNFS_LAYOUT_RECALLED | PNFS_LAYOUT_CHANGED;
|
||||
/* reset GRANTED so we know not to try LAYOUTRETURN */
|
||||
state->status &= ~PNFS_LAYOUT_GRANTED;
|
||||
ReleaseSRWLockExclusive(&state->lock);
|
||||
|
||||
/* return CHANGED to prevent any further use of the layout */
|
||||
|
|
@ -423,7 +525,7 @@ static uint32_t WINAPI file_layout_read_thread(void *args)
|
|||
stateid.stateid.seqid = 0;
|
||||
|
||||
total_read = 0;
|
||||
while ((status = thread_next_unit(thread, &io)) == PNFS_PENDING) {
|
||||
while (thread_next_unit(thread, &io) == PNFS_PENDING) {
|
||||
maxreadsize = max_read_size(client->session, &thread->file->fh);
|
||||
if (io.length > maxreadsize)
|
||||
io.length = maxreadsize;
|
||||
|
|
@ -498,7 +600,7 @@ retry_write:
|
|||
commit_max = 0;
|
||||
total_written = 0;
|
||||
|
||||
while ((status = thread_next_unit(thread, &io)) == PNFS_PENDING) {
|
||||
while (thread_next_unit(thread, &io) == PNFS_PENDING) {
|
||||
if (io.length > maxwritesize)
|
||||
io.length = maxwritesize;
|
||||
|
||||
|
|
@ -589,8 +691,9 @@ enum pnfs_status pnfs_read(
|
|||
|
||||
if (status == PNFS_SUCCESS) {
|
||||
/* interpret the layout and set up threads for io */
|
||||
status = pattern_init(&pattern, root, &state->file, stateid, layout,
|
||||
buffer_out, offset, length, state->session->lease_time);
|
||||
status = pattern_init(&pattern, root, &state->file, stateid,
|
||||
layout, buffer_out, PNFS_IOMODE_READ, offset, length,
|
||||
state->session->lease_time);
|
||||
if (status)
|
||||
eprintf("pattern_init() failed with %s\n",
|
||||
pnfs_error_string(status));
|
||||
|
|
@ -685,8 +788,9 @@ enum pnfs_status pnfs_write(
|
|||
|
||||
if (status == PNFS_SUCCESS) {
|
||||
/* interpret the layout and set up threads for io */
|
||||
status = pattern_init(&pattern, root, &state->file, stateid, layout,
|
||||
buffer, offset, length, state->session->lease_time);
|
||||
status = pattern_init(&pattern, root, &state->file, stateid,
|
||||
layout, buffer, PNFS_IOMODE_RW, offset, length,
|
||||
state->session->lease_time);
|
||||
if (status)
|
||||
eprintf("pattern_init() failed with %s\n",
|
||||
pnfs_error_string(status));
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue