thread.cc

00001 /* 
00002    EyeDB Object Database Management System
00003    Copyright (C) 1994-2008 SYSRA
00004    
00005    EyeDB is free software; you can redistribute it and/or
00006    modify it under the terms of the GNU Lesser General Public
00007    License as published by the Free Software Foundation; either
00008    version 2.1 of the License, or (at your option) any later version.
00009    
00010    EyeDB is distributed in the hope that it will be useful,
00011    but WITHOUT ANY WARRANTY; without even the implied warranty of
00012    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013    Lesser General Public License for more details.
00014    
00015    You should have received a copy of the GNU Lesser General Public
00016    License along with this library; if not, write to the Free Software
00017    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA 
00018 */
00019 
00020 /*
00021    Author: Eric Viara <viara@sysra.com>
00022 */
00023 
00024 #include <eyedbconfig.h>
00025 
00026 #include <eyedblib/thread.h>
00027 #include <eyedblib/rpc_lib.h>
00028 
00029 #include <assert.h>
00030 #include <stdio.h>
00031 #include <unistd.h>
00032 #include <stdlib.h>
00033 #include <errno.h>
00034 #include <string.h>
00035 #include <sys/time.h>
00036 
00037 using namespace std;
00038 
00039 //#define TRACE
00040 
00041 namespace eyedblib {
00042 
00043   pthread_key_t Thread::self_key;
00044 
00045   struct Thread::Initializer {
00046     Initializer() {
00047       assert(!pthread_key_create(&Thread::self_key, 0));
00048     }
00049   };
00050 
00051   static Thread::Initializer initializer;
00052 
00053   //
00054   // Mutex methods
00055   //
00056 
00057   Mutex::Mutex(Type _type, bool _lock)
00058   {
00059     type = _type;
00060     init(_lock);
00061   }
00062 
00063   Mutex::Mutex(bool _lock)
00064   {
00065     type = PROCESS_PRIVATE;
00066     init(_lock);
00067   }
00068 
00069   int Mutex::init(Type _type, bool _lock)
00070   {
00071     type = _type;
00072     return init(_lock);
00073   }
00074 
00075   int Mutex::init(bool _lock)
00076   {
00077     pthread_mutexattr_t mattr;
00078     int r = pthread_mutexattr_init(&mattr);
00079     if (r) return r;
00080 
00081 #ifdef HAVE_PTHREAD_PROCESS_SHARED
00082     if (type == PROCESS_SHARED) {
00083       r = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
00084       if (r) return r;
00085     }
00086     else {
00087       r = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_PRIVATE);
00088       if (r) return r;
00089     }
00090 #endif
00091     r = pthread_mutex_init(&mut, &mattr);
00092     if (r) return r;
00093   
00094     locked = false;
00095     if (_lock)
00096       lock();
00097 
00098     return pthread_mutexattr_destroy(&mattr);
00099   }
00100 
00101   int
00102   Mutex::lock()
00103   {
00104     int r = pthread_mutex_lock(&mut);
00105     locked = true;
00106     return r;
00107   }
00108 
00109   bool
00110   Mutex::trylock()
00111   {
00112     return !pthread_mutex_trylock(&mut);
00113   }
00114 
00115   int
00116   Mutex::unlock()
00117   {
00118 #if 1
00119     if (!locked) {
00120       fprintf(stderr, "eyedblib::Mutex::unlock(): Assertion `locked' failed\n");
00121       fprintf(stderr, "dbgserv %d\n", rpc_getpid());
00122       fflush(stderr);
00123       sleep(1000);
00124       abort();
00125     }
00126 #endif
00127     assert (locked);
00128     locked = false;
00129     return pthread_mutex_unlock(&mut);
00130   }
00131 
00132   Mutex::~Mutex()
00133   {
00134   }
00135 
00136   //
00137   // Condition methods
00138   //
00139 
00140   Condition::Condition(Type _type) : profile(this)
00141   {
00142     type = _type;
00143     init();
00144   }
00145 
00146   int
00147   Condition::init()
00148   {
00149     pthread_condattr_t cattr;
00150     int r = pthread_condattr_init(&cattr);
00151     if (r) return r;
00152 
00153 #ifdef HAVE_PTHREAD_PROCESS_SHARED
00154     if (type == PROCESS_SHARED) {
00155       r = pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
00156       if (r) return r;
00157     }
00158     else {
00159       r = pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_PRIVATE);
00160       if (r) return r;
00161     }
00162 #endif
00163     r = pthread_cond_init(&cnd, &cattr);
00164     if (r) return r;
00165     cond = 0;
00166     wait_cnt = 0;
00167     profiled = false;
00168     return 0;
00169   }
00170 
00171   int
00172   Condition::wait()
00173   {
00174     int r = mut.lock();
00175     if (r) return r;
00176     while (!cond) {
00177 #ifdef TRACE
00178       Thread *thr = Thread::getCallingThread();
00179       if (thr)
00180         printf("Condition::wait(@%d:%d, %p) -> sleep\n", thr->get_thread(),
00181                thr->get_pid(), this);
00182       else
00183         printf("Condition::wait(@%d, %p) -> sleep\n", pthread_self(), this);
00184 #endif
00185       struct timeval tp_start_wait, tp_end_wait;
00186       if (profiled) {
00187         gettimeofday(&tp_start_wait, 0);
00188         profile.wait_cnt++;
00189       }
00190       wait_cnt++;
00191       r = pthread_cond_wait(&cnd, mut.forCondWait());
00192       if (r) {mut.unlock(); return r;}
00193       mut.condWakeup();
00194       if (profiled) {
00195         gettimeofday(&tp_end_wait, 0);
00196         profile.sigwakeup.set(tp_end_wait, profile.tpsig);
00197         profile.wait.set(tp_end_wait, tp_start_wait);
00198         profile.wakeup_cnt++;
00199       }
00200       --wait_cnt;
00201 #ifdef TRACE
00202       if (thr)
00203         printf("Condition::wait(@%d:%d, %p) <- wakeup\n", thr->get_thread(),
00204                thr->get_pid(), this);
00205       else
00206         printf("Condition::wait(@%d, %p) <- wakeup\n", pthread_self(), this);
00207 #endif
00208     }
00209     --cond;
00210     assert(cond >= 0);
00211     return mut.unlock();
00212   }
00213 
00214 #define USECS_PER_SEC 1000000
00215 
00216   bool
00217   Condition::timedWait(unsigned long usec)
00218   {
00219     bool ret = true;
00220     mut.lock();
00221     struct timespec tm;
00222     if (!cond) {
00223       struct timeval tv;
00224       gettimeofday(&tv, NULL);
00225       unsigned long sec = usec / USECS_PER_SEC;
00226       tm.tv_sec  = tv.tv_sec + sec;
00227       tm.tv_nsec = 1000 * (usec - sec * USECS_PER_SEC);
00228     }
00229     while (!cond) {
00230 #ifdef TRACE
00231       Thread *thr = Thread::getCallingThread();
00232       if (thr)
00233         printf("Condition::timedwait(@%d:%d, %p) -> sleep\n", thr->get_thread(),
00234                thr->get_pid(), this);
00235       else
00236         printf("Condition::timedwait(@%d, %p) -> sleep\n", pthread_self());
00237 #endif
00238       wait_cnt++;
00239       int r = pthread_cond_timedwait(&cnd, mut.forCondWait(), &tm);
00240       mut.condWakeup();
00241       --wait_cnt;
00242 #ifdef TRACE
00243       if (thr)
00244         printf("Condition::timedwait(@%d:%d, %p) <- wakeup\n", thr->get_thread(),
00245                thr->get_pid(), this);
00246       else
00247         printf("Condition::timedwait(@%d, %p) <- wakeup\n", pthread_self(), this);
00248 #endif
00249       if (r == ETIMEDOUT) {
00250         ret = false;
00251         break;
00252       }
00253       assert(!r);
00254     }
00255     if (ret)
00256       --cond;
00257     mut.unlock();
00258     return ret;
00259   }
00260 
00261   int
00262   Condition::signal()
00263   {
00264     int r = mut.lock();
00265     if (r) return r;
00266     cond++;
00267     if (wait_cnt) {
00268 #ifdef TRACE
00269       Thread *thr = Thread::getCallingThread();
00270       if (thr)
00271         printf("Condition::signal(@%d:%d, %p)\n", thr->get_thread(),
00272                thr->get_pid(), this);
00273       else
00274         printf("Condition::signal(@%d, %p)\n", pthread_self(), this);
00275 #endif
00276       if (profiled) {
00277         gettimeofday(&profile.tpsig, 0);
00278         profile.signal_cnt++;
00279       }
00280       r = pthread_cond_signal(&cnd);
00281       if (r) return r;
00282     }
00283     return mut.unlock();
00284   }
00285 
00286   int
00287   Condition::reset()
00288   {
00289     int r = mut.lock();
00290     if (r) return r;
00291 
00292     if (wait_cnt)
00293       return -1;
00294 
00295     cond = 0;
00296     return mut.unlock();
00297   }
00298 
00299   void
00300   Condition::resetProfile()
00301   {
00302     profile.reset();
00303   }
00304 
00305   Condition::Profile::Profile(Condition *_cond)
00306   {
00307     cond = _cond;
00308     reset();
00309   }
00310 
00311   void
00312   Condition::Profile::reset()
00313   {
00314     wait_cnt = 0;
00315     wakeup_cnt = 0;
00316     signal_cnt = 0;
00317     tpsig.tv_sec = 0;
00318     tpsig.tv_usec = 0;
00319     nowait.reset();
00320     wait.reset();
00321   }
00322 
00323   Condition::~Condition()
00324   {
00325   }
00326 
00327   //
00328   // Thread methods
00329   //
00330 
00331   Thread::Thread(Type type, void (*_init)(Thread *, void *), void *_init_arg)
00332     : profile(this)
00333   {
00334     init_thr("", type, _init, _init_arg);
00335   }
00336 
00337   Thread::Thread(const char *_name, Type type,
00338                  void (*_init)(Thread *, void *), void *_init_arg)
00339     : profile(this)
00340   {
00341     init_thr(_name, type, _init, _init_arg);
00342   }
00343 
00344   void
00345   Thread::init_thr(const char *_name, Type type,
00346                    void (*_init)(Thread *, void *), void *_init_arg)
00347   {
00348     pthread_attr_t attr;
00349     start_exec = 0;
00350     start_exec_data = 0;
00351     end_exec = 0;
00352     end_exec_data = 0;
00353     init = _init;
00354     init_arg = _init_arg;
00355     profiled = false;
00356     name = strdup(_name);
00357     user_data = 0;
00358     assert (!pthread_attr_init(&attr));
00359 
00360     if (type == SCOPE_SYSTEM)
00361       assert (!pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM));
00362     else
00363       assert (!pthread_attr_setscope(&attr, PTHREAD_SCOPE_PROCESS));
00364 
00365     assert (!pthread_create(&tid, &attr, run, this));
00366     pid = rpc_getpid();
00367     idle = true;
00368   }
00369 
00370   Thread::Thread(const char *_name, bool) : profile(this)
00371   {
00372     user_data = 0;
00373     init = 0;
00374     init_arg = 0;
00375     pid = rpc_getpid();
00376     name = strdup(_name);
00377     profiled = false;
00378   }
00379 
00380   void
00381   Thread::resetProfile()
00382   {
00383     profile.reset();
00384   }
00385 
00386   Thread::Profile::Profile(Thread *_thr)
00387   {
00388     thr = _thr;
00389     reset();
00390   }
00391 
00392   void
00393   Thread::Profile::reset()
00394   {
00395     run_cnt = 0;
00396     run.reset();
00397     wait.reset();
00398   }
00399 
00400   //
00401   // ProfileStats methods
00402   //
00403 
00404   void
00405   ProfileStats::reset()
00406   {
00407     usec = 0;
00408     max_usec = 0;
00409     min_usec = ~0LL;
00410   }
00411 
00412   ProfileStats::ProfileStats()
00413   {
00414     reset();
00415   }
00416 
00417   void
00418   ProfileStats::set(struct timeval &tp1, struct timeval &tp0)
00419   {
00420     unsigned long long u = (tp1.tv_sec - tp0.tv_sec) * 1000000 +
00421       (tp1.tv_usec - tp0.tv_usec);
00422     usec += u;
00423     if (u < min_usec)
00424       min_usec = u;
00425     if (u > max_usec)
00426       max_usec = u;
00427   }
00428 
00429 
00430   void ProfileStats::display(ostream &os, unsigned int cnt) const
00431   {
00432     if (!cnt) {
00433       os << "     <nil>\n";
00434       return;
00435     }
00436 
00437     os << "     Total time: ";
00438     display_time(os, usec);
00439     os << "\n     Min time:  ";
00440     display_time(os, min_usec);
00441     os << "\n     Max time:  ";
00442     display_time(os, max_usec);
00443     os << "\n     Average:  ";
00444     display_time(os, (double)usec / cnt);
00445   
00446     os <<"\n";
00447   }
00448 
00449 
00450   ostream &operator<<(ostream &os, const Condition::Profile &profile)
00451   {
00452     os << "Condition " << profile.cond << " { \n";
00453     if (!profile.wait_cnt) {
00454       os << "  <nil>\n}\n";
00455       return os;
00456     }
00457     os << "  Wait count: " << profile.wait_cnt << "\n";
00458     os << "  Wakeup count: " << profile.wakeup_cnt << "\n";
00459     os << "  Signal count: " << profile.signal_cnt << "\n";
00460     os << "  Signal/Wakeup statistics:\n";
00461     profile.sigwakeup.display(os, profile.wakeup_cnt);
00462     os << "  Wait statistics:\n";
00463     profile.wait.display(os, profile.wait_cnt);
00464     /*
00465       os << "  No Wait statistics:\n";
00466       profile.nowait.display(os, profile.wait_cnt);
00467     */
00468     os << "}" << endl;
00469     return os;
00470   }
00471 
00472   ostream &operator<<(ostream &os, const Thread::Profile &profile)
00473   {
00474     os << "Thread @" << profile.thr->get_thread() << ":" << profile.thr->get_pid() <<
00475       " { \n";
00476     if (!profile.run_cnt) {
00477       os << "  <nil>\n}\n";
00478       return os;
00479     }
00480     if (profile.thr->getName() && *profile.thr->getName())
00481       os << "  Name: " << profile.thr->getName() << "\n";
00482 
00483     os << "  Run count: " << profile.run_cnt << "\n";
00484     os << "  Run statistics:\n";
00485     profile.run.display(os, profile.run_cnt);
00486     os << "  Wait statistics:\n";
00487     profile.wait.display(os, profile.run_cnt);
00488     os << profile.thr->sync.cnd_start.getProfile();
00489     os << profile.thr->sync.cnd_end.getProfile();
00490     os << "}" << endl;
00491     return os;
00492   }
00493 
00494   void* Thread::run(void* xthr)
00495   {
00496     Thread *thr = (Thread *)xthr;
00497 
00498     thr->tid = pthread_self();
00499     assert(!pthread_setspecific(self_key, thr));
00500     if (thr->init)
00501       thr->init(thr, thr->init_arg);
00502 
00503     struct timeval tp_start_wait, tp_start_run, tp_end_run;
00504 #ifdef TRACE
00505     printf("Thread::run(starting @%d:%d)\n", thr->get_thread(), thr->get_pid());
00506 #endif
00507 
00508     for (;;) {
00509 #ifdef TRACE
00510       printf("Thread::run(waiting for condition @%d:%d)\n", thr->get_thread(), thr->get_pid());
00511 #endif
00512       if (thr->profiled)
00513         gettimeofday(&tp_start_wait, 0);
00514 
00515       assert(!thr->sync.cnd_start.wait());
00516 #ifdef TRACE
00517       printf("Thread::run(resuming @%d:%d -> %p)\n", thr->get_thread(),
00518              thr->get_pid(),  thr->exec.fun);
00519 #endif
00520       if (thr->profiled) {
00521         gettimeofday(&tp_start_run, 0);
00522         thr->profile.wait.set(tp_start_run, tp_start_wait);
00523         thr->profile.run_cnt++;
00524       }
00525 
00526       if (thr->exec.fun) {
00527         thr->exec.return_arg = thr->exec.fun(thr->exec.input_arg);
00528       }
00529 
00530       if (thr->profiled) {
00531         gettimeofday(&tp_end_run, 0);
00532         thr->profile.run.set(tp_end_run, tp_start_run);
00533       }
00534 
00535 #ifdef TRACE
00536       printf("Thread::run(ending @%d:%d)\n", thr->get_thread(), thr->get_pid());
00537 #endif
00538       thr->idle = true;
00539       assert(!thr->sync.cnd_end.signal());
00540       if (thr->end_exec)
00541         thr->end_exec(thr, thr->end_exec_data);
00542     }
00543 
00544     return (void *)0;
00545   }
00546 
00547   Thread *
00548   Thread::getCallingThread()
00549   {
00550     Thread *thr = (Thread *)pthread_getspecific(self_key);
00551     if (thr)
00552       assert(thr->get_thread() == pthread_self());
00553     return thr;
00554   }
00555 
00556   Thread *
00557   Thread::initCallingThread()
00558   {
00559     Thread *thr = getCallingThread();
00560     if (thr) return thr;
00561     thr = new Thread("#CallingThread", true);
00562     thr->tid = pthread_self();
00563     assert(!pthread_setspecific(self_key, thr));
00564     return thr;
00565   }
00566 
00567   void *
00568   Thread::setUserData(void *_user_data)
00569   {
00570     void *o_user_data = user_data;
00571     user_data = _user_data;
00572     return o_user_data;
00573   }
00574 
00575   void
00576   Thread::execute(void *(*fun)(void *), void *input_arg)
00577   {
00578     if (!idle) wait();
00579 
00580     exec.fun = fun;
00581     exec.input_arg = input_arg;
00582     idle = false;
00583 #ifdef TRACE
00584     printf("Thread::execute(cond_signal @%d:%d)\n", tid, pid);
00585 #endif
00586     if (start_exec)
00587       start_exec(this, start_exec_data);
00588     assert(!sync.cnd_end.reset());
00589     assert(!sync.cnd_start.signal());
00590 #ifdef TRACE
00591     printf("Thread::execute(after cond_signal @%d:%d)\n", tid, pid);
00592 #endif
00593   }
00594 
00595   void *
00596   Thread::wait()
00597   {
00598 #ifdef TRACE
00599     printf("Thread::wait(@%d:%d)\n", tid, pid);
00600 #endif
00601     assert(!sync.cnd_end.wait());
00602 #ifdef TRACE
00603     printf("Thread::wait(after @%d:%d)\n", tid, pid);
00604 #endif
00605     return exec.return_arg;
00606   }
00607 
00608   void
00609   Thread::join()
00610   {
00611     void *r;
00612     assert (!pthread_join(tid, &r));
00613   }
00614 
00615   Thread::~Thread()
00616   {
00617     free(name);
00618     // ??
00619     //pthread_exit((void *)0); // ??
00620   }
00621 
00622 }
00623 

Generated on Mon Dec 22 18:16:11 2008 for eyedb by  doxygen 1.5.3