00001
00006 #include <cstring>
00007 #include <list>
00008 #include <utility>
00009 #include <iomanip>
00010 #include <cassert>
00011 #include <cmath>
00012 #include <cerrno>
00013 #include <climits>
00014
00015 #include "Space.h"
00016 #include "Value.h"
00017 #include "Cache.h"
00018 #include "functions.h"
00019 #include "exceptions.h"
00020 #include "LightString.h"
00021
00025 #define SPACE_DEBUG 0
00026
00027 unsigned int Space::NUMERIC_OBJECT_SIZE = UINT_MAX / 2;
00028 unsigned int Space::INCR_MAX_STORAGE_LEN = 24;
00029 const int Space::MUTEXES_COUNT = 32;
00030
00031 Space::Space(const std::string & name, unsigned long long softQuota, unsigned long long hardQuota,
00032 Cache *memcache, User *user) :
00033 name(name), stats(hardQuota, softQuota), hashTab(MUTEXES_COUNT), LRUList(MUTEXES_COUNT),
00034 memcache(memcache), user(user) {
00035
00036 pthread_mutex_init(&globalMutex, NULL);
00037
00038 for (int i = 0; i < MUTEXES_COUNT; ++i) {
00039 mapMutexes.push_back((pthread_rwlock_t *) malloc(sizeof(pthread_rwlock_t)));
00040
00041 if (pthread_rwlock_init(mapMutexes[i], NULL)) {
00042 perror("pthread_rwlock_init");
00043 exit(1);
00044 }
00045 }
00046
00047 fragmentedStats = new Stats[MUTEXES_COUNT];
00048
00049 current_cas_id = 1;
00050
00051 }
00052
00053 Space::~Space() {
00054 cleanSpace();
00055
00056 pthread_mutex_destroy(&globalMutex);
00057
00058 for (int i = 0; i < MUTEXES_COUNT; ++i) {
00059 pthread_rwlock_destroy(mapMutexes[i]);
00060
00061 free(mapMutexes[i]);
00062 }
00063
00064 delete[] fragmentedStats;
00065 }
00066
00067 bool Space::get(const LightString &key, Value& value, unsigned timeout) {
00068
00069 int tableCookie = hash(key);
00070 pthread_rwlock_wrlock(mapMutexes[tableCookie]);
00071
00072 SpaceHashTab::iterator it = hashTab[tableCookie].find(key);
00073 if ((it == hashTab[tableCookie].end()) || (Cache::getCurrentTime()
00074 > it->second.iteratorLRUList->expiryTime)) {
00075
00076 ++fragmentedStats[tableCookie].missCount;
00077 ++fragmentedStats[tableCookie].getsCount;
00078 pthread_rwlock_unlock(mapMutexes[tableCookie]);
00079
00080 return false;
00081 }
00082
00083 if (it->second.value.size != NUMERIC_OBJECT_SIZE) {
00084 fragmentedStats[tableCookie].bytesRead += it->second.value.size;
00085 } else {
00086 fragmentedStats[tableCookie].bytesRead += sizeof(unsigned long long);
00087 }
00088 ++fragmentedStats[tableCookie].hitCount;
00089 ++fragmentedStats[tableCookie].getsCount;
00090
00091 if (timeout != 0) {
00092 it->second.iteratorLRUList->expiryTime = computeExpiryTime(timeout);
00093 }
00094
00095
00096 LRUListElement LRUe = *it->second.iteratorLRUList;
00097 LRUList[tableCookie].erase(it->second.iteratorLRUList);
00098
00099 LRUList[tableCookie].push_front(LRUe);
00100
00101 it->second.iteratorLRUList = LRUList[tableCookie].begin();
00102
00103 if (!value.memoryPointer) {
00104 if (!(value.memoryPointer = malloc(it->second.value.size))) {
00105 pthread_rwlock_unlock(mapMutexes[tableCookie]);
00106 return false;
00107 }
00108 } else if (value.size < it->second.value.size && it->second.value.size != NUMERIC_OBJECT_SIZE) {
00109 pthread_rwlock_unlock(mapMutexes[tableCookie]);
00110 return false;
00111 }
00112
00113 value.cas_id = it->second.value.cas_id;
00114 value.flag = it->second.value.flag;
00115 value.size = it->second.value.size;
00116
00117 if (value.size != NUMERIC_OBJECT_SIZE) {
00118 memcpy(value.memoryPointer, it->second.value.memoryPointer, value.size);
00119 pthread_rwlock_unlock(mapMutexes[tableCookie]);
00120 return true;
00121 }
00122
00123
00124
00125
00126 unsigned long long numVal = *((unsigned long long*) it->second.value.memoryPointer);
00127 pthread_rwlock_unlock(mapMutexes[tableCookie]);
00128 value.size = NUM_LENGTH(numVal);
00129 char * end = (char*) value.memoryPointer + value.size - 1;
00130
00131 while (numVal) {
00132 *end-- = numVal % 10 + '0';
00133 numVal /= 10;
00134 };
00135
00136 return true;
00137 }
00138
00139 Value Space::get(const LightString &key, unsigned timeout) {
00140
00141 Value v;
00142 get(key, v, timeout);
00143 return v;
00144
00145 }
00146
00147 SpaceResult Space::doSet(const LightString &key, const Value &value, unsigned timeout,
00148 int tableCookie, SpaceHashTab::iterator found, uint64_t cas_id, uint64_t * new_cas_id) {
00149
00150 unsigned expiryTime = computeExpiryTime(timeout);
00151
00152 unsigned int valueSize = (value.size == NUMERIC_OBJECT_SIZE ? sizeof(uint64_t) : value.size);
00153
00154 int64_t spaceTaken = 0;
00155
00156 if (found != hashTab[tableCookie].end()) {
00157
00158
00159
00160 assert(value.size != NUMERIC_OBJECT_SIZE);
00161
00162 if (cas_id) {
00163 if (found->second.value.cas_id != cas_id) {
00164 return RESULT_CAS_FAIL;
00165 }
00166 }
00167
00168 if (found->second.value.size != value.size) {
00169 found->second.value.memoryPointer = realloc(found->second.value.memoryPointer,
00170 value.size);
00171 if (found->second.value.memoryPointer == NULL) {
00172 return RESULT_ERROR;
00173 }
00174 }
00175
00176 memcpy(found->second.value.memoryPointer, value.memoryPointer, value.size);
00177 found->second.value.flag = value.flag;
00178 found->second.iteratorLRUList->expiryTime = expiryTime;
00179
00180
00181
00182 if (found->second.value.size != NUMERIC_OBJECT_SIZE) {
00183 spaceTaken = (long long) value.size - (long long) found->second.value.size;
00184 } else {
00185 spaceTaken = (long long) value.size - (long long) sizeof(unsigned long long);
00186 }
00187
00188 found->second.value.size = value.size;
00189
00190
00191 LRUListElement LRUe = *found->second.iteratorLRUList;
00192 LRUList[tableCookie].erase(found->second.iteratorLRUList);
00193
00194 LRUList[tableCookie].push_front(LRUe);
00195
00196 found->second.iteratorLRUList = LRUList[tableCookie].begin();
00197
00198 } else {
00199
00200
00201 void *memoryPointer = malloc(valueSize);
00202
00203 if (!memoryPointer) {
00204 return RESULT_ERROR;
00205 }
00206
00207
00208 memcpy(memoryPointer, value.memoryPointer, valueSize);
00209
00210
00211 ++fragmentedStats[tableCookie].itemsCount;
00212
00213 spaceTaken = valueSize + 2 * key.size;
00214
00215 SpaceLRUList::iterator newHead = LRUList[tableCookie].insert(LRUList[tableCookie].begin(),
00216 LRUListElement(tableCookie, key, expiryTime));
00217
00218 found = hashTab[tableCookie].insert(std::make_pair(key, HashTabElement(Value(memoryPointer,
00219 value.size, value.flag, cas_id), newHead))).first;
00220
00221 }
00222
00223 ++fragmentedStats[tableCookie].setsCount;
00224 fragmentedStats[tableCookie].bytesWritten += valueSize;
00225
00226 pthread_mutex_lock(&globalMutex);
00227
00228 stats.usedQuota += spaceTaken;
00229 cas_id = current_cas_id++;
00230
00231 found->second.value.cas_id = cas_id;
00232
00233 if (stats.usedQuota > stats.hardQuota && spaceTaken > 0) {
00234 pthread_mutex_unlock(&globalMutex);
00235 unsigned removed = 0;
00236 while (removed < valueSize + 2 * key.size) {
00237
00238 unsigned which = tableCookie;
00239
00240 LRUListElement LRUe = LRUList[which].back();
00241 SpaceHashTab::iterator it = hashTab[which].find(LRUe.key);
00242 free(it->second.value.memoryPointer);
00243 removed += (it->second.value.size + 2 * it->first.size);
00244 hashTab[which].erase(it);
00245 LRUList[which].pop_back();
00246
00247 }
00248 pthread_mutex_lock(&globalMutex);
00249 stats.usedQuota -= removed;
00250 }
00251 pthread_mutex_unlock(&globalMutex);
00252
00253 if (new_cas_id) {
00254 *new_cas_id = cas_id;
00255 }
00256
00257 return RESULT_SUCCESS;
00258 }
00259
00260 SpaceResult Space::set(const LightString &key, const Value &value, unsigned timeout,
00261 uint64_t * new_cas_id) {
00262
00263
00264 if (value.size == 0 || value.size > Cache::MAX_OBJECT_SIZE || value.size > stats.hardQuota) {
00265 return RESULT_BAD_SIZE;
00266 }
00267
00268 int cookie = hash(key);
00269 pthread_rwlock_wrlock(mapMutexes[cookie]);
00270
00271 SpaceHashTab::iterator it = hashTab[cookie].find(key);
00272
00273 SpaceResult output = doSet(key, value, timeout, cookie, it, 0, new_cas_id);
00274
00275 pthread_rwlock_unlock(mapMutexes[cookie]);
00276
00277 return output;
00278 }
00279
00280 SpaceResult Space::cas(const LightString &key, const Value &value, unsigned timeout,
00281 uint64_t cas_id, uint64_t * new_cas_id) {
00282
00283
00284 if (value.size == 0 || value.size > Cache::MAX_OBJECT_SIZE || value.size > stats.hardQuota) {
00285 return RESULT_BAD_SIZE;
00286 }
00287
00288 int cookie = hash(key);
00289 pthread_rwlock_wrlock(mapMutexes[cookie]);
00290
00291 SpaceHashTab::iterator it = hashTab[cookie].find(key);
00292
00293 SpaceResult output = doSet(key, value, timeout, cookie, it, cas_id, new_cas_id);
00294
00295 pthread_rwlock_unlock(mapMutexes[cookie]);
00296
00297 return output;
00298
00299 }
00300
00301 SpaceResult Space::add(const LightString &key, const Value &value, unsigned timeout,
00302 uint64_t cas_id, uint64_t * new_cas_id) {
00303
00304
00305 if (value.size == 0 || value.size > Cache::MAX_OBJECT_SIZE || value.size > stats.hardQuota) {
00306 return RESULT_BAD_SIZE;
00307 }
00308
00309 int cookie = hash(key);
00310 pthread_rwlock_wrlock(mapMutexes[cookie]);
00311
00312 SpaceHashTab::iterator it = hashTab[cookie].find(key);
00313 SpaceResult ret;
00314 if (it != hashTab[cookie].end()) {
00315 ret = RESULT_EXISTS;
00316 } else {
00317 ret = doSet(key, value, timeout, cookie, it, cas_id, new_cas_id);
00318 }
00319
00320 pthread_rwlock_unlock(mapMutexes[cookie]);
00321
00322 return ret;
00323 }
00324
00325 SpaceResult Space::replace(const LightString &key, const Value &value, unsigned timeout,
00326 uint64_t cas_id, uint64_t * new_cas_id) {
00327
00328
00329 if (value.size == 0 || value.size > Cache::MAX_OBJECT_SIZE || value.size > stats.hardQuota) {
00330 return RESULT_BAD_SIZE;
00331 }
00332
00333 int cookie = hash(key);
00334 pthread_rwlock_wrlock(mapMutexes[cookie]);
00335
00336 SpaceHashTab::iterator it = hashTab[cookie].find(key);
00337 SpaceResult ret;
00338 if (it != hashTab[cookie].end()) {
00339 ret = doSet(key, value, timeout, cookie, it, cas_id, new_cas_id);
00340 } else {
00341 ret = RESULT_MISS;
00342 }
00343 pthread_rwlock_unlock(mapMutexes[cookie]);
00344
00345 return ret;
00346 }
00347
00348 SpaceResult Space::append(const LightString &key, const Value &value, uint64_t * new_cas_id) {
00349 return pend(key, value, new_cas_id, true);
00350 }
00351
00352 SpaceResult Space::prepend(const LightString &key, const Value &value, uint64_t * new_cas_id) {
00353 return pend(key, value, new_cas_id, false);
00354 }
00355
00356 SpaceResult Space::pend(const LightString &key, const Value &value, uint64_t * new_cas_id,
00357 bool append) {
00358
00359
00360
00361
00362 if (value.size > stats.hardQuota || value.size == 0) {
00363 return RESULT_BAD_SIZE;
00364 }
00365
00366
00367 int cookie = hash(key);
00368 pthread_rwlock_wrlock(mapMutexes[cookie]);
00369 SpaceResult res;
00370
00371 SpaceHashTab::iterator it = hashTab[cookie].find(key);
00372 if (it == hashTab[cookie].end()) {
00373 res = RESULT_MISS;
00374 } else {
00375
00376 unsigned realSize;
00377 char tmpbuf[INCR_MAX_STORAGE_LEN];
00378 if (it->second.value.size == NUMERIC_OBJECT_SIZE) {
00379 sprintf(tmpbuf, "%llu", (*((unsigned long long*) it->second.value.memoryPointer)));
00380 realSize = strlen(tmpbuf);
00381 } else {
00382 realSize = it->second.value.size;
00383 }
00384
00385 if (value.size + realSize > stats.hardQuota) {
00386 res = RESULT_ERROR;
00387 } else {
00388
00389 fragmentedStats[cookie].setsCount++;
00390 fragmentedStats[cookie].bytesWritten += value.size;
00391
00392
00393 void *memoryPointer = malloc(value.size + realSize);
00394
00395 if (!memoryPointer) {
00396 pthread_rwlock_unlock(mapMutexes[cookie]);
00397 return RESULT_ERROR;
00398 }
00399
00400
00401
00402 if (append) {
00403
00404 if (it->second.value.size == NUMERIC_OBJECT_SIZE) {
00405 memcpy(memoryPointer, tmpbuf, realSize);
00406 it->second.value.size = sizeof(unsigned long long);
00407 } else {
00408 memcpy(memoryPointer, it->second.value.memoryPointer, realSize);
00409 }
00410
00411
00412 memcpy((char*) memoryPointer + realSize, value.memoryPointer, value.size);
00413 } else {
00414
00415 if (it->second.value.size == NUMERIC_OBJECT_SIZE) {
00416 memcpy((char*) memoryPointer + value.size, tmpbuf, realSize);
00417 it->second.value.size = sizeof(unsigned long long);
00418 } else {
00419 memcpy((char*) memoryPointer + value.size, it->second.value.memoryPointer,
00420 realSize);
00421 }
00422
00423
00424 memcpy(memoryPointer, value.memoryPointer, value.size);
00425 }
00426
00427
00428 free(it->second.value.memoryPointer);
00429
00430
00431 pthread_mutex_lock(&globalMutex);
00432 stats.usedQuota += value.size;
00433 uint64_t cas_id = current_cas_id++;
00434 pthread_mutex_unlock(&globalMutex);
00435
00436 it->second.value.memoryPointer = memoryPointer;
00437 it->second.value.size = value.size + realSize;
00438 it->second.value.flag = value.flag;
00439 it->second.value.cas_id = cas_id;
00440
00441 if (new_cas_id) {
00442 *new_cas_id = cas_id;
00443 }
00444 res = RESULT_SUCCESS;
00445 }
00446 }
00447 pthread_rwlock_unlock(mapMutexes[cookie]);
00448 return res;
00449 }
00450
00451 SpaceResult Space::increment(const LightString &key, uint64_t step, uint64_t * out,
00452 unsigned timeout, bool force, uint64_t initial) {
00453
00454 return crement(key, step, out, timeout, force, initial);
00455
00456 }
00457
00458 SpaceResult Space::decrement(const LightString &key, uint64_t step, uint64_t * out,
00459 unsigned timeout, bool force, uint64_t initial) {
00460
00461 return crement(key, ~step + 1, out, timeout, force, initial);
00462
00463 }
00464
00465 SpaceResult Space::crement(const LightString &key, uint64_t step, uint64_t * out, unsigned timeout,
00466 bool force, uint64_t initial) {
00467
00468
00469
00470 int cookie = hash(key);
00471 pthread_rwlock_wrlock(mapMutexes[cookie]);
00472
00473 SpaceHashTab::iterator it = hashTab[cookie].find(key);
00474
00475 SpaceResult res;
00476 void * memoryPointer;
00477
00478 if (it != hashTab[cookie].end()) {
00479
00480
00481
00482 if (it->second.value.size == NUMERIC_OBJECT_SIZE) {
00483 *out = ((*((uint64_t *) it->second.value.memoryPointer)) += step);
00484 res = RESULT_SUCCESS;
00485 } else {
00486 if (!safe_strtoull((string((char*) (it->second.value.memoryPointer),
00487 it->second.value.size).substr(0, it->second.value.size)).c_str(),
00488 (uint64_t*) out)) {
00489
00490 res = RESULT_ERROR;
00491
00492 } else if (!(memoryPointer = malloc(sizeof(initial)))) {
00493
00494 res = RESULT_ERROR;
00495
00496 } else {
00497
00498 *out += step;
00499 memcpy(memoryPointer, (void*) out, sizeof(initial));
00500 free(it->second.value.memoryPointer);
00501 it->second.value.memoryPointer = memoryPointer;
00502
00503 pthread_mutex_lock(&globalMutex);
00504 stats.usedQuota += sizeof(initial) - it->second.value.size;
00505 pthread_mutex_unlock(&globalMutex);
00506
00507 it->second.value.size = NUMERIC_OBJECT_SIZE;
00508 res = RESULT_SUCCESS;
00509 }
00510 }
00511 } else {
00512
00513 if (force) {
00514 res = doSet(key, Value(&initial, NUMERIC_OBJECT_SIZE), timeout, cookie, it);
00515 *out = initial;
00516 } else {
00517 res = RESULT_MISS;
00518 }
00519 }
00520 pthread_rwlock_unlock(mapMutexes[cookie]);
00521 return res;
00522 }
00523
00524 void Space::setStatus(unsigned nUsedQuota, unsigned nGetsCount, unsigned nSetsCount,
00525 unsigned nBytesWritten, unsigned nBytesRead, unsigned nMissCount, unsigned nHitCount,
00526 unsigned long long nItemsCount) {
00527
00528
00529
00530 stats.getsCount = nGetsCount;
00531 stats.setsCount = nSetsCount;
00532
00533 stats.bytesRead = nBytesRead;
00534 stats.missCount = nMissCount;
00535 stats.hitCount = nHitCount;
00536 stats.itemsCount = nItemsCount;
00537 }
00538
00539 bool Space::saveSpace(const char *fileName, unsigned minTimeout) {
00540
00541
00542
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552
00553
00554
00555
00556
00557
00558
00559
00560
00561
00562
00563
00564
00565
00566
00567
00568
00569
00570
00571
00572
00573
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587 return true;
00588 }
00589
00590
00591 void Space::cleanSpace() {
00592
00593
00594
00595
00596
00597
00598
00599 }
00600
00601 void Space::destroySpace() {
00602
00603 cleanSpace();
00604 }
00605
00606 Stats Space::getStats() {
00607 pthread_mutex_lock(&globalMutex);
00608 stats.getsCount = 0;
00609 stats.missCount = 0;
00610 stats.hitCount = 0;
00611 stats.bytesRead = 0;
00612 stats.itemsCount = 0;
00613 stats.bytesWritten = 0;
00614 stats.setsCount = 0;
00615 for (int i = 0; i < MUTEXES_COUNT; i++) {
00616 stats.getsCount += fragmentedStats[i].getsCount;
00617 stats.missCount += fragmentedStats[i].missCount;
00618 stats.hitCount += fragmentedStats[i].hitCount;
00619 stats.bytesRead += fragmentedStats[i].bytesRead;
00620 stats.bytesWritten += fragmentedStats[i].bytesWritten;
00621 stats.itemsCount += fragmentedStats[i].itemsCount;
00622 stats.setsCount += fragmentedStats[i].setsCount;
00623 }
00624 Stats tmp = stats;
00625 pthread_mutex_unlock(&globalMutex);
00626 return tmp;
00627 }
00628
00629 void Space::clearStats() {
00630
00631 stats.bytesRead = 0;
00632 stats.bytesWritten = 0;
00633 stats.getsCount = 0;
00634 stats.setsCount = 0;
00635 stats.hitCount = 0;
00636 stats.missCount = 0;
00637 }
00638
00639 size_t Space::hash(const LightString & key) {
00640 return HASH(key) % MUTEXES_COUNT;
00641 }
00642
00643 unsigned Space::computeExpiryTime(unsigned timeout) {
00644
00645 if (timeout == 0) {
00646 return INT_MAX;
00647 } else if (timeout > 30 * 24 * 60 * 60) {
00648 return timeout;
00649 } else {
00650 return timeout + Cache::getCurrentTime();
00651 }
00652 }
00653