00001
00006 #include <deque>
00007 #include <iomanip>
00008 #include <sstream>
00009 #include <ctime>
00010 #include <sys/time.h>
00011 #include <cstdlib>
00012 #include <csignal>
00013 #include <climits>
00014 #include <cstring>
00015 #include <pthread.h>
00016
00017 #include <cassert>
00018
00019 #include "GarbageCollector.h"
00020 #include "functions.h"
00021
00025 #define GC_DEBUG 20
00026
00030 #define GC_SYNCHRO_DEBUG 0
00031
00035 #define SPACE_MANAGER 0
00036
00037 GarbageCollector::GarbageCollector(Cache *cache, unsigned sleepInterval) :
00038 cache(cache), sleepInterval(sleepInterval) {
00039
00040 quotaToDistribute = 0;
00041 nextQuotaToDistribute = 0;
00042 maxHardQuota = 0;
00043
00044 reqHistoryLimiter = 10;
00045
00046 pthread_mutex_init(&addMutex, NULL);
00047 pthread_mutex_init(&removeMutex, NULL);
00048
00049 int ret;
00050 if ((ret = pthread_create(&threadID, NULL, GCThread, this))) {
00051 fprintf(stderr, "GC:\tpthread_create: %s\n", strerror(ret));
00052 exit(1);
00053 }
00054
00055 }
00056
00057 GarbageCollector::~GarbageCollector() {
00058
00059 pthread_kill(threadID, SIGUSR1);
00060
00061 for (std::map<Space*, std::deque<unsigned> *>::iterator it = reqHistoryMap.begin(); it
00062 != reqHistoryMap.end(); ++it) {
00063 delete it->second;
00064 }
00065
00066 }
00067
00068 void GarbageCollector::addObservedSpace(Space *space) {
00069 MUTEX_LOCK(addMutex);
00070 spacesToAdd.push_back(space);
00071 cout << "GC:\tadded space " << space->name << endl;
00072 MUTEX_UNLOCK(addMutex);
00073 }
00074
00075 void GarbageCollector::removeObservedSpace(Space * space) {
00076 MUTEX_LOCK(removeMutex);
00077 spacesToRemove.insert(space);
00078 MUTEX_LOCK(addMutex);
00079 for (SpaceList::iterator it = spacesToAdd.begin(); it != spacesToAdd.end();) {
00080 if (*it == space) {
00081 it = spacesToAdd.erase(it);
00082 } else {
00083 it++;
00084 }
00085 }
00086 MUTEX_UNLOCK(addMutex);
00087 cout << "GC:\tremoved space " << space->name << endl;
00088 MUTEX_UNLOCK(removeMutex);
00089 }
00090
00091 void GarbageCollector::setInterval(unsigned interval) {
00092
00093 MUTEX_LOCK(removeMutex);
00094 sleepInterval = interval;
00095 MUTEX_UNLOCK(removeMutex);
00096
00097 }
00098
00099 void GarbageCollector::operator()() {
00100
00101 cout << "GC:\tstarted" << endl;
00102
00103 for (;;) {
00104
00105 MUTEX_LOCK(addMutex);
00106 spaceList.insert(spaceList.end(), spacesToAdd.begin(), spacesToAdd.end());
00107 spacesToAdd.clear();
00108 #if SPACE_MANAGER
00109 for (SpaceList::iterator it = spacesToAdd.begin(); it != spacesToAdd.end(); ++it) {
00110 Space *space = *it;
00111
00112 unsigned long long reqMin;
00113
00114 Stats stats = space->getStats();
00115
00116 if (stats.softQuota > stats.usedQuota)
00117 reqMin = stats.usedQuota;
00118 else
00119 reqMin = stats.softQuota;
00120
00121 nextQuotaToDistribute += stats.hardQuota - reqMin;
00122 maxHardQuota += stats.hardQuota;
00123 requiredMinimum.insert(std::pair<Space *, unsigned long long>(space, reqMin));
00124
00125 reqHistoryMap.insert(std::pair<Space *, std::deque<unsigned> *>(space, new std::deque<unsigned>()));
00126 }
00127 #endif
00128 MUTEX_UNLOCK(addMutex);
00129
00130 #if SPACE_MANAGER
00131
00132 if (reqHistoryTotal.size() == reqHistoryLimiter)
00133 reqHistoryTotal.pop_back();
00134 reqHistoryTotal.push_front(0);
00135
00136 quotaToDistribute = maxHardQuota;
00137 for (std::map<Space *, unsigned long long>::iterator minIt = requiredMinimum.begin(); minIt
00138 != requiredMinimum.end(); ++minIt) {
00139 quotaToDistribute -= minIt->second;
00140 }
00141 #endif
00142
00143 for (SpaceList::iterator it = spaceList.begin(); it != spaceList.end(); ++it) {
00144
00145 MUTEX_LOCK(removeMutex);
00146
00147 Space * space = *it;
00148
00149 if (spacesToRemove.find(space) != spacesToRemove.end() ) {
00150 MUTEX_UNLOCK(removeMutex);
00151 continue;
00152 }
00153
00154 #if SPACE_MANAGER
00155
00156 determineNewQuota(space);
00157 #endif
00158
00159 collect(space, true);
00160
00161 MUTEX_UNLOCK(removeMutex);
00162
00163 }
00164
00165 MUTEX_LOCK(removeMutex);
00166 for (SpaceList::iterator it = spaceList.begin(); it != spaceList.end();) {
00167 if (spacesToRemove.find(*it) != spacesToRemove.end()) {
00168 it = spaceList.erase(it);
00169 } else {
00170 ++it;
00171 }
00172 }
00173 spacesToRemove.clear();
00174
00175 unsigned sleepTime = sleepInterval;
00176
00177 MUTEX_UNLOCK(removeMutex);
00178
00179 usleep(1000 * sleepTime);
00180
00181 }
00182
00183 }
00184
00185 void GarbageCollector::collect(Space * space, bool goBelowQuota) {
00186
00187 time_t deadline = Cache::getCurrentTime();
00188
00189 for (int i = 0; i < space->MUTEXES_COUNT; i++) {
00190
00191 pthread_rwlock_wrlock(space->mapMutexes[i]);
00192
00193 for (Space::SpaceLRUList::iterator it = space->LRUList[i].begin(); it != space->LRUList[i].end(); ){
00194 if (it->expiryTime <= deadline){
00195 Space::SpaceHashTab::iterator hIt = space->hashTab[i].find(it->key);
00196 pthread_mutex_lock(&space->globalMutex);
00197 space->stats.usedQuota -= (hIt->second.value.size + 2* hIt->first.size);
00198 pthread_mutex_unlock(&space->globalMutex);
00199 ++space->fragmentedStats[i].itemsCount;
00200
00201 free(hIt->second.value.memoryPointer);
00202 space->hashTab[i].erase(hIt);
00203 Space::SpaceLRUList::iterator nit = it++;
00204 space->LRUList[i].erase(nit);
00205 } else {
00206 it++;
00207 }
00208 }
00209
00210 pthread_rwlock_unlock(space->mapMutexes[i]);
00211
00212 }
00213
00214 }
00215
00216 #if SPACE_MANAGER
00217 unsigned long long GarbageCollector::determineNewQuota(Space * space) {
00218
00219 unsigned long long addedQuota = 0;
00220
00221
00222
00223 std::map<Space*, std::deque<unsigned> *>::iterator it = reqHistoryMap.find(space);
00224 std::deque<unsigned> * reqHistory = it->second;
00225
00226 if (reqHistory->size() == reqHistoryLimiter) {
00227 reqHistory->pop_back();
00228 }
00229
00230 Stats stats = space->getStats();
00231
00232 if (reqHistory->size()) {
00233 reqHistoryTotal.front() += stats.setsCount - reqHistory->front();
00234 reqHistory->push_front(stats.setsCount - reqHistory->front());
00235 } else {
00236 reqHistoryTotal.front() += stats.setsCount;
00237 reqHistory->push_front(stats.setsCount);
00238 }
00239
00240
00241 unsigned spaceScore = 0;
00242 unsigned totalScore = 0;
00243
00244 for (unsigned pos = 1; pos < reqHistory->size(); pos++) {
00245 totalScore += (reqHistory->size() - pos) * reqHistoryTotal[pos];
00246 spaceScore += (reqHistory->size() - pos) * ((*reqHistory)[pos]);
00247 }
00248
00249 unsigned long long reqMin;
00250
00251 if (stats.softQuota > stats.usedQuota)
00252 reqMin = stats.usedQuota;
00253 else
00254 reqMin = stats.softQuota;
00255
00256 std::map<Space *, unsigned long long>::iterator minIt = requiredMinimum.find(space);
00257
00258 if (totalScore) {
00259 addedQuota = (spaceScore * quotaToDistribute) / totalScore;
00260
00261
00262
00263 unsigned long long newHardQuota = minIt->second + addedQuota;
00264
00265 space->lock();
00266 space->stats.hardQuota = newHardQuota;
00267 space->unlock();
00268
00269 } else
00270 addedQuota = UINT_MAX;
00271
00272
00273 minIt->second = reqMin;
00274
00275 return addedQuota;
00276 }
00277 #endif
00278
00279 void * GCThread(void * arg) {
00280
00281 GarbageCollector * gc = (GarbageCollector *) arg;
00282
00283 gc->operator ()();
00284
00285 return NULL;
00286
00287 }