rpc_lib.cc

00001 /* 
00002    EyeDB Object Database Management System
00003    Copyright (C) 1994-2008 SYSRA
00004    
00005    EyeDB is free software; you can redistribute it and/or
00006    modify it under the terms of the GNU Lesser General Public
00007    License as published by the Free Software Foundation; either
00008    version 2.1 of the License, or (at your option) any later version.
00009    
00010    EyeDB is distributed in the hope that it will be useful,
00011    but WITHOUT ANY WARRANTY; without even the implied warranty of
00012    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013    Lesser General Public License for more details.
00014    
00015    You should have received a copy of the GNU Lesser General Public
00016    License along with this library; if not, write to the Free Software
00017    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA 
00018 */
00019 
00020 /*
00021    Author: Eric Viara <viara@sysra.com>
00022 */
00023 
00024 #include <eyedbconfig.h>
00025 
00026 #include <assert.h>
00027 #include <string.h>
00028 #include <errno.h>
00029 #include <fcntl.h>
00030 #include <sys/types.h>
00031 #include <sys/socket.h>
00032 #include <sys/select.h>
00033 #include <time.h>
00034 // // @@@ ???
00035 // #ifdef AIX
00036 // #define _NO_BITFIELDS
00037 // #endif
00038 #include <netinet/in.h>
00039 #include <netinet/tcp.h>
00040 
00041 #if TIME_WITH_SYS_TIME
00042 #include <sys/time.h>
00043 #include <time.h>
00044 #else
00045 #if HAVE_SYS_TIME_H
00046 #include <sys/time.h>
00047 #else
00048 #include <time.h>
00049 #endif
00050 #endif
00051 
00052 #include <unistd.h>
00053 
00054 #include <eyedblib/rpc_lib.h>
00055 #include <eyedblib/xdr.h>
00056 #include <lib/rpc_lib_p.h>
00057 
00058 int RPC_MIN_SIZE = 128;
00059 //int RPC_MIN_SIZE = 1024;
00060 
00061 void
00062 print_addr(FILE *fd, struct in_addr *addr)
00063 {
00064   if (!RPC_BYTE1(addr))
00065     fprintf(fd, "+");
00066   else
00067     fprintf(fd, "%d.%d.%d.%d", RPC_BYTE1(addr), RPC_BYTE2(addr), RPC_BYTE3(addr), RPC_BYTE4(addr));
00068 }
00069 
00070 int
00071 cmp_addr(const struct in_addr *a1, const struct in_addr *a2)
00072 {
00073   if (RPC_BYTE1(a1) && RPC_BYTE1(a1) != RPC_BYTE1(a2))
00074     return 0;
00075 
00076   if (RPC_BYTE2(a1) && RPC_BYTE2(a1) != RPC_BYTE2(a2))
00077     return 0;
00078   
00079   if (RPC_BYTE3(a1) && RPC_BYTE3(a1) != RPC_BYTE3(a2))
00080     return 0;
00081   
00082   if (RPC_BYTE4(a1) && RPC_BYTE4(a1) != RPC_BYTE4(a2))
00083     return 0;
00084 
00085   return 1;
00086 }
00087 
00088 int
00089 rpc_hostNameToAddr(const char *name, struct in_addr *address)
00090 {
00091   struct hostent *hp;
00092 
00093   if (!(hp = gethostbyname(name)))
00094     return 0;
00095 
00096   memcpy((char *)address, (char *)hp->h_addr, hp->h_length);
00097   return 1;
00098 }
00099 
00100 int
00101 hostname2addr(const char *name, struct in_addr *addr)
00102 {
00103   return !rpc_hostNameToAddr(name, addr);
00104   /*
00105   struct hostent *hp;
00106 
00107   memset(addr, 0, sizeof(*addr));
00108 
00109   if (!(hp = gethostbyname(name)))
00110     return 1;
00111 
00112   memcpy((char *)addr, (char *)hp->h_addr, hp->h_length);
00113 
00114   return 0;
00115   */
00116 }
00117 
00118 typedef int (*perform_func)(int, void *, size_t, int ud);
00119 
00120 static int
00121 read_timeval(int fd, void *buf, size_t n, struct timeval *tv)
00122 {
00123   fd_set fds;
00124   /*  struct timeval tm;*/
00125 
00126   FD_ZERO(&fds);
00127   FD_SET(fd, &fds);
00128 
00129   if (select(fd+1, &fds, 0, 0, tv) <= 0)
00130     return -1;
00131 
00132   return read(fd, buf, n); //@@@@ the type of return should be size_t
00133 }
00134 
00135 static int
00136 read_tm(int fd, void *buf, size_t n, int ud)
00137 {
00138   struct timeval tv;
00139 
00140   if (!ud)
00141     return read(fd, buf, n);
00142 
00143   tv.tv_sec = ud;
00144   tv.tv_usec = 0;
00145   return read_timeval(fd, buf, n, &tv);
00146 }
00147 
00148 static int
00149 socketRealize(int fd, void *data, int sz, perform_func perform, int ud)
00150 { 
00151   char *p = (char *)data;
00152   int n = 0;
00153   errno = 0;
00154 
00155   if (!sz)
00156     return 0;
00157 
00158   for (;;)
00159     {
00160       int rn;
00161 
00162       rn = (*perform)(fd, p, sz - n, ud);
00163 
00164       if (rn < 0 && errno == EINTR)
00165         continue;
00166 
00167       if (rn <= 0)
00168         return rn;
00169 
00170       else if (rn == sz - n)
00171         return sz;
00172 
00173       n += rn;
00174       p += rn;
00175     }
00176 }
00177 
00178 static unsigned int total_bytes_read, total_bytes_write;
00179 static unsigned int total_read, total_read_tm, total_write;
00180 
00181 /* #define SOCKET_PROFILE 1000 */
00182 #ifdef SOCKET_PROFILE
00183 static int socket_ms = 0;
00184 #endif
00185 
00186 int
00187 rpc_socketWrite(int fd, void *data, int sz)
00188 {
00189   total_bytes_write += sz;
00190 #ifdef SOCKET_PROFILE
00191   struct timeval tv;
00192   if (!socket_ms) {
00193     gettimeofday(&tv, 0);
00194     socket_ms = (unsigned long long)tv.tv_sec * 1000ULL +
00195       (unsigned long long)tv.tv_usec / 1000ULL;
00196   }
00197 #endif
00198   total_write++;
00199 #ifdef SOCKET_PROFILE
00200   if (!(total_write % SOCKET_PROFILE)) {
00201     gettimeofday(&tv, 0);
00202     int ms = (unsigned long long)tv.tv_sec * 1000ULL +
00203       (unsigned long long)tv.tv_usec / 1000ULL;
00204     fprintf(stdout, "total_write: %d [%d ms]\n", total_write,
00205             ms - socket_ms);
00206   }
00207 #endif
00208   return socketRealize(fd, data, sz, (perform_func)write, 0);
00209 }
00210 
00211 int
00212 rpc_socketRead(int fd, void *data, int sz)
00213 {
00214   total_bytes_read += sz;
00215 #ifdef SOCKET_PROFILE
00216   struct timeval tv;
00217   if (!socket_ms) {
00218     gettimeofday(&tv, 0);
00219     socket_ms = (unsigned long long)tv.tv_sec * 1000ULL +
00220       (unsigned long long)tv.tv_usec / 1000ULL;
00221   }
00222 #endif
00223   total_read++;
00224 #ifdef SOCKET_PROFILE
00225     gettimeofday(&tv, 0);
00226     int ms = (unsigned long long)tv.tv_sec * 1000ULL +
00227       (unsigned long long)tv.tv_usec / 1000ULL;
00228   if (!(total_read % SOCKET_PROFILE))
00229     fprintf(stdout, "total_read: %d [%d ms]\n", total_read, ms-socket_ms);
00230 #endif
00231   return socketRealize(fd, data, sz, (perform_func)read, 0);
00232 }
00233 
00234 int
00235 rpc_socketReadTimeout(int fd, void *data, int sz, int ud)
00236 {
00237   total_bytes_read += sz;
00238   total_read_tm++;
00239   return socketRealize(fd, data, sz, (perform_func)read_tm, ud);
00240 }
00241 
00242 rpc_RpcDescription *
00243 rpc_newRpcDescription(rpc_RpcCode code, int nargs)
00244 {
00245   rpc_RpcDescription *rd = rpc_new(rpc_RpcDescription);
00246 
00247   rd->code     = code;
00248   rd->nargs    = nargs+1;
00249   rd->args     = (rpc_Arg *)calloc(sizeof(rpc_Arg), rd->nargs);
00250 
00251   return rd;
00252 }
00253 
00254 void
00255 rpc_deleteRpcDescription(rpc_RpcDescription *rd)
00256 {
00257   if (rd->args)
00258     free(rd->args);
00259   free(rd);
00260 }
00261 
00262 #define isnumber(c) ((c) >= '0' && (c) <= '9')
00263 
00264 rpc_Boolean
00265 rpc_portIsAddress(const char *portname)
00266 {
00267   register const char *s = portname;
00268   char c;
00269 
00270   if (!s || !*s)
00271     return rpc_False;
00272 
00273   while (c = *s++)
00274     if (!isnumber(c))
00275       return rpc_False;
00276 
00277   return rpc_True;
00278 }
00279 
00280 const char *rpc_getPortAttr(const char *port, int *domain, int *type)
00281 {
00282   const char *x = strchr(port, ':');
00283 
00284   if (!x)
00285     {
00286       *domain = rpc_portIsAddress(port) ? AF_INET : AF_UNIX;
00287       *type = SOCK_STREAM;
00288       /*
00289       printf("rpc_getPortAttr: '%s' (domain = %d, type = %d)\n",
00290              port, *domain, *type);
00291              */
00292       return port;
00293     }
00294 
00295   if (!strncasecmp(port, "udp:", 4))
00296     *type = SOCK_DGRAM;
00297   else if (!strncasecmp(port, "tcp:", 4))
00298     *type = SOCK_STREAM;
00299   else
00300     return NULL;
00301 
00302   port = &x[1];
00303   *domain = rpc_portIsAddress(port) ? AF_INET : AF_UNIX;
00304   /*
00305   printf("rpc_getPortAttr: '%s' (domain = %d, type = %d)\n",
00306          port, *domain, *type);
00307          */
00308   return port;
00309 }
00310 
00311 #include <sys/stat.h>
00312 
00313 static int conn_fd;
00314 
00315 void rpc_setConnFd(int fd)
00316 {
00317   conn_fd = fd;
00318 }
00319 
00320 int
00321 rpc_checkConn()
00322 {
00323   struct stat stat;
00324   return fstat(conn_fd, &stat);
00325 }
00326 
00327 void (*rpc_quit_handler)(void *, int);
00328 void *rpc_quit_data;
00329 
00330 void
00331 rpc_setQuitHandler(void (*_rpc_quit_handler)(void *, int),
00332                    void *_rpc_quit_data)
00333 {
00334   rpc_quit_handler = _rpc_quit_handler;
00335   rpc_quit_data    = _rpc_quit_data;
00336 }
00337 
00338 void
00339 rpc_getStats(unsigned int *read_cnt,
00340              unsigned int *read_tm_cnt,
00341              unsigned int *write_cnt,
00342              unsigned int *byte_read_cnt,
00343              unsigned int *byte_write_cnt)
00344 {
00345   *read_cnt = total_read;
00346   *read_tm_cnt = total_read_tm;
00347   *write_cnt = total_write;
00348 
00349   *byte_read_cnt = total_bytes_read;
00350   *byte_write_cnt = total_bytes_write;
00351 }
00352 
00353 eyedblib::int16
00354 hton16(eyedblib::int16 x)
00355 {
00356   return htons(x);
00357 }
00358 
00359 eyedblib::int32
00360 hton32(eyedblib::int32 x)
00361 {
00362   return htonl(x);
00363 }
00364 
00365 eyedblib::int64
00366 hton64(eyedblib::int64 x)
00367 {
00368   eyedblib::int32 h = x >> 32;
00369   eyedblib::int32 l = x & 0xffffffff;
00370   return ((unsigned long long)htonl(l)) << 32 | htonl(h);
00371 }
00372 
00373 void x2h_rpc_hd(rpc_RpcHeader *rhd)
00374 {
00375   rhd->magic = x2h_u32(rhd->magic);
00376   rhd->serial = x2h_32(rhd->serial);
00377   rhd->code = x2h_32(rhd->code);
00378   rhd->size = x2h_32(rhd->size);
00379   rhd->ndata = x2h_32(rhd->ndata);
00380   rhd->status = x2h_u32(rhd->status);
00381 }
00382 
00383 void h2x_rpc_hd(rpc_RpcHeader *xrhd, const rpc_RpcHeader *hrhd)
00384 {
00385   xrhd->magic = h2x_u32(hrhd->magic);
00386   xrhd->serial = h2x_32(hrhd->serial);
00387   xrhd->code = h2x_32(hrhd->code);
00388   xrhd->size = h2x_32(hrhd->size);
00389   xrhd->ndata = h2x_32(hrhd->ndata);
00390   xrhd->status = h2x_u32(hrhd->status);
00391 }
00392 
00393 void x2h_rpc_multiconninfo(rpc_MultiConnInfo *info)
00394 {
00395   info->magic = x2h_u32(info->magic);
00396   info->cmd = (rpc_MultiConnCommand)x2h_32(info->cmd);
00397   info->xid = x2h_32(info->xid);
00398 }
00399 
00400 void h2x_rpc_multiconninfo(rpc_MultiConnInfo *xinfo, rpc_MultiConnInfo *hinfo)
00401 {
00402   xinfo->magic = h2x_u32(hinfo->magic);
00403   xinfo->cmd = (rpc_MultiConnCommand)h2x_32(hinfo->cmd);
00404   xinfo->xid = h2x_32(hinfo->xid);
00405 }
00406 
00407 /*
00408 void
00409 terminate__Fv()
00410 {
00411   fprintf(stderr, "terminate__Fv\n");
00412   exit(1);
00413 }
00414 */
00415 
00416 void rpc_socket_nodelay(int s)
00417 {
00418   int flag = 1;
00419   socklen_t size = sizeof(int);
00420   int sz = 0;
00421 
00422   if (getenv("NO_TCP_NODELAY"))
00423     return;
00424 
00425   if (getsockopt(s,
00426                  IPPROTO_TCP,     /* set option at TCP level */
00427                  TCP_NODELAY,     /* name of option */
00428                  (char *) &flag,  /* the cast is historical 
00429                                      cruft */
00430                  &size) < 0)
00431     perror("getsockopt nodelay");
00432   
00433   //#define TCP_TRACE
00434 
00435 #ifdef TCP_TRACE
00436   fprintf(stderr, "NODELAY: %d [flag=%d, size=%d]\n", s, flag, size);
00437 #endif
00438 
00439   flag = 1;
00440   if (setsockopt(s,
00441                  IPPROTO_TCP,     /* set option at TCP level */
00442                  TCP_NODELAY,     /* name of option */
00443                  (char *) &flag,  /* the cast is historical 
00444                                      cruft */
00445                  sizeof(int)) < 0)
00446     perror("setsockopt nodelay");
00447 
00448   if (getsockopt(s,
00449                  IPPROTO_TCP,     /* set option at TCP level */
00450                  TCP_NODELAY,     /* name of option */
00451                  (char *) &flag,  /* the cast is historical 
00452                                      cruft */
00453                  &size) < 0)
00454     perror("getsockopt nodelay");
00455   
00456 #ifdef TCP_TRACE
00457   fprintf(stderr, "after NODELAY: %d [flag=%d]\n", s, flag);
00458 #endif
00459 
00460   if (!getenv("TCP_BUFSZ"))
00461     return;
00462 
00463   if (getsockopt(s,
00464                  SOL_SOCKET,     /* set option at TCP level */
00465                  SO_SNDBUF,     /* name of option */
00466                  (char *)&sz,  /* the cast is historical 
00467                                   cruft */
00468                  &size) < 0)
00469     perror("getsockopt sndbuf");
00470   
00471 #ifdef TCP_TRACE
00472   fprintf(stderr, "snd buf: %d\n", sz);
00473 #endif
00474 
00475   if (getsockopt(s,
00476                  SOL_SOCKET,     /* set option at TCP level */
00477                  SO_RCVBUF,     /* name of option */
00478                  (char *)&sz,  /* the cast is historical 
00479                                   cruft */
00480                  &size) < 0)
00481     perror("getsockopt rcvbuf");
00482   
00483 #ifdef TCP_TRACE
00484   fprintf(stderr, "rcv buf: %d\n", sz);
00485 #endif
00486 
00487   sz = 2048;
00488   //sz = 8192;
00489   //sz = 16384;
00490   //sz = 32766;
00491   if (setsockopt(s,
00492                  SOL_SOCKET,     /* set option at TCP level */
00493                  SO_SNDBUF,     /* name of option */
00494                  (char *)&sz,  /* the cast is historical 
00495                                   cruft */
00496                  sizeof(int)) < 0)
00497     perror("setsockopt sndbuf");
00498 
00499   sz = 2048;
00500   if (setsockopt(s,
00501                  SOL_SOCKET,     /* set option at TCP level */
00502                  SO_RCVBUF,     /* name of option */
00503                  (char *)&sz,  /* the cast is historical 
00504                                   cruft */
00505                  sizeof(int)) < 0)
00506     perror("setsockopt sndbuf");
00507 
00508   if (getsockopt(s,
00509                  SOL_SOCKET,     /* set option at TCP level */
00510                  SO_SNDBUF,     /* name of option */
00511                  (char *)&sz,  /* the cast is historical 
00512                                   cruft */
00513                  &size) < 0)
00514     perror("getsockopt sndbuf");
00515   
00516 #ifdef TCP_TRACE
00517   fprintf(stderr, "snd buf 2: %d\n", sz);
00518 #endif
00519 
00520   if (getsockopt(s,
00521                  SOL_SOCKET,     /* set option at TCP level */
00522                  SO_RCVBUF,     /* name of option */
00523                  (char *)&sz,  /* the cast is historical 
00524                                   cruft */
00525                  &size) < 0)
00526     perror("getsockopt rcvbuf");
00527   
00528 #ifdef TCP_TRACE
00529   fprintf(stderr, "rcv buf 2: %d\n", sz);
00530 #endif
00531 
00532 
00533   fflush(stderr);
00534 }
00535 
00536 void rpc_checkAFUnixPort(const char *portname)
00537 {
00538   int r = access(portname, O_RDONLY);
00539   if (r < 0)
00540     return;
00541 
00542   int sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
00543   if (sock_fd < 0)
00544     return;
00545 
00546   struct sockaddr_un sock_un_name;
00547   struct sockaddr *sock_addr;
00548 
00549   sock_un_name.sun_family = AF_UNIX;
00550   strcpy(sock_un_name.sun_path, portname);
00551   sock_addr = (struct sockaddr *)&sock_un_name;
00552 
00553   r = connect(sock_fd, sock_addr, sizeof(sock_un_name));
00554   if (r < 0) {
00555     //fprintf(stderr, "eyedb notice: unlinking unix socket %s\n", portname);
00556     unlink(portname);
00557   }
00558   close(sock_fd);
00559 }

Generated on Mon Dec 22 18:16:07 2008 for eyedb by  doxygen 1.5.3