non-blocking rpc receive

we already drop the lock between sending and receiving the rpc packets. now making it so that receive doesn't block for too long (ie 100ms) before unlocking the socket. this is needed for the callback. original rpc is sent and it triggers a callback from the server. we fork another thread to handle it, ie it needs to send a deleg_return rpc. if original rpc gets control and blocks on trying to receive its reply, it'll timeout and original rpc will return an error. instead we need to not block for long and allow the deleg_return to go thru so that the server can reply successfully to the original rpc.
This commit is contained in:
Olga Kornievskaia 2011-03-08 10:59:13 -05:00 committed by unknown
parent d7e438be5e
commit 741e8bf0bf
5 changed files with 77 additions and 62 deletions

View file

@ -34,7 +34,7 @@
static enum clnt_stat send_null(CLIENT *client) static enum clnt_stat send_null(CLIENT *client)
{ {
struct timeval timeout = {10, 0}; struct timeval timeout = {0, 100};
return clnt_call(client, 0, return clnt_call(client, 0,
(xdrproc_t)xdr_void, NULL, (xdrproc_t)xdr_void, NULL,
@ -305,7 +305,7 @@ int nfs41_send_compound(
IN char *inbuf, IN char *inbuf,
OUT char *outbuf) OUT char *outbuf)
{ {
struct timeval timeout = {10, 0}; struct timeval timeout = {0, 100};
enum clnt_stat rpc_status; enum clnt_stat rpc_status;
int status, count = 0, one = 1, zero = 0; int status, count = 0, one = 1, zero = 0;
uint32_t version; uint32_t version;

View file

@ -913,7 +913,7 @@ void log_hexdump(bool_t on, const u_char *title, const u_char *buf,
if (!on) return; if (!on) return;
fprintf(fd_out, "%s\n", title); fprintf(fd_out, "%04x: %s (len=%d)\n", GetCurrentThreadId(), title, len);
for (i = 0; i < len; i += 0x10) { for (i = 0; i < len; i += 0x10) {
fprintf(fd_out, " %04x: ", (u_int)(i + offset)); fprintf(fd_out, " %04x: ", (u_int)(i + offset));
jm = len - i; jm = len - i;

View file

@ -183,12 +183,12 @@ static unsigned int WINAPI clnt_cb_thread(void *args)
long saved_timeout_sec = ct->ct_wait.tv_sec; long saved_timeout_sec = ct->ct_wait.tv_sec;
long saved_timeout_usec = ct->ct_wait.tv_usec; long saved_timeout_usec = ct->ct_wait.tv_usec;
struct rpc_msg reply_msg; struct rpc_msg reply_msg;
void *res = NULL;
char cred_area[2 * MAX_AUTH_BYTES + RQCRED_SIZE]; char cred_area[2 * MAX_AUTH_BYTES + RQCRED_SIZE];
fprintf(stderr/*stdout*/, "%04x: Creating callback thread\n", GetCurrentThreadId()); fprintf(stderr/*stdout*/, "%04x: Creating callback thread\n", GetCurrentThreadId());
while(1) { while(1) {
cb_req header; cb_req header;
void *res = NULL;
mutex_lock(&clnt_fd_lock); mutex_lock(&clnt_fd_lock);
while (vc_fd_locks[WINSOCK_HANDLE_HASH(ct->ct_fd)] || while (vc_fd_locks[WINSOCK_HANDLE_HASH(ct->ct_fd)] ||
!ct->use_stored_reply_msg || !ct->use_stored_reply_msg ||
@ -200,7 +200,7 @@ static unsigned int WINAPI clnt_cb_thread(void *args)
if (!vc_fd_locks[WINSOCK_HANDLE_HASH(ct->ct_fd)]) if (!vc_fd_locks[WINSOCK_HANDLE_HASH(ct->ct_fd)])
break; break;
} }
vc_fd_locks[WINSOCK_HANDLE_HASH(ct->ct_fd)] = 1; vc_fd_locks[WINSOCK_HANDLE_HASH(ct->ct_fd)] = GetCurrentThreadId();
mutex_unlock(&clnt_fd_lock); mutex_unlock(&clnt_fd_lock);
if (cl->shutdown) { if (cl->shutdown) {
@ -211,12 +211,14 @@ static unsigned int WINAPI clnt_cb_thread(void *args)
saved_timeout_sec = ct->ct_wait.tv_sec; saved_timeout_sec = ct->ct_wait.tv_sec;
saved_timeout_usec = ct->ct_wait.tv_usec; saved_timeout_usec = ct->ct_wait.tv_usec;
xdrs->x_op = XDR_DECODE;
if (ct->use_stored_reply_msg && ct->reply_msg.rm_direction == CALL) { if (ct->use_stored_reply_msg && ct->reply_msg.rm_direction == CALL) {
goto process_rpc_call; goto process_rpc_call;
} else if (!ct->use_stored_reply_msg) { } else if (!ct->use_stored_reply_msg) {
xdrs->x_op = XDR_DECODE;
ct->ct_wait.tv_sec = ct->ct_wait.tv_usec = 0; ct->ct_wait.tv_sec = ct->ct_wait.tv_usec = 0;
xdrrec_skiprecord(xdrs); __xdrrec_setnonblock(xdrs, 0);
if (!xdrrec_skiprecord(xdrs))
goto skip_process;
if (!xdr_getxiddir(xdrs, &ct->reply_msg)) { if (!xdr_getxiddir(xdrs, &ct->reply_msg)) {
goto skip_process; goto skip_process;
} }
@ -232,6 +234,7 @@ static unsigned int WINAPI clnt_cb_thread(void *args)
} }
process_rpc_call: process_rpc_call:
//call to get call headers //call to get call headers
ct->use_stored_reply_msg = FALSE;
ct->reply_msg.rm_call.cb_cred.oa_base = cred_area; ct->reply_msg.rm_call.cb_cred.oa_base = cred_area;
ct->reply_msg.rm_call.cb_verf.oa_base = &(cred_area[MAX_AUTH_BYTES]); ct->reply_msg.rm_call.cb_verf.oa_base = &(cred_area[MAX_AUTH_BYTES]);
if (!xdr_getcallbody(xdrs, &ct->reply_msg)) { if (!xdr_getcallbody(xdrs, &ct->reply_msg)) {
@ -250,9 +253,12 @@ process_rpc_call:
if (status) { if (status) {
fprintf(stderr, "%04x: callback function failed with %d\n", status); fprintf(stderr, "%04x: callback function failed with %d\n", status);
} }
ct->use_stored_reply_msg = FALSE;
xdrs->x_op = XDR_ENCODE; xdrs->x_op = XDR_ENCODE;
__xdrrec_setblock(xdrs);
reply_msg.rm_xid = ct->reply_msg.rm_xid; reply_msg.rm_xid = ct->reply_msg.rm_xid;
fprintf(stdout, "%04x: cb: replying to xid %d\n", GetCurrentThreadId(),
ct->reply_msg.rm_xid);
ct->reply_msg.rm_xid = 0; ct->reply_msg.rm_xid = 0;
reply_msg.rm_direction = REPLY; reply_msg.rm_direction = REPLY;
reply_msg.rm_reply.rp_stat = MSG_ACCEPTED; reply_msg.rm_reply.rp_stat = MSG_ACCEPTED;
@ -495,6 +501,7 @@ clnt_vc_call(cl, proc, xdr_args, args_ptr, xdr_results, results_ptr, timeout)
#else #else
/* XXX Need Windows signal/event stuff XXX */ /* XXX Need Windows signal/event stuff XXX */
#endif #endif
enum clnt_stat status;
assert(cl != NULL); assert(cl != NULL);
@ -516,6 +523,7 @@ clnt_vc_call(cl, proc, xdr_args, args_ptr, xdr_results, results_ptr, timeout)
&& timeout.tv_usec == 0) ? FALSE : TRUE; && timeout.tv_usec == 0) ? FALSE : TRUE;
call_again: call_again:
__xdrrec_setblock(xdrs);
xdrs->x_op = XDR_ENCODE; xdrs->x_op = XDR_ENCODE;
ct->ct_error.re_status = RPC_SUCCESS; ct->ct_error.re_status = RPC_SUCCESS;
x_id = ntohl(--(*msg_x_id)); x_id = ntohl(--(*msg_x_id));
@ -527,27 +535,17 @@ call_again:
if (ct->ct_error.re_status == RPC_SUCCESS) if (ct->ct_error.re_status == RPC_SUCCESS)
ct->ct_error.re_status = RPC_CANTENCODEARGS; ct->ct_error.re_status = RPC_CANTENCODEARGS;
(void)xdrrec_endofrecord(xdrs, TRUE); (void)xdrrec_endofrecord(xdrs, TRUE);
release_fd_lock(ct->ct_fd, mask); goto out;
return (ct->ct_error.re_status);
} }
if (! xdrrec_endofrecord(xdrs, shipnow)) { if (! xdrrec_endofrecord(xdrs, shipnow)) {
release_fd_lock(ct->ct_fd, mask);
ct->ct_error.re_status = RPC_CANTSEND; ct->ct_error.re_status = RPC_CANTSEND;
return (ct->ct_error.re_status); goto out;
} }
if (! shipnow) { if (! shipnow) {
release_fd_lock(ct->ct_fd, mask); release_fd_lock(ct->ct_fd, mask);
return (RPC_SUCCESS); return (RPC_SUCCESS);
} }
/*
* Hack to provide rpc-based message passing
*/
if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
release_fd_lock(ct->ct_fd, mask);
ct->ct_error.re_status = RPC_TIMEDOUT;
return(ct->ct_error.re_status);
}
#ifdef NO_CB_4_KRB5P #ifdef NO_CB_4_KRB5P
if (cl->cb_thread != INVALID_HANDLE_VALUE) if (cl->cb_thread != INVALID_HANDLE_VALUE)
@ -569,21 +567,23 @@ call_again:
mutex_unlock(&clnt_fd_lock); mutex_unlock(&clnt_fd_lock);
} }
#endif #endif
__xdrrec_setnonblock(xdrs, 0);
xdrs->x_op = XDR_DECODE; xdrs->x_op = XDR_DECODE;
ct->reply_msg.acpted_rply.ar_verf = _null_auth; ct->reply_msg.acpted_rply.ar_verf = _null_auth;
ct->reply_msg.acpted_rply.ar_results.where = NULL; ct->reply_msg.acpted_rply.ar_results.where = NULL;
ct->reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; ct->reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
if (!ct->use_stored_reply_msg) { if (!ct->use_stored_reply_msg) {
if (! xdrrec_skiprecord(xdrs)) { if (!xdrrec_skiprecord(xdrs)) {
release_fd_lock(ct->ct_fd, mask); release_fd_lock(ct->ct_fd, mask);
return (ct->ct_error.re_status); SwitchToThread();
continue;
} }
if (!xdr_getxiddir(xdrs, &ct->reply_msg)) { if (!xdr_getxiddir(xdrs, &ct->reply_msg)) {
release_fd_lock(ct->ct_fd, mask); if (ct->ct_error.re_status == RPC_SUCCESS) {
if (ct->ct_error.re_status == RPC_SUCCESS) release_fd_lock(ct->ct_fd, mask);
continue; continue;
return (ct->ct_error.re_status); }
goto out;
} }
if (ct->reply_msg.rm_direction != REPLY) { if (ct->reply_msg.rm_direction != REPLY) {
@ -593,16 +593,15 @@ call_again:
ct->use_stored_reply_msg = TRUE; ct->use_stored_reply_msg = TRUE;
} }
release_fd_lock(ct->ct_fd, mask); release_fd_lock(ct->ct_fd, mask);
SwitchToThread();
continue; continue;
} }
} }
if (ct->reply_msg.rm_xid == x_id) { if (ct->reply_msg.rm_xid == x_id) {
ct->use_stored_reply_msg = FALSE; ct->use_stored_reply_msg = FALSE;
ct->reply_msg.rm_xid = 0; ct->reply_msg.rm_xid = 0;
if (!xdr_getreplyunion(xdrs, &ct->reply_msg)) { if (!xdr_getreplyunion(xdrs, &ct->reply_msg))
release_fd_lock(ct->ct_fd, mask); goto out;
return (ct->ct_error.re_status);
}
break; break;
} }
else { else {
@ -639,8 +638,10 @@ call_again:
goto call_again; goto call_again;
} /* end of unsuccessful completion */ } /* end of unsuccessful completion */
ct->reply_msg.rm_direction = -1; ct->reply_msg.rm_direction = -1;
out:
status = ct->ct_error.re_status;
release_fd_lock(ct->ct_fd, mask); release_fd_lock(ct->ct_fd, mask);
return (ct->ct_error.re_status); return status;
} }
static void static void
@ -927,7 +928,7 @@ read_vc(ctp, buf, len)
continue; continue;
ct->ct_error.re_status = RPC_CANTRECV; ct->ct_error.re_status = RPC_CANTRECV;
ct->ct_error.re_errno = errno; ct->ct_error.re_errno = errno;
return (-1); return (-2);
} }
break; break;
} }

