00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "eyedbconfig.h"
00025
00026 #include <assert.h>
00027 #include <sys/select.h>
00028 #include <stdlib.h>
00029 #include <signal.h>
00030 #include <fcntl.h>
00031 #include <string.h>
00032
00033 #include "rpc_feP.h"
00034 #include <eyedblib/rpc_lib.h>
00035 #include <eyedblib/log.h>
00036 #include <eyedblib/xdr.h>
00037
00038 #define USE_RPC_MIN_SIZE
00039
00040 extern int RPC_MIN_SIZE;
00041
00042 #ifdef AIX
00043 #define _NO_BITFIELDS
00044 #endif
00045
00046 #include <netinet/tcp.h>
00047
00048 void (*rpc_release_all)(void);
00049 extern int rpc_from_core;
00050
00051 static void
00052 signal_handler(int sig)
00053 {
00054 int s;
00055 for (s = 0; s < NSIG; s++)
00056 signal(s, SIG_DFL);
00057
00058 if (sig == SIGBUS || sig == SIGSEGV)
00059 {
00060 rpc_from_core = 1;
00061
00062 if (getenv("EYEDBDBG")) for (;;) sleep(1000);
00063
00064 if (rpc_quit_handler)
00065 rpc_quit_handler(rpc_quit_data, 1);
00066
00067 if (rpc_release_all)
00068 rpc_release_all();
00069
00070 raise(sig);
00071 return;
00072 }
00073
00074 IDB_LOG(IDB_LOG_CONN, ("got %s [signal=%d]\n", strsignal(sig), sig));
00075
00076 if (rpc_quit_handler)
00077 rpc_quit_handler(rpc_quit_data, 0);
00078
00079 if (rpc_release_all)
00080 rpc_release_all();
00081
00082 raise(sig);
00083 exit(0x80|sig);
00084 }
00085
00086 static void
00087 rpc_feInit()
00088 {
00089 signal(SIGINT, signal_handler);
00090 signal(SIGQUIT, signal_handler);
00091 signal(SIGSEGV, signal_handler);
00092 signal(SIGBUS, signal_handler);
00093 signal(SIGABRT, signal_handler);
00094 signal(SIGPIPE, signal_handler);
00095 }
00096
00097 rpc_Client *rpc_clientCreate(void)
00098 {
00099 rpc_Client *client = rpc_new(rpc_Client);
00100
00101 client->last_type = rpc_NBaseType;
00102
00103
00104 #if 0
00105 rpc_feInit();
00106 #endif
00107
00108 return client;
00109 }
00110
00111
00112
00113 rpc_ArgType rpc_makeClientUserType(rpc_Client *client, int size,
00114 rpc_UserArgFunction func,
00115 rpc_Boolean is_pointer)
00116 {
00117 rpc_UserType *utyp = rpc_getUTyp(client, client->last_type);
00118
00119 utyp->size = size;
00120 utyp->func = func;
00121 utyp->is_pointer = is_pointer;
00122
00123 return client->last_type++;
00124 }
00125
00126 rpc_ClientFunction *
00127 rpc_makeUserClientFunction(rpc_Client *client, rpc_RpcDescription *rd)
00128 {
00129 rpc_ClientFunction *func = rpc_new(rpc_ClientFunction);
00130
00131 rd->args[rd->nargs-1].type = rd->arg_ret;
00132 rd->args[rd->nargs-1].send_rcv = rpc_Rcv;
00133
00134 while (rd->args[rd->nargs-1].type == rpc_VoidType)
00135 rd->nargs--;
00136
00137 func->rd = rd;
00138
00139 return func;
00140 }
00141
00142 pid_t rpc_pid;
00143
00144 pid_t rpc_getpid()
00145 {
00146 if (!rpc_pid)
00147 return getpid();
00148 return rpc_pid;
00149 }
00150
00151 rpc_Status
00152 rpc_connOpen(rpc_Client *client, const char *hostname, const char *portname,
00153 rpc_ConnHandle **pconn, unsigned long magic,
00154 int conn_cnt, int comm_size, std::string &errmsg)
00155 {
00156 int domain, sock_fd, length;
00157 struct sockaddr_in sock_in_name;
00158 struct sockaddr_un sock_un_name;
00159 struct sockaddr *sock_addr;
00160 rpc_ConnHandle *conn;
00161 int i;
00162 int xid = 0;
00163
00164 errmsg = "";
00165
00166 const char *t_portname;
00167 int type;
00168
00169 t_portname = rpc_getPortAttr(portname, &domain, &type);
00170 if (!t_portname) {
00171 errmsg = std::string("invalid port: " ) + hostname;
00172 return rpc_ConnectionFailure;
00173 }
00174
00175 portname = t_portname;
00176
00177 *pconn = (rpc_ConnHandle *)0;
00178
00179 if (domain == AF_INET) {
00180 char hname[64];
00181
00182 sock_in_name.sin_family = domain;
00183 sock_in_name.sin_port = htons(atoi(portname));
00184
00185 if (hostname)
00186 strcpy(hname, hostname);
00187 else
00188 gethostname(hname, sizeof(hname)-1);
00189
00190 if (!rpc_hostNameToAddr(hname, &sock_in_name.sin_addr)) {
00191 errmsg = std::string("unknown host: " ) + hostname;
00192 return rpc_ConnectionFailure;
00193 }
00194 sock_addr = (struct sockaddr *)&sock_in_name;
00195 length = sizeof(sock_in_name);
00196 }
00197 else {
00198
00199 if (hostname) {
00200 if (!rpc_hostNameToAddr(hostname, &sock_in_name.sin_addr)) {
00201 errmsg = std::string("unknown host: " ) + hostname;
00202 return rpc_ConnectionFailure;
00203 }
00204
00205 if (strcmp(hostname, "localhost")) {
00206 errmsg = std::string("localhost expected (got ") +
00207 hostname + ") for named pipe " + portname;
00208 return rpc_ConnectionFailure;
00209 }
00210 }
00211
00212 sock_un_name.sun_family = domain;
00213 strcpy(sock_un_name.sun_path, portname);
00214 sock_addr = (struct sockaddr *)&sock_un_name;
00215 length = sizeof(sock_un_name);
00216 }
00217
00218 conn = rpc_new(rpc_ConnHandle);
00219 conn->fd = (int *)malloc(conn_cnt * sizeof(int));
00220
00221 for (i = 0; i < conn_cnt; i++) {
00222 rpc_MultiConnInfo info;
00223 rpc_MultiConnInfo xinfo;
00224 #ifdef HAVE_FATTACH
00225 if (domain == AF_UNIX) {
00226 sock_fd = open(portname, O_RDWR);
00227 if (sock_fd < 0) {
00228 errmsg = std::string("server unreachable: ") +
00229 "host " + hostname + ", port " + portname;
00230 goto failure;
00231 }
00232 }
00233 else {
00234 #endif
00235 if ((sock_fd = socket(domain, type, 0)) < 0) {
00236 errmsg = std::string("server unreachable: ") +
00237 "host " + hostname + ", port " + portname;
00238 goto failure;
00239 }
00240 #if 0
00241 if (domain == AF_INET)
00242 rpc_socket_nodelay(sock_fd);
00243 #endif
00244
00245 #ifdef TRACE
00246 utlog("opening sock_fd=%d\n", sock_fd);
00247 #endif
00248
00249 if (connect(sock_fd, sock_addr, length) < 0) {
00250 errmsg = std::string("server unreachable: ") +
00251 "host " + hostname + ", port " + portname;
00252 goto failure;
00253 }
00254
00255 #ifdef HAVE_FATTACH
00256 }
00257 #endif
00258 conn->fd[i] = sock_fd;
00259
00260 if (conn_cnt == 1)
00261 break;
00262
00263 info.magic = MM(magic);
00264
00265 if (!i) {
00266 info.cmd = rpc_NewConnection;
00267 info.xid = 0;
00268 }
00269 else {
00270 info.cmd = rpc_AssociatedConnection;
00271 info.xid = xid;
00272 }
00273
00274 h2x_rpc_multiconninfo(&xinfo, &info);
00275 if (rpc_socketWrite(sock_fd, &xinfo, sizeof(xinfo)) != sizeof(xinfo)) {
00276 errmsg = std::string("cannot write on socket: ") +
00277 "host " + hostname + ", port " + portname;
00278 goto failure;
00279 }
00280
00281 if (rpc_socketRead(sock_fd, &info, sizeof(info)) != sizeof(info)) {
00282 errmsg = std::string("client connection not granted by server: ") +
00283 "host " + hostname + ", port " + portname;
00284 goto failure;
00285 }
00286
00287 x2h_rpc_multiconninfo(&info);
00288
00289 if (info.magic != MM(magic) || info.cmd != rpc_ReplyNewConnection) {
00290 errmsg = std::string("protocol error: ") +
00291 "host " + hostname + ", port " + portname;
00292 goto failure;
00293 }
00294
00295
00296 if (!i)
00297 xid = info.xid;
00298 }
00299
00300 conn->client = client;
00301 conn->magic = magic;
00302 conn->conn_cnt = conn_cnt;
00303
00304 if (!comm_size)
00305 comm_size = RPC_COMM_SIZE;
00306
00307 conn->comm_size = comm_size;
00308 conn->comm_buff = (char **)malloc(conn_cnt * sizeof(char *));
00309
00310 for (i = 0; i < conn_cnt; i++) {
00311 conn->comm_buff[i] = (char *)calloc(comm_size, 1);
00312 }
00313
00314 *pconn = conn;
00315
00316 return rpc_Success;
00317
00318 failure:
00319 free(conn->fd);
00320 free(conn);
00321 return rpc_ConnectionFailure;
00322 }
00323
00324 rpc_Status
00325 rpc_connClose(rpc_ConnHandle *conn)
00326 {
00327 int i;
00328 #ifdef TRACE
00329 utlog("rpc_connClose(0x%x, 0x%x)\n", conn, conn->fd);
00330 #endif
00331
00332 if (conn && conn->fd)
00333 {
00334 for (i = 0; i < conn->conn_cnt; i++)
00335 {
00336 #ifdef TRACE
00337 utlog("closing %d\n", conn->fd[i]);
00338 #endif
00339 if (close(conn->fd[i]))
00340 perror("rpc_ConnClose");
00341 }
00342
00343 free(conn->fd);
00344 conn->fd = 0;
00345 }
00346
00347 for (i = 0; i < conn->conn_cnt; i++)
00348 free(conn->comm_buff[i]);
00349
00350 free(conn->comm_buff);
00351
00352 conn->comm_buff = NULL;
00353
00354 free(conn);
00355
00356 return rpc_Success;
00357 }
00358
00359 rpc_Status
00360 rpc_setClientArgSize(rpc_Client *client, int args_size)
00361 {
00362 client->args_size = args_size;
00363 return rpc_Success;
00364 }
00365
00366 static int
00367 rpc_clientArgsMake(rpc_ConnHandle *conn, int which, rpc_RpcDescription **prd,
00368 rpc_SendRcv send_rcv, rpc_FromTo fromto,
00369 rpc_ClientArg ua)
00370 {
00371 int i, fd = conn->fd[which], commsz;
00372 char *buff, *commb;
00373 register rpc_Arg *arg;
00374 rpc_ClientArg pua;
00375 rpc_RpcHeader rhd;
00376 rpc_RpcHeader xrhd;
00377 int ndata = 0;
00378 rpc_ClientData *p_data[RPC_NDATA];
00379 rpc_RpcDescription *rd;
00380 static long serial = 1000;
00381 char *comm_buff = conn->comm_buff[which];
00382 int args_size = conn->client->args_size;
00383 int buff_size = 0;
00384 rpc_UserType *utyp;
00385 rpc_StatusRec *rstatus = 0;
00386 commb = comm_buff;
00387 commsz = conn->comm_size;
00388
00389
00390 #if 1
00391 if (fromto == rpc_From)
00392 buff = commb;
00393 else
00394 buff = commb + sizeof(rhd);
00395 #else
00396 buff = commb + sizeof(rhd);
00397 #endif
00398
00399 #ifdef TRACE
00400 utlog("clientArgsMake code #%d, %s [which %d]\n",
00401 (*prd)->code, (fromto == rpc_From ? "FROM" : "TO"), which);
00402 #endif
00403
00404 if (fromto == rpc_From)
00405 {
00406 int sz;
00407 #ifdef USE_RPC_MIN_SIZE
00408 if ((sz = rpc_socketRead(fd, buff, RPC_MIN_SIZE)) != RPC_MIN_SIZE)
00409 {
00410 IDB_LOG_FX(("Client Protocol Error #1: size=%d, expected=%d\n",
00411 sz, RPC_MIN_SIZE));
00412 return 0;
00413 }
00414 memcpy(&rhd, buff, sizeof(rhd));
00415 x2h_rpc_hd(&rhd);
00416 buff += sizeof(rhd);
00417 #else
00418 if ((sz = rpc_socketRead(fd, buff, sizeof(rhd))) != sizeof(rhd))
00419 {
00420 IDB_LOG_FX(("Client Protocol Error #1: size=%d, expected=%d\n",
00421 sz, sizeof(rhd)));
00422 return 0;
00423 }
00424 memcpy(&rhd, buff, sizeof(rhd));
00425 x2h_rpc_hd(&rhd);
00426 buff += sizeof(rhd);
00427 #endif
00428
00429 #ifdef TRACE
00430 utlog(".... code #%d FROM\n", rhd.code);
00431 #endif
00432
00433 if (rhd.magic != conn->magic)
00434 {
00435 IDB_LOG_FX(("Client Protocol Error #2: invalid magic=%p, "
00436 "expected=%p\n", rhd.magic, conn->magic));
00437 return 0;
00438 }
00439
00440 if (rhd.code != (*prd)->code)
00441 {
00442 IDB_LOG_FX(("Client Protocol Error #3: invalid code=%p, expected=%p\n",
00443 rhd.code, (*prd)->code));
00444 return 0;
00445 }
00446
00447 #ifdef USE_RPC_MIN_SIZE
00448 if (rhd.size-RPC_MIN_SIZE > 0) {
00449 if (rpc_socketRead(fd, buff+RPC_MIN_SIZE-sizeof(rhd),
00450 rhd.size-RPC_MIN_SIZE) != rhd.size-RPC_MIN_SIZE)
00451 {
00452 IDB_LOG_FX(("Client Protocol Error #4: read failed for %d bytes\n",
00453 rhd.size-sizeof(rhd)));
00454 return 0;
00455 }
00456 }
00457 #else
00458 if (rhd.size-sizeof(rhd))
00459 if (rpc_socketRead(fd, buff, rhd.size-sizeof(rhd)) !=
00460 rhd.size-sizeof(rhd))
00461 {
00462 IDB_LOG_FX(("Client Protocol Error #4: read failed for %d bytes\n",
00463 rhd.size-sizeof(rhd)));
00464 return 0;
00465 }
00466 #endif
00467
00468 rd = *prd;
00469
00470 if (rhd.size > commsz)
00471 {
00472 IDB_LOG_FX(("Client Protocol Error #5: size exceeded=%d, max=%d\n",
00473 rhd.size, commsz));
00474 return 0;
00475 }
00476 }
00477 else
00478 rd = *prd;
00479
00480 for (i = 0, arg = rd->args, pua = ua; i < rd->nargs; i++, arg++, pua += args_size)
00481 switch(arg->type)
00482 {
00483 case rpc_Int16Type:
00484 rpc_copy_fast_xdr(arg, buff, pua, sizeof(eyedblib::int16), send_rcv, fromto, x2h_16_cpy, h2x_16_cpy);
00485 break;
00486
00487 case rpc_Int32Type:
00488 rpc_copy_fast_xdr(arg, buff, pua, sizeof(eyedblib::int32), send_rcv, fromto, x2h_32_cpy, h2x_32_cpy);
00489 break;
00490
00491 case rpc_Int64Type:
00492 rpc_copy_fast_xdr(arg, buff, pua, sizeof(eyedblib::int64), send_rcv, fromto, x2h_64_cpy, h2x_64_cpy);
00493 break;
00494
00495 case rpc_StatusType:
00496 if ((arg->send_rcv & rpc_Rcv) && fromto == rpc_From)
00497 {
00498 int err;
00499 x2h_32_cpy(&err, buff);
00500 if (err)
00501 rstatus = (rpc_StatusRec *)pua;
00502 }
00503 rpc_copy_fast_xdr(arg, buff, pua, sizeof(eyedblib::int32), send_rcv, fromto, x2h_32_cpy, h2x_32_cpy);
00504 break;
00505
00506 case rpc_StringType:
00507 if ((arg->send_rcv & rpc_Send) && fromto == rpc_To)
00508 {
00509 int len = strlen(*(char **)pua)+1;
00510 h2x_32_cpy(buff, &len);
00511 buff += sizeof(len);
00512 if (len)
00513 {
00514 memcpy(buff, *(char **)pua, len);
00515 buff += len;
00516 }
00517 }
00518 else if ((arg->send_rcv & rpc_Rcv) && fromto == rpc_From)
00519 {
00520 int len;
00521 x2h_32_cpy(&len, buff);
00522 buff += sizeof(len);
00523 if (len)
00524 *(char **)pua = buff;
00525 else
00526 *(char **)pua = "";
00527 buff += len;
00528 }
00529 break;
00530
00531 case rpc_DataType:
00532 if ((arg->send_rcv & rpc_Send) && fromto == rpc_To)
00533 {
00534 int status;
00535 rpc_ClientData *a_data = (rpc_ClientData *)pua;
00536
00537
00538 h2x_32_cpy(buff, &a_data->data);
00539 buff += 8;
00540
00541 h2x_32_cpy(buff, &a_data->size);
00542 buff += sizeof(a_data->size);
00543
00544 if (!a_data->data || a_data->data == rpc_ObjectNone)
00545 a_data = 0;
00546 else if (a_data->size < rpc_buff_size(commsz, commb, buff))
00547 {
00548 status = rpc_SyncData;
00549 h2x_32_cpy(buff, &status);
00550 buff += sizeof(status);
00551 memcpy(buff, a_data->data, a_data->size);
00552 buff += a_data->size;
00553 }
00554 else
00555 {
00556 status = rpc_ASyncData;
00557 h2x_32_cpy(buff, &status);
00558 buff += sizeof(status);
00559 p_data[ndata++] = a_data;
00560 }
00561 }
00562 else if ((arg->send_rcv & rpc_Rcv) && fromto == rpc_From)
00563 {
00564 int status, offset;
00565 rpc_ClientData *a_data = (rpc_ClientData *)pua;
00566
00567 x2h_32_cpy(&a_data->size, buff);
00568 buff += sizeof(a_data->size);
00569
00570 x2h_32_cpy(&status, buff);
00571 buff += sizeof(status);
00572
00573 if (!a_data->data)
00574 a_data->data = (void *)malloc(a_data->size);
00575
00576 if (status == rpc_SyncData)
00577 {
00578 x2h_32_cpy(&offset, buff);
00579 memcpy(a_data->data, buff+offset, a_data->size);
00580 buff += sizeof(offset);
00581 buff_size += a_data->size;
00582 }
00583 else
00584 {
00585 memset(buff, 0, sizeof(offset));
00586 buff += sizeof(offset);
00587 p_data[ndata++] = a_data;
00588 }
00589 }
00590 break;
00591
00592 case rpc_VoidType:
00593 break;
00594
00595 default:
00596 rpc_assert(arg->type >= rpc_NBaseType && arg->type < conn->client->last_type);
00597 utyp = rpc_getUTyp(conn->client, arg->type);
00598
00599 if (utyp->func)
00600 utyp->func(arg, &buff, pua, send_rcv, fromto);
00601 else if (utyp->is_pointer)
00602 rpc_copy(arg, buff, *(char **)pua, utyp->size, send_rcv, fromto);
00603 else
00604 rpc_copy(arg, buff, pua, utyp->size, send_rcv, fromto);
00605 }
00606
00607 if (fromto == rpc_To)
00608 {
00609
00610 rhd.code = rd->code;
00611
00612 rhd.magic = conn->magic;
00613 rhd.serial = ++serial;
00614 rhd.ndata = ndata;
00615 rhd.status = 0;
00616
00617 rhd.size = (int)(buff-commb) + buff_size;
00618
00619 if (rhd.size > commsz)
00620 {
00621 IDB_LOG_FX(("Client Protocol Error #6: size exceeded=%d, max=%d, "
00622 "buff_size=%d\n",
00623 rhd.size, commsz, buff_size));
00624 return 0;
00625 }
00626
00627
00628
00629
00630
00631
00632
00633
00634 h2x_rpc_hd(&xrhd, &rhd);
00635 memcpy(commb, &xrhd, sizeof(xrhd));
00636
00637 #ifdef USE_RPC_MIN_SIZE
00638 if (rhd.size < RPC_MIN_SIZE)
00639 rhd.size = RPC_MIN_SIZE;
00640 #endif
00641
00642 if (rpc_socketWrite(fd, commb, rhd.size) != rhd.size)
00643 {
00644 IDB_LOG_FX(("Client Protocol Error #7: write failed for %d bytes\n",
00645 rhd.size));
00646 return 0;
00647 }
00648
00649 if (ndata)
00650 {
00651 int d[RPC_NDATA], i;
00652 for (i = 0; i < ndata; i++)
00653 d[i] = h2x_32(p_data[i]->size);
00654
00655 if (rpc_socketWrite(fd, d, ndata*sizeof(int)) != ndata*sizeof(int))
00656 {
00657 IDB_LOG_FX(("Client Protocol Error #8: write failed for %d "
00658 "bytes\n", ndata*sizeof(int)));
00659 return 0;
00660 }
00661 for (i = 0; i < ndata; i++) {
00662 if (rpc_socketWrite(fd, p_data[i]->data, p_data[i]->size) !=
00663 p_data[i]->size)
00664 {
00665 IDB_LOG_FX(("Client Protocol Error #9: write failed for "
00666 "%d bytes\n",
00667 p_data[i]->size));
00668 return 0;
00669 }
00670 }
00671 }
00672 }
00673
00674 if (fromto == rpc_From && (send_rcv & rpc_Rcv) && ndata)
00675 {
00676 int d[RPC_NDATA], i;
00677
00678 if (rpc_socketRead(fd, d, ndata*sizeof(int)) != ndata*sizeof(int))
00679 {
00680 IDB_LOG_FX(("Client Protocol Error #10: write failed for "
00681 "%d bytes\n",
00682 ndata*sizeof(int)));
00683 return 0;
00684 }
00685
00686 for (i = 0; i < ndata; i++) {
00687 d[i] = x2h_32(d[i]);
00688 if (d[i] > 0)
00689 {
00690 if (rpc_socketRead(fd, p_data[i]->data, d[i]) != d[i])
00691 {
00692 IDB_LOG_FX(("Client Protocol Error #11: write failed for "
00693 "%d bytes\n", d[i]));
00694 return 0;
00695 }
00696 }
00697 }
00698 }
00699
00700 if (rstatus)
00701 {
00702 int len;
00703 int tmp;
00704 if (rpc_socketRead(fd, &tmp, sizeof(tmp)) != sizeof(tmp))
00705 return 0;
00706 rstatus->err = x2h_32(tmp);
00707 if (rpc_socketRead(fd, &tmp, sizeof(tmp)) != sizeof(tmp))
00708 return 0;
00709 len = x2h_32(tmp);
00710 if (rpc_socketRead(fd, rstatus->err_msg, len+1) != len+1)
00711 return 0;
00712 }
00713
00714 #ifdef TRACE
00715 utlog("clientArgsMake code #%d, %s [which %d] done!\n",
00716 (*prd)->code, (fromto == rpc_From ? "FROM" : "TO"), which);
00717 #endif
00718 return 1;
00719 }
00720
00721 rpc_Status
00722 rpc_rpcMake(rpc_ConnHandle *conn, int which, rpc_ClientFunction *func, void *ua)
00723 {
00724 if (conn && conn->fd)
00725 {
00726 rpc_RpcDescription *rd;
00727
00728 if (which < 0 || which >= conn->conn_cnt)
00729 return rpc_Error;
00730
00731 rd = func->rd;
00732
00733 rpc_assert(rd);
00734
00735 if (!rpc_clientArgsMake(conn, which, &rd, rpc_Send, rpc_To,
00736 (rpc_ClientArg)ua))
00737 return rpc_ServerFailure;
00738
00739 if (!rpc_clientArgsMake(conn, which, &rd, rpc_Rcv, rpc_From,
00740 (rpc_ClientArg)ua))
00741 return rpc_ServerFailure;
00742
00743 return rpc_Success;
00744 }
00745
00746 return rpc_Error;
00747 }