rpc_be.cc

00001 /* 
00002    EyeDB Object Database Management System
00003    Copyright (C) 1994-2008 SYSRA
00004    
00005    EyeDB is free software; you can redistribute it and/or
00006    modify it under the terms of the GNU Lesser General Public
00007    License as published by the Free Software Foundation; either
00008    version 2.1 of the License, or (at your option) any later version.
00009    
00010    EyeDB is distributed in the hope that it will be useful,
00011    but WITHOUT ANY WARRANTY; without even the implied warranty of
00012    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013    Lesser General Public License for more details.
00014    
00015    You should have received a copy of the GNU Lesser General Public
00016    License along with this library; if not, write to the Free Software
00017    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA 
00018 */
00019 
00020 /*
00021   Author: Eric Viara <viara@sysra.com>
00022 */
00023 
00024 
00025 #include <eyedbconfig.h>
00026 
00027 #include <string.h>
00028 #include <pthread.h>
00029 
00030 #include <eyedblib/thread.h>
00031 
00032 #define USE_RPC_MIN_SIZE
00033 
00034 extern int RPC_MIN_SIZE;
00035 
00036 /* MIND: this constant is used in idbrpclib.h: should be factorized */
00037 #define START_CODE 0x100
00038 
00039 /* disconnected the 21/08/01 */
00040 /*#define RPC_TIMEOUT 7200*/
00041 
00042 extern pid_t rpc_pid;
00043 
00044 #ifdef HAVE_STROPTS
00045 #include <stropts.h>
00046 #endif
00047 #include <grp.h>
00048 #include <pwd.h>
00049 #include <fcntl.h>
00050 #include <sys/resource.h>
00051 
00052 #include <sys/wait.h>
00053 
00054 #include <time.h>
00055 #include <assert.h>
00056 #include <signal.h>
00057 #include <stdio.h>
00058 #include <string.h>
00059 #include <errno.h>
00060 #include <unistd.h>
00061 #include <stdlib.h>
00062 #include <stdarg.h>
00063 #include <sys/stat.h>
00064 #include <eyedblib/log.h>
00065 #include <eyedblib/xdr.h>
00066 
00067 #include "rpc_beP.h"
00068 #include <eyedblib/rpc_lib.h>
00069 
00070 static  rpc_ClientInfo *clientInfo[256];
00071 
00072 /* static functions */
00073 static int rpc_inputHandle(rpc_Server *, int, int);
00074 static void rpc_garbClientInfo(rpc_Server *server, int, int);
00075 static void rpc_garbRealize(rpc_Server *server, rpc_ClientInfo *ci,
00076                             int force);
00077 static rpc_Boolean abortProgram;
00078 static rpc_Server *rpc_mainServer;
00079 static char *progName = "rpc";
00080 static int rpc_timeout;
00081 
00082 rpc_ClientId rpc_client_id;
00083 
00084 #define rpc_isSocketValid(S) ((S) >= 0)
00085 
00086 #define TRACE
00087 /*#define TRACE2*/
00088 
00089 void rpc_setTimeOut(int _timeout)
00090 {
00091 #ifdef RPC_TIMEOUT
00092   rpc_timeout = (_timeout ? _timeout : RPC_TIMEOUT);
00093 #else
00094   rpc_timeout = _timeout;
00095 #endif
00096 }
00097 
00098 void
00099 PERROR(const char *msg)
00100 {
00101   const char *s = strerror(errno);
00102   utlog("%s : %s\n", msg, (s ? s : "<unknown>"));
00103   fprintf(stderr, "%s : %s\n", msg, (s ? s : "<unknown>"));
00104 }
00105 
00106 //static pthread_mutex_t msg_mp, gen_mp;
00107 eyedblib::Mutex msg_mp;
00108 eyedblib::Mutex gen_mp;
00109 
00110 /* MIND: I THINK THAT THIS MODULE IS NOT *QUITE* THREAD SAFE */
00111 
00112 #include <limits.h>
00113 
00114 static void
00115 msg_init()
00116 {
00117   //pthread_mutex_init(&msg_mp, NULL);
00118   //pthread_mutex_init(&gen_mp, NULL);
00119 }
00120 
00121 static const char *
00122 msg_make(const char *fmt, ...)
00123 {
00124   va_list ap;
00125   static char str[256];
00126   char buf[256];
00127 
00128   //pthread_mutex_lock(&msg_mp);
00129   eyedblib::MutexLocker _(msg_mp);
00130   va_start(ap, fmt);
00131 
00132 #if 0
00133   sprintf(str, "\n[thread %d#%d] %s: ", rpc_getpid(), pthread_self(), progName);
00134   vsprintf(buf, fmt, ap);
00135   (void)strcat(str, buf);
00136 #else
00137   vsprintf(str, fmt, ap);
00138 #endif
00139   va_end(ap);
00140 
00141   //pthread_mutex_unlock(&msg_mp);
00142   return str;
00143 }
00144 
00145 static void
00146 close_files(void)
00147 {
00148 #if 0
00149   int fd;
00150   int maxfd = sizeof(fd_mask) * 8;
00151   
00152   for (fd = 1; fd <= maxfd; fd++)
00153     if (FD_ISSET(fd, &rpc_mainServer->fds_used))
00154       {
00155         if (close(fd) < 0)
00156           PERROR (msg_make("error closing file %d", fd));
00157       }
00158 #endif
00159 }
00160 
00161 static void
00162 close_clients(void)
00163 {
00164   int fd;
00165 
00166   abortProgram = rpc_True;
00167 
00168   for (fd = 0; fd < sizeof(clientInfo)/sizeof(clientInfo[0]); fd++)
00169     if (clientInfo[fd])
00170       rpc_garbClientInfo(rpc_mainServer, 0, fd);
00171 
00172   /*
00173     if (unixname)
00174     unlink(unixname);
00175   */
00176 }
00177 
00178 static char *rpc_unixPort;
00179 
00180 #define DO_NOT_EXIT -128000
00181 
00182 namespace eyedbsm {
00183   extern void mutexes_release();
00184 }
00185 
00186 namespace eyedb {
00187   extern void IDB_releaseConn();
00188 }
00189 
00190 static void
00191 _QUIT_(int fromcore)
00192 {
00193   eyedbsm::mutexes_release();
00194   eyedb::IDB_releaseConn();
00195 
00196   if (rpc_quit_handler)
00197     rpc_quit_handler(rpc_quit_data, fromcore);
00198 }
00199 
00200 void
00201 rpc_quit(int rc, int fromcore)
00202 {
00203   _QUIT_(fromcore);
00204 
00205   close_clients();
00206   close_files();
00207 
00208   if (rc != DO_NOT_EXIT)
00209     exit(rc);
00210 }
00211 
00212 void
00213 rpc_unlink_socket()
00214 {
00215   if (rpc_unixPort)
00216     unlink(rpc_unixPort);
00217 }
00218 
00219 static void
00220 signal_handler(int sig) 
00221 {
00222   int s;
00223   for (s = 0; s < NSIG; s++)
00224     signal(s, SIG_DFL);
00225   IDB_LOG(IDB_LOG_CONN, ("backend got %s [signal=%d]\n", strsignal(sig), sig));
00226 
00227   if (getenv("EYEDBDEBUG_"))
00228     sleep(1000);
00229 
00230   if (sig == SIGBUS || sig == SIGSEGV || sig == SIGABRT) {
00231     IDB_LOG(IDB_LOG_CONN, ("backend fatal signal...\n"));
00232 
00233     rpc_quit(DO_NOT_EXIT, 1);
00234 
00235     if (getenv("EYEDBDBG")) {
00236       for (;;) sleep(1000);
00237     }
00238 
00239     /* the following code implies sometimes a loop! */
00240     /* disconnected it the 27/10/00 */
00241     /*
00242       IDB_LOG(IDB_LOG_CONN, ("backend tries to abort and core dumped\n"));
00243 
00244       raise(sig);
00245       return;
00246     */
00247   }
00248 
00249   rpc_quit(0, 0);
00250   raise(sig);
00251   exit(0x80|sig);
00252 }
00253 
00254 void rpc_setProgName(const char *name)
00255 {
00256   progName = strdup(name);
00257 }
00258 
00259 static void
00260 rpc_ServerInit()
00261 {
00262   static rpc_Boolean init = rpc_False;;
00263 
00264   if (!init) {
00265     struct rlimit rlp;
00266 
00267     if (!getrlimit(RLIMIT_NOFILE, &rlp)) {
00268       rlp.rlim_cur = rlp.rlim_max;
00269       setrlimit(RLIMIT_NOFILE, &rlp);
00270     }
00271 
00272     msg_init();
00273 
00274     signal(SIGHUP,  signal_handler);
00275     signal(SIGTERM, signal_handler);
00276     signal(SIGINT,  signal_handler);
00277     signal(SIGQUIT, signal_handler);
00278     signal(SIGSEGV, signal_handler);
00279     signal(SIGBUS,  signal_handler);
00280     signal(SIGABRT,  signal_handler);
00281 
00282     signal(SIGPIPE, SIG_IGN);
00283 
00284     init = rpc_True;
00285   }
00286 }
00287 
00288 #define RPC_MAXARGS 32
00289 
00290 static rpc_ClientInfo*
00291 rpc_newClientInfo(rpc_Server *server, int fd[], int fd_cnt)
00292 {
00293   rpc_ClientInfo *ci = rpc_new(rpc_ClientInfo);
00294   int i;
00295 
00296   for (i = 0; i < fd_cnt; i++)
00297     clientInfo[fd[i]] = ci;
00298 
00299   ci->fd_cnt = fd_cnt;
00300   ci->refcnt = fd_cnt;
00301 
00302   ci->tid       = (pthread_t *)malloc(server->conn_cnt * sizeof(pthread_t));
00303   ci->ua        = (rpc_ServerArg *)malloc(server->conn_cnt * sizeof(rpc_ServerArg));
00304   ci->comm_buff = (char **)malloc(server->conn_cnt * sizeof(char *));
00305 
00306   for (i = 0; i < server->conn_cnt; i++) {
00307     ci->comm_buff[i] = (char *)calloc(server->comm_size, 1);
00308     ci->ua[i]        = (rpc_ServerArg)calloc(server->args_size * RPC_MAXARGS, 1);
00309   }
00310 
00311   return ci;
00312 }
00313 
00314 /* API functions */
00315 rpc_Server *rpc_serverCreate(rpc_ServerMode mode, unsigned long magic,
00316                              int conn_cnt, int comm_size,
00317                              void (*init)(int *, int, rpc_ConnInfo *),
00318                              void (*release)(rpc_ConnInfo *),
00319                              void (*begin)(int, void *),
00320                              void (*end)(int, void *), void *user_data)
00321 {
00322   if (mode != rpc_MonoProc && mode != rpc_MultiProcs &&
00323       mode != rpc_MultiThreaded && mode != rpc_FrontThreaded)
00324     return 0;
00325   else {
00326     rpc_Server *server = rpc_new(rpc_Server);
00327 
00328     server->last_type = rpc_NBaseType;
00329     server->conn_cnt  = conn_cnt;
00330     if (!comm_size)
00331       comm_size = RPC_COMM_SIZE;
00332     server->comm_size = comm_size;
00333     server->mode      = mode;
00334     server->magic     = magic;
00335     server->init      = init;
00336     server->release   = release;
00337     server->begin     = begin;
00338     server->end       = end;
00339     server->user_data = user_data;
00340 
00341 #ifdef TRACE
00342     utlog(msg_make("serverCreate conn_cnt = %d\n", conn_cnt));
00343 #endif
00344     rpc_ServerInit();
00345       
00346     return server;
00347   }
00348 }
00349 
00350 rpc_ArgType rpc_makeServerUserType(rpc_Server *server, int size,
00351                                    rpc_UserArgFunction func)
00352                                    
00353 {
00354   rpc_UserType *utyp = rpc_getUTyp(server, server->last_type);
00355   utyp->size = size;
00356   utyp->func = func;
00357   return server->last_type++;
00358 }
00359 
00360 rpc_ServerFunction *
00361 rpc_makeUserServerFunction(rpc_Server *server, rpc_RpcDescription *rd,
00362                            rpc_UserServerFunction uf)
00363 {
00364   rpc_ServerFunction *func = rpc_new(rpc_ServerFunction);
00365 
00366   rd->args[rd->nargs-1].type     = rd->arg_ret;
00367   rd->args[rd->nargs-1].send_rcv = rpc_Rcv;
00368 
00369   while (rd->args[rd->nargs-1].type == rpc_VoidType)
00370     rd->nargs--;
00371 
00372   func->rd = rd;
00373   func->uf = uf;
00374 
00375 #ifdef RPC_FUN_CHAIN
00376   func->next = server->first;
00377   server->first = func;
00378 #else
00379   assert(rd->code - START_CODE < sizeof(server->funcs)/sizeof(server->funcs[0]) && rd->code - START_CODE >= 0);
00380   server->funcs[rd->code - START_CODE] = func;
00381 #endif
00382 
00383   return func;
00384 }
00385 
00386 rpc_Status
00387 rpc_setServerArgSize(rpc_Server *server, int args_size)
00388 {
00389   server->args_size = args_size;
00390   return rpc_Success;
00391 }
00392 
00393 void
00394 rpc_serverOptionsGet(int argc, char *argv[], char **portname, char **unixname)
00395 {
00396   int i;
00397 
00398   *portname = 0;
00399   *unixname = 0;
00400 
00401   for (i = 1; i < argc; ) {
00402     char *s = argv[i];
00403     if (s[0] == '-') {
00404       if (!strcmp(s, "-inetd")) {
00405         if (i+1 >= argc)
00406           return;
00407 
00408         *portname = argv[++i];
00409         i++;
00410       }
00411       else if (!strcmp(s, "-unixd")) {
00412         if (i+1 >= argc)
00413           return;
00414 
00415         *unixname = argv[++i];
00416         if (strlen(*unixname) >= sizeof(((struct sockaddr_un *)0)->sun_path)) {
00417           utlog(msg_make("eyedb fatal error: unix filename too long (must be < %d\n"),
00418                 sizeof(((struct sockaddr_un *)0)->sun_path));
00419           return;
00420         }
00421         i++;
00422       }
00423     }
00424     else
00425       return;
00426   }
00427 }
00428 
00429 /*extern int gethostname(const char *, int);*/
00430 
00431 static void
00432 bind_error(const char *portname, bool tcpip)
00433 {
00434   fprintf(stderr, "\nPerharps another eyedbd is running on ");
00435   if (tcpip)
00436     fprintf(stderr, "TCP/IP port %s\n", portname);
00437   else
00438     fprintf(stderr, "named pipe port:\n%s\n", portname);
00439 
00440   fprintf(stderr, "\nYou may check this by launching:\n");
00441   fprintf(stderr, "eyedbctl status --port=%s\n", portname);
00442   if (!tcpip) {
00443     fprintf(stderr, "\nIf no, unlink this port as follows:\n");
00444     fprintf(stderr, "rm -f %s\n", portname);
00445     fprintf(stderr, "and relaunch the server.\n");
00446   }
00447 }
00448 
00449 static void
00450 rpc_socket_reuse_addr(int s)
00451 {
00452   int val;
00453   val = 1;
00454   if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char *)&val, sizeof(val)) < 0)
00455     PERROR("setsockopt reuseaddr");
00456 }
00457 
00458 rpc_Status
00459 rpc_portOpen(rpc_Server *server, const char *servname, const char *portname,
00460              rpc_PortHandle **pport)
00461 {
00462   char hname[128];
00463   int portnumber;
00464   rpc_PortHandle *port = rpc_new(rpc_PortHandle);
00465   const char *t_portname;
00466 
00467   t_portname = rpc_getPortAttr(portname, &port->domain, &port->type);
00468   if (!t_portname) {
00469     fprintf(stderr, "invalid port '%s'", portname);
00470     return rpc_Error;
00471   }
00472   portname = t_portname;
00473   port->server = server;
00474   port->portname = strdup(portname);
00475 
00476   *pport = port;
00477 
00478   if (port->domain == AF_INET) {
00479     if ((port->u.in.sockin_fd = socket(AF_INET, port->type, 0)) < 0) {
00480       PERROR(msg_make("eyedb fatal error: unable to create inet socket port '%s'", port->portname) );
00481       return rpc_Error;
00482     }
00483 
00484     rpc_socket_reuse_addr(port->u.in.sockin_fd);
00485     rpc_socket_nodelay(port->u.in.sockin_fd);
00486       
00487     port->u.in.sock_in_name.sin_family = AF_INET;
00488     port->u.in.sock_in_name.sin_port   = htons(atoi(portname));
00489       
00490     /* get host name */
00491     if (servname == 0) {
00492       if (gethostname(hname, sizeof(hname)-1) < 0) {
00493         PERROR(msg_make("eyedb fatal error: gethostname failed") );
00494         return rpc_Error;
00495       }
00496       hname[sizeof(hname)-1] = 0;
00497     }
00498     else
00499       strcpy(hname, servname);
00500 
00501     if (!rpc_hostNameToAddr(hname, &port->u.in.sock_in_name.sin_addr)) {
00502       utlog(msg_make("eyedb fatal error: unknown host '%s'\n", hname));
00503       fprintf(stderr, msg_make("unknown host '%s'\n", hname));
00504       return rpc_Error;
00505     }
00506       
00507     if (bind(port->u.in.sockin_fd, (struct sockaddr *)&port->u.in.sock_in_name,
00508              sizeof(port->u.in.sock_in_name)) < 0 ) {
00509       PERROR(msg_make("eyedb fatal error: bind (naming the socket) failed port '%s'", port->portname));
00510       bind_error(port->portname, true);
00511       return rpc_Error;
00512     }
00513 
00514     if (rpc_isSocketValid(port->u.in.sockin_fd) &&
00515         (port->type == SOCK_STREAM && listen(port->u.in.sockin_fd, 2) < 0)) {
00516       PERROR(msg_make("eyedb fatal error: listen for inet socket port '%s'", port->portname) );
00517       return rpc_Error;
00518     }
00519   }
00520 
00521   if (port->domain == AF_UNIX) {
00522     rpc_checkAFUnixPort(portname);
00523 
00524 #ifdef HAVE_FATTACH
00525     int pfd[2];
00526     int fd;
00527     int created = 0;
00528 
00529     if ((fd = open(portname, O_RDONLY)) < 0) {
00530       if ((fd = creat(portname, 0666)) < 0) {
00531         PERROR(msg_make("eyedb fatal error: cannot create file '%s'",
00532                         portname));
00533         return rpc_Error;
00534       }
00535       created = 1;
00536     }
00537       
00538     if (fchmod(fd, 0666) < 0)   {
00539       if (created) unlink(portname);
00540       PERROR(msg_make("eyedb fatal error: cannot change file '%s' mode",
00541                       portname));
00542       return rpc_Error;
00543     }
00544       
00545     close(fd);
00546 
00547     if (pipe(pfd) < 0) {
00548       if (created) unlink(portname);
00549       PERROR(msg_make("eyedb fatal error: unable to create pipe"));
00550       return rpc_Error;
00551     }
00552 
00553     // solaris platform only ?
00554     if (ioctl(pfd[0], I_PUSH, "connld") < 0) {
00555       if (created) unlink(portname);
00556       PERROR(msg_make("eyedb fatal error: unable to configure pipe"));
00557       return rpc_Error;
00558     }
00559     //
00560 
00561     if (fattach(pfd[0], port->portname) < 0) {
00562       if (created) unlink(portname);
00563       PERROR(msg_make("eyedb fatal error: unable to attach stream to file '%s'", port->portname) );
00564       return rpc_Error;
00565     }
00566 
00567     port->u.un.sockun_fd = pfd[1];
00568 #else
00569     if ((port->u.un.sockun_fd = socket(AF_UNIX, port->type, 0)) < 0) {
00570       PERROR(msg_make("eyedb fatal error: unable to create unix socket port '%s'", port->portname) );
00571       return rpc_Error;
00572     }
00573 
00574     port->u.un.sock_un_name.sun_family = AF_UNIX;
00575     strcpy(port->u.un.sock_un_name.sun_path, portname);
00576 
00577     if (bind(port->u.un.sockun_fd,
00578              (struct sockaddr *)&port->u.un.sock_un_name,
00579              sizeof(port->u.un.sock_un_name)) < 0 ) {
00580       PERROR(msg_make("eyedb fatal error: bind (naming the socket) failed port '%s'", port->portname));
00581       bind_error(port->portname, false);
00582       /*
00583         fprintf(stderr, "\nPerharps another eyedbd is running on port:\n%s\n",
00584         port->portname);
00585         fprintf(stderr, "\nYou may check this by launching:\n");
00586         fprintf(stderr, "eyedbctl status --port=%s\n", port->portname);
00587         fprintf(stderr, "\nIf no, unlink this port as follows:\n");
00588         fprintf(stderr, "rm -f %s\n", port->portname);
00589         fprintf(stderr, "and relaunch the server.\n");
00590       */
00591       return rpc_Error;
00592     }
00593 
00594     chmod(portname, 0777);
00595     if (rpc_isSocketValid(port->u.un.sockun_fd) &&
00596         listen(port->u.un.sockun_fd, 2) < 0 ) {
00597       PERROR(msg_make("eyedb fatal error: listen for unix socket port '%s'", port->portname) );
00598       return rpc_Error;
00599     }
00600 #endif
00601   }
00602 
00603   return rpc_Success;
00604 }
00605 
00606 typedef struct {
00607   rpc_Server *server;
00608   int fd;
00609   int which;
00610 } rpc_ThreadArg;
00611 
00612 static void check_fd(int fd, const char *msg)
00613 {
00614   struct stat stat;
00615   if (fstat(fd, &stat) < 0)
00616     utlog(msg_make("%s: error fd=%d is not a valid file descriptor\n",
00617                    msg, fd));
00618 }
00619 
00620 static char exiting = 0;
00621 
00622 static eyedblib::Mutex exit_mp;
00623 static eyedblib::Mutex wait_thr_mp;
00624 static eyedblib::Condition *wait_thr_cond;
00625 
00626 static void *wait_thr(void *arg)
00627 {
00628   pid_t pid = *(pid_t *)arg;
00629   free(arg);
00630 #ifdef TRACE2
00631   printf("%d:%d waiting for pid %d\n", rpc_getpid(), pthread_self(), pid);
00632 #endif
00633   int status;
00634   wait_thr_cond->signal();
00635   unsigned max_loop = 0;
00636   do {
00637     if (max_loop > 100) // to avoid infinite loop
00638       break;
00639     errno = 0;
00640     waitpid(pid, &status, 0);
00641     max_loop++;
00642   } while(errno);
00643 
00644 #ifdef TRACE2
00645   printf("%d:%d done pid %d\n", rpc_getpid(), pthread_self(), pid);
00646 #endif
00647   pthread_detach(pthread_self());
00648   pthread_exit(&status);
00649 
00650   return 0;
00651 }
00652 
00653 static void *serv_thr(void *arg)
00654 {
00655   rpc_ThreadArg *thr_arg = (rpc_ThreadArg *)arg;
00656   int fd = thr_arg->fd;
00657   int which = thr_arg->which;
00658   rpc_Server *server = thr_arg->server;
00659   free(thr_arg);
00660 
00661   rpc_setConnFd(fd);
00662   IDB_LOG(IDB_LOG_CONN, ("new thread %d [fd = %d, which=%d], stack = 0x%x\n", pthread_self(),
00663                          fd, which, &server));
00664 
00665   rpc_client_id = (rpc_ClientId)fd;
00666 
00667   for (;;)
00668     if (!rpc_inputHandle(server, which, fd)) /* for now */ {
00669       if (server->mode == rpc_MultiThreaded || server->conn_cnt > 1) {
00670         eyedblib::MutexLocker _(exit_mp);
00671         void *status = 0;
00672 #ifdef TRACE
00673         utlog(msg_make("%d thread EXIT\n", pthread_self()));
00674 #endif
00675 #ifdef TRACE2
00676         fprintf(stderr, "%d:%d thread EXIT\n", rpc_getpid(), pthread_self());
00677         fflush(stderr);
00678 #endif
00679         //          rpc_garbClientInfo(server, which, fd);
00680         rpc_garbClientInfo(server, 0, fd); // force which
00681         exit(0);
00682         pthread_exit(&status);
00683       }
00684       else
00685         break;
00686     }
00687   return 0;
00688 }
00689 
00690 /* needed by sekern.c but chelou! */
00691 static int rpc_serverPort;
00692 
00693 int rpc_serverPortGet()
00694 {
00695   return rpc_serverPort;
00696 }
00697 
00698 static void
00699 rpc_serverStart(rpc_Server *server, int *fd, int fd_cnt, rpc_ConnInfo *rpc_ci)
00700 {
00701   if (server->connh)
00702     (*server->connh)(server, (rpc_ClientId)fd[0], rpc_True);
00703   
00704   if (server->init)
00705     server->init(fd, fd_cnt, rpc_ci);
00706 }
00707 
00708 static void
00709 rpc_serverEnd(rpc_Server *server, rpc_ConnInfo *rpc_ci)
00710 {
00711   if (server->release)
00712     server->release(rpc_ci);
00713 }
00714 
00715 static void
00716 rpc_makeThread(rpc_Server *server, int which, int fd, rpc_ClientInfo *ci)
00717 {
00718   rpc_ThreadArg *thr_arg = rpc_new(rpc_ThreadArg);
00719 
00720   thr_arg->server = server;
00721   thr_arg->fd     = fd;
00722   thr_arg->which  = which;
00723 
00724 #ifdef TRACE
00725   utlog(msg_make("rpc_makeThread which=%d, fd=%d\n", which, fd));
00726 #endif
00727 
00728   pthread_create(&ci->tid[which], NULL, serv_thr, thr_arg);
00729 }
00730 
00731 static void
00732 rpc_makeNewConnection(rpc_Server *server, int new_fd[], int fd_cnt,
00733                       int *pmax_fd, rpc_ConnInfo *rpc_ci)
00734 {
00735   /* We have a new connection. */
00736   rpc_ThreadArg *thr_arg;
00737   int main_fd = new_fd[0];
00738   rpc_ClientInfo *ci;
00739   int i;
00740 
00741   ci = rpc_newClientInfo(server, new_fd, fd_cnt);
00742 
00743 #ifdef TRACE
00744   {
00745     char buf[2048];
00746     strcpy(buf, "new connection : ");
00747     for (i = 0; i < fd_cnt; i++) {
00748       char tok[32];
00749       if (i)
00750         strcat(buf, ", ");
00751 
00752       sprintf(tok, "fd = %d", new_fd[i]);
00753       strcat(buf, tok);
00754     }
00755     strcat(buf, "\n");
00756 
00757     utlog(buf);
00758   }
00759 #endif
00760 
00761   if (server->mode == rpc_MonoProc) {
00762     for (i = 0; i < fd_cnt; i++)
00763       {
00764         if (new_fd[i] > *pmax_fd)
00765           *pmax_fd = new_fd[i];
00766         FD_SET(new_fd[i], &server->fds_used);
00767       }
00768     rpc_serverStart(server, new_fd, fd_cnt, rpc_ci);
00769     rpc_serverEnd(server, rpc_ci);
00770   }
00771   else {
00772     if (server->mode == rpc_MultiThreaded) {
00773       rpc_serverStart(server, new_fd, fd_cnt, rpc_ci);
00774       for (i = 0; i < fd_cnt; i++)
00775         rpc_makeThread(server, i, new_fd[i], ci);
00776       rpc_serverEnd(server, rpc_ci);
00777     }
00778     else if (server->mode == rpc_MultiProcs) {
00779       pid_t child_pid;
00780 
00781       if ((child_pid = fork()) == 0) {
00782         rpc_pid = getpid();
00783         const char *w;
00784         if ((w = getenv("EYEDBWAIT"))) {
00785           int sec = atoi(w);
00786           if (!sec)
00787             sec = 30;
00788           printf("Pid %d waiting for %d seconds\n", rpc_getpid(), sec);
00789           sleep(sec);
00790           printf("Continuing...\n");
00791         }
00792 
00793         rpc_serverStart(server, new_fd, fd_cnt, rpc_ci);
00794         if (fd_cnt > 1) {
00795           for (i = 0; i < fd_cnt; i++)
00796             rpc_makeThread(server, i, new_fd[i], ci);
00797 
00798           for (i = 0; i < fd_cnt; i++) {
00799             void *status;
00800             pthread_join(ci->tid[i], &status);
00801           }
00802 
00803           free(ci->tid);
00804           free(ci);
00805 #ifdef TRACE
00806           utlog(msg_make("all threads terminated\n"));
00807 #endif
00808         }
00809         else {
00810           thr_arg = rpc_new(rpc_ThreadArg);
00811           thr_arg->server = server;
00812           thr_arg->fd     = new_fd[0];
00813           thr_arg->which  = 0;
00814           serv_thr(thr_arg);
00815         }
00816 
00817         rpc_serverEnd(server, rpc_ci);
00818         rpc_quit(0, 0);
00819       }
00820 
00821       wait_thr_cond = new eyedblib::Condition();
00822       pthread_t wait_thr_p;
00823       pid_t *pid = new pid_t(child_pid);
00824       errno = 0;
00825       if (!pthread_create(&wait_thr_p, 0, wait_thr, pid))
00826         wait_thr_cond->wait();
00827       else
00828         IDB_LOG(IDB_LOG_CONN, ("cannot create waiting thread\n"));
00829 
00830       delete wait_thr_cond;
00831 
00832       rpc_garbRealize(server, ci, 1);
00833       free(rpc_ci);
00834     }
00835   }
00836 }
00837 
00838 typedef struct {
00839   time_t begin;
00840   int *fd;
00841   int fd_cnt;
00842 } rpc_MultiConnEntry;
00843 
00844 #define MAX_MULTICONN_ENTRIES 64 /* should be 16 */
00845 
00846 static rpc_MultiConnEntry multiConnEntry[MAX_MULTICONN_ENTRIES];
00847 static rpc_ConnInfo *multiConnInfo[256];
00848 static rpc_Boolean multiConn[256];
00849 
00850 static void
00851 rpc_multiConnClear(rpc_Server *server, int fd, const char *msg)
00852 {
00853   int r;
00854   multiConn[fd] = rpc_False;
00855   /*free(multiConnInfo[fd]);*/ /* shure ? */
00856   multiConnInfo[fd] = 0;
00857   FD_CLR(fd, &server->fds_used);
00858   r = close(fd);
00859   utlog(msg_make("%s: multi close fd=%d r = %d\n", msg, fd, r));
00860 }
00861 
00862 static void
00863 rpc_multiConnSuppress(rpc_Server *server, rpc_MultiConnEntry *mce,
00864                       rpc_Boolean do_close)
00865 {
00866   int i;
00867 
00868   for (i = 0; i < mce->fd_cnt; i++) {
00869     int fd = mce->fd[i];
00870     if (do_close) {
00871       close(fd);
00872     }
00873     multiConn[fd] = rpc_False;
00874     FD_CLR(fd, &server->fds_used);
00875   }
00876 
00877   free(mce->fd);
00878   mce->fd = (int *)0;
00879   mce->fd_cnt = 0;
00880 }
00881 
00882 static void
00883 rpc_addRealize(rpc_MultiConnEntry *mce, int fd_cnt, int new_fd)
00884 {
00885   time(&mce->begin);
00886   mce->fd = (int *)calloc(sizeof(int), fd_cnt);
00887   mce->fd[0] = new_fd;
00888   mce->fd_cnt = 1;
00889 }
00890 
00891 static int
00892 rpc_multiConnAddEntry(rpc_Server *server, int new_fd)
00893 {
00894   rpc_MultiConnEntry *mce = multiConnEntry;
00895   int fd_cnt = server->conn_cnt;
00896   int i;
00897   time_t now;
00898 
00899   for (i = 0; i < MAX_MULTICONN_ENTRIES; i++, mce++)
00900     if (!mce->fd_cnt) {
00901       rpc_addRealize(mce, fd_cnt, new_fd);
00902       return i;
00903     }
00904 
00905 #ifdef TRACE
00906   utlog(msg_make("multiConnAddEntry: TRIES GARBAGE!\n"));
00907 #endif
00908 
00909   /* no found, tries to garbage according to begin time */
00910   time(&now);
00911   mce = multiConnEntry;
00912   for (i = 0; i < MAX_MULTICONN_ENTRIES; i++, mce++)
00913     if ((now - mce->begin) > 10) {
00914       rpc_multiConnSuppress(server, mce, rpc_True);
00915       rpc_addRealize(mce, fd_cnt, new_fd);
00916       return i;
00917     }
00918 
00919   return -1;
00920 }
00921 
00922 static rpc_MultiConnEntry *
00923 rpc_multiConnGetEntry(int xid)
00924 {
00925   rpc_MultiConnEntry *mce;
00926 
00927   if (xid < 0 || xid >= MAX_MULTICONN_ENTRIES)
00928     return 0;
00929 
00930   mce = &multiConnEntry[xid];
00931   if (!mce->fd_cnt)
00932     return 0;
00933 
00934   return mce;
00935 }
00936 
00937 static void
00938 rpc_multiConnManage(rpc_Server *server, int fd, int *pmaxfd)
00939 {
00940   rpc_MultiConnInfo info;
00941   rpc_MultiConnInfo xinfo;
00942 
00943 #ifdef TRACE
00944   utlog(msg_make("rpc_multiConnManage %d\n", fd));
00945 #endif
00946 
00947   if (rpc_socketReadTimeout(fd, &info, sizeof(info), 10) != sizeof(info)) {
00948 #ifdef TRACE
00949     utlog(msg_make("timeout for fd=%d\n", fd));
00950 #endif
00951     rpc_multiConnClear(server, fd, "timeout");
00952     return;
00953   }
00954   x2h_rpc_multiconninfo(&info);
00955 
00956 #ifdef TRACE2
00957   utlog(msg_make("get magic %x, cmd = %x, xid = %d\n", info.magic, info.cmd,
00958                  info.xid));
00959 #endif
00960   if (info.magic != MM(server->magic)) {
00961     fprintf(stderr, "bad magic: %x vs %x\n", info.magic, MM(server->magic));
00962     rpc_multiConnClear(server, fd, "bad magic");
00963     return;
00964   }
00965 
00966   if (info.cmd == rpc_NewConnection) {
00967     info.xid = rpc_multiConnAddEntry(server, fd);
00968 
00969     info.cmd = rpc_ReplyNewConnection;
00970 
00971     h2x_rpc_multiconninfo(&xinfo, &info);
00972     if (rpc_socketWrite(fd, &xinfo, sizeof(xinfo)) != sizeof(xinfo)) {
00973       rpc_multiConnClear(server, fd, "bad new connection");
00974       return;
00975     }
00976   }
00977   else if (info.cmd == rpc_AssociatedConnection) {
00978     rpc_MultiConnEntry *mce;
00979     mce = rpc_multiConnGetEntry(info.xid);
00980     if (!mce) {
00981       rpc_multiConnClear(server, fd, "no mce");
00982       return;
00983     }
00984       
00985 #ifdef TRACE2
00986     utlog(msg_make("rpc_AssociatedConnection\n"));
00987 #endif
00988     mce->fd[mce->fd_cnt++] = fd;
00989 
00990     info.cmd = rpc_ReplyNewConnection;
00991     h2x_rpc_multiconninfo(&xinfo, &info);
00992     if (rpc_socketWrite(fd, &xinfo, sizeof(xinfo)) != sizeof(xinfo)) {
00993       rpc_multiConnClear(server, fd, "bad reply connection");
00994       return;
00995     }
00996     if (mce->fd_cnt == server->conn_cnt) {
00997       rpc_makeNewConnection(server, mce->fd, mce->fd_cnt, pmaxfd,
00998                             multiConnInfo[fd]);
00999 
01000       rpc_multiConnSuppress(server, mce, rpc_True);
01001     }
01002   }
01003   else
01004     rpc_multiConnClear(server, fd, "command not understood");
01005 }
01006 
01007 rpc_Status
01008 rpc_serverMainLoop(rpc_Server *server, rpc_PortHandle **ports, int nports)
01009 {
01010   int max_fd = 0;
01011   int n, fd, new_fd;
01012   fd_set fds_ready_to_read;
01013   rpc_PortHandle *portdb[sizeof(fd_set)*8];
01014 
01015   rpc_mainServer = server;
01016 
01017   memset(portdb, 0, sizeof(portdb));
01018 
01019   FD_ZERO(&server->fds_used);
01020 
01021   for (n = 0; n < nports; n++) {
01022     rpc_PortHandle *port = ports[n];
01023 
01024     if (port->domain == AF_INET) {
01025       fd = port->u.in.sockin_fd;
01026       rpc_serverPort = atoi(port->portname);
01027     }
01028     else if (port->domain == AF_UNIX) {
01029       fd = port->u.un.sockun_fd;
01030       rpc_unixPort = port->portname;
01031     }
01032 
01033     if (fd > max_fd)
01034       max_fd = fd;
01035 
01036     portdb[fd] = port;
01037 
01038     FD_SET(fd, &server->fds_used);
01039   }
01040 
01041   for (;;) {
01042 #ifdef HAVE_FATTACH
01043     struct strrecvfd info;
01044 #endif
01045     fds_ready_to_read = server->fds_used;
01046     
01047     /* select sets those which are ready to read */
01048     n = select (max_fd+1, &fds_ready_to_read, 0, 0, 0);
01049 
01050     if (n < 0) {
01051       if (errno == EINTR) {
01052         continue;
01053       }
01054       else {
01055         PERROR(msg_make("error in select"));
01056         /*
01057           utlog("fds_ready_to_read %p, max_fd = %d\n",
01058           fds_ready_to_read, max_fd);
01059         */
01060         /* workaround! */
01061         {
01062           int ifd;
01063           struct stat stat;
01064           for (ifd = 0; ifd <= max_fd; ifd++)
01065             if (FD_ISSET(ifd, &server->fds_used) && fstat(ifd, &stat)<0) {
01066               utlog("warning, fd is invalid %d\n", ifd);
01067               FD_CLR(ifd, &server->fds_used);
01068             }
01069           continue;
01070         }
01071         /*return rpc_Error;*/
01072       }
01073     }
01074 
01075     for (fd = 0; fd <= max_fd; fd++)
01076       if (FD_ISSET(fd, &fds_ready_to_read)) {
01077         rpc_PortHandle *port;
01078         if (port = portdb[fd]) {
01079           /* we have a new connection */
01080           struct sockaddr *sock_addr;
01081           socklen_t length;
01082 
01083           if (port->domain == AF_INET) {
01084             sock_addr = (struct sockaddr *)&port->u.in.sock_in_name;
01085             length = sizeof(port->u.in.sock_in_name);
01086           }
01087           else {
01088             sock_addr = (struct sockaddr *)&port->u.un.sock_un_name;
01089             length = sizeof(port->u.un.sock_un_name);
01090           }
01091 
01092 #ifdef HAVE_FATTACH
01093           if (port->domain == AF_UNIX) {
01094             if (ioctl(fd, I_RECVFD, &info) < 0) {
01095               PERROR("ioctl");
01096               continue;
01097             }
01098 
01099             IDB_LOG(IDB_LOG_CONN,
01100                     ("connection from %s %s\n",
01101                      getpwuid(info.uid)->pw_name,
01102                      (getgrgid(info.gid) ?
01103                       getgrgid(info.gid)->gr_name : "")));
01104 
01105             new_fd = info.fd;
01106           }
01107           else
01108 #endif
01109             if ((new_fd = accept(fd, sock_addr, &length)) < 0)
01110               PERROR("accept connection");
01111 
01112           if (new_fd >= 0) {
01113             rpc_ConnInfo *ci;
01114             if (port->domain == AF_UNIX) {
01115 #ifdef HAVE_FATTACH
01116               ci = rpc_make_stream_conninfo(new_fd, &info);
01117 #else
01118               ci = rpc_make_unix_conninfo(new_fd);
01119 #endif
01120             }
01121             else {
01122               rpc_socket_nodelay(new_fd);
01123               ci = rpc_make_tcpip_conninfo(new_fd);
01124             }
01125 
01126             if (!ci) {
01127               close(new_fd);
01128               continue;
01129             }
01130 
01131             if (server->conn_cnt > 1) {
01132               FD_SET(new_fd, &server->fds_used);
01133               if (new_fd > max_fd)
01134                 max_fd = new_fd;
01135               multiConn[new_fd] = rpc_True;
01136               multiConnInfo[new_fd] = ci;
01137             }
01138             else
01139               rpc_makeNewConnection(server, &new_fd, 1, &max_fd, ci);
01140           }
01141         }
01142         else if (multiConn[fd])
01143           rpc_multiConnManage(server, fd, &max_fd);
01144         else {
01145           if (!rpc_inputHandle(server, 0, fd)) {
01146             rpc_garbClientInfo(server, 0, fd);
01147           }
01148         }
01149       }
01150   }
01151 
01152   return rpc_Success;
01153 }
01154 
01155 rpc_Server *
01156 rpc_getMainServer()
01157 {
01158   return rpc_mainServer;
01159 }
01160 
01161 rpc_ConnectionHandlerFunction
01162 rpc_setConnectionHandler(rpc_Server *server,
01163                          rpc_ConnectionHandlerFunction connh)
01164 {
01165   rpc_ConnectionHandlerFunction oconnh = server->connh;
01166 
01167   server->connh = connh;
01168 
01169   return oconnh;
01170 }
01171 
01172 
01173 static int rpc_serverArgsMake(rpc_Server *, int, int, rpc_ServerFunction **,
01174                               rpc_SendRcv, rpc_FromTo);
01175 
01176 static int
01177 rpc_inputHandle(rpc_Server *server, int which, int fd)
01178 {
01179   rpc_ServerFunction *func;
01180   rpc_Status status;
01181   rpc_ClientInfo *ci = clientInfo[fd];
01182 
01183   if (server->begin)
01184     server->begin(which, server->user_data);
01185 
01186   if (which < 0 || which >= server->conn_cnt)
01187     return 0;
01188 
01189   if (!rpc_serverArgsMake(server, which, fd, &func, rpc_Send,
01190                           rpc_From))
01191     return 0;
01192             
01193   (*func->uf)((rpc_ClientId)fd, ci->ua[which]);
01194             
01195   if (!rpc_serverArgsMake(server, which, fd, &func, rpc_Rcv,
01196                           rpc_To))
01197     return 0;
01198   
01199   if (server->end)
01200     server->end(which, server->user_data);
01201   
01202   return 1;
01203 }
01204 
01205 static void
01206 rpc_garbRealize(rpc_Server *server, rpc_ClientInfo *ci, int force)
01207 {
01208   int i;
01209   
01210   for (i = 0; i < server->conn_cnt; i++) {
01211     free(ci->comm_buff[i]);
01212     free(ci->ua[i]);
01213   }
01214   
01215   free(ci->comm_buff);
01216   free(ci->ua);
01217   if (force) {
01218     free(ci->tid);
01219     free(ci);
01220   }
01221 
01222   for (i = 0; i < sizeof(clientInfo)/sizeof(clientInfo[0]); i++)
01223     if (clientInfo[i] == ci)
01224       clientInfo[i] = 0;
01225 }
01226 
01227 static void
01228 rpc_garbClientInfo(rpc_Server *server, int which, int fd)
01229 {
01230   rpc_ClientInfo *ci = clientInfo[fd];
01231 
01232 #ifdef TRACE2
01233   printf("%d:%d garbClientInfo...\n", rpc_getpid(), pthread_self());
01234 #endif
01235 #ifdef TRACE
01236   utlog(msg_make("rpc_garbClientInfo(which = %d, fd = %d, ci = %p)\n",
01237                  which, fd, ci));
01238 #endif
01239   if (!ci)
01240     return;
01241 
01242   eyedblib::MutexLocker _(gen_mp);
01243   //pthread_mutex_lock(&gen_mp);
01244 #ifdef TRACE
01245   utlog(msg_make("refcnt = %d, fd_cnt = %d\n",
01246                  ci->refcnt, ci->fd_cnt));
01247 #endif
01248 
01249   if (!which && server->connh)
01250     (*server->connh)(server, (rpc_ClientId)fd, rpc_False);
01251 
01252   if (!--ci->refcnt)
01253     rpc_garbRealize(server, ci, 0);
01254 
01255   clientInfo[fd] = 0;
01256 
01257   FD_CLR(fd, &server->fds_used);
01258 #ifdef TRACE
01259   //utlog("rpc_garbClientInfo: close(%d)\n", fd);
01260 #endif
01261   close(fd);
01262 
01263 #if 0
01264   if (ci->refcnt) {
01265     _.unlock();
01266     int i = 0;
01267     for (i = 0; i < ci->fd_cnt; i++) {
01268       if (ci->tid[i] == pthread_self())
01269         break;
01270     }
01271 
01272     for (++i; i < ci->fd_cnt; i++) {
01273       printf("%d:%d killing thread %d:%d\n", rpc_getpid(),
01274              pthread_self(), rpc_getpid(), ci->tid[i]);
01275       pthread_kill(ci->tid[i], SIGTERM);
01276     }
01277   }
01278 #endif
01279 
01280 #ifdef TRACE
01281   utlog(msg_make("close connection fd=%d\n", fd));
01282 #endif
01283   //pthread_mutex_unlock(&gen_mp);
01284 }
01285 
01286 /* send_rcv: select interesting arguments : rpc_Send || rpc_Rcv
01287    fromto  : rpc_From or rpc_To socket
01288 */
01289 
01290 rpc_ClientInfo *
01291 rpc_clientInfoGet(int fd)
01292 {
01293   return clientInfo[fd];
01294 }
01295 
01296 int rpc_serverArgsMake(rpc_Server *server, int which, int fd,
01297                        rpc_ServerFunction **pfunc,
01298                        rpc_SendRcv send_rcv, rpc_FromTo fromto)
01299 {
01300   int i, commsz;
01301   char *buff, *commb;
01302   register rpc_Arg *arg;
01303   rpc_ServerArg pua;
01304   rpc_RpcHeader rhd;
01305   rpc_RpcHeader xrhd;
01306   int ndata = 0;
01307   rpc_ServerData *p_data[RPC_NDATA];
01308   rpc_RpcDescription *rd;
01309   register rpc_ClientInfo *ci = clientInfo[fd];
01310   char *comm_buff = ci->comm_buff[which];
01311   rpc_ServerArg ua = ci->ua[which];
01312   int comm_size = server->comm_size;
01313   int args_size = server->args_size;
01314   rpc_ServerFunction *func;
01315   int buff_size = 0;
01316   rpc_UserType *utyp;
01317   rpc_StatusRec rstatus;
01318 
01319   rstatus.err = 0;
01320 
01321   int read_cnt = 0;
01322   int write_cnt = 0;
01323   commb = comm_buff;
01324   commsz = comm_size;
01325   /*#ifdef RPC_MIN_SIZE*/
01326 #if 1
01327   if (fromto == rpc_From)
01328     buff = commb;
01329   else
01330     buff = commb + sizeof(rhd);
01331 #else
01332   buff = commb + sizeof(rhd);
01333 #endif
01334 
01335   if (fromto == rpc_From) {
01336     /* lecture de l'header */
01337 #ifdef USE_RPC_MIN_SIZE
01338     read_cnt++;
01339 #ifdef RPC_TIMEOUT
01340     if (rpc_socketReadTimeout(fd, buff, RPC_MIN_SIZE, rpc_timeout) !=
01341         RPC_MIN_SIZE)
01342       return 0;
01343 #else
01344     if (rpc_socketRead(fd, buff, RPC_MIN_SIZE) != RPC_MIN_SIZE)
01345       return 0;
01346 #endif
01347 
01348     memcpy(&rhd, buff, sizeof(rhd));
01349     x2h_rpc_hd(&rhd);
01350     buff += sizeof(rhd);
01351 #else
01352 
01353 #ifdef RPC_TIMEOUT
01354     read_cnt++;
01355     if (rpc_socketReadTimeout(fd, buff, sizeof(rhd), rpc_timeout) !=
01356         sizeof(rhd))
01357       return 0;
01358 #else
01359     if (rpc_socketRead(fd, buff, sizeof(rhd)) != sizeof(rhd))
01360       return 0;
01361 #endif
01362 
01363     memcpy(&rhd, buff, sizeof(rhd));
01364     x2h_rpc_hd(&rhd);
01365     buff += sizeof(rhd);
01366 
01367 #endif
01368     if (rhd.magic != server->magic) {
01369       IDB_LOG_FX(("Server Error #1: invalid magic=%p, expected=%d, "
01370                   "serial=%d\n", rhd.magic, server->magic, rhd.serial));
01371       /*rpc_quit(1, 0);*/
01372       return 0;
01373     }
01374 
01375     func = rpc_rpcGet(server, rhd.code);
01376 
01377     if (!func) {
01378       IDB_LOG_FX(("Server Error #2: invalid function code=%d\n", rhd.code));
01379       /*rpc_quit(1, 0);*/
01380       return 0;
01381     }
01382 
01383 #ifdef USE_RPC_MIN_SIZE
01384     if (rhd.size-RPC_MIN_SIZE > 0) {
01385       read_cnt++;
01386       if (rpc_socketRead(fd, buff+RPC_MIN_SIZE-sizeof(rhd),
01387                          rhd.size-RPC_MIN_SIZE) != rhd.size-RPC_MIN_SIZE) {
01388         IDB_LOG_FX(("Server Error #3: read failed for %d bytes\n",
01389                     rhd.size-sizeof(rhd)));
01390         /*rpc_quit(1, 0);*/
01391         return 0;
01392       }
01393     }
01394 #else
01395     if (rhd.size-sizeof(rhd))
01396       if (rpc_socketRead(fd, buff, rhd.size-sizeof(rhd)) !=
01397           rhd.size-sizeof(rhd)) {
01398         IDB_LOG_FX(("Server Error #3: read failed for %d bytes\n",
01399                     rhd.size-sizeof(rhd)));
01400         /*rpc_quit(1, 0);*/
01401         return 0;
01402       }
01403 #endif
01404     *pfunc = func;
01405   }
01406   else {
01407     func = *pfunc;
01408 #ifdef TRACE2
01409     utlog(msg_make("[%d] serverArgsMake code #%d, TO\n", pthread_self(), func->rd->code));
01410 #endif
01411   }
01412 
01413   rd = func->rd;
01414 
01415   for (i = 0, arg = rd->args, pua = ua; i < rd->nargs; i++, arg++, pua += args_size)
01416     switch(arg->type) {
01417     case rpc_Int16Type:
01418       rpc_copy_fast_xdr(arg, buff, pua, sizeof(eyedblib::int16), send_rcv, fromto, x2h_16_cpy, h2x_16_cpy);
01419       break;
01420         
01421     case rpc_Int32Type:
01422       rpc_copy_fast_xdr(arg, buff, pua, sizeof(eyedblib::int32), send_rcv, fromto, x2h_32_cpy, h2x_32_cpy);
01423       break;
01424         
01425     case rpc_Int64Type:
01426       rpc_copy_fast_xdr(arg, buff, pua, sizeof(eyedblib::int64), send_rcv, fromto, x2h_64_cpy, h2x_64_cpy);
01427       break;
01428         
01429     case rpc_StatusType:
01430       if ((arg->send_rcv & rpc_Rcv) && fromto == rpc_To) {
01431         eyedblib_mcp(&rstatus, pua, sizeof(rstatus));
01432       }
01433 
01434       /* assuming that the 'err' field is of sizeof(eyedblib::int32) and that
01435          it is the first field in the rpc_StatusRec structure */
01436       rpc_copy_fast_xdr(arg, buff, pua, sizeof(eyedblib::int32), send_rcv, fromto, x2h_32_cpy, h2x_32_cpy);
01437 
01438       break;
01439 
01440     case rpc_StringType:
01441       if ((arg->send_rcv & rpc_Send) && fromto == rpc_From) {
01442         int len;
01443         x2h_32_cpy(&len, buff);
01444 
01445         buff += sizeof(len);
01446         if (len)
01447           *(char **)pua = buff; /* oups? */
01448         else
01449           *(char **)pua = "";
01450         buff += len;
01451       }
01452       else if ((arg->send_rcv & rpc_Rcv) && fromto == rpc_To) {
01453         int len;
01454 
01455         if (*(char **)pua)
01456           len = strlen(*(char **)pua)+1;
01457         else
01458           len = 0;
01459         h2x_32_cpy(buff, &len);
01460         buff += sizeof(len);
01461         if (len)
01462           memcpy(buff, *(char **)pua, len);
01463         buff += len;
01464       }
01465       break;
01466         
01467     case rpc_DataType:
01468       if ((arg->send_rcv & rpc_Send) && fromto == rpc_From) {
01469         int status;
01470         rpc_ServerData *a_data = (rpc_ServerData *)pua;
01471 
01472         a_data->garbage_fun = 0;
01473         a_data->garbage_data = 0;
01474 
01475         x2h_32_cpy(&a_data->data, buff);
01476 
01477         buff += 8;
01478         x2h_32_cpy(&a_data->size, buff);
01479         buff += sizeof(a_data->size);
01480         x2h_32_cpy(&status, buff);
01481         buff += sizeof(status);
01482         if (status == rpc_SyncData) {
01483           a_data->data = buff;
01484           buff += a_data->size;
01485           a_data->fd = -1;
01486         }
01487         else
01488           a_data->fd = fd;
01489       }
01490       else if ((arg->send_rcv & rpc_Rcv) && fromto == rpc_To) {
01491         int status, offset;
01492         rpc_ServerData *a_data = (rpc_ServerData *)pua;
01493 
01494         h2x_32_cpy(buff, &a_data->size);
01495         buff += sizeof(a_data->size);
01496 
01497         if (a_data->status == rpc_BuffUsed) {
01498           status = rpc_SyncData;
01499           h2x_32_cpy(buff, &status);
01500           buff += sizeof(status);
01501           offset = (char *)a_data->data - buff;
01502           h2x_32_cpy(buff, &offset);
01503           buff += sizeof(offset);
01504           buff_size += a_data->size;
01505         }
01506         else if (a_data->status == rpc_TempDataUsed ||
01507                  a_data->status == rpc_PermDataUsed) {
01508           status = rpc_ASyncData;
01509           h2x_32_cpy(buff, &status);
01510           buff += sizeof(status);
01511           offset = 0;
01512           h2x_32_cpy(buff, &offset);
01513           buff += sizeof(offset);
01514           p_data[ndata++] = a_data;
01515         }
01516       }
01517       else if ((arg->send_rcv & rpc_Rcv) && fromto == rpc_From) {
01518         char *pbuff = commb + sizeof(rhd);
01519         int j;
01520         rpc_Arg *parg;
01521         rpc_Boolean cant = rpc_False;
01522         rpc_ServerData *a_data = (rpc_ServerData *)pua;
01523 
01524         a_data->garbage_fun = 0;
01525         a_data->garbage_data = 0;
01526 
01527         for (j = 0, parg = rd->args; j < rd->nargs; j++, parg++)
01528           switch(parg->type) {
01529           case rpc_ByteType:
01530             if (parg->send_rcv & rpc_Rcv)
01531               pbuff += sizeof(char);
01532             break;
01533 
01534           case rpc_Int16Type:
01535             if (parg->send_rcv & rpc_Rcv)
01536               pbuff += sizeof(eyedblib::int16);
01537             break;
01538         
01539           case rpc_Int32Type:
01540             if (parg->send_rcv & rpc_Rcv)
01541               pbuff += sizeof(eyedblib::int32);
01542             break;
01543         
01544           case rpc_Int64Type:
01545             if (parg->send_rcv & rpc_Rcv)
01546               pbuff += sizeof(eyedblib::int64);
01547             break;
01548         
01549           case rpc_StatusType:
01550             if (parg->send_rcv & rpc_Rcv)
01551               pbuff += sizeof(eyedblib::int32);
01552             break;
01553 
01554           case rpc_StringType:
01555             if (parg->send_rcv & rpc_Rcv)
01556               cant = rpc_True;
01557             break; 
01558 
01559           case rpc_DataType:
01560             if (parg->send_rcv & rpc_Rcv)
01561               if (j != i)
01562                 cant = rpc_True;
01563             break;
01564                   
01565           case rpc_VoidType:
01566             break;
01567 
01568           default:
01569             rpc_assert(parg->type >= rpc_NBaseType &&
01570                        parg->type < server->last_type);
01571             if (parg->send_rcv & rpc_Rcv) {
01572               utyp = rpc_getUTyp(server, parg->type);
01573 
01574               if (utyp->size == rpc_SizeVariable)
01575                 cant = rpc_True;
01576               else
01577                 pbuff += utyp->size;
01578               break;
01579             }
01580           }
01581 
01582         a_data->fd = fd;
01583 
01584         if (cant) {
01585           a_data->data = 0;
01586           a_data->buff_size = 0;
01587         }
01588         else {
01589           /* data size + status + offset */
01590           a_data->data = pbuff + 3*sizeof(int);
01591           a_data->buff_size = rpc_buff_size(commsz, commb, pbuff);
01592         }
01593         buff += 3*sizeof(int);
01594       }
01595       break;
01596         
01597     case rpc_VoidType:
01598       break;
01599 
01600     default:
01601       rpc_assert(arg->type >= rpc_NBaseType && arg->type < server->last_type);
01602       utyp = rpc_getUTyp(server, arg->type);
01603 
01604       if (utyp->func)
01605         utyp->func(arg, &buff, pua, send_rcv, fromto);
01606       else
01607         rpc_copy(arg, buff, pua, utyp->size, send_rcv, fromto);
01608     }
01609   
01610   if (fromto == rpc_From) {
01611     if (rhd.ndata)
01612       {
01613         int d[RPC_NDATA], i;
01614 
01615         read_cnt++;
01616         if (rpc_socketRead(fd, d, rhd.ndata*sizeof(int)) != rhd.ndata*sizeof(int))
01617           return 0;
01618       }
01619   }
01620   else if (fromto == rpc_To) {
01621     rhd.code = rd->code;
01622 
01623     rhd.magic = server->magic;
01624     rhd.serial = 0;
01625     rhd.ndata = ndata;
01626     rhd.status = 0;
01627 
01628     rhd.size = (int)(buff-commb) + buff_size;
01629 
01630     /*
01631 #ifdef USE_RPC_MIN_SIZE
01632     if (rhd.size < RPC_MIN_SIZE)
01633       rhd.size = RPC_MIN_SIZE;
01634 #endif
01635     */
01636     h2x_rpc_hd(&xrhd, &rhd);
01637     memcpy(commb, &xrhd, sizeof(xrhd));
01638       
01639 #ifdef USE_RPC_MIN_SIZE
01640     if (rhd.size < RPC_MIN_SIZE)
01641       rhd.size = RPC_MIN_SIZE;
01642 #endif
01643 
01644     write_cnt++;
01645     if (rpc_socketWrite(fd, commb, rhd.size) <= 0)
01646       return 0;
01647       
01648     if (ndata) {
01649       int d[RPC_NDATA], i;
01650       for (i = 0; i < ndata; i++)
01651         d[i] = h2x_32(p_data[i]->size);
01652       write_cnt++;
01653       if (rpc_socketWrite(fd, d, ndata*sizeof(int)) <= 0)
01654         return 0;
01655 
01656       for (i = 0; i < ndata; i++) {
01657         if (p_data[i]->size) {
01658           int error = 0;
01659           write_cnt++;
01660           if (rpc_socketWrite(fd, p_data[i]->data, p_data[i]->size) <= 0)
01661             error = 1;
01662           if (p_data[i]->status == rpc_TempDataUsed) {
01663             /*
01664               if (p_data[i]->garbage_fun)
01665               p_data[i]->garbage_fun(p_data[i]->data,
01666               p_data[i]->garbage_data);
01667               else */
01668             free((char *)p_data[i]->data);
01669           }
01670           if (error)
01671             return 0;
01672         }
01673       }
01674     }
01675 
01676     if (rstatus.err) {
01677       int len = strlen(rstatus.err_msg);
01678       int tmp = h2x_32(rstatus.err);
01679       write_cnt++;
01680       if (rpc_socketWrite(fd, &tmp, sizeof(tmp)) != sizeof(tmp))
01681         return 0;
01682       tmp = h2x_32(len);
01683       write_cnt++;
01684       if (rpc_socketWrite(fd, &tmp, sizeof(tmp)) != sizeof(tmp))
01685         return 0;
01686       write_cnt++;
01687       if (rpc_socketWrite(fd, rstatus.err_msg, len+1) != len+1)
01688         return 0;
01689     }
01690   }
01691 
01692   return 1;
01693 }
01694 
01695 rpc_Boolean
01696 rpc_serverCheck(int port)
01697 {
01698   int sock_fd, length;
01699   struct sockaddr_in sock_in_name;
01700   struct sockaddr *sock_addr;
01701   char hname[128];
01702 
01703   sock_in_name.sin_family = AF_INET;
01704   sock_in_name.sin_port = htons(port);
01705   /* get host name */
01706   if (gethostname(hname, sizeof(hname)-1) < 0) {
01707     PERROR(msg_make("gethostname failed") );
01708     return rpc_False;
01709   }
01710   hname[sizeof(hname)-1] = 0;
01711   if (!rpc_hostNameToAddr(hname, &sock_in_name.sin_addr))
01712     return rpc_False;
01713   sock_addr = (struct sockaddr *)&sock_in_name;
01714   length = sizeof(sock_in_name);
01715 
01716   if ((sock_fd = socket(AF_INET, SOCK_STREAM, 0))  < 0) {
01717     PERROR(msg_make("unable to create socket"));
01718     return rpc_False;
01719   }
01720 
01721   if (connect(sock_fd, sock_addr, length) < 0) {
01722     PERROR(msg_make("unable to connect socket"));
01723     return rpc_False;
01724   }
01725 
01726   close(sock_fd);
01727   return rpc_True;
01728 }
01729 
01730 rpc_ServerFunction *rpc_rpcGet(rpc_Server *server, rpc_RpcCode code)
01731 {
01732 #ifdef RPC_FUN_CHAIN
01733   register rpc_ServerFunction *func = server->first;
01734 
01735   while (func) {
01736     if (func->rd->code == code)
01737       return func;
01738     func = func->next;
01739   }
01740 
01741   return 0;
01742 #else
01743   return server->funcs[code - START_CODE];
01744 #endif
01745 }
01746 
01747 void
01748 eyedblib_abort()
01749 {
01750   time_t t;
01751   static int reentrant = 0;
01752   char msg[256];
01753   time(&t);
01754   
01755   if (reentrant)
01756     exit(1);
01757 
01758   reentrant = 1;
01759 
01760   sprintf(msg, "EyeDB aborting [pid = %d]\n", rpc_getpid());
01761   write(2, msg, strlen(msg));
01762 
01763   utlog("EyeDB aborting [pid = %d]\n", rpc_getpid());
01764   if (getenv("EYEDBDBG")) for (;;) sleep(1000);
01765 
01766   _QUIT_(0);
01767 
01768   kill(SIGABRT, rpc_getpid());
01769   exit(2);
01770 }
01771 
01772 void
01773 abort()
01774 {
01775   eyedblib_abort();
01776 }

Generated on Mon Dec 22 18:16:07 2008 for eyedb by  doxygen 1.5.3