00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include <eyedbconfig.h>
00026
00027 #include <string.h>
00028 #include <pthread.h>
00029
00030 #include <eyedblib/thread.h>
00031
00032 #define USE_RPC_MIN_SIZE
00033
00034 extern int RPC_MIN_SIZE;
00035
00036
00037 #define START_CODE 0x100
00038
00039
00040
00041
00042 extern pid_t rpc_pid;
00043
00044 #ifdef HAVE_STROPTS
00045 #include <stropts.h>
00046 #endif
00047 #include <grp.h>
00048 #include <pwd.h>
00049 #include <fcntl.h>
00050 #include <sys/resource.h>
00051
00052 #include <sys/wait.h>
00053
00054 #include <time.h>
00055 #include <assert.h>
00056 #include <signal.h>
00057 #include <stdio.h>
00058 #include <string.h>
00059 #include <errno.h>
00060 #include <unistd.h>
00061 #include <stdlib.h>
00062 #include <stdarg.h>
00063 #include <sys/stat.h>
00064 #include <eyedblib/log.h>
00065 #include <eyedblib/xdr.h>
00066
00067 #include "rpc_beP.h"
00068 #include <eyedblib/rpc_lib.h>
00069
00070 static rpc_ClientInfo *clientInfo[256];
00071
00072
00073 static int rpc_inputHandle(rpc_Server *, int, int);
00074 static void rpc_garbClientInfo(rpc_Server *server, int, int);
00075 static void rpc_garbRealize(rpc_Server *server, rpc_ClientInfo *ci,
00076 int force);
00077 static rpc_Boolean abortProgram;
00078 static rpc_Server *rpc_mainServer;
00079 static char *progName = "rpc";
00080 static int rpc_timeout;
00081
00082 rpc_ClientId rpc_client_id;
00083
00084 #define rpc_isSocketValid(S) ((S) >= 0)
00085
00086 #define TRACE
00087
00088
00089 void rpc_setTimeOut(int _timeout)
00090 {
00091 #ifdef RPC_TIMEOUT
00092 rpc_timeout = (_timeout ? _timeout : RPC_TIMEOUT);
00093 #else
00094 rpc_timeout = _timeout;
00095 #endif
00096 }
00097
00098 void
00099 PERROR(const char *msg)
00100 {
00101 const char *s = strerror(errno);
00102 utlog("%s : %s\n", msg, (s ? s : "<unknown>"));
00103 fprintf(stderr, "%s : %s\n", msg, (s ? s : "<unknown>"));
00104 }
00105
00106
00107 eyedblib::Mutex msg_mp;
00108 eyedblib::Mutex gen_mp;
00109
00110
00111
00112 #include <limits.h>
00113
00114 static void
00115 msg_init()
00116 {
00117
00118
00119 }
00120
00121 static const char *
00122 msg_make(const char *fmt, ...)
00123 {
00124 va_list ap;
00125 static char str[256];
00126 char buf[256];
00127
00128
00129 eyedblib::MutexLocker _(msg_mp);
00130 va_start(ap, fmt);
00131
00132 #if 0
00133 sprintf(str, "\n[thread %d#%d] %s: ", rpc_getpid(), pthread_self(), progName);
00134 vsprintf(buf, fmt, ap);
00135 (void)strcat(str, buf);
00136 #else
00137 vsprintf(str, fmt, ap);
00138 #endif
00139 va_end(ap);
00140
00141
00142 return str;
00143 }
00144
00145 static void
00146 close_files(void)
00147 {
00148 #if 0
00149 int fd;
00150 int maxfd = sizeof(fd_mask) * 8;
00151
00152 for (fd = 1; fd <= maxfd; fd++)
00153 if (FD_ISSET(fd, &rpc_mainServer->fds_used))
00154 {
00155 if (close(fd) < 0)
00156 PERROR (msg_make("error closing file %d", fd));
00157 }
00158 #endif
00159 }
00160
00161 static void
00162 close_clients(void)
00163 {
00164 int fd;
00165
00166 abortProgram = rpc_True;
00167
00168 for (fd = 0; fd < sizeof(clientInfo)/sizeof(clientInfo[0]); fd++)
00169 if (clientInfo[fd])
00170 rpc_garbClientInfo(rpc_mainServer, 0, fd);
00171
00172
00173
00174
00175
00176 }
00177
00178 static char *rpc_unixPort;
00179
00180 #define DO_NOT_EXIT -128000
00181
00182 namespace eyedbsm {
00183 extern void mutexes_release();
00184 }
00185
00186 namespace eyedb {
00187 extern void IDB_releaseConn();
00188 }
00189
00190 static void
00191 _QUIT_(int fromcore)
00192 {
00193 eyedbsm::mutexes_release();
00194 eyedb::IDB_releaseConn();
00195
00196 if (rpc_quit_handler)
00197 rpc_quit_handler(rpc_quit_data, fromcore);
00198 }
00199
00200 void
00201 rpc_quit(int rc, int fromcore)
00202 {
00203 _QUIT_(fromcore);
00204
00205 close_clients();
00206 close_files();
00207
00208 if (rc != DO_NOT_EXIT)
00209 exit(rc);
00210 }
00211
00212 void
00213 rpc_unlink_socket()
00214 {
00215 if (rpc_unixPort)
00216 unlink(rpc_unixPort);
00217 }
00218
00219 static void
00220 signal_handler(int sig)
00221 {
00222 int s;
00223 for (s = 0; s < NSIG; s++)
00224 signal(s, SIG_DFL);
00225 IDB_LOG(IDB_LOG_CONN, ("backend got %s [signal=%d]\n", strsignal(sig), sig));
00226
00227 if (getenv("EYEDBDEBUG_"))
00228 sleep(1000);
00229
00230 if (sig == SIGBUS || sig == SIGSEGV || sig == SIGABRT) {
00231 IDB_LOG(IDB_LOG_CONN, ("backend fatal signal...\n"));
00232
00233 rpc_quit(DO_NOT_EXIT, 1);
00234
00235 if (getenv("EYEDBDBG")) {
00236 for (;;) sleep(1000);
00237 }
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247 }
00248
00249 rpc_quit(0, 0);
00250 raise(sig);
00251 exit(0x80|sig);
00252 }
00253
00254 void rpc_setProgName(const char *name)
00255 {
00256 progName = strdup(name);
00257 }
00258
00259 static void
00260 rpc_ServerInit()
00261 {
00262 static rpc_Boolean init = rpc_False;;
00263
00264 if (!init) {
00265 struct rlimit rlp;
00266
00267 if (!getrlimit(RLIMIT_NOFILE, &rlp)) {
00268 rlp.rlim_cur = rlp.rlim_max;
00269 setrlimit(RLIMIT_NOFILE, &rlp);
00270 }
00271
00272 msg_init();
00273
00274 signal(SIGHUP, signal_handler);
00275 signal(SIGTERM, signal_handler);
00276 signal(SIGINT, signal_handler);
00277 signal(SIGQUIT, signal_handler);
00278 signal(SIGSEGV, signal_handler);
00279 signal(SIGBUS, signal_handler);
00280 signal(SIGABRT, signal_handler);
00281
00282 signal(SIGPIPE, SIG_IGN);
00283
00284 init = rpc_True;
00285 }
00286 }
00287
00288 #define RPC_MAXARGS 32
00289
00290 static rpc_ClientInfo*
00291 rpc_newClientInfo(rpc_Server *server, int fd[], int fd_cnt)
00292 {
00293 rpc_ClientInfo *ci = rpc_new(rpc_ClientInfo);
00294 int i;
00295
00296 for (i = 0; i < fd_cnt; i++)
00297 clientInfo[fd[i]] = ci;
00298
00299 ci->fd_cnt = fd_cnt;
00300 ci->refcnt = fd_cnt;
00301
00302 ci->tid = (pthread_t *)malloc(server->conn_cnt * sizeof(pthread_t));
00303 ci->ua = (rpc_ServerArg *)malloc(server->conn_cnt * sizeof(rpc_ServerArg));
00304 ci->comm_buff = (char **)malloc(server->conn_cnt * sizeof(char *));
00305
00306 for (i = 0; i < server->conn_cnt; i++) {
00307 ci->comm_buff[i] = (char *)calloc(server->comm_size, 1);
00308 ci->ua[i] = (rpc_ServerArg)calloc(server->args_size * RPC_MAXARGS, 1);
00309 }
00310
00311 return ci;
00312 }
00313
00314
00315 rpc_Server *rpc_serverCreate(rpc_ServerMode mode, unsigned long magic,
00316 int conn_cnt, int comm_size,
00317 void (*init)(int *, int, rpc_ConnInfo *),
00318 void (*release)(rpc_ConnInfo *),
00319 void (*begin)(int, void *),
00320 void (*end)(int, void *), void *user_data)
00321 {
00322 if (mode != rpc_MonoProc && mode != rpc_MultiProcs &&
00323 mode != rpc_MultiThreaded && mode != rpc_FrontThreaded)
00324 return 0;
00325 else {
00326 rpc_Server *server = rpc_new(rpc_Server);
00327
00328 server->last_type = rpc_NBaseType;
00329 server->conn_cnt = conn_cnt;
00330 if (!comm_size)
00331 comm_size = RPC_COMM_SIZE;
00332 server->comm_size = comm_size;
00333 server->mode = mode;
00334 server->magic = magic;
00335 server->init = init;
00336 server->release = release;
00337 server->begin = begin;
00338 server->end = end;
00339 server->user_data = user_data;
00340
00341 #ifdef TRACE
00342 utlog(msg_make("serverCreate conn_cnt = %d\n", conn_cnt));
00343 #endif
00344 rpc_ServerInit();
00345
00346 return server;
00347 }
00348 }
00349
00350 rpc_ArgType rpc_makeServerUserType(rpc_Server *server, int size,
00351 rpc_UserArgFunction func)
00352
00353 {
00354 rpc_UserType *utyp = rpc_getUTyp(server, server->last_type);
00355 utyp->size = size;
00356 utyp->func = func;
00357 return server->last_type++;
00358 }
00359
00360 rpc_ServerFunction *
00361 rpc_makeUserServerFunction(rpc_Server *server, rpc_RpcDescription *rd,
00362 rpc_UserServerFunction uf)
00363 {
00364 rpc_ServerFunction *func = rpc_new(rpc_ServerFunction);
00365
00366 rd->args[rd->nargs-1].type = rd->arg_ret;
00367 rd->args[rd->nargs-1].send_rcv = rpc_Rcv;
00368
00369 while (rd->args[rd->nargs-1].type == rpc_VoidType)
00370 rd->nargs--;
00371
00372 func->rd = rd;
00373 func->uf = uf;
00374
00375 #ifdef RPC_FUN_CHAIN
00376 func->next = server->first;
00377 server->first = func;
00378 #else
00379 assert(rd->code - START_CODE < sizeof(server->funcs)/sizeof(server->funcs[0]) && rd->code - START_CODE >= 0);
00380 server->funcs[rd->code - START_CODE] = func;
00381 #endif
00382
00383 return func;
00384 }
00385
00386 rpc_Status
00387 rpc_setServerArgSize(rpc_Server *server, int args_size)
00388 {
00389 server->args_size = args_size;
00390 return rpc_Success;
00391 }
00392
00393 void
00394 rpc_serverOptionsGet(int argc, char *argv[], char **portname, char **unixname)
00395 {
00396 int i;
00397
00398 *portname = 0;
00399 *unixname = 0;
00400
00401 for (i = 1; i < argc; ) {
00402 char *s = argv[i];
00403 if (s[0] == '-') {
00404 if (!strcmp(s, "-inetd")) {
00405 if (i+1 >= argc)
00406 return;
00407
00408 *portname = argv[++i];
00409 i++;
00410 }
00411 else if (!strcmp(s, "-unixd")) {
00412 if (i+1 >= argc)
00413 return;
00414
00415 *unixname = argv[++i];
00416 if (strlen(*unixname) >= sizeof(((struct sockaddr_un *)0)->sun_path)) {
00417 utlog(msg_make("eyedb fatal error: unix filename too long (must be < %d\n"),
00418 sizeof(((struct sockaddr_un *)0)->sun_path));
00419 return;
00420 }
00421 i++;
00422 }
00423 }
00424 else
00425 return;
00426 }
00427 }
00428
00429
00430
00431 static void
00432 bind_error(const char *portname, bool tcpip)
00433 {
00434 fprintf(stderr, "\nPerharps another eyedbd is running on ");
00435 if (tcpip)
00436 fprintf(stderr, "TCP/IP port %s\n", portname);
00437 else
00438 fprintf(stderr, "named pipe port:\n%s\n", portname);
00439
00440 fprintf(stderr, "\nYou may check this by launching:\n");
00441 fprintf(stderr, "eyedbctl status --port=%s\n", portname);
00442 if (!tcpip) {
00443 fprintf(stderr, "\nIf no, unlink this port as follows:\n");
00444 fprintf(stderr, "rm -f %s\n", portname);
00445 fprintf(stderr, "and relaunch the server.\n");
00446 }
00447 }
00448
00449 static void
00450 rpc_socket_reuse_addr(int s)
00451 {
00452 int val;
00453 val = 1;
00454 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char *)&val, sizeof(val)) < 0)
00455 PERROR("setsockopt reuseaddr");
00456 }
00457
00458 rpc_Status
00459 rpc_portOpen(rpc_Server *server, const char *servname, const char *portname,
00460 rpc_PortHandle **pport)
00461 {
00462 char hname[128];
00463 int portnumber;
00464 rpc_PortHandle *port = rpc_new(rpc_PortHandle);
00465 const char *t_portname;
00466
00467 t_portname = rpc_getPortAttr(portname, &port->domain, &port->type);
00468 if (!t_portname) {
00469 fprintf(stderr, "invalid port '%s'", portname);
00470 return rpc_Error;
00471 }
00472 portname = t_portname;
00473 port->server = server;
00474 port->portname = strdup(portname);
00475
00476 *pport = port;
00477
00478 if (port->domain == AF_INET) {
00479 if ((port->u.in.sockin_fd = socket(AF_INET, port->type, 0)) < 0) {
00480 PERROR(msg_make("eyedb fatal error: unable to create inet socket port '%s'", port->portname) );
00481 return rpc_Error;
00482 }
00483
00484 rpc_socket_reuse_addr(port->u.in.sockin_fd);
00485 rpc_socket_nodelay(port->u.in.sockin_fd);
00486
00487 port->u.in.sock_in_name.sin_family = AF_INET;
00488 port->u.in.sock_in_name.sin_port = htons(atoi(portname));
00489
00490
00491 if (servname == 0) {
00492 if (gethostname(hname, sizeof(hname)-1) < 0) {
00493 PERROR(msg_make("eyedb fatal error: gethostname failed") );
00494 return rpc_Error;
00495 }
00496 hname[sizeof(hname)-1] = 0;
00497 }
00498 else
00499 strcpy(hname, servname);
00500
00501 if (!rpc_hostNameToAddr(hname, &port->u.in.sock_in_name.sin_addr)) {
00502 utlog(msg_make("eyedb fatal error: unknown host '%s'\n", hname));
00503 fprintf(stderr, msg_make("unknown host '%s'\n", hname));
00504 return rpc_Error;
00505 }
00506
00507 if (bind(port->u.in.sockin_fd, (struct sockaddr *)&port->u.in.sock_in_name,
00508 sizeof(port->u.in.sock_in_name)) < 0 ) {
00509 PERROR(msg_make("eyedb fatal error: bind (naming the socket) failed port '%s'", port->portname));
00510 bind_error(port->portname, true);
00511 return rpc_Error;
00512 }
00513
00514 if (rpc_isSocketValid(port->u.in.sockin_fd) &&
00515 (port->type == SOCK_STREAM && listen(port->u.in.sockin_fd, 2) < 0)) {
00516 PERROR(msg_make("eyedb fatal error: listen for inet socket port '%s'", port->portname) );
00517 return rpc_Error;
00518 }
00519 }
00520
00521 if (port->domain == AF_UNIX) {
00522 rpc_checkAFUnixPort(portname);
00523
00524 #ifdef HAVE_FATTACH
00525 int pfd[2];
00526 int fd;
00527 int created = 0;
00528
00529 if ((fd = open(portname, O_RDONLY)) < 0) {
00530 if ((fd = creat(portname, 0666)) < 0) {
00531 PERROR(msg_make("eyedb fatal error: cannot create file '%s'",
00532 portname));
00533 return rpc_Error;
00534 }
00535 created = 1;
00536 }
00537
00538 if (fchmod(fd, 0666) < 0) {
00539 if (created) unlink(portname);
00540 PERROR(msg_make("eyedb fatal error: cannot change file '%s' mode",
00541 portname));
00542 return rpc_Error;
00543 }
00544
00545 close(fd);
00546
00547 if (pipe(pfd) < 0) {
00548 if (created) unlink(portname);
00549 PERROR(msg_make("eyedb fatal error: unable to create pipe"));
00550 return rpc_Error;
00551 }
00552
00553
00554 if (ioctl(pfd[0], I_PUSH, "connld") < 0) {
00555 if (created) unlink(portname);
00556 PERROR(msg_make("eyedb fatal error: unable to configure pipe"));
00557 return rpc_Error;
00558 }
00559
00560
00561 if (fattach(pfd[0], port->portname) < 0) {
00562 if (created) unlink(portname);
00563 PERROR(msg_make("eyedb fatal error: unable to attach stream to file '%s'", port->portname) );
00564 return rpc_Error;
00565 }
00566
00567 port->u.un.sockun_fd = pfd[1];
00568 #else
00569 if ((port->u.un.sockun_fd = socket(AF_UNIX, port->type, 0)) < 0) {
00570 PERROR(msg_make("eyedb fatal error: unable to create unix socket port '%s'", port->portname) );
00571 return rpc_Error;
00572 }
00573
00574 port->u.un.sock_un_name.sun_family = AF_UNIX;
00575 strcpy(port->u.un.sock_un_name.sun_path, portname);
00576
00577 if (bind(port->u.un.sockun_fd,
00578 (struct sockaddr *)&port->u.un.sock_un_name,
00579 sizeof(port->u.un.sock_un_name)) < 0 ) {
00580 PERROR(msg_make("eyedb fatal error: bind (naming the socket) failed port '%s'", port->portname));
00581 bind_error(port->portname, false);
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591 return rpc_Error;
00592 }
00593
00594 chmod(portname, 0777);
00595 if (rpc_isSocketValid(port->u.un.sockun_fd) &&
00596 listen(port->u.un.sockun_fd, 2) < 0 ) {
00597 PERROR(msg_make("eyedb fatal error: listen for unix socket port '%s'", port->portname) );
00598 return rpc_Error;
00599 }
00600 #endif
00601 }
00602
00603 return rpc_Success;
00604 }
00605
00606 typedef struct {
00607 rpc_Server *server;
00608 int fd;
00609 int which;
00610 } rpc_ThreadArg;
00611
00612 static void check_fd(int fd, const char *msg)
00613 {
00614 struct stat stat;
00615 if (fstat(fd, &stat) < 0)
00616 utlog(msg_make("%s: error fd=%d is not a valid file descriptor\n",
00617 msg, fd));
00618 }
00619
00620 static char exiting = 0;
00621
00622 static eyedblib::Mutex exit_mp;
00623 static eyedblib::Mutex wait_thr_mp;
00624 static eyedblib::Condition *wait_thr_cond;
00625
00626 static void *wait_thr(void *arg)
00627 {
00628 pid_t pid = *(pid_t *)arg;
00629 free(arg);
00630 #ifdef TRACE2
00631 printf("%d:%d waiting for pid %d\n", rpc_getpid(), pthread_self(), pid);
00632 #endif
00633 int status;
00634 wait_thr_cond->signal();
00635 unsigned max_loop = 0;
00636 do {
00637 if (max_loop > 100)
00638 break;
00639 errno = 0;
00640 waitpid(pid, &status, 0);
00641 max_loop++;
00642 } while(errno);
00643
00644 #ifdef TRACE2
00645 printf("%d:%d done pid %d\n", rpc_getpid(), pthread_self(), pid);
00646 #endif
00647 pthread_detach(pthread_self());
00648 pthread_exit(&status);
00649
00650 return 0;
00651 }
00652
00653 static void *serv_thr(void *arg)
00654 {
00655 rpc_ThreadArg *thr_arg = (rpc_ThreadArg *)arg;
00656 int fd = thr_arg->fd;
00657 int which = thr_arg->which;
00658 rpc_Server *server = thr_arg->server;
00659 free(thr_arg);
00660
00661 rpc_setConnFd(fd);
00662 IDB_LOG(IDB_LOG_CONN, ("new thread %d [fd = %d, which=%d], stack = 0x%x\n", pthread_self(),
00663 fd, which, &server));
00664
00665 rpc_client_id = (rpc_ClientId)fd;
00666
00667 for (;;)
00668 if (!rpc_inputHandle(server, which, fd)) {
00669 if (server->mode == rpc_MultiThreaded || server->conn_cnt > 1) {
00670 eyedblib::MutexLocker _(exit_mp);
00671 void *status = 0;
00672 #ifdef TRACE
00673 utlog(msg_make("%d thread EXIT\n", pthread_self()));
00674 #endif
00675 #ifdef TRACE2
00676 fprintf(stderr, "%d:%d thread EXIT\n", rpc_getpid(), pthread_self());
00677 fflush(stderr);
00678 #endif
00679
00680 rpc_garbClientInfo(server, 0, fd);
00681 exit(0);
00682 pthread_exit(&status);
00683 }
00684 else
00685 break;
00686 }
00687 return 0;
00688 }
00689
00690
00691 static int rpc_serverPort;
00692
00693 int rpc_serverPortGet()
00694 {
00695 return rpc_serverPort;
00696 }
00697
00698 static void
00699 rpc_serverStart(rpc_Server *server, int *fd, int fd_cnt, rpc_ConnInfo *rpc_ci)
00700 {
00701 if (server->connh)
00702 (*server->connh)(server, (rpc_ClientId)fd[0], rpc_True);
00703
00704 if (server->init)
00705 server->init(fd, fd_cnt, rpc_ci);
00706 }
00707
00708 static void
00709 rpc_serverEnd(rpc_Server *server, rpc_ConnInfo *rpc_ci)
00710 {
00711 if (server->release)
00712 server->release(rpc_ci);
00713 }
00714
00715 static void
00716 rpc_makeThread(rpc_Server *server, int which, int fd, rpc_ClientInfo *ci)
00717 {
00718 rpc_ThreadArg *thr_arg = rpc_new(rpc_ThreadArg);
00719
00720 thr_arg->server = server;
00721 thr_arg->fd = fd;
00722 thr_arg->which = which;
00723
00724 #ifdef TRACE
00725 utlog(msg_make("rpc_makeThread which=%d, fd=%d\n", which, fd));
00726 #endif
00727
00728 pthread_create(&ci->tid[which], NULL, serv_thr, thr_arg);
00729 }
00730
00731 static void
00732 rpc_makeNewConnection(rpc_Server *server, int new_fd[], int fd_cnt,
00733 int *pmax_fd, rpc_ConnInfo *rpc_ci)
00734 {
00735
00736 rpc_ThreadArg *thr_arg;
00737 int main_fd = new_fd[0];
00738 rpc_ClientInfo *ci;
00739 int i;
00740
00741 ci = rpc_newClientInfo(server, new_fd, fd_cnt);
00742
00743 #ifdef TRACE
00744 {
00745 char buf[2048];
00746 strcpy(buf, "new connection : ");
00747 for (i = 0; i < fd_cnt; i++) {
00748 char tok[32];
00749 if (i)
00750 strcat(buf, ", ");
00751
00752 sprintf(tok, "fd = %d", new_fd[i]);
00753 strcat(buf, tok);
00754 }
00755 strcat(buf, "\n");
00756
00757 utlog(buf);
00758 }
00759 #endif
00760
00761 if (server->mode == rpc_MonoProc) {
00762 for (i = 0; i < fd_cnt; i++)
00763 {
00764 if (new_fd[i] > *pmax_fd)
00765 *pmax_fd = new_fd[i];
00766 FD_SET(new_fd[i], &server->fds_used);
00767 }
00768 rpc_serverStart(server, new_fd, fd_cnt, rpc_ci);
00769 rpc_serverEnd(server, rpc_ci);
00770 }
00771 else {
00772 if (server->mode == rpc_MultiThreaded) {
00773 rpc_serverStart(server, new_fd, fd_cnt, rpc_ci);
00774 for (i = 0; i < fd_cnt; i++)
00775 rpc_makeThread(server, i, new_fd[i], ci);
00776 rpc_serverEnd(server, rpc_ci);
00777 }
00778 else if (server->mode == rpc_MultiProcs) {
00779 pid_t child_pid;
00780
00781 if ((child_pid = fork()) == 0) {
00782 rpc_pid = getpid();
00783 const char *w;
00784 if ((w = getenv("EYEDBWAIT"))) {
00785 int sec = atoi(w);
00786 if (!sec)
00787 sec = 30;
00788 printf("Pid %d waiting for %d seconds\n", rpc_getpid(), sec);
00789 sleep(sec);
00790 printf("Continuing...\n");
00791 }
00792
00793 rpc_serverStart(server, new_fd, fd_cnt, rpc_ci);
00794 if (fd_cnt > 1) {
00795 for (i = 0; i < fd_cnt; i++)
00796 rpc_makeThread(server, i, new_fd[i], ci);
00797
00798 for (i = 0; i < fd_cnt; i++) {
00799 void *status;
00800 pthread_join(ci->tid[i], &status);
00801 }
00802
00803 free(ci->tid);
00804 free(ci);
00805 #ifdef TRACE
00806 utlog(msg_make("all threads terminated\n"));
00807 #endif
00808 }
00809 else {
00810 thr_arg = rpc_new(rpc_ThreadArg);
00811 thr_arg->server = server;
00812 thr_arg->fd = new_fd[0];
00813 thr_arg->which = 0;
00814 serv_thr(thr_arg);
00815 }
00816
00817 rpc_serverEnd(server, rpc_ci);
00818 rpc_quit(0, 0);
00819 }
00820
00821 wait_thr_cond = new eyedblib::Condition();
00822 pthread_t wait_thr_p;
00823 pid_t *pid = new pid_t(child_pid);
00824 errno = 0;
00825 if (!pthread_create(&wait_thr_p, 0, wait_thr, pid))
00826 wait_thr_cond->wait();
00827 else
00828 IDB_LOG(IDB_LOG_CONN, ("cannot create waiting thread\n"));
00829
00830 delete wait_thr_cond;
00831
00832 rpc_garbRealize(server, ci, 1);
00833 free(rpc_ci);
00834 }
00835 }
00836 }
00837
00838 typedef struct {
00839 time_t begin;
00840 int *fd;
00841 int fd_cnt;
00842 } rpc_MultiConnEntry;
00843
00844 #define MAX_MULTICONN_ENTRIES 64
00845
00846 static rpc_MultiConnEntry multiConnEntry[MAX_MULTICONN_ENTRIES];
00847 static rpc_ConnInfo *multiConnInfo[256];
00848 static rpc_Boolean multiConn[256];
00849
00850 static void
00851 rpc_multiConnClear(rpc_Server *server, int fd, const char *msg)
00852 {
00853 int r;
00854 multiConn[fd] = rpc_False;
00855
00856 multiConnInfo[fd] = 0;
00857 FD_CLR(fd, &server->fds_used);
00858 r = close(fd);
00859 utlog(msg_make("%s: multi close fd=%d r = %d\n", msg, fd, r));
00860 }
00861
00862 static void
00863 rpc_multiConnSuppress(rpc_Server *server, rpc_MultiConnEntry *mce,
00864 rpc_Boolean do_close)
00865 {
00866 int i;
00867
00868 for (i = 0; i < mce->fd_cnt; i++) {
00869 int fd = mce->fd[i];
00870 if (do_close) {
00871 close(fd);
00872 }
00873 multiConn[fd] = rpc_False;
00874 FD_CLR(fd, &server->fds_used);
00875 }
00876
00877 free(mce->fd);
00878 mce->fd = (int *)0;
00879 mce->fd_cnt = 0;
00880 }
00881
00882 static void
00883 rpc_addRealize(rpc_MultiConnEntry *mce, int fd_cnt, int new_fd)
00884 {
00885 time(&mce->begin);
00886 mce->fd = (int *)calloc(sizeof(int), fd_cnt);
00887 mce->fd[0] = new_fd;
00888 mce->fd_cnt = 1;
00889 }
00890
00891 static int
00892 rpc_multiConnAddEntry(rpc_Server *server, int new_fd)
00893 {
00894 rpc_MultiConnEntry *mce = multiConnEntry;
00895 int fd_cnt = server->conn_cnt;
00896 int i;
00897 time_t now;
00898
00899 for (i = 0; i < MAX_MULTICONN_ENTRIES; i++, mce++)
00900 if (!mce->fd_cnt) {
00901 rpc_addRealize(mce, fd_cnt, new_fd);
00902 return i;
00903 }
00904
00905 #ifdef TRACE
00906 utlog(msg_make("multiConnAddEntry: TRIES GARBAGE!\n"));
00907 #endif
00908
00909
00910 time(&now);
00911 mce = multiConnEntry;
00912 for (i = 0; i < MAX_MULTICONN_ENTRIES; i++, mce++)
00913 if ((now - mce->begin) > 10) {
00914 rpc_multiConnSuppress(server, mce, rpc_True);
00915 rpc_addRealize(mce, fd_cnt, new_fd);
00916 return i;
00917 }
00918
00919 return -1;
00920 }
00921
00922 static rpc_MultiConnEntry *
00923 rpc_multiConnGetEntry(int xid)
00924 {
00925 rpc_MultiConnEntry *mce;
00926
00927 if (xid < 0 || xid >= MAX_MULTICONN_ENTRIES)
00928 return 0;
00929
00930 mce = &multiConnEntry[xid];
00931 if (!mce->fd_cnt)
00932 return 0;
00933
00934 return mce;
00935 }
00936
00937 static void
00938 rpc_multiConnManage(rpc_Server *server, int fd, int *pmaxfd)
00939 {
00940 rpc_MultiConnInfo info;
00941 rpc_MultiConnInfo xinfo;
00942
00943 #ifdef TRACE
00944 utlog(msg_make("rpc_multiConnManage %d\n", fd));
00945 #endif
00946
00947 if (rpc_socketReadTimeout(fd, &info, sizeof(info), 10) != sizeof(info)) {
00948 #ifdef TRACE
00949 utlog(msg_make("timeout for fd=%d\n", fd));
00950 #endif
00951 rpc_multiConnClear(server, fd, "timeout");
00952 return;
00953 }
00954 x2h_rpc_multiconninfo(&info);
00955
00956 #ifdef TRACE2
00957 utlog(msg_make("get magic %x, cmd = %x, xid = %d\n", info.magic, info.cmd,
00958 info.xid));
00959 #endif
00960 if (info.magic != MM(server->magic)) {
00961 fprintf(stderr, "bad magic: %x vs %x\n", info.magic, MM(server->magic));
00962 rpc_multiConnClear(server, fd, "bad magic");
00963 return;
00964 }
00965
00966 if (info.cmd == rpc_NewConnection) {
00967 info.xid = rpc_multiConnAddEntry(server, fd);
00968
00969 info.cmd = rpc_ReplyNewConnection;
00970
00971 h2x_rpc_multiconninfo(&xinfo, &info);
00972 if (rpc_socketWrite(fd, &xinfo, sizeof(xinfo)) != sizeof(xinfo)) {
00973 rpc_multiConnClear(server, fd, "bad new connection");
00974 return;
00975 }
00976 }
00977 else if (info.cmd == rpc_AssociatedConnection) {
00978 rpc_MultiConnEntry *mce;
00979 mce = rpc_multiConnGetEntry(info.xid);
00980 if (!mce) {
00981 rpc_multiConnClear(server, fd, "no mce");
00982 return;
00983 }
00984
00985 #ifdef TRACE2
00986 utlog(msg_make("rpc_AssociatedConnection\n"));
00987 #endif
00988 mce->fd[mce->fd_cnt++] = fd;
00989
00990 info.cmd = rpc_ReplyNewConnection;
00991 h2x_rpc_multiconninfo(&xinfo, &info);
00992 if (rpc_socketWrite(fd, &xinfo, sizeof(xinfo)) != sizeof(xinfo)) {
00993 rpc_multiConnClear(server, fd, "bad reply connection");
00994 return;
00995 }
00996 if (mce->fd_cnt == server->conn_cnt) {
00997 rpc_makeNewConnection(server, mce->fd, mce->fd_cnt, pmaxfd,
00998 multiConnInfo[fd]);
00999
01000 rpc_multiConnSuppress(server, mce, rpc_True);
01001 }
01002 }
01003 else
01004 rpc_multiConnClear(server, fd, "command not understood");
01005 }
01006
01007 rpc_Status
01008 rpc_serverMainLoop(rpc_Server *server, rpc_PortHandle **ports, int nports)
01009 {
01010 int max_fd = 0;
01011 int n, fd, new_fd;
01012 fd_set fds_ready_to_read;
01013 rpc_PortHandle *portdb[sizeof(fd_set)*8];
01014
01015 rpc_mainServer = server;
01016
01017 memset(portdb, 0, sizeof(portdb));
01018
01019 FD_ZERO(&server->fds_used);
01020
01021 for (n = 0; n < nports; n++) {
01022 rpc_PortHandle *port = ports[n];
01023
01024 if (port->domain == AF_INET) {
01025 fd = port->u.in.sockin_fd;
01026 rpc_serverPort = atoi(port->portname);
01027 }
01028 else if (port->domain == AF_UNIX) {
01029 fd = port->u.un.sockun_fd;
01030 rpc_unixPort = port->portname;
01031 }
01032
01033 if (fd > max_fd)
01034 max_fd = fd;
01035
01036 portdb[fd] = port;
01037
01038 FD_SET(fd, &server->fds_used);
01039 }
01040
01041 for (;;) {
01042 #ifdef HAVE_FATTACH
01043 struct strrecvfd info;
01044 #endif
01045 fds_ready_to_read = server->fds_used;
01046
01047
01048 n = select (max_fd+1, &fds_ready_to_read, 0, 0, 0);
01049
01050 if (n < 0) {
01051 if (errno == EINTR) {
01052 continue;
01053 }
01054 else {
01055 PERROR(msg_make("error in select"));
01056
01057
01058
01059
01060
01061 {
01062 int ifd;
01063 struct stat stat;
01064 for (ifd = 0; ifd <= max_fd; ifd++)
01065 if (FD_ISSET(ifd, &server->fds_used) && fstat(ifd, &stat)<0) {
01066 utlog("warning, fd is invalid %d\n", ifd);
01067 FD_CLR(ifd, &server->fds_used);
01068 }
01069 continue;
01070 }
01071
01072 }
01073 }
01074
01075 for (fd = 0; fd <= max_fd; fd++)
01076 if (FD_ISSET(fd, &fds_ready_to_read)) {
01077 rpc_PortHandle *port;
01078 if (port = portdb[fd]) {
01079
01080 struct sockaddr *sock_addr;
01081 socklen_t length;
01082
01083 if (port->domain == AF_INET) {
01084 sock_addr = (struct sockaddr *)&port->u.in.sock_in_name;
01085 length = sizeof(port->u.in.sock_in_name);
01086 }
01087 else {
01088 sock_addr = (struct sockaddr *)&port->u.un.sock_un_name;
01089 length = sizeof(port->u.un.sock_un_name);
01090 }
01091
01092 #ifdef HAVE_FATTACH
01093 if (port->domain == AF_UNIX) {
01094 if (ioctl(fd, I_RECVFD, &info) < 0) {
01095 PERROR("ioctl");
01096 continue;
01097 }
01098
01099 IDB_LOG(IDB_LOG_CONN,
01100 ("connection from %s %s\n",
01101 getpwuid(info.uid)->pw_name,
01102 (getgrgid(info.gid) ?
01103 getgrgid(info.gid)->gr_name : "")));
01104
01105 new_fd = info.fd;
01106 }
01107 else
01108 #endif
01109 if ((new_fd = accept(fd, sock_addr, &length)) < 0)
01110 PERROR("accept connection");
01111
01112 if (new_fd >= 0) {
01113 rpc_ConnInfo *ci;
01114 if (port->domain == AF_UNIX) {
01115 #ifdef HAVE_FATTACH
01116 ci = rpc_make_stream_conninfo(new_fd, &info);
01117 #else
01118 ci = rpc_make_unix_conninfo(new_fd);
01119 #endif
01120 }
01121 else {
01122 rpc_socket_nodelay(new_fd);
01123 ci = rpc_make_tcpip_conninfo(new_fd);
01124 }
01125
01126 if (!ci) {
01127 close(new_fd);
01128 continue;
01129 }
01130
01131 if (server->conn_cnt > 1) {
01132 FD_SET(new_fd, &server->fds_used);
01133 if (new_fd > max_fd)
01134 max_fd = new_fd;
01135 multiConn[new_fd] = rpc_True;
01136 multiConnInfo[new_fd] = ci;
01137 }
01138 else
01139 rpc_makeNewConnection(server, &new_fd, 1, &max_fd, ci);
01140 }
01141 }
01142 else if (multiConn[fd])
01143 rpc_multiConnManage(server, fd, &max_fd);
01144 else {
01145 if (!rpc_inputHandle(server, 0, fd)) {
01146 rpc_garbClientInfo(server, 0, fd);
01147 }
01148 }
01149 }
01150 }
01151
01152 return rpc_Success;
01153 }
01154
01155 rpc_Server *
01156 rpc_getMainServer()
01157 {
01158 return rpc_mainServer;
01159 }
01160
01161 rpc_ConnectionHandlerFunction
01162 rpc_setConnectionHandler(rpc_Server *server,
01163 rpc_ConnectionHandlerFunction connh)
01164 {
01165 rpc_ConnectionHandlerFunction oconnh = server->connh;
01166
01167 server->connh = connh;
01168
01169 return oconnh;
01170 }
01171
01172
01173 static int rpc_serverArgsMake(rpc_Server *, int, int, rpc_ServerFunction **,
01174 rpc_SendRcv, rpc_FromTo);
01175
01176 static int
01177 rpc_inputHandle(rpc_Server *server, int which, int fd)
01178 {
01179 rpc_ServerFunction *func;
01180 rpc_Status status;
01181 rpc_ClientInfo *ci = clientInfo[fd];
01182
01183 if (server->begin)
01184 server->begin(which, server->user_data);
01185
01186 if (which < 0 || which >= server->conn_cnt)
01187 return 0;
01188
01189 if (!rpc_serverArgsMake(server, which, fd, &func, rpc_Send,
01190 rpc_From))
01191 return 0;
01192
01193 (*func->uf)((rpc_ClientId)fd, ci->ua[which]);
01194
01195 if (!rpc_serverArgsMake(server, which, fd, &func, rpc_Rcv,
01196 rpc_To))
01197 return 0;
01198
01199 if (server->end)
01200 server->end(which, server->user_data);
01201
01202 return 1;
01203 }
01204
01205 static void
01206 rpc_garbRealize(rpc_Server *server, rpc_ClientInfo *ci, int force)
01207 {
01208 int i;
01209
01210 for (i = 0; i < server->conn_cnt; i++) {
01211 free(ci->comm_buff[i]);
01212 free(ci->ua[i]);
01213 }
01214
01215 free(ci->comm_buff);
01216 free(ci->ua);
01217 if (force) {
01218 free(ci->tid);
01219 free(ci);
01220 }
01221
01222 for (i = 0; i < sizeof(clientInfo)/sizeof(clientInfo[0]); i++)
01223 if (clientInfo[i] == ci)
01224 clientInfo[i] = 0;
01225 }
01226
01227 static void
01228 rpc_garbClientInfo(rpc_Server *server, int which, int fd)
01229 {
01230 rpc_ClientInfo *ci = clientInfo[fd];
01231
01232 #ifdef TRACE2
01233 printf("%d:%d garbClientInfo...\n", rpc_getpid(), pthread_self());
01234 #endif
01235 #ifdef TRACE
01236 utlog(msg_make("rpc_garbClientInfo(which = %d, fd = %d, ci = %p)\n",
01237 which, fd, ci));
01238 #endif
01239 if (!ci)
01240 return;
01241
01242 eyedblib::MutexLocker _(gen_mp);
01243
01244 #ifdef TRACE
01245 utlog(msg_make("refcnt = %d, fd_cnt = %d\n",
01246 ci->refcnt, ci->fd_cnt));
01247 #endif
01248
01249 if (!which && server->connh)
01250 (*server->connh)(server, (rpc_ClientId)fd, rpc_False);
01251
01252 if (!--ci->refcnt)
01253 rpc_garbRealize(server, ci, 0);
01254
01255 clientInfo[fd] = 0;
01256
01257 FD_CLR(fd, &server->fds_used);
01258 #ifdef TRACE
01259
01260 #endif
01261 close(fd);
01262
01263 #if 0
01264 if (ci->refcnt) {
01265 _.unlock();
01266 int i = 0;
01267 for (i = 0; i < ci->fd_cnt; i++) {
01268 if (ci->tid[i] == pthread_self())
01269 break;
01270 }
01271
01272 for (++i; i < ci->fd_cnt; i++) {
01273 printf("%d:%d killing thread %d:%d\n", rpc_getpid(),
01274 pthread_self(), rpc_getpid(), ci->tid[i]);
01275 pthread_kill(ci->tid[i], SIGTERM);
01276 }
01277 }
01278 #endif
01279
01280 #ifdef TRACE
01281 utlog(msg_make("close connection fd=%d\n", fd));
01282 #endif
01283
01284 }
01285
01286
01287
01288
01289
01290 rpc_ClientInfo *
01291 rpc_clientInfoGet(int fd)
01292 {
01293 return clientInfo[fd];
01294 }
01295
01296 int rpc_serverArgsMake(rpc_Server *server, int which, int fd,
01297 rpc_ServerFunction **pfunc,
01298 rpc_SendRcv send_rcv, rpc_FromTo fromto)
01299 {
01300 int i, commsz;
01301 char *buff, *commb;
01302 register rpc_Arg *arg;
01303 rpc_ServerArg pua;
01304 rpc_RpcHeader rhd;
01305 rpc_RpcHeader xrhd;
01306 int ndata = 0;
01307 rpc_ServerData *p_data[RPC_NDATA];
01308 rpc_RpcDescription *rd;
01309 register rpc_ClientInfo *ci = clientInfo[fd];
01310 char *comm_buff = ci->comm_buff[which];
01311 rpc_ServerArg ua = ci->ua[which];
01312 int comm_size = server->comm_size;
01313 int args_size = server->args_size;
01314 rpc_ServerFunction *func;
01315 int buff_size = 0;
01316 rpc_UserType *utyp;
01317 rpc_StatusRec rstatus;
01318
01319 rstatus.err = 0;
01320
01321 int read_cnt = 0;
01322 int write_cnt = 0;
01323 commb = comm_buff;
01324 commsz = comm_size;
01325
01326 #if 1
01327 if (fromto == rpc_From)
01328 buff = commb;
01329 else
01330 buff = commb + sizeof(rhd);
01331 #else
01332 buff = commb + sizeof(rhd);
01333 #endif
01334
01335 if (fromto == rpc_From) {
01336
01337 #ifdef USE_RPC_MIN_SIZE
01338 read_cnt++;
01339 #ifdef RPC_TIMEOUT
01340 if (rpc_socketReadTimeout(fd, buff, RPC_MIN_SIZE, rpc_timeout) !=
01341 RPC_MIN_SIZE)
01342 return 0;
01343 #else
01344 if (rpc_socketRead(fd, buff, RPC_MIN_SIZE) != RPC_MIN_SIZE)
01345 return 0;
01346 #endif
01347
01348 memcpy(&rhd, buff, sizeof(rhd));
01349 x2h_rpc_hd(&rhd);
01350 buff += sizeof(rhd);
01351 #else
01352
01353 #ifdef RPC_TIMEOUT
01354 read_cnt++;
01355 if (rpc_socketReadTimeout(fd, buff, sizeof(rhd), rpc_timeout) !=
01356 sizeof(rhd))
01357 return 0;
01358 #else
01359 if (rpc_socketRead(fd, buff, sizeof(rhd)) != sizeof(rhd))
01360 return 0;
01361 #endif
01362
01363 memcpy(&rhd, buff, sizeof(rhd));
01364 x2h_rpc_hd(&rhd);
01365 buff += sizeof(rhd);
01366
01367 #endif
01368 if (rhd.magic != server->magic) {
01369 IDB_LOG_FX(("Server Error #1: invalid magic=%p, expected=%d, "
01370 "serial=%d\n", rhd.magic, server->magic, rhd.serial));
01371
01372 return 0;
01373 }
01374
01375 func = rpc_rpcGet(server, rhd.code);
01376
01377 if (!func) {
01378 IDB_LOG_FX(("Server Error #2: invalid function code=%d\n", rhd.code));
01379
01380 return 0;
01381 }
01382
01383 #ifdef USE_RPC_MIN_SIZE
01384 if (rhd.size-RPC_MIN_SIZE > 0) {
01385 read_cnt++;
01386 if (rpc_socketRead(fd, buff+RPC_MIN_SIZE-sizeof(rhd),
01387 rhd.size-RPC_MIN_SIZE) != rhd.size-RPC_MIN_SIZE) {
01388 IDB_LOG_FX(("Server Error #3: read failed for %d bytes\n",
01389 rhd.size-sizeof(rhd)));
01390
01391 return 0;
01392 }
01393 }
01394 #else
01395 if (rhd.size-sizeof(rhd))
01396 if (rpc_socketRead(fd, buff, rhd.size-sizeof(rhd)) !=
01397 rhd.size-sizeof(rhd)) {
01398 IDB_LOG_FX(("Server Error #3: read failed for %d bytes\n",
01399 rhd.size-sizeof(rhd)));
01400
01401 return 0;
01402 }
01403 #endif
01404 *pfunc = func;
01405 }
01406 else {
01407 func = *pfunc;
01408 #ifdef TRACE2
01409 utlog(msg_make("[%d] serverArgsMake code #%d, TO\n", pthread_self(), func->rd->code));
01410 #endif
01411 }
01412
01413 rd = func->rd;
01414
01415 for (i = 0, arg = rd->args, pua = ua; i < rd->nargs; i++, arg++, pua += args_size)
01416 switch(arg->type) {
01417 case rpc_Int16Type:
01418 rpc_copy_fast_xdr(arg, buff, pua, sizeof(eyedblib::int16), send_rcv, fromto, x2h_16_cpy, h2x_16_cpy);
01419 break;
01420
01421 case rpc_Int32Type:
01422 rpc_copy_fast_xdr(arg, buff, pua, sizeof(eyedblib::int32), send_rcv, fromto, x2h_32_cpy, h2x_32_cpy);
01423 break;
01424
01425 case rpc_Int64Type:
01426 rpc_copy_fast_xdr(arg, buff, pua, sizeof(eyedblib::int64), send_rcv, fromto, x2h_64_cpy, h2x_64_cpy);
01427 break;
01428
01429 case rpc_StatusType:
01430 if ((arg->send_rcv & rpc_Rcv) && fromto == rpc_To) {
01431 eyedblib_mcp(&rstatus, pua, sizeof(rstatus));
01432 }
01433
01434
01435
01436 rpc_copy_fast_xdr(arg, buff, pua, sizeof(eyedblib::int32), send_rcv, fromto, x2h_32_cpy, h2x_32_cpy);
01437
01438 break;
01439
01440 case rpc_StringType:
01441 if ((arg->send_rcv & rpc_Send) && fromto == rpc_From) {
01442 int len;
01443 x2h_32_cpy(&len, buff);
01444
01445 buff += sizeof(len);
01446 if (len)
01447 *(char **)pua = buff;
01448 else
01449 *(char **)pua = "";
01450 buff += len;
01451 }
01452 else if ((arg->send_rcv & rpc_Rcv) && fromto == rpc_To) {
01453 int len;
01454
01455 if (*(char **)pua)
01456 len = strlen(*(char **)pua)+1;
01457 else
01458 len = 0;
01459 h2x_32_cpy(buff, &len);
01460 buff += sizeof(len);
01461 if (len)
01462 memcpy(buff, *(char **)pua, len);
01463 buff += len;
01464 }
01465 break;
01466
01467 case rpc_DataType:
01468 if ((arg->send_rcv & rpc_Send) && fromto == rpc_From) {
01469 int status;
01470 rpc_ServerData *a_data = (rpc_ServerData *)pua;
01471
01472 a_data->garbage_fun = 0;
01473 a_data->garbage_data = 0;
01474
01475 x2h_32_cpy(&a_data->data, buff);
01476
01477 buff += 8;
01478 x2h_32_cpy(&a_data->size, buff);
01479 buff += sizeof(a_data->size);
01480 x2h_32_cpy(&status, buff);
01481 buff += sizeof(status);
01482 if (status == rpc_SyncData) {
01483 a_data->data = buff;
01484 buff += a_data->size;
01485 a_data->fd = -1;
01486 }
01487 else
01488 a_data->fd = fd;
01489 }
01490 else if ((arg->send_rcv & rpc_Rcv) && fromto == rpc_To) {
01491 int status, offset;
01492 rpc_ServerData *a_data = (rpc_ServerData *)pua;
01493
01494 h2x_32_cpy(buff, &a_data->size);
01495 buff += sizeof(a_data->size);
01496
01497 if (a_data->status == rpc_BuffUsed) {
01498 status = rpc_SyncData;
01499 h2x_32_cpy(buff, &status);
01500 buff += sizeof(status);
01501 offset = (char *)a_data->data - buff;
01502 h2x_32_cpy(buff, &offset);
01503 buff += sizeof(offset);
01504 buff_size += a_data->size;
01505 }
01506 else if (a_data->status == rpc_TempDataUsed ||
01507 a_data->status == rpc_PermDataUsed) {
01508 status = rpc_ASyncData;
01509 h2x_32_cpy(buff, &status);
01510 buff += sizeof(status);
01511 offset = 0;
01512 h2x_32_cpy(buff, &offset);
01513 buff += sizeof(offset);
01514 p_data[ndata++] = a_data;
01515 }
01516 }
01517 else if ((arg->send_rcv & rpc_Rcv) && fromto == rpc_From) {
01518 char *pbuff = commb + sizeof(rhd);
01519 int j;
01520 rpc_Arg *parg;
01521 rpc_Boolean cant = rpc_False;
01522 rpc_ServerData *a_data = (rpc_ServerData *)pua;
01523
01524 a_data->garbage_fun = 0;
01525 a_data->garbage_data = 0;
01526
01527 for (j = 0, parg = rd->args; j < rd->nargs; j++, parg++)
01528 switch(parg->type) {
01529 case rpc_ByteType:
01530 if (parg->send_rcv & rpc_Rcv)
01531 pbuff += sizeof(char);
01532 break;
01533
01534 case rpc_Int16Type:
01535 if (parg->send_rcv & rpc_Rcv)
01536 pbuff += sizeof(eyedblib::int16);
01537 break;
01538
01539 case rpc_Int32Type:
01540 if (parg->send_rcv & rpc_Rcv)
01541 pbuff += sizeof(eyedblib::int32);
01542 break;
01543
01544 case rpc_Int64Type:
01545 if (parg->send_rcv & rpc_Rcv)
01546 pbuff += sizeof(eyedblib::int64);
01547 break;
01548
01549 case rpc_StatusType:
01550 if (parg->send_rcv & rpc_Rcv)
01551 pbuff += sizeof(eyedblib::int32);
01552 break;
01553
01554 case rpc_StringType:
01555 if (parg->send_rcv & rpc_Rcv)
01556 cant = rpc_True;
01557 break;
01558
01559 case rpc_DataType:
01560 if (parg->send_rcv & rpc_Rcv)
01561 if (j != i)
01562 cant = rpc_True;
01563 break;
01564
01565 case rpc_VoidType:
01566 break;
01567
01568 default:
01569 rpc_assert(parg->type >= rpc_NBaseType &&
01570 parg->type < server->last_type);
01571 if (parg->send_rcv & rpc_Rcv) {
01572 utyp = rpc_getUTyp(server, parg->type);
01573
01574 if (utyp->size == rpc_SizeVariable)
01575 cant = rpc_True;
01576 else
01577 pbuff += utyp->size;
01578 break;
01579 }
01580 }
01581
01582 a_data->fd = fd;
01583
01584 if (cant) {
01585 a_data->data = 0;
01586 a_data->buff_size = 0;
01587 }
01588 else {
01589
01590 a_data->data = pbuff + 3*sizeof(int);
01591 a_data->buff_size = rpc_buff_size(commsz, commb, pbuff);
01592 }
01593 buff += 3*sizeof(int);
01594 }
01595 break;
01596
01597 case rpc_VoidType:
01598 break;
01599
01600 default:
01601 rpc_assert(arg->type >= rpc_NBaseType && arg->type < server->last_type);
01602 utyp = rpc_getUTyp(server, arg->type);
01603
01604 if (utyp->func)
01605 utyp->func(arg, &buff, pua, send_rcv, fromto);
01606 else
01607 rpc_copy(arg, buff, pua, utyp->size, send_rcv, fromto);
01608 }
01609
01610 if (fromto == rpc_From) {
01611 if (rhd.ndata)
01612 {
01613 int d[RPC_NDATA], i;
01614
01615 read_cnt++;
01616 if (rpc_socketRead(fd, d, rhd.ndata*sizeof(int)) != rhd.ndata*sizeof(int))
01617 return 0;
01618 }
01619 }
01620 else if (fromto == rpc_To) {
01621 rhd.code = rd->code;
01622
01623 rhd.magic = server->magic;
01624 rhd.serial = 0;
01625 rhd.ndata = ndata;
01626 rhd.status = 0;
01627
01628 rhd.size = (int)(buff-commb) + buff_size;
01629
01630
01631
01632
01633
01634
01635
01636 h2x_rpc_hd(&xrhd, &rhd);
01637 memcpy(commb, &xrhd, sizeof(xrhd));
01638
01639 #ifdef USE_RPC_MIN_SIZE
01640 if (rhd.size < RPC_MIN_SIZE)
01641 rhd.size = RPC_MIN_SIZE;
01642 #endif
01643
01644 write_cnt++;
01645 if (rpc_socketWrite(fd, commb, rhd.size) <= 0)
01646 return 0;
01647
01648 if (ndata) {
01649 int d[RPC_NDATA], i;
01650 for (i = 0; i < ndata; i++)
01651 d[i] = h2x_32(p_data[i]->size);
01652 write_cnt++;
01653 if (rpc_socketWrite(fd, d, ndata*sizeof(int)) <= 0)
01654 return 0;
01655
01656 for (i = 0; i < ndata; i++) {
01657 if (p_data[i]->size) {
01658 int error = 0;
01659 write_cnt++;
01660 if (rpc_socketWrite(fd, p_data[i]->data, p_data[i]->size) <= 0)
01661 error = 1;
01662 if (p_data[i]->status == rpc_TempDataUsed) {
01663
01664
01665
01666
01667
01668 free((char *)p_data[i]->data);
01669 }
01670 if (error)
01671 return 0;
01672 }
01673 }
01674 }
01675
01676 if (rstatus.err) {
01677 int len = strlen(rstatus.err_msg);
01678 int tmp = h2x_32(rstatus.err);
01679 write_cnt++;
01680 if (rpc_socketWrite(fd, &tmp, sizeof(tmp)) != sizeof(tmp))
01681 return 0;
01682 tmp = h2x_32(len);
01683 write_cnt++;
01684 if (rpc_socketWrite(fd, &tmp, sizeof(tmp)) != sizeof(tmp))
01685 return 0;
01686 write_cnt++;
01687 if (rpc_socketWrite(fd, rstatus.err_msg, len+1) != len+1)
01688 return 0;
01689 }
01690 }
01691
01692 return 1;
01693 }
01694
01695 rpc_Boolean
01696 rpc_serverCheck(int port)
01697 {
01698 int sock_fd, length;
01699 struct sockaddr_in sock_in_name;
01700 struct sockaddr *sock_addr;
01701 char hname[128];
01702
01703 sock_in_name.sin_family = AF_INET;
01704 sock_in_name.sin_port = htons(port);
01705
01706 if (gethostname(hname, sizeof(hname)-1) < 0) {
01707 PERROR(msg_make("gethostname failed") );
01708 return rpc_False;
01709 }
01710 hname[sizeof(hname)-1] = 0;
01711 if (!rpc_hostNameToAddr(hname, &sock_in_name.sin_addr))
01712 return rpc_False;
01713 sock_addr = (struct sockaddr *)&sock_in_name;
01714 length = sizeof(sock_in_name);
01715
01716 if ((sock_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
01717 PERROR(msg_make("unable to create socket"));
01718 return rpc_False;
01719 }
01720
01721 if (connect(sock_fd, sock_addr, length) < 0) {
01722 PERROR(msg_make("unable to connect socket"));
01723 return rpc_False;
01724 }
01725
01726 close(sock_fd);
01727 return rpc_True;
01728 }
01729
01730 rpc_ServerFunction *rpc_rpcGet(rpc_Server *server, rpc_RpcCode code)
01731 {
01732 #ifdef RPC_FUN_CHAIN
01733 register rpc_ServerFunction *func = server->first;
01734
01735 while (func) {
01736 if (func->rd->code == code)
01737 return func;
01738 func = func->next;
01739 }
01740
01741 return 0;
01742 #else
01743 return server->funcs[code - START_CODE];
01744 #endif
01745 }
01746
01747 void
01748 eyedblib_abort()
01749 {
01750 time_t t;
01751 static int reentrant = 0;
01752 char msg[256];
01753 time(&t);
01754
01755 if (reentrant)
01756 exit(1);
01757
01758 reentrant = 1;
01759
01760 sprintf(msg, "EyeDB aborting [pid = %d]\n", rpc_getpid());
01761 write(2, msg, strlen(msg));
01762
01763 utlog("EyeDB aborting [pid = %d]\n", rpc_getpid());
01764 if (getenv("EYEDBDBG")) for (;;) sleep(1000);
01765
01766 _QUIT_(0);
01767
01768 kill(SIGABRT, rpc_getpid());
01769 exit(2);
01770 }
01771
01772 void
01773 abort()
01774 {
01775 eyedblib_abort();
01776 }