mutex.cc

00001 /* 
00002    EyeDB Object Database Management System
00003    Copyright (C) 1994-2008 SYSRA
00004    
00005    EyeDB is free software; you can redistribute it and/or
00006    modify it under the terms of the GNU Lesser General Public
00007    License as published by the Free Software Foundation; either
00008    version 2.1 of the License, or (at your option) any later version.
00009    
00010    EyeDB is distributed in the hope that it will be useful,
00011    but WITHOUT ANY WARRANTY; without even the implied warranty of
00012    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013    Lesser General Public License for more details.
00014    
00015    You should have received a copy of the GNU Lesser General Public
00016    License along with this library; if not, write to the Free Software
00017    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA 
00018 */
00019 
00020 /*
00021   Author: Eric Viara <viara@sysra.com>
00022 */
00023 
00024 #include <eyedbconfig.h>
00025 
00026 #include <stdio.h>
00027 #include <assert.h>
00028 #include <string.h>
00029 #include <signal.h>
00030 #include <errno.h>
00031 #if TIME_WITH_SYS_TIME
00032 #include <sys/time.h>
00033 #include <time.h>
00034 #else
00035 #if HAVE_SYS_TIME_H
00036 #include <sys/time.h>
00037 #else
00038 #include <time.h>
00039 #endif
00040 #endif
00041 #include <unistd.h>
00042 #include <stdlib.h>
00043 #include "eyedbsm_p.h"
00044 #include "mutex.h"
00045 
00046 #ifdef HAVE_SEMAPHORE_POLICY_SYSV_IPC
00047 #include <sys/sem.h>
00048 /* SEM_FAST could not work because in this case we need as many semaphores
00049    as mutex! */
00050 #define UT_SEM_FAST
00051 #endif
00052 
00053 #include <eyedblib/performer.h>
00054 
00055 #define NMT 10
00056 
00057 namespace eyedbsm {
00058   Boolean cleanup = False;
00059 
00060   static int g_mutex_idx;
00061 
00062   struct GMutex {
00063     unsigned int xid;
00064     Mutex *mp;
00065   } g_mutex[NMT];
00066 
00067   static Mutex *sleeping_mp;
00068 
00069   //#undef IDB_LOG
00070   //#define IDB_LOG(X, MSG) printf MSG
00071 
00072   void
00073   mutexes_init()
00074   {
00075   }
00076 
00077   void
00078   mutexes_release()
00079   {
00080     int idx = g_mutex_idx;
00081 
00082 #ifdef HAVE_SEMAPHORE_POLICY_POSIX
00083     if (sleeping_mp) {
00084       pthread_mutex_unlock(&sleeping_mp->pmp->u.mp);
00085       IDB_LOG(IDB_LOG_MTX, ("found a sleeping mutex"));
00086     }
00087 #endif
00088 
00089     IDB_LOG(IDB_LOG_MTX, ("mutexes_release start => %d\n", idx));
00090 
00091     for (int i = 0; i < g_mutex_idx; i++) {
00092       GMutex *gm = &g_mutex[i];
00093       if (gm->mp) {
00094         // added the 16/01/99
00095 #ifdef HAVE_SEMAPHORE_POLICY_POSIX
00096         pthread_mutex_unlock((pthread_mutex_t *)&gm->mp->pmp);
00097 #endif
00098         mutexUnlock(gm->mp, gm->xid);
00099         gm->mp = 0;
00100       }
00101     }
00102 
00103     IDB_LOG(IDB_LOG_MTX, ("mutexes_release done => %d\n", idx));
00104   }
00105 
00106   static pthread_mutex_t local_mp = PTHREAD_MUTEX_INITIALIZER;
00107 
00108   static void
00109   appendGMutex(Mutex *mp, unsigned int xid)
00110   {
00111     int i;
00112 
00113     pthread_mutex_lock(&local_mp);
00114     for (i = 0; i < NMT; i++) {
00115       GMutex *gm = &g_mutex[i];
00116       if (!gm->mp) {
00117         gm->mp = mp;
00118         gm->xid = xid;
00119         if (i >= g_mutex_idx)
00120           g_mutex_idx = i+1;
00121         break;
00122       }
00123     }
00124 
00125     pthread_mutex_unlock(&local_mp);
00126   }
00127 
00128   static Status
00129   releaseGMutex(Mutex *mp, unsigned int xid)
00130   {
00131     int i;
00132 
00133     pthread_mutex_lock(&local_mp);
00134     for (i = g_mutex_idx-1; i >= 0; i--) {
00135       GMutex *gm = &g_mutex[i];
00136       if (gm->mp == mp) {
00137         gm->mp = 0;
00138         gm->xid = 0;
00139         if (i == g_mutex_idx-1)
00140           --g_mutex_idx;
00141         pthread_mutex_unlock(&local_mp);
00142         return Success;
00143       }
00144     }
00145 
00146     pthread_mutex_unlock(&local_mp);
00147     ESM_ASSERT(0, 0, xid);
00148     return Success;
00149   }
00150 
00151 #define wakeupInterval 4
00152 #define lockTimeout    8
00153 
00154   static void
00155   dumpMutex(Mutex *mp)
00156   {
00157     int i;
00158     printf("Mutex %s, xid = %d, state = %d {\n", mp->pmp->mtname,
00159            mp->pmp->xid, mp->pmp->locked);
00160     printf("};\n\n");
00161   }
00162 
00163   /*
00164     #define THR_TRACE
00165     #define THR_TRACE_1
00166     #define THR_TRACE_2
00167   */
00168 
00169   void
00170   mutexLightInit(DbDescription *vd, Mutex *mp, MutexP *pmp)
00171   {
00172     assert(pmp);
00173     if (!mp)
00174       return;
00175 
00176     mp->pmp = pmp;
00177     mp->cond.pcond = &pmp->pcond;
00178 #ifdef HAVE_SEMAPHORE_POLICY_SYSV_IPC
00179     mp->id = (vd ? ut_sem_open(vd->semkeys[0]) : -1);
00180     mp->plocked = &vd->locked;
00181 #endif
00182 
00183 #ifdef THR_TRACE
00184 #ifdef HAVE_SEMAPHORE_POLICY_SYSV_IPC
00185     IDB_LOG(IDB_LOG_MTX,
00186             ("mutexLightInit(mp=%p, name=%s, id=%d, key=%d)\n", mp,
00187              pmp->mtname, mp->id,
00188              (vd ? vd->semkeys[0] : 0)));
00189 #else
00190     IDB_LOG(IDB_LOG_MTX,
00191             ("mutexLightInit(mp=%p, name=%s)\n", mp, mp->pmp->mtname));
00192 #endif
00193 #endif
00194 
00195     condLightInit(vd, &mp->cond, &mp->pmp->pcond);
00196   }
00197 
00198   int
00199   mutexInit(DbDescription *vd, Mutex *mp, MutexP *pmp,
00200             const char *mtname)
00201   {
00202     mutexLightInit(vd, mp, pmp);
00203 #ifdef THR_TRACE
00204     IDB_LOG(IDB_LOG_MTX,
00205             ("mutexInit(mp=%p, pmp=%p, name=%s)\n", mp, pmp, mtname));
00206 #endif
00207     memset(pmp, 0, sizeof(*pmp));
00208 #ifdef HAVE_SEMAPHORE_POLICY_SYSV_IPC
00209     pmp->u.key = (vd ? vd->semkeys[0] : 0);
00210 #else
00211     pthread_mutexattr_t mattr;
00212 
00213     assert (!pthread_mutexattr_init(&mattr));
00214 
00215 #ifdef HAVE_PTHREAD_PROCESS_SHARED
00216     assert (!pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED));
00217 #endif
00218 
00219     assert (!pthread_mutex_init(&pmp->u.mp, &mattr));
00220 #endif
00221     strcpy(pmp->mtname, mtname);
00222 
00223     pmp->magic  = MT_MAGIC;
00224     pmp->xid    = 0;
00225 
00226     pmp->locked = 0;
00227 
00228     pmp->wait_cnt = 0;
00229     condInit(vd, (mp ? &mp->cond : 0), &pmp->pcond);
00230 
00231 #ifdef HAVE_SEMAPHORE_POLICY_POSIX
00232     IDB_LOG(IDB_LOG_MTX,
00233             ("mutexInit(%p [mp=%p], \"%s\")\n", mp, &pmp->u.mp,
00234              pmp->mtname));
00235 #endif
00236     return 1;
00237   }
00238 
00239 #define RETURN_ERROR(R, S) \
00240 do { \
00241        IDB_LOG(IDB_LOG_MTX, ("mutex" S " (xid = %d, mp->pmp->xid = %d, locked %d) [mp = 0x%x, \"%s\"], error mutex lock r=%d, errno=%d\n", xid, mp->pmp->xid, mp->pmp->locked, mp, (mp->pmp->mtname ?  mp->pmp->mtname : "<unknown>"), R, errno)); \
00242        fprintf(stderr, "mutex" S " (xid = %d, mp->pmp->xid = %d, locked %d) [mp = 0x%x, \"%s\"], error mutex lock r=%d, errno=%d\n", xid, mp->pmp->xid, mp->pmp->locked, mp, (mp->pmp->mtname ?  mp->pmp->mtname : "<unknown>"), R, errno); \
00243   return statusMake(ERROR, "mutexLock (xid = %d, mp->pmp->xid = %d, state %d) [mp = 0x%x, \"%s\"], error mutex lock r=%d, errno=%d\n", xid, mp->pmp->xid, mp->pmp->locked, mp, (mp->pmp->mtname ?  mp->pmp->mtname : "<unknown>"), R, errno); \
00244 } while(0)      
00245 
00246 #define RETURN_MAGIC_ERROR() \
00247       ESM_ASSERT_ABORT(0, 0, xid); \
00248       return statusMake(ERROR, "invalid magic number for mutex: perharps there is a log format incompatibility.\nRun 'eyedbsmtool shmmem cleanup <dbfile>'")
00249 
00250   // added the 10/01/02 to improve high concurrency programs:
00251   // MIND: currently not fully validated !
00252 #ifdef HAVE_SEMAPHORE_POLICY_POSIX
00253 #define MUTEX_FAST
00254 #endif
00255 
00256   static Status
00257   mutexLock_realize(Mutex *mp, unsigned int xid, Boolean reentrant)
00258   {
00259     int r, r1;
00260 
00261     if (mp->pmp->magic != MT_MAGIC) {
00262       IDB_LOG(IDB_LOG_MTX, ("mutexLock (xid = %d) [mp = 0x%x, \"%s\"], invalid magic 0x%x, expected 0x%x\n",
00263                             xid, mp, (mp->pmp->mtname ? mp->pmp->mtname : "<unknown>"), mp->pmp->magic, MT_MAGIC));
00264 
00265       RETURN_MAGIC_ERROR();
00266     }
00267 
00268 #ifdef THR_TRACE
00269     IDB_LOG(IDB_LOG_MTX, ("trying to lock %s %p reentrant %d\n", mp->pmp->mtname, mp, reentrant));
00270 #endif
00271 
00272     if (cleanup)
00273       return Success;
00274 
00275 #ifdef MUTEX_FAST
00276     if (!reentrant) {
00277       r = pthread_mutex_lock((pthread_mutex_t *)&mp->pmp->u.mp);
00278       if (r)
00279         RETURN_ERROR(r, "Lock");
00280     }
00281     appendGMutex(mp, xid);
00282     return Success;
00283 #endif
00284 
00285 #ifdef HAVE_SEMAPHORE_POLICY_SYSV_IPC
00286     ESM_ASSERT_ABORT(mp->id>=0, 0, 0);
00287     ESM_ASSERT_ABORT(mp->plocked, 0, 0);
00288     int sid = mp->id;
00289 #else
00290     int sid = 0;
00291 #endif
00292 
00293 #ifdef UT_SEM_FAST
00294     if (!reentrant) {
00295       ESM_ASSERT_ABORT(*mp->plocked >= 0, 0, 0);
00296 
00297       if (!(*mp->plocked)++) {
00298         // NOTE THAT: timedlock take a lot of times!!!
00299         //r = ut_sem_timedlock(sid, lockTimeout*1000);
00300         //if (r < 0)
00301         //return statusMake(FATAL_MUTEX_LOCK_TIMEOUT, "mutex %s", mp->pmp->mtname);
00302         r = ut_sem_lock(sid);
00303         if (r)
00304           RETURN_ERROR(r, "Lock");
00305       }
00306     }
00307 #else
00308 
00309     if (!reentrant) {
00310 #ifdef HAVE_SEMAPHORE_POLICY_SYSV_IPC
00311       r = ut_sem_lock(sid);
00312 #else
00313       r = pthread_mutex_lock((pthread_mutex_t *)&mp->pmp->u.mp);
00314       sleeping_mp = mp;
00315 #endif
00316       if (r)
00317         RETURN_ERROR(r, "Lock");
00318     }
00319 
00320     for (;;) {
00321       int wait_cnt;
00322       if (!mp->pmp->locked) {
00323         mp->pmp->xid = xid;
00324         mp->pmp->locked = 1;
00325 
00326 #ifdef THR_TRACE
00327         IDB_LOG(IDB_LOG_MTX, ("locking %s %p\n", mp->pmp->mtname, mp));
00328 #endif
00329 #ifdef HAVE_SEMAPHORE_POLICY_SYSV_IPC
00330         r = ut_sem_unlock(sid);
00331 #else
00332         r = pthread_mutex_unlock((pthread_mutex_t *)&mp->pmp->u.mp);
00333         sleeping_mp = 0;
00334 #endif
00335         if (r)
00336           RETURN_ERROR(r, "Lock");
00337 
00338         break;
00339       }
00340 
00341       wait_cnt = mp->pmp->wait_cnt++;
00342 #ifdef THR_TRACE_1
00343       IDB_LOG(IDB_LOG_MTX, ("waiting for unlocked %s [wait_cnt = %d]...\n",
00344                             mp->pmp->mtname, wait_cnt));
00345 #endif
00346       r = COND_WAIT(&mp->cond, mp, xid, lockTimeout);
00347 #ifdef THR_TRACE_1
00348       IDB_LOG(IDB_LOG_MTX, ("got it %s %p [%d]?\n", mp->pmp->mtname, mp, r));
00349 #endif
00350       mp->pmp->wait_cnt--;
00351 
00352       if (r) {
00353 #ifdef HAVE_SEMAPHORE_POLICY_SYSV_IPC
00354         r1 = ut_sem_unlock(sid);
00355 #else
00356         r1 = pthread_mutex_unlock((pthread_mutex_t *)&mp->pmp->u.mp);
00357         sleeping_mp = 0;
00358 #endif
00359         if (r1)
00360           RETURN_ERROR(r1, "Lock");
00361 
00362         if (r < 0)
00363           return statusMake(FATAL_MUTEX_LOCK_TIMEOUT, "mutex %s", mp->pmp->mtname);
00364           
00365         RETURN_ERROR(r, "Lock");
00366       }
00367     }
00368 
00369 #endif
00370     appendGMutex(mp, xid);
00371 
00372     return Success;
00373   }
00374 
00375   static Status
00376   mutexUnlock_realize(Mutex *mp, unsigned int xid, Boolean reentrant)
00377   {
00378     int r, mpxid;
00379 
00380     if (mp->pmp->magic != MT_MAGIC) {
00381       IDB_LOG(IDB_LOG_MTX, ("mutexUnlock (xid = %d) [mp = 0x%x, \"%s\"], "
00382                             "invalid magic 0x%x, expected 0x%x\n",
00383                             xid, mp, (mp->pmp->mtname ?  mp->pmp->mtname : "<unknown>"),
00384                             mp->pmp->magic, MT_MAGIC));
00385       RETURN_MAGIC_ERROR();
00386     }
00387 
00388     if (cleanup)
00389       return Success;
00390 #ifdef THR_TRACE
00391     IDB_LOG(IDB_LOG_MTX, ("trying to unlock %s %p xid %d==%d?\n",
00392                           mp->pmp->mtname, mp, mp->pmp->xid, xid));
00393 #endif
00394 
00395     ESM_ASSERT_ABORT(mp->pmp->xid == xid || !mp->pmp->xid || !xid, 0, 0);
00396 
00397 #ifdef MUTEX_FAST
00398     if (!reentrant) {
00399       r = pthread_mutex_unlock((pthread_mutex_t *)&mp->pmp->u.mp);
00400       if (r)
00401         RETURN_ERROR(r, "UnLock");
00402     }
00403     return releaseGMutex(mp, xid);
00404 #endif
00405 
00406 #ifdef HAVE_SEMAPHORE_POLICY_SYSV_IPC
00407     //  assert(mp->pmp->u.key || mp->id);
00408     ESM_ASSERT_ABORT(mp->id>=0, 0, 0);
00409     ESM_ASSERT_ABORT(mp->plocked, 0, 0);
00410     int sid = mp->id;
00411 #else
00412     int sid = 0;
00413 #endif
00414 
00415 #ifdef UT_SEM_FAST
00416     if (!reentrant) {
00417       ESM_ASSERT_ABORT(*mp->plocked > 0, 0, 0);
00418       if (!--(*mp->plocked)) {
00419         r = ut_sem_unlock(sid);
00420         if (r)
00421           RETURN_ERROR(r, "Unlock");
00422       }
00423     }
00424 #else
00425 
00426     ESM_ASSERT_ABORT(mp->pmp->locked, 0, 0);
00427 
00428 #ifdef HAVE_SEMAPHORE_POLICY_SYSV_IPC
00429     r = ut_sem_lock(sid);
00430 #else
00431     r = pthread_mutex_lock((pthread_mutex_t *)&mp->pmp->u.mp);
00432     sleeping_mp = mp;
00433 #endif
00434 
00435     if (r)
00436       RETURN_ERROR(r, "Unlock");
00437 
00438     mp->pmp->xid = 0;
00439     mp->pmp->locked = 0;
00440 
00441     if (mp->pmp->wait_cnt)
00442       COND_SIGNAL(&mp->cond);
00443 
00444 #ifdef THR_TRACE
00445     IDB_LOG(IDB_LOG_MTX, ("unlocking %s %p\n", mp->pmp->mtname, mp));
00446 #endif
00447 
00448     if (!reentrant) {
00449 #ifdef HAVE_SEMAPHORE_POLICY_SYSV_IPC
00450       r = ut_sem_unlock(sid);
00451 #else
00452       r = pthread_mutex_unlock((pthread_mutex_t *)&mp->pmp->u.mp);
00453       sleeping_mp = 0;
00454 #endif
00455 
00456       if (r)
00457         RETURN_ERROR(r, "Unlock");
00458     }
00459 #endif
00460 
00461     return releaseGMutex(mp, xid);
00462   }
00463 
00464   Status
00465   mutexLock(Mutex *mp, unsigned int xid)
00466   {
00467     return mutexLock_realize(mp, xid, False);
00468   }
00469 
00470   Status
00471   mutexLock_r(Mutex *mp, unsigned int xid)
00472   {
00473     return mutexLock_realize(mp, xid, True);
00474   }
00475 
00476   Status
00477   mutexUnlock(Mutex *mp, unsigned int xid)
00478   {
00479     return mutexUnlock_realize(mp, xid, False);
00480   }
00481 
00482   Status
00483   mutexUnlock_r(Mutex *mp, unsigned int xid)
00484   {
00485     return mutexUnlock_realize(mp, xid, True);
00486   }
00487 
00488   int
00489   mutexCheckNotLock(Mutex *mp, unsigned int xid)
00490   {
00491     if (mp->pmp->xid == xid) {
00492       IDB_LOG(IDB_LOG_MTX, ("WARNING mutex \"%s\" is locked by CURRENT xid = %d\n",
00493                             (mp->pmp->mtname ?  mp->pmp->mtname : "<unknown>"), xid));
00494       MUTEX_UNLOCK((Mutex *)mp, xid);
00495     }
00496     else if (mp->pmp->xid) {
00497       IDB_LOG(IDB_LOG_MTX, ("mutex \"%s\" is locked by OTHER xid = %d\n",
00498                             (mp->pmp->mtname ?  mp->pmp->mtname : "<unknown>"), mp->pmp->xid));
00499     }
00500 
00501     return 0;
00502   }
00503 
00504   XMOffset
00505   condNew(DbDescription *vd, XMHandle *xmh, CondWait *cond)
00506   {
00507     CondWaitP *pcond;
00508     XMOffset pcond_off;
00509 
00510     pcond = (CondWaitP *)XMAlloc(xmh, sizeof(CondWaitP));
00511     pcond_off = XM_OFFSET(xmh, pcond);
00512 
00513     condInit(vd, cond, pcond);
00514     return pcond_off;
00515   }
00516 
00517   void
00518   condDelete(DbDescription *vd, XMHandle *xmh, XMOffset pcond_off)
00519   {
00520     CondWaitP *pcond = (CondWaitP *)XM_ADDR(xmh, pcond_off);
00521     XMFree(xmh, pcond);
00522 #ifdef HAVE_SEMAPHORE_POLICY_POSIX
00523     pthread_cond_destroy(&pcond->u.cond);
00524 #endif
00525   }
00526 
00527   CondWait *
00528   condMake(DbDescription *vd, XMHandle *xmh, XMOffset pcond_off,
00529            CondWait *cond)
00530   {
00531     CondWaitP *pcond = (CondWaitP *)XM_ADDR(xmh, pcond_off);
00532     condLightInit(vd, cond, pcond);
00533     return cond;
00534   }
00535 
00536   void
00537   condLightInit(DbDescription *vd, CondWait *cond, CondWaitP *pcond)
00538   {
00539     assert(pcond);
00540     if (!cond)
00541       return;
00542 
00543     cond->pcond = pcond;
00544 #ifdef HAVE_SEMAPHORE_POLICY_SYSV_IPC
00545     cond->id = (vd ? ut_sem_open(vd->semkeys[1]) : -1);
00546 #endif
00547 #ifdef THR_TRACE
00548     IDB_LOG(IDB_LOG_MTX,
00549             ("condLightInit(cond=%p, pcond=%p, cid=%d)\n",
00550              cond, pcond, cond->id));
00551 #endif
00552   }
00553 
00554   int
00555   condInit(DbDescription *vd, CondWait *cond, CondWaitP *pcond)
00556   {
00557     condLightInit(vd, cond, pcond);
00558 
00559 #ifdef THR_TRACE
00560     IDB_LOG(IDB_LOG_MTX,
00561             ("condInit(cond = %p, pcond = %p, cid = %d)\n",
00562              cond, pcond, (cond ? cond->id : -1)));
00563 #endif
00564     memset(pcond, 0, sizeof(*pcond));
00565 #ifdef HAVE_SEMAPHORE_POLICY_SYSV_IPC
00566     pcond->u.key = (vd ? vd->semkeys[1] : 0);
00567 #else
00568     pthread_condattr_t cattr;
00569 
00570     assert (!pthread_condattr_init(&cattr));
00571 
00572 #ifdef _POSIX_THREAD_PROCESS_SHARED
00573     assert (!pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED));
00574 #endif
00575 
00576     assert (!pthread_cond_init(&pcond->u.cond, &cattr));
00577 #endif
00578 
00579     pcond->magic = MT_MAGIC;
00580 
00581     return 1;
00582   }
00583 
00584   static int
00585   condWait_realize(CondWait *cond, Mutex *mp, unsigned int xid,
00586                    unsigned int timeout, Boolean reentrant)
00587   {
00588     int r;
00589 #ifdef HAVE_SEMAPHORE_POLICY_SYSV_IPC
00590     ESM_ASSERT_ABORT(mp->id>=0, 0, 0);
00591     int sid = mp->id;
00592     if (cond->id < 0)
00593       cond->id = ut_sem_open(cond->pcond->u.key);
00594     int cid = cond->id;
00595 #endif
00596     struct timespec tm;
00597 
00598     if (cond->pcond->magic != MT_MAGIC) {
00599       IDB_LOG(IDB_LOG_MTX, ("condWait (xid = %d) [cond = 0x%x], invalid magic 0x%x, expected 0x%x\n",
00600                             xid, cond, cond->pcond->magic, MT_MAGIC));
00601       ESM_ASSERT_ABORT(0, 0, xid);
00602       return 1;
00603     }
00604 
00605     if (!timeout)
00606       timeout = wakeupInterval;
00607 
00608     tm.tv_sec  = time(0) + timeout;
00609     tm.tv_nsec = 0;
00610 
00611 #define WK_TIMEDWAIT_BUG
00612 #ifdef WK_TIMEDWAIT_BUG
00613     for (;;) { // 10/01/02: workaround problem false timeout return
00614 #endif
00615 #ifdef HAVE_SEMAPHORE_POLICY_SYSV_IPC
00616       if (reentrant) {
00617         Status s = mutexUnlock_r(mp, xid);
00618         if (s) {
00619           IDB_LOG(IDB_LOG_MTX, ("condWait [cond = 0x%x], fatal error, errno=%d\n",
00620                                 cond, errno));
00621           return 1;
00622         }
00623       }
00624 
00625       //r = ut_sem_condwait(sid, cid);
00626       r = ut_sem_timedcondwait(sid, cid, timeout*1000);
00627 
00628       if (reentrant) {
00629         Status s = mutexLock_r(mp, xid);
00630         if (s)
00631           {
00632             IDB_LOG(IDB_LOG_MTX, ("condWait [cond = 0x%x], fatal error, errno=%d\n",
00633                                   cond, errno));
00634             return 1;
00635           }
00636       }
00637 #else
00638       if (reentrant) {
00639         Status s = mutexUnlock_r(mp, xid);
00640         if (s)
00641           {
00642             IDB_LOG(IDB_LOG_MTX, ("condWait [cond = 0x%x], fatal error, errno=%d\n",
00643                                   cond, errno));
00644             return 1;
00645           }
00646       }
00647 
00648 
00649 #ifdef THR_TRACE
00650       IDB_LOG(IDB_LOG_MTX, ("condWait [xid = %d, cond = 0x%x] sleeping for %d seconds [%sreentrant] %s\n",
00651                             xid, cond, timeout, (reentrant ? "" : "non "), mp->pmp->mtname));
00652 #endif
00653 
00654       /*
00655         printf("condWait [xid = %d, cond = 0x%x] sleeping for %d seconds [%sreentrant] %s\n",
00656         xid, cond, timeout, (reentrant ? "" : "non "), mp->pmp->mtname);
00657       */
00658 
00659       r = pthread_cond_timedwait(&cond->pcond->u.cond, &mp->pmp->u.mp, &tm);
00660 
00661 #ifdef THR_TRACE
00662       IDB_LOG(IDB_LOG_MTX,
00663               ("condWait [xid = %d, cond = 0x%x] wakingup [return=%d]\n",
00664                xid, cond, r));
00665 #endif
00666 
00667       if (reentrant) {
00668         Status s = mutexLock_r(mp, xid);
00669         if (s)
00670           {
00671             IDB_LOG(IDB_LOG_MTX, ("condWait [cond = 0x%x], fatal error, errno=%d\n",
00672                                   cond, errno));
00673             return 1;
00674           }
00675       }
00676 
00677 #ifdef THR_TRACE_2
00678       IDB_LOG(IDB_LOG_MTX, ("condWait (xid = %d) [cond = 0x%x] wakeup %d OK reentrant %d\n",
00679                             xid, cond, r, reentrant));
00680 #endif
00681 #endif
00682 
00683       if (r == ETIMEDOUT ) {
00684         IDB_LOG(IDB_LOG_MTX, ("condWait timedwait [cond = 0x%x]\n", cond));
00685         time_t t = time(0);
00686         //printf("elapsed %d @%d\n", t - tm.tv_sec, pthread_self());
00687 #ifdef WK_TIMEDWAIT_BUG
00688         if (t - tm.tv_sec >= 0)
00689           return -(timeout);
00690         continue;
00691 #else
00692         return -(timeout);
00693 #endif
00694       }
00695 
00696       if (r) {
00697         IDB_LOG(IDB_LOG_MTX, ("condWait [cond = 0x%x], fatal error, r=%d, errno=%d, reentrant=%d\n", cond, r, errno, reentrant));
00698         perror("condWait");
00699         return r;
00700       }
00701 
00702       return r;
00703 #ifdef WK_TIMEDWAIT_BUG
00704     }
00705 #endif
00706   }
00707 
00708   int
00709   condWait(CondWait *cond, Mutex *mp, unsigned int xid,
00710            unsigned int timeout)
00711   {
00712     return condWait_realize(cond, mp, xid, timeout, False);
00713   }
00714 
00715   int
00716   condWait_r(CondWait *cond, Mutex *mp, unsigned int xid,
00717              unsigned int timeout)
00718   {
00719     return condWait_realize(cond, mp, xid, timeout, True);
00720   }
00721 
00722   int
00723   condSignal(CondWait *cond)
00724   {
00725     int r;
00726 #ifdef HAVE_SEMAPHORE_POLICY_SYSV_IPC
00727     if (cond->id < 0)
00728       cond->id = ut_sem_open(cond->pcond->u.key);
00729     int cid = cond->id;
00730 #endif
00731 
00732     if (cond->pcond->magic != MT_MAGIC) {
00733       IDB_LOG(IDB_LOG_MTX, ("condSignal [cond = 0x%x], invalid magic 0x%x, expected 0x%x\n",
00734                             cond, cond->pcond->magic, MT_MAGIC));
00735 
00736       return 1;
00737     }
00738 
00739 #ifdef THR_TRACE
00740     IDB_LOG(IDB_LOG_MTX, ("condSignal [cond = 0x%x]\n", cond));
00741 #endif
00742     //printf("condSignal [cond = 0x%x]\n", cond);
00743 
00744 #ifdef HAVE_SEMAPHORE_POLICY_SYSV_IPC
00745     r = ut_sem_signal(cid);
00746 #else
00747     r = pthread_cond_signal(&cond->pcond->u.cond);
00748 #endif
00749 
00750     if (r) {
00751       IDB_LOG(IDB_LOG_MTX, ("condSignal [cond = 0x%x], fatal error, r=%d, errno=%d\n", cond, r, errno));
00752       perror("condSignal");
00753 
00754       /*      ESM_ASSERT(0, 0, 0);*/
00755       return r;
00756     }
00757 
00758     return r;
00759   }
00760 
00761   static eyedblib::ThreadPool *thrpool;
00762   static eyedblib::ThreadPool *(*thrpool_get)();
00763 
00764   void
00765   setThreadPool(eyedblib::ThreadPool *_thrpool)
00766   {
00767     thrpool = _thrpool;
00768   }
00769 
00770   void
00771   setThreadPoolGet(eyedblib::ThreadPool *(*_thrpool_get)())
00772   {
00773     thrpool_get = _thrpool_get;
00774   }
00775 
00776   eyedblib::ThreadPool *
00777   getThreadPool()
00778   {
00779     if (thrpool)
00780       return thrpool;
00781     if (thrpool_get)
00782       thrpool = thrpool_get();
00783 
00784     return thrpool;
00785   }
00786 }

Generated on Mon Dec 22 18:15:58 2008 for eyedb by  doxygen 1.5.3