View file

@ -87,6 +87,7 @@ char *_get_next_token(char *, int);
bool_t __svc_clean_idle(fd_set *, int, bool_t); bool_t __svc_clean_idle(fd_set *, int, bool_t);
bool_t __xdrrec_setnonblock(XDR *, int); bool_t __xdrrec_setnonblock(XDR *, int);
bool_t __xdrrec_setblock(XDR *);
bool_t __xdrrec_getrec(XDR *, enum xprt_stat *, bool_t); bool_t __xdrrec_getrec(XDR *, enum xprt_stat *, bool_t);
void __xprt_unregister_unlocked(SVCXPRT *); void __xprt_unregister_unlocked(SVCXPRT *);
void __xprt_set_raddr(SVCXPRT *, const struct sockaddr_storage *); void __xprt_set_raddr(SVCXPRT *, const struct sockaddr_storage *);

View file

@ -480,11 +480,10 @@ xdrrec_skiprecord(xdrs)
enum xprt_stat xstat; enum xprt_stat xstat;
if (rstrm->nonblock) { if (rstrm->nonblock) {
if (__xdrrec_getrec(xdrs, &xstat, FALSE)) { if (__xdrrec_getrec(xdrs, &xstat, FALSE))
rstrm->fbtbc = 0;
return TRUE; return TRUE;
}
if (rstrm->in_finger == rstrm->in_boundry && if (rstrm->in_finger == rstrm->in_boundry &&
xstat == XPRT_MOREREQS) { xstat == XPRT_MOREREQS) {
rstrm->fbtbc = 0; rstrm->fbtbc = 0;
return TRUE; return TRUE;
@ -592,6 +591,7 @@ __xdrrec_getrec(xdrs, statp, expectdata)
*statp = XPRT_DIED; *statp = XPRT_DIED;
return FALSE; return FALSE;
} }
rstrm->fbtbc = rstrm->in_header & (~LAST_FRAG);
rstrm->in_reclen += fraglen; rstrm->in_reclen += fraglen;
if (rstrm->in_reclen > rstrm->recvsize) if (rstrm->in_reclen > rstrm->recvsize)
realloc_stream(rstrm, rstrm->in_reclen); realloc_stream(rstrm, rstrm->in_reclen);
@ -601,35 +601,39 @@ __xdrrec_getrec(xdrs, statp, expectdata)
} }
} }
n = rstrm->readit(rstrm->tcp_handle, do {
rstrm->in_base + rstrm->in_received, n = rstrm->readit(rstrm->tcp_handle,
(rstrm->in_reclen - rstrm->in_received)); rstrm->in_base + rstrm->in_received,
(rstrm->in_reclen - rstrm->in_received));
if (n < 0) { /* this case is needed for non-block as socket returns TIMEDOUT and -1
*statp = XPRT_DIED; * -2 is an error case and covered by the next if() statement */
return FALSE; if (n == -1) continue;
}
if (n == 0) { if (n < 0) {
*statp = expectdata ? XPRT_DIED : XPRT_IDLE; *statp = XPRT_DIED;
return FALSE; return FALSE;
} }
rstrm->in_received += n; if (n == 0) {
*statp = expectdata ? XPRT_DIED : XPRT_IDLE;
return FALSE;
}
if (rstrm->in_received == rstrm->in_reclen) { rstrm->in_received += n;
rstrm->in_haveheader = FALSE; if (rstrm->in_received == rstrm->in_reclen) {
rstrm->in_hdrp = (char *)(void *)&rstrm->in_header; rstrm->in_haveheader = FALSE;
rstrm->in_hdrlen = 0; rstrm->in_hdrp = (char *)(void *)&rstrm->in_header;
if (rstrm->last_frag) { rstrm->in_hdrlen = 0;
rstrm->fbtbc = rstrm->in_reclen; if (rstrm->last_frag) {
rstrm->in_boundry = rstrm->in_base + rstrm->in_reclen; rstrm->in_boundry = rstrm->in_base + rstrm->in_reclen;
rstrm->in_finger = rstrm->in_base; rstrm->in_finger = rstrm->in_base;
rstrm->in_reclen = rstrm->in_received = 0; rstrm->in_reclen = rstrm->in_received = 0;
*statp = XPRT_MOREREQS; *statp = XPRT_MOREREQS;
return TRUE; return TRUE;
} }
} }
} while (1);
*statp = XPRT_MOREREQS; *statp = XPRT_MOREREQS;
return FALSE; return FALSE;
@ -649,6 +653,15 @@ __xdrrec_setnonblock(xdrs, maxrec)
return TRUE; return TRUE;
} }
bool_t
__xdrrec_setblock(xdrs)
XDR *xdrs;
{
RECSTREAM *rstrm = (RECSTREAM *)(xdrs->x_private);
rstrm->nonblock = FALSE;
return TRUE;
}
/* /*
* Internal useful routines * Internal useful routines
*/ */