From 741e8bf0bf38bdd74ce89eb7559af2e703ac44d6 Mon Sep 17 00:00:00 2001 From: Olga Kornievskaia Date: Tue, 8 Mar 2011 10:59:13 -0500 Subject: [PATCH] non-blocking rpc receive we already drop the lock between sending and receiving the rpc packets. now making it so that receive doesn't block for too long (ie 100ms) before unlocking the socket. this is needed for the callback. original rpc is sent and it triggers a callback from the server. we fork another thread to handle it, ie it needs to send a deleg_return rpc. if original rpc gets control and blocks on trying to receive its reply, it'll timeout and original rpc will return an error. instead we need to not block for long and allow the deleg_return to go thru so that the server can reply successfully to the original rpc. --- daemon/nfs41_rpc.c | 4 +-- libtirpc/src/auth_sspi.c | 2 +- libtirpc/src/clnt_vc.c | 61 +++++++++++++++++----------------- libtirpc/src/rpc_com.h | 1 + libtirpc/src/xdr_rec.c | 71 ++++++++++++++++++++++++---------------- 5 files changed, 77 insertions(+), 62 deletions(-) diff --git a/daemon/nfs41_rpc.c b/daemon/nfs41_rpc.c index 7379a05..1effaa1 100644 --- a/daemon/nfs41_rpc.c +++ b/daemon/nfs41_rpc.c @@ -34,7 +34,7 @@ static enum clnt_stat send_null(CLIENT *client) { - struct timeval timeout = {10, 0}; + struct timeval timeout = {0, 100}; return clnt_call(client, 0, (xdrproc_t)xdr_void, NULL, @@ -305,7 +305,7 @@ int nfs41_send_compound( IN char *inbuf, OUT char *outbuf) { - struct timeval timeout = {10, 0}; + struct timeval timeout = {0, 100}; enum clnt_stat rpc_status; int status, count = 0, one = 1, zero = 0; uint32_t version; diff --git a/libtirpc/src/auth_sspi.c b/libtirpc/src/auth_sspi.c index 6574db1..2456c2e 100644 --- a/libtirpc/src/auth_sspi.c +++ b/libtirpc/src/auth_sspi.c @@ -913,7 +913,7 @@ void log_hexdump(bool_t on, const u_char *title, const u_char *buf, if (!on) return; - fprintf(fd_out, "%s\n", title); + fprintf(fd_out, "%04x: %s (len=%d)\n", GetCurrentThreadId(), title, len); for (i = 0; i < len; i += 0x10) { fprintf(fd_out, " %04x: ", (u_int)(i + offset)); jm = len - i; diff --git a/libtirpc/src/clnt_vc.c b/libtirpc/src/clnt_vc.c index 488be66..b546f49 100644 --- a/libtirpc/src/clnt_vc.c +++ b/libtirpc/src/clnt_vc.c @@ -182,13 +182,13 @@ static unsigned int WINAPI clnt_cb_thread(void *args) XDR *xdrs = &(ct->ct_xdrs); long saved_timeout_sec = ct->ct_wait.tv_sec; long saved_timeout_usec = ct->ct_wait.tv_usec; - struct rpc_msg reply_msg; - void *res = NULL; + struct rpc_msg reply_msg; char cred_area[2 * MAX_AUTH_BYTES + RQCRED_SIZE]; fprintf(stderr/*stdout*/, "%04x: Creating callback thread\n", GetCurrentThreadId()); while(1) { cb_req header; + void *res = NULL; mutex_lock(&clnt_fd_lock); while (vc_fd_locks[WINSOCK_HANDLE_HASH(ct->ct_fd)] || !ct->use_stored_reply_msg || @@ -200,7 +200,7 @@ static unsigned int WINAPI clnt_cb_thread(void *args) if (!vc_fd_locks[WINSOCK_HANDLE_HASH(ct->ct_fd)]) break; } - vc_fd_locks[WINSOCK_HANDLE_HASH(ct->ct_fd)] = 1; + vc_fd_locks[WINSOCK_HANDLE_HASH(ct->ct_fd)] = GetCurrentThreadId(); mutex_unlock(&clnt_fd_lock); if (cl->shutdown) { @@ -211,12 +211,14 @@ static unsigned int WINAPI clnt_cb_thread(void *args) saved_timeout_sec = ct->ct_wait.tv_sec; saved_timeout_usec = ct->ct_wait.tv_usec; + xdrs->x_op = XDR_DECODE; if (ct->use_stored_reply_msg && ct->reply_msg.rm_direction == CALL) { goto process_rpc_call; } else if (!ct->use_stored_reply_msg) { - xdrs->x_op = XDR_DECODE; ct->ct_wait.tv_sec = ct->ct_wait.tv_usec = 0; - xdrrec_skiprecord(xdrs); + __xdrrec_setnonblock(xdrs, 0); + if (!xdrrec_skiprecord(xdrs)) + goto skip_process; if (!xdr_getxiddir(xdrs, &ct->reply_msg)) { goto skip_process; } @@ -232,6 +234,7 @@ static unsigned int WINAPI clnt_cb_thread(void *args) } process_rpc_call: //call to get call headers + ct->use_stored_reply_msg = FALSE; ct->reply_msg.rm_call.cb_cred.oa_base = cred_area; ct->reply_msg.rm_call.cb_verf.oa_base = &(cred_area[MAX_AUTH_BYTES]); if (!xdr_getcallbody(xdrs, &ct->reply_msg)) { @@ -250,9 +253,12 @@ process_rpc_call: if (status) { fprintf(stderr, "%04x: callback function failed with %d\n", status); } - ct->use_stored_reply_msg = FALSE; + xdrs->x_op = XDR_ENCODE; + __xdrrec_setblock(xdrs); reply_msg.rm_xid = ct->reply_msg.rm_xid; + fprintf(stdout, "%04x: cb: replying to xid %d\n", GetCurrentThreadId(), + ct->reply_msg.rm_xid); ct->reply_msg.rm_xid = 0; reply_msg.rm_direction = REPLY; reply_msg.rm_reply.rp_stat = MSG_ACCEPTED; @@ -495,6 +501,7 @@ clnt_vc_call(cl, proc, xdr_args, args_ptr, xdr_results, results_ptr, timeout) #else /* XXX Need Windows signal/event stuff XXX */ #endif + enum clnt_stat status; assert(cl != NULL); @@ -516,6 +523,7 @@ clnt_vc_call(cl, proc, xdr_args, args_ptr, xdr_results, results_ptr, timeout) && timeout.tv_usec == 0) ? FALSE : TRUE; call_again: + __xdrrec_setblock(xdrs); xdrs->x_op = XDR_ENCODE; ct->ct_error.re_status = RPC_SUCCESS; x_id = ntohl(--(*msg_x_id)); @@ -527,27 +535,17 @@ call_again: if (ct->ct_error.re_status == RPC_SUCCESS) ct->ct_error.re_status = RPC_CANTENCODEARGS; (void)xdrrec_endofrecord(xdrs, TRUE); - release_fd_lock(ct->ct_fd, mask); - return (ct->ct_error.re_status); + goto out; } if (! xdrrec_endofrecord(xdrs, shipnow)) { - release_fd_lock(ct->ct_fd, mask); ct->ct_error.re_status = RPC_CANTSEND; - return (ct->ct_error.re_status); + goto out; } if (! shipnow) { release_fd_lock(ct->ct_fd, mask); return (RPC_SUCCESS); } - /* - * Hack to provide rpc-based message passing - */ - if (timeout.tv_sec == 0 && timeout.tv_usec == 0) { - release_fd_lock(ct->ct_fd, mask); - ct->ct_error.re_status = RPC_TIMEDOUT; - return(ct->ct_error.re_status); - } #ifdef NO_CB_4_KRB5P if (cl->cb_thread != INVALID_HANDLE_VALUE) @@ -569,21 +567,23 @@ call_again: mutex_unlock(&clnt_fd_lock); } #endif - + __xdrrec_setnonblock(xdrs, 0); xdrs->x_op = XDR_DECODE; ct->reply_msg.acpted_rply.ar_verf = _null_auth; ct->reply_msg.acpted_rply.ar_results.where = NULL; ct->reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; if (!ct->use_stored_reply_msg) { - if (! xdrrec_skiprecord(xdrs)) { + if (!xdrrec_skiprecord(xdrs)) { release_fd_lock(ct->ct_fd, mask); - return (ct->ct_error.re_status); + SwitchToThread(); + continue; } if (!xdr_getxiddir(xdrs, &ct->reply_msg)) { - release_fd_lock(ct->ct_fd, mask); - if (ct->ct_error.re_status == RPC_SUCCESS) + if (ct->ct_error.re_status == RPC_SUCCESS) { + release_fd_lock(ct->ct_fd, mask); continue; - return (ct->ct_error.re_status); + } + goto out; } if (ct->reply_msg.rm_direction != REPLY) { @@ -593,16 +593,15 @@ call_again: ct->use_stored_reply_msg = TRUE; } release_fd_lock(ct->ct_fd, mask); + SwitchToThread(); continue; } } if (ct->reply_msg.rm_xid == x_id) { ct->use_stored_reply_msg = FALSE; ct->reply_msg.rm_xid = 0; - if (!xdr_getreplyunion(xdrs, &ct->reply_msg)) { - release_fd_lock(ct->ct_fd, mask); - return (ct->ct_error.re_status); - } + if (!xdr_getreplyunion(xdrs, &ct->reply_msg)) + goto out; break; } else { @@ -639,8 +638,10 @@ call_again: goto call_again; } /* end of unsuccessful completion */ ct->reply_msg.rm_direction = -1; +out: + status = ct->ct_error.re_status; release_fd_lock(ct->ct_fd, mask); - return (ct->ct_error.re_status); + return status; } static void @@ -927,7 +928,7 @@ read_vc(ctp, buf, len) continue; ct->ct_error.re_status = RPC_CANTRECV; ct->ct_error.re_errno = errno; - return (-1); + return (-2); } break; } diff --git a/libtirpc/src/rpc_com.h b/libtirpc/src/rpc_com.h index 9a7fff0..0658438 100644 --- a/libtirpc/src/rpc_com.h +++ b/libtirpc/src/rpc_com.h @@ -87,6 +87,7 @@ char *_get_next_token(char *, int); bool_t __svc_clean_idle(fd_set *, int, bool_t); bool_t __xdrrec_setnonblock(XDR *, int); +bool_t __xdrrec_setblock(XDR *); bool_t __xdrrec_getrec(XDR *, enum xprt_stat *, bool_t); void __xprt_unregister_unlocked(SVCXPRT *); void __xprt_set_raddr(SVCXPRT *, const struct sockaddr_storage *); diff --git a/libtirpc/src/xdr_rec.c b/libtirpc/src/xdr_rec.c index 9e2dee8..920835c 100644 --- a/libtirpc/src/xdr_rec.c +++ b/libtirpc/src/xdr_rec.c @@ -480,11 +480,10 @@ xdrrec_skiprecord(xdrs) enum xprt_stat xstat; if (rstrm->nonblock) { - if (__xdrrec_getrec(xdrs, &xstat, FALSE)) { - rstrm->fbtbc = 0; + if (__xdrrec_getrec(xdrs, &xstat, FALSE)) return TRUE; - } - if (rstrm->in_finger == rstrm->in_boundry && + + if (rstrm->in_finger == rstrm->in_boundry && xstat == XPRT_MOREREQS) { rstrm->fbtbc = 0; return TRUE; @@ -592,6 +591,7 @@ __xdrrec_getrec(xdrs, statp, expectdata) *statp = XPRT_DIED; return FALSE; } + rstrm->fbtbc = rstrm->in_header & (~LAST_FRAG); rstrm->in_reclen += fraglen; if (rstrm->in_reclen > rstrm->recvsize) realloc_stream(rstrm, rstrm->in_reclen); @@ -601,35 +601,39 @@ __xdrrec_getrec(xdrs, statp, expectdata) } } - n = rstrm->readit(rstrm->tcp_handle, - rstrm->in_base + rstrm->in_received, - (rstrm->in_reclen - rstrm->in_received)); + do { + n = rstrm->readit(rstrm->tcp_handle, + rstrm->in_base + rstrm->in_received, + (rstrm->in_reclen - rstrm->in_received)); - if (n < 0) { - *statp = XPRT_DIED; - return FALSE; - } + /* this case is needed for non-block as socket returns TIMEDOUT and -1 + * -2 is an error case and covered by the next if() statement */ + if (n == -1) continue; - if (n == 0) { - *statp = expectdata ? XPRT_DIED : XPRT_IDLE; - return FALSE; - } + if (n < 0) { + *statp = XPRT_DIED; + return FALSE; + } - rstrm->in_received += n; + if (n == 0) { + *statp = expectdata ? XPRT_DIED : XPRT_IDLE; + return FALSE; + } - if (rstrm->in_received == rstrm->in_reclen) { - rstrm->in_haveheader = FALSE; - rstrm->in_hdrp = (char *)(void *)&rstrm->in_header; - rstrm->in_hdrlen = 0; - if (rstrm->last_frag) { - rstrm->fbtbc = rstrm->in_reclen; - rstrm->in_boundry = rstrm->in_base + rstrm->in_reclen; - rstrm->in_finger = rstrm->in_base; - rstrm->in_reclen = rstrm->in_received = 0; - *statp = XPRT_MOREREQS; - return TRUE; - } - } + rstrm->in_received += n; + if (rstrm->in_received == rstrm->in_reclen) { + rstrm->in_haveheader = FALSE; + rstrm->in_hdrp = (char *)(void *)&rstrm->in_header; + rstrm->in_hdrlen = 0; + if (rstrm->last_frag) { + rstrm->in_boundry = rstrm->in_base + rstrm->in_reclen; + rstrm->in_finger = rstrm->in_base; + rstrm->in_reclen = rstrm->in_received = 0; + *statp = XPRT_MOREREQS; + return TRUE; + } + } + } while (1); *statp = XPRT_MOREREQS; return FALSE; @@ -649,6 +653,15 @@ __xdrrec_setnonblock(xdrs, maxrec) return TRUE; } +bool_t +__xdrrec_setblock(xdrs) + XDR *xdrs; +{ + RECSTREAM *rstrm = (RECSTREAM *)(xdrs->x_private); + + rstrm->nonblock = FALSE; + return TRUE; +} /* * Internal useful routines */