performer.cc

00001 /* 
00002    EyeDB Object Database Management System
00003    Copyright (C) 1994-2008 SYSRA
00004    
00005    EyeDB is free software; you can redistribute it and/or
00006    modify it under the terms of the GNU Lesser General Public
00007    License as published by the Free Software Foundation; either
00008    version 2.1 of the License, or (at your option) any later version.
00009    
00010    EyeDB is distributed in the hope that it will be useful,
00011    but WITHOUT ANY WARRANTY; without even the implied warranty of
00012    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013    Lesser General Public License for more details.
00014    
00015    You should have received a copy of the GNU Lesser General Public
00016    License along with this library; if not, write to the Free Software
00017    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA 
00018 */
00019 
00020 /*
00021    Author: Eric Viara <viara@sysra.com>
00022 */
00023 
00024 #include "eyedbconfig.h"
00025 
00026 #include <iostream>
00027 #include <streambuf>
00028 #include <assert.h>
00029 #include <stdio.h>
00030 #include <unistd.h>
00031 #include <stdlib.h>
00032 #include <iomanip>
00033 
00034 #include <eyedblib/performer.h>
00035 
00036 using namespace std;
00037 
00038 //#define TRACE
00039 
00040 //
00041 // ThreadPerformer methods
00042 //
00043 
00044 namespace eyedblib {
00045 
00046   void *
00047   ThreadPerformer::thread_wrapper(void *xarg)
00048   {
00049     WrapperArg *arg = (WrapperArg *)xarg;
00050     return arg->exec(arg->input_arg);
00051   }
00052 
00053   ThreadPerformer::ThreadPerformer(void (*init)(ThreadPerformer *, void *),
00054                                    void *init_arg)
00055   {
00056     //thr = new Thread();
00057     thr = 0;
00058     user_data = 0;
00059     wait_prev = wait_next = 0;
00060     run_prev = run_next = 0;
00061     free = true;
00062     if (init)
00063       init(this, init_arg);
00064   }
00065 
00066   void
00067   ThreadPerformer::start(Thread *_thr, ThreadPerformerFunction exec, ThreadPerformerArg input_arg)
00068   {
00069     thr = _thr;
00070     wrap_arg.exec = exec;
00071     wrap_arg.input_arg = input_arg;
00072     if (thr)
00073       thr->execute(thread_wrapper, &wrap_arg);
00074   }
00075 
00076   void
00077   ThreadPerformer::resume(Thread *_thr)
00078   {
00079     thr = _thr;
00080     thr->execute(thread_wrapper, &wrap_arg);
00081   }
00082 
00083   ThreadPerformerArg
00084   ThreadPerformer::wait()
00085   {
00086     void *x = thr->wait();
00087     return x;
00088   }
00089 
00090   void *
00091   ThreadPerformer::setUserData(void *_user_data)
00092   {
00093     void *o_user_data = user_data;
00094     user_data = _user_data;
00095     return o_user_data;
00096   }
00097 
00098   ThreadPerformer::~ThreadPerformer()
00099   {
00100     delete thr;
00101   }
00102 
00103   //
00104   // ThreadPool definitions and methods
00105   //
00106 
00107   ThreadPool::ThreadPool(void (*init_p)(ThreadPerformer *, void *),
00108                          void *init_p_arg,
00109                          unsigned int _thread_cnt)
00110   {
00111     init_performer = init_p;
00112     init_performer_arg = init_p_arg;
00113     init(_thread_cnt);
00114   }
00115 
00116   ThreadPool::ThreadPool(unsigned int _thread_cnt)
00117   {
00118     init_performer = 0;
00119     init_performer_arg = 0;
00120     init(_thread_cnt);
00121   }
00122 
00123   void
00124   ThreadPool::init(unsigned int _thread_cnt)
00125   {
00126     profiled = false;
00127     thread_cnt = _thread_cnt;
00128     current_thread_performer_cnt = 0;
00129     thrs = new Thread*[thread_cnt];
00130     for (int n = 0; n < thread_cnt; n++)
00131       thrs[n] = new Thread();
00132 
00133     end_cond = new Condition();
00134     performers = 0;
00135     wait_first = run_first = 0;
00136   }
00137 
00138   void
00139   ThreadPool::endExecWrapper(Thread *thr, void *xthrpool)
00140   {
00141     ThreadPool *thrpool = (ThreadPool *)xthrpool;
00142 #ifdef TRACE
00143     printf("#%d endExecWrapper\n", pthread_self());
00144 #endif
00145     ThreadPerformer *p = (ThreadPerformer *)thr->getUserData();
00146     p->return_arg = thr->wait();
00147     p->thr = 0;
00148     thrpool->release(p);
00149 
00150     p = thrpool->peekFromWaitQueue();
00151     if (p) {
00152       thrpool->beforeStart(p, thr);
00153       p->resume(thr);
00154     }
00155 
00156     thrpool->end_cond->signal();
00157   }
00158 
00159   void
00160   ThreadPool::beforeStart(ThreadPerformer *p, Thread *thr)
00161   {
00162     p->return_arg = 0;
00163     thr->onEndExec(endExecWrapper, this);
00164     thr->setUserData(p);
00165     addToRunQueue(p);
00166   }
00167 
00168   Thread *
00169   ThreadPool::getOneThread()
00170   {
00171     for (int n = 0; n < thread_cnt; n++)
00172       if (thrs[n]->isIdle())
00173         return thrs[n];
00174     return 0;
00175   }
00176 
00177   void
00178   ThreadPool::addToWaitQueue(ThreadPerformer *p)
00179   {
00180     MutexLocker _(mut);
00181 
00182     p->wait_next = wait_first;
00183     if (wait_first)
00184       wait_first->wait_prev = p;
00185 
00186     wait_first = p;
00187     p->wait_prev = 0;
00188   }
00189 
00190   ThreadPerformer *
00191   ThreadPool::peekFromWaitQueue()
00192   {
00193     MutexLocker _(mut);
00194     if (!wait_first)
00195       return 0;
00196     ThreadPerformer *p = wait_first;
00197     if (p->wait_next)
00198       p->wait_next->wait_prev = 0;
00199     wait_first = p->wait_next;
00200     p->wait_next = 0;
00201     return p;
00202   }
00203 
00204   void
00205   ThreadPool::addToRunQueue(ThreadPerformer *p)
00206   {
00207     MutexLocker _(mut);
00208 
00209     ThreadPerformer *pp = run_first;
00210     while (pp) {
00211       if (p == pp) {
00212         return;
00213       }
00214       pp = pp->run_next;
00215     }
00216 
00217     p->run_next = run_first;
00218     if (run_first)
00219       run_first->run_prev = p;
00220 
00221     run_first = p;
00222     p->run_prev = 0;
00223   }
00224 
00225   ThreadPerformer *
00226   ThreadPool::peekFromRunQueue()
00227   {
00228     MutexLocker _(mut);
00229     if (!run_first)
00230       return 0;
00231     ThreadPerformer *p = run_first;
00232     while (p) {
00233       if (p->free) {
00234         if (p->run_next)
00235           p->run_next->run_prev = p->run_prev;
00236         if (p->run_prev)
00237           p->run_prev->run_next = p->run_next;
00238         if (p == run_first)
00239           run_first = p->run_next;
00240         p->run_next = p->run_prev = 0;
00241         return p;
00242       }
00243       p = p->run_next;
00244     }
00245 
00246     return 0;
00247   }
00248 
00249   ThreadPerformer *
00250   ThreadPool::start(ThreadPerformerFunction exec, ThreadPerformerArg arg)
00251   {
00252     ThreadPerformer *performer = getOne();
00253 #ifdef TRACE
00254     printf("ThreadPool: getting %p\n", performer);
00255 #endif
00256     if (performer) {
00257       Thread *thr = getOneThread();
00258 #ifdef TRACE
00259       printf("Getting thread %p\n", thr);
00260       if (thr)
00261         printf(" -> thread @%d:%d\n",
00262                thr->get_thread(),
00263                thr->get_pid());
00264 #endif
00265       if (thr)
00266         beforeStart(performer, thr);
00267       else
00268         addToWaitQueue(performer);
00269 
00270       performer->start(thr, exec, arg);
00271     }
00272     return performer;
00273   }
00274 
00275   ThreadPerformerArg
00276   ThreadPool::wait(ThreadPerformer *&p)
00277   {
00278     end_cond->wait();
00279     p = peekFromRunQueue();
00280     if (p) return p->return_arg;
00281     return 0;
00282   }
00283 
00284   void
00285   ThreadPool::reset()
00286   {
00287     waitAll();
00288     while (peekFromRunQueue())
00289       ;
00290     end_cond->reset();
00291   }
00292 
00293   void
00294   ThreadPool::print(FILE *fd)
00295   {
00296     fprintf(fd, "%d Threads\n", thread_cnt);
00297     int busy_cnt = 0, free_cnt = 0;
00298     for (int i = 0; i < current_thread_performer_cnt; i++) {
00299       if (performers[i]->free)
00300         free_cnt++;
00301       else
00302         busy_cnt++;
00303     }
00304 
00305     fprintf(fd, "%d Thread Performers\n", current_thread_performer_cnt);
00306     fprintf(fd, "%d Free Thread Performers\n", free_cnt);
00307     fprintf(fd, "%d Busy Thread Performers\n", busy_cnt);
00308     ThreadPerformer *p = wait_first;
00309     int wait_cnt = 0;
00310     while (p) {
00311       wait_cnt++;
00312       p = p->wait_next;
00313     }
00314     fprintf(fd, "%d Thread Performers in wait queue\n", wait_cnt);
00315     p = run_first;
00316     int run_cnt = 0;
00317     while (p) {
00318       run_cnt++;
00319       p = p->run_next;
00320     }
00321     fprintf(fd, "%d Thread Performers in run queue\n", run_cnt);
00322   }
00323 
00324   void
00325   ThreadPool::waitAll()
00326   {
00327     /*
00328       for (int i = 0; i < current_thread_performer_cnt; i++) {
00329       if (!performers[i]->free)
00330       (void)wait(performers[i]);
00331       }
00332     */
00333     while (run_first) {
00334       ThreadPerformer *perf;
00335       wait(perf);
00336     }
00337   }
00338 
00339   void
00340   ThreadPool::setProfile(bool _profiled)
00341   {
00342     profiled = true;
00343     unsigned int cnt = thread_cnt;
00344     if (current_thread_performer_cnt < cnt)
00345       cnt = current_thread_performer_cnt;
00346 
00347     for (int i = 0; i < cnt; i++) {
00348       ThreadPerformer *thrperf = performers[i];
00349       thrperf->getThread()->setProfile(profiled);
00350     }
00351   }
00352 
00353   void
00354   ThreadPool::profileReset()
00355   {
00356     unsigned int cnt = thread_cnt;
00357     if (current_thread_performer_cnt < cnt)
00358       cnt = current_thread_performer_cnt;
00359 
00360     for (int i = 0; i < cnt; i++) {
00361       ThreadPerformer *thrperf = performers[i];
00362       thrperf->getThread()->resetProfile();
00363     }
00364   }
00365 
00366   Thread::Profile **
00367   ThreadPool::getProfiles(unsigned int &cnt) const
00368   {
00369     cnt = thread_cnt;
00370     if (current_thread_performer_cnt < cnt)
00371       cnt = current_thread_performer_cnt;
00372   
00373     Thread::Profile **profiles = new Thread::Profile*[cnt+1];
00374     for (int i = 0; i < cnt; i++) {
00375       ThreadPerformer *thrperf = performers[i];
00376       profiles[i] = new Thread::Profile(thrperf->getThread());
00377       *profiles[i] = thrperf->getThread()->getProfile();
00378     }
00379 
00380     profiles[cnt] = 0;
00381     return profiles;
00382   }
00383 
00384   void ProfileStats::display_time(ostream &os, double usec)
00385   {
00386     char buf[512];
00387     bool prev = false;
00388     if (usec < 1000) {
00389       sprintf(buf, "%.2f", usec);
00390       os << buf << "us";
00391       prev = true;
00392     }
00393 
00394     usec /= 1000;
00395     if (usec >= 0.01) {
00396       if (usec < 1000) {
00397         sprintf(buf, "%s%.2f", (prev ? " " : ""), usec);
00398         os << buf << "ms";
00399         prev = true;
00400       }
00401       usec /= 1000;
00402       if (usec >= 0.01) {
00403         sprintf(buf, "%s%.2f", (prev ? " " : ""), usec);
00404         os << buf << "s";
00405       }
00406     }
00407   }
00408 
00409   ostream &operator<<(ostream &os, Thread::Profile **profiles)
00410   {
00411     int n = 0;
00412     unsigned long long run_cnt = 0;
00413     unsigned long long wait_usec = 0;
00414     unsigned long long run_usec = 0;
00415     double run_avg_usec = 0.;
00416     double wait_avg_usec = 0.;
00417     for (; profiles[n]; n++) {
00418       run_cnt += profiles[n]->run_cnt;
00419       wait_usec += profiles[n]->wait.usec;
00420       run_usec += profiles[n]->run.usec;
00421       os << *profiles[n];
00422     }
00423 
00424     os << "\nTotal threads:             " << n << "\n";
00425     os << "Total runs:                " << run_cnt << "\n";
00426     os << "Total run time:            ";
00427     ProfileStats::display_time(os, run_usec);
00428     os << "\nTotal wait time:           ";
00429     ProfileStats::display_time(os, wait_usec);
00430     os << "\nAverage total run/thread:  ";
00431     ProfileStats::display_time(os, (double)run_usec/n);
00432     os << "\nAverage one run/thread:    ";
00433     ProfileStats::display_time(os, (double)run_usec/run_cnt);
00434     os << "\nAverage total wait/thread: ";
00435     ProfileStats::display_time(os, (double)wait_usec/n);
00436     os << "\nAverage one wait/thread:   ";
00437     ProfileStats::display_time(os, (double)wait_usec/run_cnt);
00438     os << endl;
00439     return os;
00440   }
00441 
00442   ThreadPerformer *
00443   ThreadPool::getOne()
00444   {
00445     MutexLocker locker(mut);
00446     return getOneRealize();
00447   }
00448 
00449   ThreadPerformer *
00450   ThreadPool::getOneRealize()
00451   {
00452     for (int i = 0; i < current_thread_performer_cnt; i++) {
00453       ThreadPerformer *p = performers[i];
00454       if (p->free) {
00455         p->free = false;
00456         p->run_next = p->run_prev = p->wait_next = p->wait_prev = 0;
00457         return p;
00458       }
00459     }
00460 
00461     int o_current_thread_performer_cnt = current_thread_performer_cnt;
00462 
00463     current_thread_performer_cnt += 8;
00464 
00465     performers = (ThreadPerformer **)realloc(performers,
00466                                              sizeof(ThreadPerformer *) *
00467                                              current_thread_performer_cnt);
00468 
00469     for (int i = o_current_thread_performer_cnt; i < current_thread_performer_cnt; i++) {
00470       performers[i] = new ThreadPerformer(init_performer, init_performer_arg);
00471       //performers[i]->performer->getThread()->setProfile(profiled);
00472     }
00473 
00474     return getOneRealize();
00475   }
00476 
00477   void
00478   ThreadPool::release(ThreadPerformer *performer)
00479   {
00480     MutexLocker _(mut);
00481     assert(!performer->free);
00482 #ifdef TRACE
00483     printf("#%d setting free -> %p\n", pthread_self(), performer);
00484 #endif
00485     performer->free = true;
00486   }
00487 
00488   ThreadPool::~ThreadPool()
00489   {
00490     for (int i = 0; i < current_thread_performer_cnt; i++)
00491       delete performers[i];
00492     free(performers);
00493   }
00494 
00495 }

Generated on Mon Dec 22 18:16:07 2008 for eyedb by  doxygen 1.5.3