00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <eyedbconfig.h>
00025
00026 #include <eyedblib/thread.h>
00027 #include <eyedblib/rpc_lib.h>
00028
00029 #include <assert.h>
00030 #include <stdio.h>
00031 #include <unistd.h>
00032 #include <stdlib.h>
00033 #include <errno.h>
00034 #include <string.h>
00035 #include <sys/time.h>
00036
00037 using namespace std;
00038
00039
00040
00041 namespace eyedblib {
00042
00043 pthread_key_t Thread::self_key;
00044
00045 struct Thread::Initializer {
00046 Initializer() {
00047 assert(!pthread_key_create(&Thread::self_key, 0));
00048 }
00049 };
00050
00051 static Thread::Initializer initializer;
00052
00053
00054
00055
00056
00057 Mutex::Mutex(Type _type, bool _lock)
00058 {
00059 type = _type;
00060 init(_lock);
00061 }
00062
00063 Mutex::Mutex(bool _lock)
00064 {
00065 type = PROCESS_PRIVATE;
00066 init(_lock);
00067 }
00068
00069 int Mutex::init(Type _type, bool _lock)
00070 {
00071 type = _type;
00072 return init(_lock);
00073 }
00074
00075 int Mutex::init(bool _lock)
00076 {
00077 pthread_mutexattr_t mattr;
00078 int r = pthread_mutexattr_init(&mattr);
00079 if (r) return r;
00080
00081 #ifdef HAVE_PTHREAD_PROCESS_SHARED
00082 if (type == PROCESS_SHARED) {
00083 r = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
00084 if (r) return r;
00085 }
00086 else {
00087 r = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_PRIVATE);
00088 if (r) return r;
00089 }
00090 #endif
00091 r = pthread_mutex_init(&mut, &mattr);
00092 if (r) return r;
00093
00094 locked = false;
00095 if (_lock)
00096 lock();
00097
00098 return pthread_mutexattr_destroy(&mattr);
00099 }
00100
00101 int
00102 Mutex::lock()
00103 {
00104 int r = pthread_mutex_lock(&mut);
00105 locked = true;
00106 return r;
00107 }
00108
00109 bool
00110 Mutex::trylock()
00111 {
00112 return !pthread_mutex_trylock(&mut);
00113 }
00114
00115 int
00116 Mutex::unlock()
00117 {
00118 #if 1
00119 if (!locked) {
00120 fprintf(stderr, "eyedblib::Mutex::unlock(): Assertion `locked' failed\n");
00121 fprintf(stderr, "dbgserv %d\n", rpc_getpid());
00122 fflush(stderr);
00123 sleep(1000);
00124 abort();
00125 }
00126 #endif
00127 assert (locked);
00128 locked = false;
00129 return pthread_mutex_unlock(&mut);
00130 }
00131
00132 Mutex::~Mutex()
00133 {
00134 }
00135
00136
00137
00138
00139
00140 Condition::Condition(Type _type) : profile(this)
00141 {
00142 type = _type;
00143 init();
00144 }
00145
00146 int
00147 Condition::init()
00148 {
00149 pthread_condattr_t cattr;
00150 int r = pthread_condattr_init(&cattr);
00151 if (r) return r;
00152
00153 #ifdef HAVE_PTHREAD_PROCESS_SHARED
00154 if (type == PROCESS_SHARED) {
00155 r = pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
00156 if (r) return r;
00157 }
00158 else {
00159 r = pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_PRIVATE);
00160 if (r) return r;
00161 }
00162 #endif
00163 r = pthread_cond_init(&cnd, &cattr);
00164 if (r) return r;
00165 cond = 0;
00166 wait_cnt = 0;
00167 profiled = false;
00168 return 0;
00169 }
00170
00171 int
00172 Condition::wait()
00173 {
00174 int r = mut.lock();
00175 if (r) return r;
00176 while (!cond) {
00177 #ifdef TRACE
00178 Thread *thr = Thread::getCallingThread();
00179 if (thr)
00180 printf("Condition::wait(@%d:%d, %p) -> sleep\n", thr->get_thread(),
00181 thr->get_pid(), this);
00182 else
00183 printf("Condition::wait(@%d, %p) -> sleep\n", pthread_self(), this);
00184 #endif
00185 struct timeval tp_start_wait, tp_end_wait;
00186 if (profiled) {
00187 gettimeofday(&tp_start_wait, 0);
00188 profile.wait_cnt++;
00189 }
00190 wait_cnt++;
00191 r = pthread_cond_wait(&cnd, mut.forCondWait());
00192 if (r) {mut.unlock(); return r;}
00193 mut.condWakeup();
00194 if (profiled) {
00195 gettimeofday(&tp_end_wait, 0);
00196 profile.sigwakeup.set(tp_end_wait, profile.tpsig);
00197 profile.wait.set(tp_end_wait, tp_start_wait);
00198 profile.wakeup_cnt++;
00199 }
00200 --wait_cnt;
00201 #ifdef TRACE
00202 if (thr)
00203 printf("Condition::wait(@%d:%d, %p) <- wakeup\n", thr->get_thread(),
00204 thr->get_pid(), this);
00205 else
00206 printf("Condition::wait(@%d, %p) <- wakeup\n", pthread_self(), this);
00207 #endif
00208 }
00209 --cond;
00210 assert(cond >= 0);
00211 return mut.unlock();
00212 }
00213
00214 #define USECS_PER_SEC 1000000
00215
00216 bool
00217 Condition::timedWait(unsigned long usec)
00218 {
00219 bool ret = true;
00220 mut.lock();
00221 struct timespec tm;
00222 if (!cond) {
00223 struct timeval tv;
00224 gettimeofday(&tv, NULL);
00225 unsigned long sec = usec / USECS_PER_SEC;
00226 tm.tv_sec = tv.tv_sec + sec;
00227 tm.tv_nsec = 1000 * (usec - sec * USECS_PER_SEC);
00228 }
00229 while (!cond) {
00230 #ifdef TRACE
00231 Thread *thr = Thread::getCallingThread();
00232 if (thr)
00233 printf("Condition::timedwait(@%d:%d, %p) -> sleep\n", thr->get_thread(),
00234 thr->get_pid(), this);
00235 else
00236 printf("Condition::timedwait(@%d, %p) -> sleep\n", pthread_self());
00237 #endif
00238 wait_cnt++;
00239 int r = pthread_cond_timedwait(&cnd, mut.forCondWait(), &tm);
00240 mut.condWakeup();
00241 --wait_cnt;
00242 #ifdef TRACE
00243 if (thr)
00244 printf("Condition::timedwait(@%d:%d, %p) <- wakeup\n", thr->get_thread(),
00245 thr->get_pid(), this);
00246 else
00247 printf("Condition::timedwait(@%d, %p) <- wakeup\n", pthread_self(), this);
00248 #endif
00249 if (r == ETIMEDOUT) {
00250 ret = false;
00251 break;
00252 }
00253 assert(!r);
00254 }
00255 if (ret)
00256 --cond;
00257 mut.unlock();
00258 return ret;
00259 }
00260
00261 int
00262 Condition::signal()
00263 {
00264 int r = mut.lock();
00265 if (r) return r;
00266 cond++;
00267 if (wait_cnt) {
00268 #ifdef TRACE
00269 Thread *thr = Thread::getCallingThread();
00270 if (thr)
00271 printf("Condition::signal(@%d:%d, %p)\n", thr->get_thread(),
00272 thr->get_pid(), this);
00273 else
00274 printf("Condition::signal(@%d, %p)\n", pthread_self(), this);
00275 #endif
00276 if (profiled) {
00277 gettimeofday(&profile.tpsig, 0);
00278 profile.signal_cnt++;
00279 }
00280 r = pthread_cond_signal(&cnd);
00281 if (r) return r;
00282 }
00283 return mut.unlock();
00284 }
00285
00286 int
00287 Condition::reset()
00288 {
00289 int r = mut.lock();
00290 if (r) return r;
00291
00292 if (wait_cnt)
00293 return -1;
00294
00295 cond = 0;
00296 return mut.unlock();
00297 }
00298
00299 void
00300 Condition::resetProfile()
00301 {
00302 profile.reset();
00303 }
00304
00305 Condition::Profile::Profile(Condition *_cond)
00306 {
00307 cond = _cond;
00308 reset();
00309 }
00310
00311 void
00312 Condition::Profile::reset()
00313 {
00314 wait_cnt = 0;
00315 wakeup_cnt = 0;
00316 signal_cnt = 0;
00317 tpsig.tv_sec = 0;
00318 tpsig.tv_usec = 0;
00319 nowait.reset();
00320 wait.reset();
00321 }
00322
00323 Condition::~Condition()
00324 {
00325 }
00326
00327
00328
00329
00330
00331 Thread::Thread(Type type, void (*_init)(Thread *, void *), void *_init_arg)
00332 : profile(this)
00333 {
00334 init_thr("", type, _init, _init_arg);
00335 }
00336
00337 Thread::Thread(const char *_name, Type type,
00338 void (*_init)(Thread *, void *), void *_init_arg)
00339 : profile(this)
00340 {
00341 init_thr(_name, type, _init, _init_arg);
00342 }
00343
00344 void
00345 Thread::init_thr(const char *_name, Type type,
00346 void (*_init)(Thread *, void *), void *_init_arg)
00347 {
00348 pthread_attr_t attr;
00349 start_exec = 0;
00350 start_exec_data = 0;
00351 end_exec = 0;
00352 end_exec_data = 0;
00353 init = _init;
00354 init_arg = _init_arg;
00355 profiled = false;
00356 name = strdup(_name);
00357 user_data = 0;
00358 assert (!pthread_attr_init(&attr));
00359
00360 if (type == SCOPE_SYSTEM)
00361 assert (!pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM));
00362 else
00363 assert (!pthread_attr_setscope(&attr, PTHREAD_SCOPE_PROCESS));
00364
00365 assert (!pthread_create(&tid, &attr, run, this));
00366 pid = rpc_getpid();
00367 idle = true;
00368 }
00369
00370 Thread::Thread(const char *_name, bool) : profile(this)
00371 {
00372 user_data = 0;
00373 init = 0;
00374 init_arg = 0;
00375 pid = rpc_getpid();
00376 name = strdup(_name);
00377 profiled = false;
00378 }
00379
00380 void
00381 Thread::resetProfile()
00382 {
00383 profile.reset();
00384 }
00385
00386 Thread::Profile::Profile(Thread *_thr)
00387 {
00388 thr = _thr;
00389 reset();
00390 }
00391
00392 void
00393 Thread::Profile::reset()
00394 {
00395 run_cnt = 0;
00396 run.reset();
00397 wait.reset();
00398 }
00399
00400
00401
00402
00403
00404 void
00405 ProfileStats::reset()
00406 {
00407 usec = 0;
00408 max_usec = 0;
00409 min_usec = ~0LL;
00410 }
00411
00412 ProfileStats::ProfileStats()
00413 {
00414 reset();
00415 }
00416
00417 void
00418 ProfileStats::set(struct timeval &tp1, struct timeval &tp0)
00419 {
00420 unsigned long long u = (tp1.tv_sec - tp0.tv_sec) * 1000000 +
00421 (tp1.tv_usec - tp0.tv_usec);
00422 usec += u;
00423 if (u < min_usec)
00424 min_usec = u;
00425 if (u > max_usec)
00426 max_usec = u;
00427 }
00428
00429
00430 void ProfileStats::display(ostream &os, unsigned int cnt) const
00431 {
00432 if (!cnt) {
00433 os << " <nil>\n";
00434 return;
00435 }
00436
00437 os << " Total time: ";
00438 display_time(os, usec);
00439 os << "\n Min time: ";
00440 display_time(os, min_usec);
00441 os << "\n Max time: ";
00442 display_time(os, max_usec);
00443 os << "\n Average: ";
00444 display_time(os, (double)usec / cnt);
00445
00446 os <<"\n";
00447 }
00448
00449
00450 ostream &operator<<(ostream &os, const Condition::Profile &profile)
00451 {
00452 os << "Condition " << profile.cond << " { \n";
00453 if (!profile.wait_cnt) {
00454 os << " <nil>\n}\n";
00455 return os;
00456 }
00457 os << " Wait count: " << profile.wait_cnt << "\n";
00458 os << " Wakeup count: " << profile.wakeup_cnt << "\n";
00459 os << " Signal count: " << profile.signal_cnt << "\n";
00460 os << " Signal/Wakeup statistics:\n";
00461 profile.sigwakeup.display(os, profile.wakeup_cnt);
00462 os << " Wait statistics:\n";
00463 profile.wait.display(os, profile.wait_cnt);
00464
00465
00466
00467
00468 os << "}" << endl;
00469 return os;
00470 }
00471
00472 ostream &operator<<(ostream &os, const Thread::Profile &profile)
00473 {
00474 os << "Thread @" << profile.thr->get_thread() << ":" << profile.thr->get_pid() <<
00475 " { \n";
00476 if (!profile.run_cnt) {
00477 os << " <nil>\n}\n";
00478 return os;
00479 }
00480 if (profile.thr->getName() && *profile.thr->getName())
00481 os << " Name: " << profile.thr->getName() << "\n";
00482
00483 os << " Run count: " << profile.run_cnt << "\n";
00484 os << " Run statistics:\n";
00485 profile.run.display(os, profile.run_cnt);
00486 os << " Wait statistics:\n";
00487 profile.wait.display(os, profile.run_cnt);
00488 os << profile.thr->sync.cnd_start.getProfile();
00489 os << profile.thr->sync.cnd_end.getProfile();
00490 os << "}" << endl;
00491 return os;
00492 }
00493
00494 void* Thread::run(void* xthr)
00495 {
00496 Thread *thr = (Thread *)xthr;
00497
00498 thr->tid = pthread_self();
00499 assert(!pthread_setspecific(self_key, thr));
00500 if (thr->init)
00501 thr->init(thr, thr->init_arg);
00502
00503 struct timeval tp_start_wait, tp_start_run, tp_end_run;
00504 #ifdef TRACE
00505 printf("Thread::run(starting @%d:%d)\n", thr->get_thread(), thr->get_pid());
00506 #endif
00507
00508 for (;;) {
00509 #ifdef TRACE
00510 printf("Thread::run(waiting for condition @%d:%d)\n", thr->get_thread(), thr->get_pid());
00511 #endif
00512 if (thr->profiled)
00513 gettimeofday(&tp_start_wait, 0);
00514
00515 assert(!thr->sync.cnd_start.wait());
00516 #ifdef TRACE
00517 printf("Thread::run(resuming @%d:%d -> %p)\n", thr->get_thread(),
00518 thr->get_pid(), thr->exec.fun);
00519 #endif
00520 if (thr->profiled) {
00521 gettimeofday(&tp_start_run, 0);
00522 thr->profile.wait.set(tp_start_run, tp_start_wait);
00523 thr->profile.run_cnt++;
00524 }
00525
00526 if (thr->exec.fun) {
00527 thr->exec.return_arg = thr->exec.fun(thr->exec.input_arg);
00528 }
00529
00530 if (thr->profiled) {
00531 gettimeofday(&tp_end_run, 0);
00532 thr->profile.run.set(tp_end_run, tp_start_run);
00533 }
00534
00535 #ifdef TRACE
00536 printf("Thread::run(ending @%d:%d)\n", thr->get_thread(), thr->get_pid());
00537 #endif
00538 thr->idle = true;
00539 assert(!thr->sync.cnd_end.signal());
00540 if (thr->end_exec)
00541 thr->end_exec(thr, thr->end_exec_data);
00542 }
00543
00544 return (void *)0;
00545 }
00546
00547 Thread *
00548 Thread::getCallingThread()
00549 {
00550 Thread *thr = (Thread *)pthread_getspecific(self_key);
00551 if (thr)
00552 assert(thr->get_thread() == pthread_self());
00553 return thr;
00554 }
00555
00556 Thread *
00557 Thread::initCallingThread()
00558 {
00559 Thread *thr = getCallingThread();
00560 if (thr) return thr;
00561 thr = new Thread("#CallingThread", true);
00562 thr->tid = pthread_self();
00563 assert(!pthread_setspecific(self_key, thr));
00564 return thr;
00565 }
00566
00567 void *
00568 Thread::setUserData(void *_user_data)
00569 {
00570 void *o_user_data = user_data;
00571 user_data = _user_data;
00572 return o_user_data;
00573 }
00574
00575 void
00576 Thread::execute(void *(*fun)(void *), void *input_arg)
00577 {
00578 if (!idle) wait();
00579
00580 exec.fun = fun;
00581 exec.input_arg = input_arg;
00582 idle = false;
00583 #ifdef TRACE
00584 printf("Thread::execute(cond_signal @%d:%d)\n", tid, pid);
00585 #endif
00586 if (start_exec)
00587 start_exec(this, start_exec_data);
00588 assert(!sync.cnd_end.reset());
00589 assert(!sync.cnd_start.signal());
00590 #ifdef TRACE
00591 printf("Thread::execute(after cond_signal @%d:%d)\n", tid, pid);
00592 #endif
00593 }
00594
00595 void *
00596 Thread::wait()
00597 {
00598 #ifdef TRACE
00599 printf("Thread::wait(@%d:%d)\n", tid, pid);
00600 #endif
00601 assert(!sync.cnd_end.wait());
00602 #ifdef TRACE
00603 printf("Thread::wait(after @%d:%d)\n", tid, pid);
00604 #endif
00605 return exec.return_arg;
00606 }
00607
00608 void
00609 Thread::join()
00610 {
00611 void *r;
00612 assert (!pthread_join(tid, &r));
00613 }
00614
00615 Thread::~Thread()
00616 {
00617 free(name);
00618
00619
00620 }
00621
00622 }
00623