00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "eyedbconfig.h"
00025
00026 #include <iostream>
00027 #include <streambuf>
00028 #include <assert.h>
00029 #include <stdio.h>
00030 #include <unistd.h>
00031 #include <stdlib.h>
00032 #include <iomanip>
00033
00034 #include <eyedblib/performer.h>
00035
00036 using namespace std;
00037
00038
00039
00040
00041
00042
00043
00044 namespace eyedblib {
00045
00046 void *
00047 ThreadPerformer::thread_wrapper(void *xarg)
00048 {
00049 WrapperArg *arg = (WrapperArg *)xarg;
00050 return arg->exec(arg->input_arg);
00051 }
00052
00053 ThreadPerformer::ThreadPerformer(void (*init)(ThreadPerformer *, void *),
00054 void *init_arg)
00055 {
00056
00057 thr = 0;
00058 user_data = 0;
00059 wait_prev = wait_next = 0;
00060 run_prev = run_next = 0;
00061 free = true;
00062 if (init)
00063 init(this, init_arg);
00064 }
00065
00066 void
00067 ThreadPerformer::start(Thread *_thr, ThreadPerformerFunction exec, ThreadPerformerArg input_arg)
00068 {
00069 thr = _thr;
00070 wrap_arg.exec = exec;
00071 wrap_arg.input_arg = input_arg;
00072 if (thr)
00073 thr->execute(thread_wrapper, &wrap_arg);
00074 }
00075
00076 void
00077 ThreadPerformer::resume(Thread *_thr)
00078 {
00079 thr = _thr;
00080 thr->execute(thread_wrapper, &wrap_arg);
00081 }
00082
00083 ThreadPerformerArg
00084 ThreadPerformer::wait()
00085 {
00086 void *x = thr->wait();
00087 return x;
00088 }
00089
00090 void *
00091 ThreadPerformer::setUserData(void *_user_data)
00092 {
00093 void *o_user_data = user_data;
00094 user_data = _user_data;
00095 return o_user_data;
00096 }
00097
00098 ThreadPerformer::~ThreadPerformer()
00099 {
00100 delete thr;
00101 }
00102
00103
00104
00105
00106
00107 ThreadPool::ThreadPool(void (*init_p)(ThreadPerformer *, void *),
00108 void *init_p_arg,
00109 unsigned int _thread_cnt)
00110 {
00111 init_performer = init_p;
00112 init_performer_arg = init_p_arg;
00113 init(_thread_cnt);
00114 }
00115
00116 ThreadPool::ThreadPool(unsigned int _thread_cnt)
00117 {
00118 init_performer = 0;
00119 init_performer_arg = 0;
00120 init(_thread_cnt);
00121 }
00122
00123 void
00124 ThreadPool::init(unsigned int _thread_cnt)
00125 {
00126 profiled = false;
00127 thread_cnt = _thread_cnt;
00128 current_thread_performer_cnt = 0;
00129 thrs = new Thread*[thread_cnt];
00130 for (int n = 0; n < thread_cnt; n++)
00131 thrs[n] = new Thread();
00132
00133 end_cond = new Condition();
00134 performers = 0;
00135 wait_first = run_first = 0;
00136 }
00137
00138 void
00139 ThreadPool::endExecWrapper(Thread *thr, void *xthrpool)
00140 {
00141 ThreadPool *thrpool = (ThreadPool *)xthrpool;
00142 #ifdef TRACE
00143 printf("#%d endExecWrapper\n", pthread_self());
00144 #endif
00145 ThreadPerformer *p = (ThreadPerformer *)thr->getUserData();
00146 p->return_arg = thr->wait();
00147 p->thr = 0;
00148 thrpool->release(p);
00149
00150 p = thrpool->peekFromWaitQueue();
00151 if (p) {
00152 thrpool->beforeStart(p, thr);
00153 p->resume(thr);
00154 }
00155
00156 thrpool->end_cond->signal();
00157 }
00158
00159 void
00160 ThreadPool::beforeStart(ThreadPerformer *p, Thread *thr)
00161 {
00162 p->return_arg = 0;
00163 thr->onEndExec(endExecWrapper, this);
00164 thr->setUserData(p);
00165 addToRunQueue(p);
00166 }
00167
00168 Thread *
00169 ThreadPool::getOneThread()
00170 {
00171 for (int n = 0; n < thread_cnt; n++)
00172 if (thrs[n]->isIdle())
00173 return thrs[n];
00174 return 0;
00175 }
00176
00177 void
00178 ThreadPool::addToWaitQueue(ThreadPerformer *p)
00179 {
00180 MutexLocker _(mut);
00181
00182 p->wait_next = wait_first;
00183 if (wait_first)
00184 wait_first->wait_prev = p;
00185
00186 wait_first = p;
00187 p->wait_prev = 0;
00188 }
00189
00190 ThreadPerformer *
00191 ThreadPool::peekFromWaitQueue()
00192 {
00193 MutexLocker _(mut);
00194 if (!wait_first)
00195 return 0;
00196 ThreadPerformer *p = wait_first;
00197 if (p->wait_next)
00198 p->wait_next->wait_prev = 0;
00199 wait_first = p->wait_next;
00200 p->wait_next = 0;
00201 return p;
00202 }
00203
00204 void
00205 ThreadPool::addToRunQueue(ThreadPerformer *p)
00206 {
00207 MutexLocker _(mut);
00208
00209 ThreadPerformer *pp = run_first;
00210 while (pp) {
00211 if (p == pp) {
00212 return;
00213 }
00214 pp = pp->run_next;
00215 }
00216
00217 p->run_next = run_first;
00218 if (run_first)
00219 run_first->run_prev = p;
00220
00221 run_first = p;
00222 p->run_prev = 0;
00223 }
00224
00225 ThreadPerformer *
00226 ThreadPool::peekFromRunQueue()
00227 {
00228 MutexLocker _(mut);
00229 if (!run_first)
00230 return 0;
00231 ThreadPerformer *p = run_first;
00232 while (p) {
00233 if (p->free) {
00234 if (p->run_next)
00235 p->run_next->run_prev = p->run_prev;
00236 if (p->run_prev)
00237 p->run_prev->run_next = p->run_next;
00238 if (p == run_first)
00239 run_first = p->run_next;
00240 p->run_next = p->run_prev = 0;
00241 return p;
00242 }
00243 p = p->run_next;
00244 }
00245
00246 return 0;
00247 }
00248
00249 ThreadPerformer *
00250 ThreadPool::start(ThreadPerformerFunction exec, ThreadPerformerArg arg)
00251 {
00252 ThreadPerformer *performer = getOne();
00253 #ifdef TRACE
00254 printf("ThreadPool: getting %p\n", performer);
00255 #endif
00256 if (performer) {
00257 Thread *thr = getOneThread();
00258 #ifdef TRACE
00259 printf("Getting thread %p\n", thr);
00260 if (thr)
00261 printf(" -> thread @%d:%d\n",
00262 thr->get_thread(),
00263 thr->get_pid());
00264 #endif
00265 if (thr)
00266 beforeStart(performer, thr);
00267 else
00268 addToWaitQueue(performer);
00269
00270 performer->start(thr, exec, arg);
00271 }
00272 return performer;
00273 }
00274
00275 ThreadPerformerArg
00276 ThreadPool::wait(ThreadPerformer *&p)
00277 {
00278 end_cond->wait();
00279 p = peekFromRunQueue();
00280 if (p) return p->return_arg;
00281 return 0;
00282 }
00283
00284 void
00285 ThreadPool::reset()
00286 {
00287 waitAll();
00288 while (peekFromRunQueue())
00289 ;
00290 end_cond->reset();
00291 }
00292
00293 void
00294 ThreadPool::print(FILE *fd)
00295 {
00296 fprintf(fd, "%d Threads\n", thread_cnt);
00297 int busy_cnt = 0, free_cnt = 0;
00298 for (int i = 0; i < current_thread_performer_cnt; i++) {
00299 if (performers[i]->free)
00300 free_cnt++;
00301 else
00302 busy_cnt++;
00303 }
00304
00305 fprintf(fd, "%d Thread Performers\n", current_thread_performer_cnt);
00306 fprintf(fd, "%d Free Thread Performers\n", free_cnt);
00307 fprintf(fd, "%d Busy Thread Performers\n", busy_cnt);
00308 ThreadPerformer *p = wait_first;
00309 int wait_cnt = 0;
00310 while (p) {
00311 wait_cnt++;
00312 p = p->wait_next;
00313 }
00314 fprintf(fd, "%d Thread Performers in wait queue\n", wait_cnt);
00315 p = run_first;
00316 int run_cnt = 0;
00317 while (p) {
00318 run_cnt++;
00319 p = p->run_next;
00320 }
00321 fprintf(fd, "%d Thread Performers in run queue\n", run_cnt);
00322 }
00323
00324 void
00325 ThreadPool::waitAll()
00326 {
00327
00328
00329
00330
00331
00332
00333 while (run_first) {
00334 ThreadPerformer *perf;
00335 wait(perf);
00336 }
00337 }
00338
00339 void
00340 ThreadPool::setProfile(bool _profiled)
00341 {
00342 profiled = true;
00343 unsigned int cnt = thread_cnt;
00344 if (current_thread_performer_cnt < cnt)
00345 cnt = current_thread_performer_cnt;
00346
00347 for (int i = 0; i < cnt; i++) {
00348 ThreadPerformer *thrperf = performers[i];
00349 thrperf->getThread()->setProfile(profiled);
00350 }
00351 }
00352
00353 void
00354 ThreadPool::profileReset()
00355 {
00356 unsigned int cnt = thread_cnt;
00357 if (current_thread_performer_cnt < cnt)
00358 cnt = current_thread_performer_cnt;
00359
00360 for (int i = 0; i < cnt; i++) {
00361 ThreadPerformer *thrperf = performers[i];
00362 thrperf->getThread()->resetProfile();
00363 }
00364 }
00365
00366 Thread::Profile **
00367 ThreadPool::getProfiles(unsigned int &cnt) const
00368 {
00369 cnt = thread_cnt;
00370 if (current_thread_performer_cnt < cnt)
00371 cnt = current_thread_performer_cnt;
00372
00373 Thread::Profile **profiles = new Thread::Profile*[cnt+1];
00374 for (int i = 0; i < cnt; i++) {
00375 ThreadPerformer *thrperf = performers[i];
00376 profiles[i] = new Thread::Profile(thrperf->getThread());
00377 *profiles[i] = thrperf->getThread()->getProfile();
00378 }
00379
00380 profiles[cnt] = 0;
00381 return profiles;
00382 }
00383
00384 void ProfileStats::display_time(ostream &os, double usec)
00385 {
00386 char buf[512];
00387 bool prev = false;
00388 if (usec < 1000) {
00389 sprintf(buf, "%.2f", usec);
00390 os << buf << "us";
00391 prev = true;
00392 }
00393
00394 usec /= 1000;
00395 if (usec >= 0.01) {
00396 if (usec < 1000) {
00397 sprintf(buf, "%s%.2f", (prev ? " " : ""), usec);
00398 os << buf << "ms";
00399 prev = true;
00400 }
00401 usec /= 1000;
00402 if (usec >= 0.01) {
00403 sprintf(buf, "%s%.2f", (prev ? " " : ""), usec);
00404 os << buf << "s";
00405 }
00406 }
00407 }
00408
00409 ostream &operator<<(ostream &os, Thread::Profile **profiles)
00410 {
00411 int n = 0;
00412 unsigned long long run_cnt = 0;
00413 unsigned long long wait_usec = 0;
00414 unsigned long long run_usec = 0;
00415 double run_avg_usec = 0.;
00416 double wait_avg_usec = 0.;
00417 for (; profiles[n]; n++) {
00418 run_cnt += profiles[n]->run_cnt;
00419 wait_usec += profiles[n]->wait.usec;
00420 run_usec += profiles[n]->run.usec;
00421 os << *profiles[n];
00422 }
00423
00424 os << "\nTotal threads: " << n << "\n";
00425 os << "Total runs: " << run_cnt << "\n";
00426 os << "Total run time: ";
00427 ProfileStats::display_time(os, run_usec);
00428 os << "\nTotal wait time: ";
00429 ProfileStats::display_time(os, wait_usec);
00430 os << "\nAverage total run/thread: ";
00431 ProfileStats::display_time(os, (double)run_usec/n);
00432 os << "\nAverage one run/thread: ";
00433 ProfileStats::display_time(os, (double)run_usec/run_cnt);
00434 os << "\nAverage total wait/thread: ";
00435 ProfileStats::display_time(os, (double)wait_usec/n);
00436 os << "\nAverage one wait/thread: ";
00437 ProfileStats::display_time(os, (double)wait_usec/run_cnt);
00438 os << endl;
00439 return os;
00440 }
00441
00442 ThreadPerformer *
00443 ThreadPool::getOne()
00444 {
00445 MutexLocker locker(mut);
00446 return getOneRealize();
00447 }
00448
00449 ThreadPerformer *
00450 ThreadPool::getOneRealize()
00451 {
00452 for (int i = 0; i < current_thread_performer_cnt; i++) {
00453 ThreadPerformer *p = performers[i];
00454 if (p->free) {
00455 p->free = false;
00456 p->run_next = p->run_prev = p->wait_next = p->wait_prev = 0;
00457 return p;
00458 }
00459 }
00460
00461 int o_current_thread_performer_cnt = current_thread_performer_cnt;
00462
00463 current_thread_performer_cnt += 8;
00464
00465 performers = (ThreadPerformer **)realloc(performers,
00466 sizeof(ThreadPerformer *) *
00467 current_thread_performer_cnt);
00468
00469 for (int i = o_current_thread_performer_cnt; i < current_thread_performer_cnt; i++) {
00470 performers[i] = new ThreadPerformer(init_performer, init_performer_arg);
00471
00472 }
00473
00474 return getOneRealize();
00475 }
00476
00477 void
00478 ThreadPool::release(ThreadPerformer *performer)
00479 {
00480 MutexLocker _(mut);
00481 assert(!performer->free);
00482 #ifdef TRACE
00483 printf("#%d setting free -> %p\n", pthread_self(), performer);
00484 #endif
00485 performer->free = true;
00486 }
00487
00488 ThreadPool::~ThreadPool()
00489 {
00490 for (int i = 0; i < current_thread_performer_cnt; i++)
00491 delete performers[i];
00492 free(performers);
00493 }
00494
00495 }