00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <eyedbconfig.h>
00025
00026 #include <assert.h>
00027 #include <string.h>
00028 #include <errno.h>
00029 #include <fcntl.h>
00030 #include <sys/types.h>
00031 #include <sys/socket.h>
00032 #include <sys/select.h>
00033 #include <time.h>
00034
00035
00036
00037
00038 #include <netinet/in.h>
00039 #include <netinet/tcp.h>
00040
00041 #if TIME_WITH_SYS_TIME
00042 #include <sys/time.h>
00043 #include <time.h>
00044 #else
00045 #if HAVE_SYS_TIME_H
00046 #include <sys/time.h>
00047 #else
00048 #include <time.h>
00049 #endif
00050 #endif
00051
00052 #include <unistd.h>
00053
00054 #include <eyedblib/rpc_lib.h>
00055 #include <eyedblib/xdr.h>
00056 #include <lib/rpc_lib_p.h>
00057
00058 int RPC_MIN_SIZE = 128;
00059
00060
00061 void
00062 print_addr(FILE *fd, struct in_addr *addr)
00063 {
00064 if (!RPC_BYTE1(addr))
00065 fprintf(fd, "+");
00066 else
00067 fprintf(fd, "%d.%d.%d.%d", RPC_BYTE1(addr), RPC_BYTE2(addr), RPC_BYTE3(addr), RPC_BYTE4(addr));
00068 }
00069
00070 int
00071 cmp_addr(const struct in_addr *a1, const struct in_addr *a2)
00072 {
00073 if (RPC_BYTE1(a1) && RPC_BYTE1(a1) != RPC_BYTE1(a2))
00074 return 0;
00075
00076 if (RPC_BYTE2(a1) && RPC_BYTE2(a1) != RPC_BYTE2(a2))
00077 return 0;
00078
00079 if (RPC_BYTE3(a1) && RPC_BYTE3(a1) != RPC_BYTE3(a2))
00080 return 0;
00081
00082 if (RPC_BYTE4(a1) && RPC_BYTE4(a1) != RPC_BYTE4(a2))
00083 return 0;
00084
00085 return 1;
00086 }
00087
00088 int
00089 rpc_hostNameToAddr(const char *name, struct in_addr *address)
00090 {
00091 struct hostent *hp;
00092
00093 if (!(hp = gethostbyname(name)))
00094 return 0;
00095
00096 memcpy((char *)address, (char *)hp->h_addr, hp->h_length);
00097 return 1;
00098 }
00099
00100 int
00101 hostname2addr(const char *name, struct in_addr *addr)
00102 {
00103 return !rpc_hostNameToAddr(name, addr);
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116 }
00117
00118 typedef int (*perform_func)(int, void *, size_t, int ud);
00119
00120 static int
00121 read_timeval(int fd, void *buf, size_t n, struct timeval *tv)
00122 {
00123 fd_set fds;
00124
00125
00126 FD_ZERO(&fds);
00127 FD_SET(fd, &fds);
00128
00129 if (select(fd+1, &fds, 0, 0, tv) <= 0)
00130 return -1;
00131
00132 return read(fd, buf, n);
00133 }
00134
00135 static int
00136 read_tm(int fd, void *buf, size_t n, int ud)
00137 {
00138 struct timeval tv;
00139
00140 if (!ud)
00141 return read(fd, buf, n);
00142
00143 tv.tv_sec = ud;
00144 tv.tv_usec = 0;
00145 return read_timeval(fd, buf, n, &tv);
00146 }
00147
00148 static int
00149 socketRealize(int fd, void *data, int sz, perform_func perform, int ud)
00150 {
00151 char *p = (char *)data;
00152 int n = 0;
00153 errno = 0;
00154
00155 if (!sz)
00156 return 0;
00157
00158 for (;;)
00159 {
00160 int rn;
00161
00162 rn = (*perform)(fd, p, sz - n, ud);
00163
00164 if (rn < 0 && errno == EINTR)
00165 continue;
00166
00167 if (rn <= 0)
00168 return rn;
00169
00170 else if (rn == sz - n)
00171 return sz;
00172
00173 n += rn;
00174 p += rn;
00175 }
00176 }
00177
00178 static unsigned int total_bytes_read, total_bytes_write;
00179 static unsigned int total_read, total_read_tm, total_write;
00180
00181
00182 #ifdef SOCKET_PROFILE
00183 static int socket_ms = 0;
00184 #endif
00185
00186 int
00187 rpc_socketWrite(int fd, void *data, int sz)
00188 {
00189 total_bytes_write += sz;
00190 #ifdef SOCKET_PROFILE
00191 struct timeval tv;
00192 if (!socket_ms) {
00193 gettimeofday(&tv, 0);
00194 socket_ms = (unsigned long long)tv.tv_sec * 1000ULL +
00195 (unsigned long long)tv.tv_usec / 1000ULL;
00196 }
00197 #endif
00198 total_write++;
00199 #ifdef SOCKET_PROFILE
00200 if (!(total_write % SOCKET_PROFILE)) {
00201 gettimeofday(&tv, 0);
00202 int ms = (unsigned long long)tv.tv_sec * 1000ULL +
00203 (unsigned long long)tv.tv_usec / 1000ULL;
00204 fprintf(stdout, "total_write: %d [%d ms]\n", total_write,
00205 ms - socket_ms);
00206 }
00207 #endif
00208 return socketRealize(fd, data, sz, (perform_func)write, 0);
00209 }
00210
00211 int
00212 rpc_socketRead(int fd, void *data, int sz)
00213 {
00214 total_bytes_read += sz;
00215 #ifdef SOCKET_PROFILE
00216 struct timeval tv;
00217 if (!socket_ms) {
00218 gettimeofday(&tv, 0);
00219 socket_ms = (unsigned long long)tv.tv_sec * 1000ULL +
00220 (unsigned long long)tv.tv_usec / 1000ULL;
00221 }
00222 #endif
00223 total_read++;
00224 #ifdef SOCKET_PROFILE
00225 gettimeofday(&tv, 0);
00226 int ms = (unsigned long long)tv.tv_sec * 1000ULL +
00227 (unsigned long long)tv.tv_usec / 1000ULL;
00228 if (!(total_read % SOCKET_PROFILE))
00229 fprintf(stdout, "total_read: %d [%d ms]\n", total_read, ms-socket_ms);
00230 #endif
00231 return socketRealize(fd, data, sz, (perform_func)read, 0);
00232 }
00233
00234 int
00235 rpc_socketReadTimeout(int fd, void *data, int sz, int ud)
00236 {
00237 total_bytes_read += sz;
00238 total_read_tm++;
00239 return socketRealize(fd, data, sz, (perform_func)read_tm, ud);
00240 }
00241
00242 rpc_RpcDescription *
00243 rpc_newRpcDescription(rpc_RpcCode code, int nargs)
00244 {
00245 rpc_RpcDescription *rd = rpc_new(rpc_RpcDescription);
00246
00247 rd->code = code;
00248 rd->nargs = nargs+1;
00249 rd->args = (rpc_Arg *)calloc(sizeof(rpc_Arg), rd->nargs);
00250
00251 return rd;
00252 }
00253
00254 void
00255 rpc_deleteRpcDescription(rpc_RpcDescription *rd)
00256 {
00257 if (rd->args)
00258 free(rd->args);
00259 free(rd);
00260 }
00261
00262 #define isnumber(c) ((c) >= '0' && (c) <= '9')
00263
00264 rpc_Boolean
00265 rpc_portIsAddress(const char *portname)
00266 {
00267 register const char *s = portname;
00268 char c;
00269
00270 if (!s || !*s)
00271 return rpc_False;
00272
00273 while (c = *s++)
00274 if (!isnumber(c))
00275 return rpc_False;
00276
00277 return rpc_True;
00278 }
00279
00280 const char *rpc_getPortAttr(const char *port, int *domain, int *type)
00281 {
00282 const char *x = strchr(port, ':');
00283
00284 if (!x)
00285 {
00286 *domain = rpc_portIsAddress(port) ? AF_INET : AF_UNIX;
00287 *type = SOCK_STREAM;
00288
00289
00290
00291
00292 return port;
00293 }
00294
00295 if (!strncasecmp(port, "udp:", 4))
00296 *type = SOCK_DGRAM;
00297 else if (!strncasecmp(port, "tcp:", 4))
00298 *type = SOCK_STREAM;
00299 else
00300 return NULL;
00301
00302 port = &x[1];
00303 *domain = rpc_portIsAddress(port) ? AF_INET : AF_UNIX;
00304
00305
00306
00307
00308 return port;
00309 }
00310
00311 #include <sys/stat.h>
00312
00313 static int conn_fd;
00314
00315 void rpc_setConnFd(int fd)
00316 {
00317 conn_fd = fd;
00318 }
00319
00320 int
00321 rpc_checkConn()
00322 {
00323 struct stat stat;
00324 return fstat(conn_fd, &stat);
00325 }
00326
00327 void (*rpc_quit_handler)(void *, int);
00328 void *rpc_quit_data;
00329
00330 void
00331 rpc_setQuitHandler(void (*_rpc_quit_handler)(void *, int),
00332 void *_rpc_quit_data)
00333 {
00334 rpc_quit_handler = _rpc_quit_handler;
00335 rpc_quit_data = _rpc_quit_data;
00336 }
00337
00338 void
00339 rpc_getStats(unsigned int *read_cnt,
00340 unsigned int *read_tm_cnt,
00341 unsigned int *write_cnt,
00342 unsigned int *byte_read_cnt,
00343 unsigned int *byte_write_cnt)
00344 {
00345 *read_cnt = total_read;
00346 *read_tm_cnt = total_read_tm;
00347 *write_cnt = total_write;
00348
00349 *byte_read_cnt = total_bytes_read;
00350 *byte_write_cnt = total_bytes_write;
00351 }
00352
00353 eyedblib::int16
00354 hton16(eyedblib::int16 x)
00355 {
00356 return htons(x);
00357 }
00358
00359 eyedblib::int32
00360 hton32(eyedblib::int32 x)
00361 {
00362 return htonl(x);
00363 }
00364
00365 eyedblib::int64
00366 hton64(eyedblib::int64 x)
00367 {
00368 eyedblib::int32 h = x >> 32;
00369 eyedblib::int32 l = x & 0xffffffff;
00370 return ((unsigned long long)htonl(l)) << 32 | htonl(h);
00371 }
00372
00373 void x2h_rpc_hd(rpc_RpcHeader *rhd)
00374 {
00375 rhd->magic = x2h_u32(rhd->magic);
00376 rhd->serial = x2h_32(rhd->serial);
00377 rhd->code = x2h_32(rhd->code);
00378 rhd->size = x2h_32(rhd->size);
00379 rhd->ndata = x2h_32(rhd->ndata);
00380 rhd->status = x2h_u32(rhd->status);
00381 }
00382
00383 void h2x_rpc_hd(rpc_RpcHeader *xrhd, const rpc_RpcHeader *hrhd)
00384 {
00385 xrhd->magic = h2x_u32(hrhd->magic);
00386 xrhd->serial = h2x_32(hrhd->serial);
00387 xrhd->code = h2x_32(hrhd->code);
00388 xrhd->size = h2x_32(hrhd->size);
00389 xrhd->ndata = h2x_32(hrhd->ndata);
00390 xrhd->status = h2x_u32(hrhd->status);
00391 }
00392
00393 void x2h_rpc_multiconninfo(rpc_MultiConnInfo *info)
00394 {
00395 info->magic = x2h_u32(info->magic);
00396 info->cmd = (rpc_MultiConnCommand)x2h_32(info->cmd);
00397 info->xid = x2h_32(info->xid);
00398 }
00399
00400 void h2x_rpc_multiconninfo(rpc_MultiConnInfo *xinfo, rpc_MultiConnInfo *hinfo)
00401 {
00402 xinfo->magic = h2x_u32(hinfo->magic);
00403 xinfo->cmd = (rpc_MultiConnCommand)h2x_32(hinfo->cmd);
00404 xinfo->xid = h2x_32(hinfo->xid);
00405 }
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416 void rpc_socket_nodelay(int s)
00417 {
00418 int flag = 1;
00419 socklen_t size = sizeof(int);
00420 int sz = 0;
00421
00422 if (getenv("NO_TCP_NODELAY"))
00423 return;
00424
00425 if (getsockopt(s,
00426 IPPROTO_TCP,
00427 TCP_NODELAY,
00428 (char *) &flag,
00429
00430 &size) < 0)
00431 perror("getsockopt nodelay");
00432
00433
00434
00435 #ifdef TCP_TRACE
00436 fprintf(stderr, "NODELAY: %d [flag=%d, size=%d]\n", s, flag, size);
00437 #endif
00438
00439 flag = 1;
00440 if (setsockopt(s,
00441 IPPROTO_TCP,
00442 TCP_NODELAY,
00443 (char *) &flag,
00444
00445 sizeof(int)) < 0)
00446 perror("setsockopt nodelay");
00447
00448 if (getsockopt(s,
00449 IPPROTO_TCP,
00450 TCP_NODELAY,
00451 (char *) &flag,
00452
00453 &size) < 0)
00454 perror("getsockopt nodelay");
00455
00456 #ifdef TCP_TRACE
00457 fprintf(stderr, "after NODELAY: %d [flag=%d]\n", s, flag);
00458 #endif
00459
00460 if (!getenv("TCP_BUFSZ"))
00461 return;
00462
00463 if (getsockopt(s,
00464 SOL_SOCKET,
00465 SO_SNDBUF,
00466 (char *)&sz,
00467
00468 &size) < 0)
00469 perror("getsockopt sndbuf");
00470
00471 #ifdef TCP_TRACE
00472 fprintf(stderr, "snd buf: %d\n", sz);
00473 #endif
00474
00475 if (getsockopt(s,
00476 SOL_SOCKET,
00477 SO_RCVBUF,
00478 (char *)&sz,
00479
00480 &size) < 0)
00481 perror("getsockopt rcvbuf");
00482
00483 #ifdef TCP_TRACE
00484 fprintf(stderr, "rcv buf: %d\n", sz);
00485 #endif
00486
00487 sz = 2048;
00488
00489
00490
00491 if (setsockopt(s,
00492 SOL_SOCKET,
00493 SO_SNDBUF,
00494 (char *)&sz,
00495
00496 sizeof(int)) < 0)
00497 perror("setsockopt sndbuf");
00498
00499 sz = 2048;
00500 if (setsockopt(s,
00501 SOL_SOCKET,
00502 SO_RCVBUF,
00503 (char *)&sz,
00504
00505 sizeof(int)) < 0)
00506 perror("setsockopt sndbuf");
00507
00508 if (getsockopt(s,
00509 SOL_SOCKET,
00510 SO_SNDBUF,
00511 (char *)&sz,
00512
00513 &size) < 0)
00514 perror("getsockopt sndbuf");
00515
00516 #ifdef TCP_TRACE
00517 fprintf(stderr, "snd buf 2: %d\n", sz);
00518 #endif
00519
00520 if (getsockopt(s,
00521 SOL_SOCKET,
00522 SO_RCVBUF,
00523 (char *)&sz,
00524
00525 &size) < 0)
00526 perror("getsockopt rcvbuf");
00527
00528 #ifdef TCP_TRACE
00529 fprintf(stderr, "rcv buf 2: %d\n", sz);
00530 #endif
00531
00532
00533 fflush(stderr);
00534 }
00535
00536 void rpc_checkAFUnixPort(const char *portname)
00537 {
00538 int r = access(portname, O_RDONLY);
00539 if (r < 0)
00540 return;
00541
00542 int sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
00543 if (sock_fd < 0)
00544 return;
00545
00546 struct sockaddr_un sock_un_name;
00547 struct sockaddr *sock_addr;
00548
00549 sock_un_name.sun_family = AF_UNIX;
00550 strcpy(sock_un_name.sun_path, portname);
00551 sock_addr = (struct sockaddr *)&sock_un_name;
00552
00553 r = connect(sock_fd, sock_addr, sizeof(sock_un_name));
00554 if (r < 0) {
00555
00556 unlink(portname);
00557 }
00558 close(sock_fd);
00559 }