00001
00006 #include "EventServer.h"
00007 #include "LightString.h"
00008
00012 int main(int argc, char ** argv) {
00013
00014 #if DEBUG_BINARY >= 20
00015 f = fopen("test.in", "wb");
00016 #endif
00017
00018 startTime = Cache::getCurrentTime();
00019
00020 try {
00021
00022 if (signal(SIGINT, signalHandler)) {
00023 perror("signal");
00024 exit(1);
00025 }
00026
00027 int c;
00028 int daemon = 0, argport = 0, optok = 1;
00029
00030 while ((c = getopt(argc, argv, "C:dp:")) != -1) {
00031 switch (c) {
00032 case 'C':
00033 configurationDirectory(optarg);
00034 break;
00035 case 'd':
00036 daemon = 1;
00037 break;
00038 case 'p':
00039 errno = 0;
00040 argport = strtoul(optarg, NULL, 10);
00041 if (errno) {
00042 fprintf(stderr, "Wrong port number!\n");
00043 exit(1);
00044 }
00045 break;
00046 default:
00047 optok = 0;
00048 }
00049 }
00050
00051 if (!optok || optind < argc) {
00052 printf("Usage: %s [-d] [-C directory] [-p port]\n"
00053 " -d run as a daemon\n"
00054 " -p port set server listening port\n"
00055 " -C directory specify configuration directory (default: %s)\n\n", argv[0],
00056 configurationDirectory().c_str());
00057 exit(1);
00058 }
00059
00060 if (daemon) {
00061 daemonize();
00062 }
00063
00064 cout << endl;
00065 cout << "Server:\t" << PACKAGE << "-" << VERSION << " is now starting...\n";
00066
00067 loadConfFile(string(configurationDirectory()) + "/server.cfg");
00068
00069 if (argport) {
00070 port = argport;
00071 }
00072
00073 cout << "Server:\tinitializing cache..." << endl;
00074
00075 cache = new Cache(configurationDirectory());
00076
00077 if (logSwitch) {
00078 serverLogger = new ServerLogger(serverLoggerInterval, serverLogsDirectory);
00079 }
00080
00081 User * user = cache->login(userName);
00082 if (!user) {
00083 cout << "No such user: \"" << userName << "\"" << endl;
00084 delete cache;
00085 return 1;
00086 }
00087
00088 space = user->createSpace(spaceName, softQuota * 1024 * 1024, hardQuota * 1024 * 1024);
00089 if (!space) {
00090 cout << "ERROR: could not create space: \"" << spaceName << "\"\n";
00091 return 1;
00092 }
00093
00094 sockaddr_in serverAddress;
00095 bzero((void *) &serverAddress, sizeof(serverAddress));
00096 serverAddress.sin_family = AF_INET;
00097 serverAddress.sin_addr.s_addr = htonl(INADDR_ANY);
00098 serverAddress.sin_port = htons(port);
00099
00100 int serverSocket;
00101
00102 if ((serverSocket = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
00103 perror("socket");
00104 return 1;
00105 }
00106
00107 int flag = 1;
00108 if (setsockopt(serverSocket, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag)) < 0) {
00109 perror("setsockopt(SO_REUSEADDR)");
00110 }
00111
00112 if (bind(serverSocket, (sockaddr *) &serverAddress, sizeof(serverAddress)) == -1) {
00113 perror("bind");
00114 cout << "Server:\tFailed to listen on port " << port << "!\n";
00115 return 1;
00116 }
00117
00118 if (listen(serverSocket, 0) == -1) {
00119 perror("listen");
00120 return 1;
00121 }
00122
00123 cout << "Server:\tListening on port " << port << "...\n";
00124
00125 threads = (Thread *) calloc(threadCount, sizeof(Thread));
00126
00127 for (int i = 0; i < threadCount; i++) {
00128
00129 Thread * t = &threads[i];
00130
00131 int fds[2];
00132
00133 if (pipe(fds)) {
00134 perror("pipe");
00135 exit(1);
00136 }
00137
00138 t->in = fds[0];
00139 t->out = fds[1];
00140
00141 t->eventBase = (event_base *) event_init();
00142
00143 t->index = i;
00144
00145 event_set(&t->pipeEvent, t->in, EV_READ | EV_PERSIST, pipeCallback, t);
00146 event_base_set(t->eventBase, &t->pipeEvent);
00147 event_add(&t->pipeEvent, NULL);
00148 }
00149
00150 int cpuCount = 0;
00151 for (int i = 0; i < threadCount; i++) {
00152
00153 cpu_set_t cpuset;
00154 pthread_attr_t attr;
00155
00156 CPU_ZERO(&cpuset);
00157 int cpu = cpuCount ? i % cpuCount : i;
00158 CPU_SET(cpu, &cpuset);
00159 pthread_attr_init(&attr);
00160
00161 if (pthread_attr_setaffinity_np(&attr, sizeof(cpuset), &cpuset)) {
00162 perror("pthread_attr_setaffinity_np");
00163 exit(1);
00164 }
00165
00166 int ret;
00167 if ((ret = pthread_create(&threads[i].threadID, &attr, threadWorker, &threads[i]))) {
00168
00169 if (ret == EINVAL && cpuCount == 0) {
00170 cpuCount = i--;
00171 cout << "Server:\tWARNING: Number of detected cores (" << cpuCount
00172 << ") is lower "
00173 "than number of threads!" << endl;
00174 continue;
00175 }
00176
00177 cerr << "pthread_create: " << strerror(ret) << endl;
00178 cout << (ret == EINVAL) << endl;
00179 exit(1);
00180 } else {
00181 cout << "Server:\tThread " << i + 1 << " started using core #" << cpu << endl;
00182 }
00183 }
00184
00185 int flags;
00186 struct event acceptEvent;
00187
00188 if ((flags = fcntl(serverSocket, F_GETFL)) == -1 || fcntl(serverSocket, F_SETFL, flags
00189 | O_NONBLOCK) == -1) {
00190 perror("fcntl");
00191 exit(1);
00192 }
00193
00194 struct event_base *acceptEventBase;
00195 acceptEventBase = (struct event_base*) event_init();
00196
00197 event_set(&acceptEvent, serverSocket, EV_READ | EV_PERSIST, acceptCallback, &acceptEvent);
00198 event_base_set(acceptEventBase, &acceptEvent);
00199
00200 event_add(&acceptEvent, NULL);
00201
00202 event_base_loop(acceptEventBase, 0);
00203
00204 void * p;
00205 for (int i = 0; i < threadCount; i++) {
00206 pthread_join(threads[i].threadID, &p);
00207 cout << "thread joined " << i << endl;
00208 }
00209
00210 printf("Evacuate the dancefloor! ;)\n");
00211
00212 free(threads);
00213
00214 } catch (Exception & e) {
00215 cout << "Server:\tERROR: " << e.getMessage() << endl;
00216 }
00217
00218 }
00219
00220 inline void acceptCallback(int serverSocket, short event, void *arg) {
00221
00222 struct workerData dat;
00223 static int lastThreadNo = -1;
00224
00225 if ((dat.clientSocket = accept(serverSocket, NULL, NULL)) == -1) {
00226 fprintf(stderr, "errno %d\n", errno);
00227 perror("accept");
00228
00229 return;
00230 }
00231
00232
00233
00234
00235
00236 int threadNo = ++lastThreadNo % threadCount;
00237
00238 dat.threadNo = threadNo;
00239 threads[threadNo].connections++;
00240 threads[threadNo].totalConnections++;
00241
00242 int ret = write(threads[threadNo].out, (void*) &dat, sizeof(dat));
00243
00244 if (ret != sizeof(dat)) {
00245 perror("write");
00246 }
00247
00248 }
00249
00250 inline void pipeCallback(int fd, short event, void * arg) {
00251
00252 struct workerData dat;
00253
00254 int ret = read(fd, (void*) &dat, sizeof(dat));
00255
00256 if (ret != sizeof(dat)) {
00257 perror("read");
00258 }
00259
00260 Info * info = (Info *) malloc(sizeof(Info));
00261
00262 info->clientSocket = dat.clientSocket;
00263 info->t = &threads[dat.threadNo];
00264
00265 info->offset = 0;
00266 info->cursor = 0;
00267 info->continueFlag = 0;
00268 info->protocol = 0;
00269 info->state = UNDEFINED;
00270 info->active = true;
00271 info->queueBuffer = NULL;
00272 info->queueLength = 0;
00273 info->queueCapacity = 0;
00274 info->quit = 0;
00275
00276 info->value.memoryPointer = info->sendBuffer + MAX_HEADER_LENGTH;
00277
00278 event_set(&info->readEvent, info->clientSocket, EV_READ | EV_PERSIST, readCallback, info);
00279 event_base_set(info->t->eventBase, &info->readEvent);
00280 event_add(&info->readEvent, &connectionTimeout);
00281
00282 event_set(&info->writeEvent, info->clientSocket, EV_WRITE | EV_PERSIST, writeCallback, info);
00283 event_base_set(info->t->eventBase, &info->writeEvent);
00284
00285 }
00286
00290 #define SHORTEST_CMD "get "
00291
00292 inline void readCallback(int socket, short event, void * arg) {
00293
00294 Info * i = (Info *) arg;
00295
00296 if (event == EV_TIMEOUT && !i->continueFlag) {
00297
00298 if (i->active) {
00299 i->active = false;
00300 event_del(&i->readEvent);
00301 event_add(&i->readEvent, &connectionTimeout);
00302 return;
00303 }
00304
00305
00306 if (logSwitch) {
00307 serverLogger->logConnectionTimeout();
00308 }
00309 sweepConnection(i);
00310 return;
00311 }
00312
00313 i->active = true;
00314
00315 int len = recvfrom(socket, i->buffer + i->offset, sizeof(i->buffer) - i->offset, MSG_DONTWAIT,
00316 NULL, NULL);
00317
00318 if (len > 0) {
00319 i->offset += len;
00320 }
00321
00322
00323
00324
00325 if (len == 0 ) {
00326
00327 sweepConnection(i);
00328 return;
00329
00330 } else if (len == -1 && errno != EAGAIN) {
00331
00332
00333 if (logSwitch) {
00334 serverLogger->logSocketError();
00335 }
00336
00337 sweepConnection(i);
00338 return;
00339 }
00340
00341 i->continueFlag = false;
00342
00343 if (!i->protocol) {
00344
00345 if ((unsigned char) i->buffer[0] == 0x80) {
00346 i->protocol = 2;
00347 } else {
00348 i->protocol = 1;
00349 }
00350 }
00351
00352 if (i->protocol == 1) {
00353 asciiHandler(i);
00354 } else {
00355 binaryHandler(i);
00356 }
00357
00358 }
00359
00364 inline void binaryHandler(Info * i) {
00365
00366 #if ENDIAN_LITTLE
00367 #define BSWAP_64(x) \
00368 src = (struct trick *) &x; \
00369 tmp = src->l; \
00370 src->l = htonl(src->r); \
00371 src->r = htonl(tmp);
00372 #define INVERT_HEADER(header) \
00373 header->header.bodylen = ntohl(header->header.bodylen); \
00374 header->header.keylen = ntohs(header->header.keylen); \
00375 BSWAP_64(header->header.cas);
00376 #define INVERT_SET_HEADER_PART(header) \
00377 header->set_header.expiration = ntohl(header->set_header.expiration); \
00378 header->set_header.flags = ntohl(header->set_header.flags);
00379 #else
00380 #define BSWAP_64(x)
00381 #define INVERT_HEADER(header)
00382 #define INVERT_SET_HEADER_PART(header)
00383 #endif
00384
00385 # if ENDIAN_LITTLE
00386 struct trick {
00387 uint32_t l;
00388 uint32_t r;
00389 }* src;
00390 uint32_t tmp;
00391 # endif
00392
00393 void * value;
00394 unsigned int value_len;
00395 char * key;
00396 raw_response_header * response;
00397 response_header * get_response;
00398 increment_response_header * increment_response;
00399 bool quiet;
00400 SpaceResult res;
00401 uint16_t actual_keylen;
00402
00403 while (true) {
00404
00405 value_len = 0;
00406
00407 request_header * header = (request_header *) (i->buffer + i->cursor);
00408
00409 if (i->offset - i->cursor < sizeof(raw_request_header) || i->offset - i->cursor
00410 < sizeof(raw_request_header) + ntohl(header->header.bodylen)) {
00411
00412 if (DEBUG_BINARY && i->offset - i->cursor) {
00413 printf("waiting...%d %d %d\n", i->offset - i->cursor < sizeof(raw_request_header),
00414 i->offset - i->cursor < sizeof(raw_request_header) + ntohl(
00415 header->header.bodylen), ntohl(header->header.bodylen));
00416 }
00417 return;
00418 }
00419
00420 printBinaryCommand(header->header.opcode);
00421 printBinaryBuffer(i->buffer + i->cursor, sizeof(raw_request_header) + ntohl(
00422 header->header.bodylen), "");
00423
00424 #if DEBUG_BINARY >= 20
00425 fwrite(i->buffer + i->cursor, sizeof(char), sizeof(raw_request_header) + ntohl(
00426 header->header.bodylen), f);
00427 fflush(f);
00428 #endif
00429
00430 switch (header->header.opcode) {
00431 case 0x0d:
00432 case 0x09:
00433 case 0x00:
00434 case 0x0c:
00435
00436
00437 INVERT_HEADER(header)
00438
00439 key = i->buffer + i->cursor + sizeof(raw_request_header);
00440
00441 actual_keylen = (header->header.opcode ? header->header.keylen : 0);
00442
00443 i->value.size = Cache::MAX_OBJECT_SIZE;
00444 if (space->get(LightString(key, header->header.keylen), i->value)) {
00445
00446 get_response = (response_header *) ((char *) i->value.memoryPointer
00447 - sizeof(hit_response_header) - actual_keylen);
00448 memset(get_response, 0, sizeof(hit_response_header));
00449
00450 get_response->header.extlen = 0x04;
00451 get_response->hit_header.flags = htonl(i->value.flag);
00452 get_response->header.cas = i->value.cas_id;
00453 get_response->header.bodylen = htonl(0x04 + actual_keylen + i->value.size);
00454 BSWAP_64(get_response->header.cas);
00455
00456 i->length = sizeof(hit_response_header) + i->value.size + actual_keylen;
00457 } else {
00458 get_response = (response_header *) ((char *) i->value.memoryPointer
00459 - sizeof(raw_response_header) - actual_keylen);
00460 memset(get_response, 0, sizeof(raw_response_header));
00461
00462 get_response->header.status = htons(0x0001);
00463 get_response->header.bodylen = htonl(actual_keylen);
00464
00465 i->length = sizeof(raw_response_header) + actual_keylen;
00466 }
00467
00468 get_response->header.magic = 0x81;
00469 get_response->header.opcode = header->header.opcode;
00470 get_response->header.opaque = header->header.opaque;
00471
00472
00473 if (header->header.opcode) {
00474 get_response->header.keylen = htons(header->header.keylen);
00475 memcpy((char *) get_response + sizeof(raw_response_header)
00476 + get_response->header.extlen, key, header->header.keylen);
00477 }
00478
00479 i->sendIt = (char *) get_response;
00480
00481 break;
00482
00483 case 0x11:
00484 case 0x12:
00485 case 0x13:
00486 case 0x01:
00487 case 0x02:
00488 case 0x03:
00489
00490 INVERT_SET_HEADER_PART(header)
00491
00492 value_len = -8;
00493
00494 case 0x0e:
00495 case 0x19:
00496 case 0x0f:
00497 case 0x1a:
00498
00499
00500 INVERT_HEADER(header)
00501
00502 key = i->buffer + i->cursor + sizeof(raw_request_header) + header->header.extlen;
00503 value = key + header->header.keylen;
00504
00505
00506 value_len += header->header.bodylen - header->header.keylen;
00507
00508 response = (raw_response_header *) i->sendBuffer;
00509 i->sendIt = i->sendBuffer;
00510 memset(response, 0, sizeof(raw_response_header));
00511
00512 if ((header->header.opcode & 0x0f) == 0x01) {
00513 res = space->cas(LightString(key, header->header.keylen), Value(value,
00514 value_len, header->set_header.flags), header->set_header.expiration,
00515 header->header.cas, &response->cas);
00516 } else if ((header->header.opcode & 0x0f) == 0x02) {
00517 res = space->add(LightString(key, header->header.keylen), Value(value,
00518 value_len, header->set_header.flags), header->set_header.expiration,
00519 header->header.cas, &response->cas);
00520 } else if ((header->header.opcode & 0x0f) == 0x03) {
00521 res = space->replace(LightString(key, header->header.keylen), Value(value,
00522 value_len, header->set_header.flags), header->set_header.expiration,
00523 header->header.cas, &response->cas);
00524 } else if (header->header.opcode == 0x0e || header->header.opcode == 0x19) {
00525 res = space->append(LightString(key, header->header.keylen), Value(value,
00526 value_len), &response->cas);
00527 } else if (header->header.opcode == 0x0f || header->header.opcode == 0x1a) {
00528 res = space->prepend(LightString(key, header->header.keylen), Value(value,
00529 value_len), &response->cas);
00530 } else {
00531 assert(0);
00532 }
00533
00534 response->magic = 0x81;
00535 response->opcode = header->header.opcode;
00536 response->opaque = header->header.opaque;
00537
00538 if (res) {
00539 if (res == RESULT_EXISTS) {
00540 response->status = htons(0x0002);
00541 } else if (res == RESULT_MISS) {
00542 response->status = htons(0x0001);
00543 } else if (res == RESULT_BAD_SIZE) {
00544 response->status = htons(0x0004);
00545 } else if (res == RESULT_ERROR) {
00546 response->status = htons(0x0082);
00547 }
00548 }
00549
00550 BSWAP_64(response->cas)
00551
00552 i->length = sizeof(raw_response_header);
00553
00554 break;
00555
00556 case 0x05:
00557 case 0x06:
00558 INVERT_HEADER(header)
00559 BSWAP_64(header->increment_header.delta)
00560 BSWAP_64(header->increment_header.initial_value)
00561 header->increment_header.expiration = ntohl(header->increment_header.expiration);
00562
00563 uint64_t out;
00564
00565 key = i->buffer + i->cursor + sizeof(raw_request_header) + header->header.extlen;
00566
00567 if (header->header.opcode == 0x05) {
00568 res = space->increment(LightString(key, header->header.keylen),
00569 header->increment_header.delta, &out,
00570 header->increment_header.expiration,
00571 header->increment_header.expiration != 0xffffffff,
00572 header->increment_header.initial_value);
00573 } else {
00574 res = space->decrement(LightString(key, header->header.keylen),
00575 header->increment_header.delta, &out,
00576 header->increment_header.expiration,
00577 header->increment_header.expiration != 0xffffffff,
00578 header->increment_header.initial_value);
00579 }
00580
00581 increment_response = (increment_response_header *) i->sendBuffer;
00582 response = (raw_response_header *) i->sendBuffer;
00583 i->sendIt = i->sendBuffer;
00584 memset(increment_response, 0, sizeof(increment_response_header));
00585
00586 response->magic = 0x81;
00587 response->opcode = header->header.opcode;
00588 response->opaque = header->header.opaque;
00589
00590 if (res == RESULT_SUCCESS) {
00591 increment_response->value = out;
00592 BSWAP_64(increment_response->value)
00593 response->bodylen = htonl(sizeof(uint64_t));
00594 i->length = sizeof(increment_response_header);
00595 } else {
00596 if (res == RESULT_ERROR) {
00597 response->status = htons(0x0006);
00598 } else {
00599 response->status = htons(0x0001);
00600 }
00601 i->length = sizeof(raw_response_header);
00602 }
00603
00604 break;
00605
00606 case 0x0b:
00607 response = (raw_response_header *) i->sendBuffer;
00608 i->sendIt = i->sendBuffer;
00609 memset(response, 0, sizeof(raw_response_header));
00610
00611 response->magic = 0x81;
00612 response->opcode = header->header.opcode;
00613 response->opaque = header->header.opaque;
00614
00615
00616
00617
00618
00619
00620
00621 response->bodylen = htonl(snprintf(i->sendBuffer + sizeof(raw_response_header),
00622 sizeof(i->sendBuffer) - sizeof(raw_response_header), "%s-%s", VERSION,
00623 PACKAGE));
00624
00625 i->length = sizeof(raw_response_header) + ntohl(response->bodylen);
00626
00627 break;
00628
00629 case 0x10:
00630 INVERT_HEADER(header)
00631 prepareStatBinaryResponse(header, i);
00632
00633 break;
00634
00635 case 0x07:
00636 case 0x17:
00637 i->quit = 1;
00638 case 0x0a:
00639
00640 INVERT_HEADER(header)
00641
00642 response = (raw_response_header *) i->sendBuffer;
00643 i->sendIt = (char *) response;
00644 i->length = sizeof(raw_response_header);
00645 memset(response, 0, sizeof(raw_response_header));
00646
00647 response->magic = 0x81;
00648 response->opcode = header->header.opcode;
00649 response->opaque = header->header.opaque;
00650
00651 break;
00652
00653 default:
00654
00655 assert("Not implemented yet!");
00656
00657 char NOT_IMPL_MSG[] = "Not implemented yet!";
00658
00659 INVERT_HEADER(header)
00660
00661 response = (raw_response_header *) i->sendBuffer;
00662 i->sendIt = (char *) response;
00663 i->length = sizeof(raw_response_header) + sizeof(NOT_IMPL_MSG) - 1;
00664 memset(response, 0, sizeof(raw_response_header));
00665
00666 response->magic = 0x81;
00667 response->opcode = header->header.opcode;
00668 response->status = htonl(0x0081);
00669 response->bodylen = htonl(sizeof(NOT_IMPL_MSG) - 1);
00670
00671 memcpy(i->sendBuffer + sizeof(raw_response_header), NOT_IMPL_MSG,
00672 sizeof(NOT_IMPL_MSG) - 1);
00673 }
00674
00675 i->cursor += sizeof(raw_request_header) + header->header.bodylen;
00676 resetBuffer(i);
00677
00678 quiet = (header->header.opcode & 0x10 && header->header.opcode & 0x0f)
00679 || header->header.opcode == 0x0d;
00680
00681 if (i->queueBuffer || quiet) {
00682
00683
00684 while (i->queueCapacity - i->queueLength < i->length) {
00685
00686
00687 if (DEBUG_BINARY) {
00688 printf("reallocing queue...\n");
00689 }
00690
00691 i->queueCapacity += QUEUE_CHUNK_LENGTH;
00692 char * buf = (char *) malloc((i->queueCapacity));
00693 memcpy(buf, i->queueBuffer, i->queueLength);
00694 free(i->queueBuffer);
00695 i->queueBuffer = buf;
00696 }
00697
00698
00699 memcpy(i->queueBuffer + i->queueLength, i->sendIt, i->length);
00700 i->queueLength += i->length;
00701
00702 if (quiet) {
00703 if (DEBUG_BINARY) {
00704 printf("quiet mode...\n");
00705 }
00706 if (i->quit) {
00707 sweepConnection(i);
00708 return;
00709 }
00710 continue;
00711 } else {
00712
00713 i->sendIt = i->queueBuffer;
00714 i->length = i->queueLength;
00715 }
00716
00717 }
00718
00719 if (!quiet) {
00720
00721 i->writeOffset = 0;
00722
00723 printBinaryBuffer((void *) i->sendIt, i->length, "--->");
00724
00725
00726 int len = sendto(i->clientSocket, i->sendIt, i->length, 0, NULL, 0);
00727
00728 if (len <= 0) {
00729 perror("binaryHandler-send");
00730 if (logSwitch) {
00731 serverLogger->logSocketError();
00732 }
00733
00734 sweepConnection(i);
00735 return;
00736 }
00737
00738 i->writeOffset += len;
00739
00740 if (i->writeOffset == i->length) {
00741
00742
00743 if (i->quit) {
00744 sweepConnection(i);
00745 return;
00746 } else {
00747 freeQueue(i);
00748 }
00749
00750 } else {
00751
00752
00753 event_del(&i->readEvent);
00754 event_add(&i->writeEvent, &connectionTimeout);
00755
00756 return;
00757 }
00758
00759 }
00760
00761 }
00762
00763 }
00764
00769 inline void asciiHandler(Info * i) {
00770
00771 while (true) {
00772
00773 if (i->state == UNDEFINED) {
00774
00775 if (i->offset - i->cursor < sizeof(SHORTEST_CMD) - 1) {
00776 return;
00777 }
00778
00779 i->tokenFlag = true;
00780 i->endFlag = false;
00781 i->tokenCount = 0;
00782
00783 parseState(i);
00784
00785 if (i->state == QUIT) {
00786
00787 sweepConnection(i);
00788 return;
00789 } else if (i->state == ERROR_STATE) {
00790 REPORT_ERROR;
00791 return;
00792 } else if (i->state == UNDEFINED) {
00793 return;
00794 }
00795 }
00796
00797 if (i->state % 2 == 0 && i->state <= PREPEND_HEADER) {
00798
00799 while (i->cursor < i->offset) {
00800
00801 if (i->endFlag) {
00802 i->endFlag = false;
00803 break;
00804 }
00805
00806 switch (i->buffer[i->cursor]) {
00807 case ' ':
00808 i->tokenFlag = true;
00809 i->buffer[i->cursor] = '\0';
00810 break;
00811 case '\r':
00812
00813 i->buffer[i->cursor] = '\0';
00814
00815 if (i->buffer[++i->cursor] == '\n') {
00816
00817 if (i->tokenCount < 4 || (i->state == CAS_HEADER && i->tokenCount < 5)) {
00818 REPORT_ERROR;
00819 return;
00820 }
00821
00822 char * endptr;
00823 errno = 0;
00824 i->length = strtoul(i->buffer + i->tokens[3], &endptr, 10);
00825
00826 if (errno || *endptr != '\0') {
00827 REPORT_ERROR;
00828 return;
00829 }
00830
00831 i->state = (State) (i->state + 1);
00832 i->endFlag = true;
00833 } else {
00834 REPORT_ERROR;
00835 return;
00836 }
00837
00838 break;
00839 case '\n':
00840 REPORT_ERROR
00841 ;
00842 return;
00843 default:
00844 if (i->tokenFlag) {
00845 if ((i->tokenCount == 5 && i->state != CAS_HEADER) || i->tokenCount
00846 == 6) {
00847 REPORT_ERROR;
00848 return;
00849 }
00850 i->tokens[i->tokenCount++] = i->cursor;
00851 i->tokenFlag = false;
00852 }
00853
00854 }
00855
00856 i->cursor++;
00857
00858 }
00859
00860 } else if (i->state == GET_HEADER || i->state == GETS_HEADER) {
00861
00862 while (i->cursor < i->offset) {
00863
00864 bool b = 0;
00865
00866 switch (i->buffer[i->cursor]) {
00867 case '\r':
00868 i->endFlag = true;
00869 case ' ':
00870
00871 i->tokenFlag = true;
00872 i->buffer[i->cursor] = '\0';
00873
00874
00875
00876 i->value.size = Cache::MAX_OBJECT_SIZE;
00877
00878 if ((b = space->get(LightString(i->buffer + i->tokGet, i->cursor
00879 - i->tokGet), i->value))) {
00880
00881 if (sizeof(i->sendBuffer) - MAX_HEADER_LENGTH < i->value.size
00882 + sizeof(LINE_TERMINATOR) - 1
00883 + (i->endFlag ? sizeof(REQUEST_TERMINATOR) - 1 : 0)) {
00884
00885 REPORT_ERROR;
00886 return;
00887 }
00888
00889
00890
00891 unsigned int flag = i->value.flag;
00892 unsigned int size = i->value.size;
00893
00894
00895 char * end = i->sendBuffer + MAX_HEADER_LENGTH;
00896
00897 *--end = '\n';
00898 *--end = '\r';
00899
00900 if (i->state == GETS_HEADER) {
00901 unsigned int cas = i->value.cas_id;
00902
00903 if (!cas) {
00904 *--end = '0';
00905 }
00906 while (cas) {
00907 *--end = cas % 10 + '0';
00908 cas /= 10;
00909 }
00910 *--end = ' ';
00911 }
00912
00913 while (size) {
00914 *--end = size % 10 + '0';
00915 size /= 10;
00916 }
00917
00918 *--end = ' ';
00919
00920 if (!flag)
00921 *--end = '0';
00922
00923 while (flag) {
00924 *--end = flag % 10 + '0';
00925 flag /= 10;
00926 }
00927
00928 *--end = ' ';
00929
00930 end -= i->cursor - i->tokGet;
00931
00932 memcpy(end, i->buffer + i->tokGet, i->cursor - i->tokGet);
00933
00934 *--end = ' ';
00935 *--end = 'E';
00936 *--end = 'U';
00937 *--end = 'L';
00938 *--end = 'A';
00939 *--end = 'V';
00940
00941 i->length = (i->sendBuffer + MAX_HEADER_LENGTH) - end;
00942
00943 i->sendBuffer[MAX_HEADER_LENGTH + i->value.size] = '\r';
00944 i->sendBuffer[MAX_HEADER_LENGTH + i->value.size + 1] = '\n';
00945
00946 i->length += i->value.size + sizeof(LINE_TERMINATOR) - 1;
00947
00948 i->sendIt = end;
00949
00950 }
00951
00952 if (i->endFlag) {
00953
00954 i->cursor++;
00955
00956 if (i->buffer[i->cursor] == '\n') {
00957
00958 if (b) {
00959 char * dst = (char *) i->sendIt + i->length;
00960 *dst = 'E';
00961 *++dst = 'N';
00962 *++dst = 'D';
00963 *++dst = '\r';
00964 *++dst = '\n';
00965 i->length += sizeof(REQUEST_TERMINATOR) - 1;
00966 } else {
00967 i->sendIt = REQUEST_TERMINATOR;
00968 i->length = sizeof(REQUEST_TERMINATOR) - 1;
00969 }
00970
00971 i->state = GET_RESPONSE_END;
00972
00973
00974 resetBuffer(i);
00975
00976 } else {
00977 REPORT_ERROR;
00978 return;
00979 }
00980
00981 }
00982
00983 break;
00984 case '\n':
00985 REPORT_ERROR
00986 ;
00987 return;
00988 default:
00989 if (i->tokenFlag) {
00990 i->tokGet = i->cursor;
00991 i->tokenFlag = false;
00992 }
00993 }
00994
00995 i->cursor++;
00996
00997 if (b || i->endFlag) {
00998 break;
00999 }
01000 }
01001 } else if (i->state == STATS_HEADER) {
01002
01003 Stats stats = space->getStats();
01004
01005 i->state = STATS_RESPONSE;
01006 i->length = 0;
01007 i->sendIt = i->sendBuffer;
01008 i->length += sprintf(i->sendBuffer, "STAT pid %u\r\n", getpid());
01009 i->length += sprintf(i->sendBuffer + i->length, "STAT uptime %ld\r\n",
01010 Cache::getCurrentTime() - startTime);
01011 i->length += sprintf(i->sendBuffer + i->length, "STAT time %lu\r\n",
01012 Cache::getCurrentTime());
01013 i->length += sprintf(i->sendBuffer + i->length, "STAT version %s-%s\r\n", PACKAGE,
01014 VERSION);
01015 i->length += sprintf(i->sendBuffer + i->length, "STAT pointer_size %u\r\n",
01016 (unsigned) (8 * sizeof(void*)));
01017 i->length += sprintf(i->sendBuffer + i->length, "STAT rusage_user 0.000000\r\n");
01018 i->length += sprintf(i->sendBuffer + i->length, "STAT rusage_system 0.000000\r\n");
01019 uint64_t conn = 0, total_conn = 0;
01020 for (int j = 0; j < threadCount; j++) {
01021 conn += threads[j].connections;
01022 total_conn += threads[j].totalConnections;
01023 }
01024 i->length += sprintf(i->sendBuffer + i->length, "STAT curr_connections %llu\r\n",
01025 (long long unsigned int) conn);
01026 i->length += sprintf(i->sendBuffer + i->length, "STAT total_connections %llu\r\n",
01027 (long long unsigned int) total_conn);
01028 i->length += sprintf(i->sendBuffer + i->length, "STAT connection_structures 0\r\n");
01029 i->length += sprintf(i->sendBuffer + i->length, "STAT cmd_get %llu\r\n",
01030 stats.getsCount);
01031 i->length += sprintf(i->sendBuffer + i->length, "STAT cmd_set %llu\r\n",
01032 stats.setsCount);
01033 i->length += sprintf(i->sendBuffer + i->length, "STAT cmd_flush 0\r\n");
01034 i->length += sprintf(i->sendBuffer + i->length, "STAT get_hits %llu\r\n",
01035 stats.hitCount);
01036 i->length += sprintf(i->sendBuffer + i->length, "STAT get_misses %llu\r\n",
01037 stats.missCount);
01038 i->length += sprintf(i->sendBuffer + i->length, "STAT delete_misses 0\r\n");
01039 i->length += sprintf(i->sendBuffer + i->length, "STAT delete_hits 0\r\n");
01040 i->length += sprintf(i->sendBuffer + i->length, "STAT incr_misses 0\r\n");
01041 i->length += sprintf(i->sendBuffer + i->length, "STAT incr_hits 0\r\n");
01042 i->length += sprintf(i->sendBuffer + i->length, "STAT decr_misses 0\r\n");
01043 i->length += sprintf(i->sendBuffer + i->length, "STAT decr_hits 0\r\n");
01044 i->length += sprintf(i->sendBuffer + i->length, "STAT cas_misses 0\r\n");
01045 i->length += sprintf(i->sendBuffer + i->length, "STAT cas_hits 0\r\n");
01046 i->length += sprintf(i->sendBuffer + i->length, "STAT cas_badval 0\r\n");
01047 i->length += sprintf(i->sendBuffer + i->length, "STAT bytes_read %llu\r\n",
01048 stats.bytesRead);
01049 i->length += sprintf(i->sendBuffer + i->length, "STAT bytes_written %llu\r\n",
01050 stats.bytesWritten);
01051 i->length += sprintf(i->sendBuffer + i->length, "STAT limit_maxbytes 0\r\n");
01052 i->length += sprintf(i->sendBuffer + i->length, "STAT accepting_conns 0\r\n");
01053 i->length += sprintf(i->sendBuffer + i->length, "STAT listen_disbled_num 0\r\n");
01054 i->length += sprintf(i->sendBuffer + i->length, "STAT threads %u\r\n", threadCount);
01055 i->length += sprintf(i->sendBuffer + i->length, "STAT conn_yields 0\r\n");
01056 i->length += sprintf(i->sendBuffer + i->length, "STAT bytes 0\r\n");
01057 i->length += sprintf(i->sendBuffer + i->length, "STAT curr_items %llu\r\n",
01058 stats.itemsCount);
01059 i->length += sprintf(i->sendBuffer + i->length, "STAT total_items 0\r\n");
01060 i->length += sprintf(i->sendBuffer + i->length, "STAT evictions 0\r\n");
01061
01062 i->length += sprintf(i->sendBuffer + i->length, "STAT used_quota %llu\r\n",
01063 stats.usedQuota);
01064 i->length += sprintf(i->sendBuffer + i->length, "STAT hard_quota %llu\r\n",
01065 stats.hardQuota);
01066 i->length += sprintf(i->sendBuffer + i->length, "END\r\n");
01067
01068 } else if (i->state == INCR_HEADER || i->state == DECR_HEADER) {
01069
01070 while (i->cursor < i->offset) {
01071
01072 bool b = 0;
01073
01074 switch (i->buffer[i->cursor]) {
01075 case '\r':
01076 i->endFlag = true;
01077 case ' ':
01078
01079 i->tokenFlag = true;
01080 i->buffer[i->cursor] = '\0';
01081
01082 if (i->endFlag) {
01083
01084 i->cursor++;
01085
01086 if (i->buffer[i->cursor] == '\n' && (i->tokenCount == 2
01087 || (i->tokenCount == 3 && !strcmp(i->buffer + i->tokens[2],
01088 "noreply")))) {
01089
01090 uint64_t retVal;
01091 SpaceResult r;
01092 if (i->state == INCR_HEADER) {
01093 r = space->increment(LightString(i->buffer + i->tokens[0],
01094 i->tokens[1] - i->tokens[0] - 1), atoll(i->buffer
01095 + i->tokens[1]), &retVal);
01096 } else {
01097 r = space->decrement(LightString(i->buffer + i->tokens[0],
01098 i->tokens[1] - i->tokens[0] - 1), atoll(i->buffer
01099 + i->tokens[1]), &retVal);
01100 }
01101
01102 if (i->tokenCount == 2) {
01103
01104 if (!r) {
01105
01106 i->length = NUM_LENGTH(retVal) + 2;
01107
01108 char *end = i->sendBuffer + i->length - 1;
01109
01110 *end-- = '\n';
01111 *end-- = '\r';
01112
01113 while (retVal) {
01114 *end-- = retVal % 10 + '0';
01115 retVal /= 10;
01116 }
01117
01118 i->sendIt = i->sendBuffer;
01119 } else {
01120 i->sendIt = NOT_FOUND_MSG;
01121 i->length = sizeof(NOT_FOUND_MSG) - 1;
01122 }
01123
01124 } else {
01125 i->state = UNDEFINED;
01126 i->cursor++;
01127 resetBuffer(i);
01128 return;
01129 }
01130
01131 i->state = INCR_DECR_RESPONSE;
01132 resetBuffer(i);
01133
01134 } else {
01135 REPORT_ERROR;
01136 return;
01137 }
01138
01139 }
01140
01141 break;
01142 case '\n':
01143 REPORT_ERROR
01144 ;
01145 return;
01146 default:
01147 if (i->tokenFlag) {
01148 i->tokens[i->tokenCount++] = i->cursor;
01149 i->tokenFlag = false;
01150 }
01151 }
01152
01153 i->cursor++;
01154
01155 if (b || i->endFlag) {
01156 break;
01157 }
01158 }
01159 } else if (i->state == VERSION_CMD) {
01160 Stats stats = space->getStats();
01161
01162 i->state = STATS_RESPONSE;
01163 i->length = 0;
01164 i->sendIt = i->sendBuffer;
01165 i->length += sprintf(i->sendBuffer, "VERSION %s\r\n", VERSION);
01166 }
01167
01168 if (i->state % 2 == 1 && i->state <= PREPEND_VALUE) {
01169
01170 if (i->offset - i->cursor < i->length + 2) {
01171 return;
01172 }
01173
01174
01175 if (i->buffer[i->cursor + i->length] != '\r' || i->buffer[i->cursor + i->length + 1]
01176 != '\n') {
01177 REPORT_ERROR;
01178 return;
01179 }
01180
01181 SpaceResult res;
01182 char * endptr;
01183
01184 errno = 0;
01185 unsigned flag = strtoul(i->buffer + i->tokens[1], &endptr, 10);
01186 if (errno || endptr != i->buffer + i->tokens[2] - 1) {
01187 REPORT_ERROR;
01188 return;
01189 }
01190
01191 errno = 0;
01192 unsigned timeout = strtoul(i->buffer + i->tokens[2], &endptr, 10);
01193 if (errno || endptr != i->buffer + i->tokens[3] - 1) {
01194 REPORT_ERROR;
01195 return;
01196 }
01197
01198 unsigned long long int cas_id;
01199
01200 switch (i->state) {
01201 case SET_VALUE:
01202 res = space->set(LightString(i->buffer + i->tokens[0], i->tokens[1]
01203 - i->tokens[0] - 1), Value(i->buffer + i->cursor, i->length, flag),
01204 timeout);
01205 break;
01206 case CAS_VALUE:
01207
01208 errno = 0;
01209 cas_id = strtoull(i->buffer + i->tokens[4], &endptr, 10);
01210 if (errno || *endptr) {
01211 REPORT_ERROR;
01212 return;
01213 }
01214
01215 res = space->cas(LightString(i->buffer + i->tokens[0], i->tokens[1]
01216 - i->tokens[0] - 1), Value(i->buffer + i->cursor, i->length, flag),
01217 timeout, cas_id);
01218 break;
01219 case ADD_VALUE:
01220 res = space->add(LightString(i->buffer + i->tokens[0], i->tokens[1]
01221 - i->tokens[0] - 1), Value(i->buffer + i->cursor, i->length, flag),
01222 timeout);
01223 break;
01224 case APPEND_VALUE:
01225 res = space->append(LightString(i->buffer + i->tokens[0], i->tokens[1]
01226 - i->tokens[0] - 1), Value(i->buffer + i->cursor, i->length, flag));
01227 break;
01228 case PREPEND_VALUE:
01229 res = space->prepend(LightString(i->buffer + i->tokens[0], i->tokens[1]
01230 - i->tokens[0] - 1), Value(i->buffer + i->cursor, i->length, flag));
01231 break;
01232 case REPLACE_VALUE:
01233 res = space->replace(LightString(i->buffer + i->tokens[0], i->tokens[1]
01234 - i->tokens[0] - 1), Value(i->buffer + i->cursor, i->length, flag),
01235 timeout);
01236 break;
01237 default:
01238 assert(0);
01239 }
01240
01241 i->cursor += i->length + 2;
01242 resetBuffer(i);
01243
01244 if ((i->tokenCount == 5 && i->state != CAS_VALUE) || (i->tokenCount == 6 && i->state
01245 == CAS_VALUE)) {
01246
01247 static int noreply1 = *((int *) "nore");
01248 static int noreply2 = *((int *) "ply");
01249
01250 char * b = i->buffer + i->tokens[i->tokenCount - 1];
01251 if (*((int *) b) != noreply1 || *((int *) (b + 4)) != noreply2) {
01252 REPORT_ERROR;
01253 return;
01254 }
01255 i->state = UNDEFINED;
01256 if (i->cursor < i->offset) {
01257 continue;
01258 } else {
01259 return;
01260 }
01261 } else {
01262 if (!res) {
01263 i->sendIt = STORED_MSG;
01264 i->length = sizeof(STORED_MSG) - 1;
01265 } else if (res == RESULT_CAS_FAIL) {
01266 i->sendIt = EXISTS_MSG;
01267 i->length = sizeof(EXISTS_MSG) - 1;
01268 } else {
01269 i->sendIt = NOT_STORED_MSG;
01270 i->length = sizeof(NOT_STORED_MSG) - 1;
01271 }
01272 }
01273
01274 }
01275
01276 i->writeOffset = 0;
01277
01278
01279 int len = sendto(i->clientSocket, i->sendIt, i->length, (i->state == GET_HEADER || i->state
01280 == GETS_HEADER ? MSG_MORE : 0) | MSG_DONTWAIT, NULL, 0);
01281
01282 if (len <= 0) {
01283 perror("asciiHandler-send");
01284 if (logSwitch) {
01285 serverLogger->logSocketError();
01286 }
01287 sweepConnection(i);
01288 return;
01289 }
01290
01291 i->writeOffset += len;
01292
01293 if (i->writeOffset == i->length) {
01294
01295
01296 if (i->state != GET_HEADER && i->state != GETS_HEADER) {
01297 i->state = UNDEFINED;
01298 }
01299
01300 } else {
01301
01302 event_del(&i->readEvent);
01303 event_add(&i->writeEvent, &connectionTimeout);
01304 return;
01305 }
01306
01307 }
01308 }
01309
01310 inline void writeCallback(int socket, short event, void * arg) {
01311
01312 Info * i = (Info *) arg;
01313
01314 if (event == EV_TIMEOUT && i->length - i->writeOffset > 0) {
01315
01316 if (logSwitch) {
01317 serverLogger->logConnectionTimeout();
01318 }
01319
01320 sweepConnectionWrite(i);
01321
01322 return;
01323 }
01324
01325 int len = sendto(socket, i->sendIt, i->length - i->writeOffset, (i->state == GET_HEADER
01326 || i->state == GETS_HEADER ? MSG_MORE : 0) | MSG_DONTWAIT, NULL, 0);
01327
01328 if (len <= 0) {
01329 perror("writeCallback-send");
01330 if (logSwitch) {
01331 serverLogger->logSocketError();
01332 }
01333
01334 sweepConnectionWrite(i);
01335 return;
01336 }
01337
01338 i->writeOffset += len;
01339
01340 if (i->writeOffset == i->length) {
01341
01342
01343 if (i->quit) {
01344 sweepConnectionWrite(i);
01345 return;
01346 }
01347 freeQueue(i);
01348
01349
01350
01351 if (i->state != GET_HEADER && i->state != GETS_HEADER) {
01352 i->state = UNDEFINED;
01353 }
01354
01355 if (i->offset > i->cursor) {
01356 i->continueFlag = true;
01357 timeval t = { 0, 0 };
01358 event_add(&i->readEvent, &t);
01359 } else {
01360 event_add(&i->readEvent, &connectionTimeout);
01361 }
01362 event_del(&i->writeEvent);
01363 return;
01364
01365 }
01366
01367 }
01368
01369 void prepareStatBinaryResponse(request_header * header, Info * i) {
01370
01371 static const char* ALL_STATS[] = { "pid", "uptime", "time", "version", "pointer_size",
01372 "rusage_user", "rusage_system", "curr_items", "total_items", "bytes",
01373 "curr_connections", "total_connections", "connection_structures", "cmd_get", "cmd_set",
01374 "get_hits", "get_misses", "evictions", "bytes_read", "bytes_written", "limit_maxbytes",
01375 "threads", "" };
01376
01377 Stats stats = space->getStats();
01378 i->sendIt = i->sendBuffer;
01379 i->length = 0;
01380 raw_request_header * response;
01381
01382 char * key = (char *) header + sizeof(raw_request_header);
01383 for (unsigned int j = 0; j < sizeof(ALL_STATS) / sizeof(char *); j++) {
01384 if (!header->header.keylen || !strncmp(ALL_STATS[j], key, header->header.keylen)
01385 || !*ALL_STATS[j]) {
01386 response = (raw_request_header *) (i->sendBuffer + i->length);
01387 memset(response, 0, sizeof(raw_request_header));
01388
01389 int keylen = strlen(ALL_STATS[j]);
01390
01391 response->magic = 0x81;
01392 response->opcode = 0x10;
01393 response->keylen = htons(keylen);
01394 response->opaque = header->header.opaque;
01395
01396 strncpy((char *) response + sizeof(raw_response_header), ALL_STATS[j], keylen);
01397 int len = getStat(&stats, ALL_STATS[j], (char *) response + sizeof(raw_response_header)
01398 + keylen, 100);
01399 response->bodylen = htonl(keylen + len);
01400
01401 i->length += sizeof(raw_response_header) + keylen + len;
01402 }
01403 }
01404
01405 }
01406
01407 int getStat(Stats * stats, const char * arg, char * buffer, int maxLength) {
01408
01409 #define __STAT(_arg, format, value) if (!strcmp(arg, _arg)) { \
01410 return snprintf(buffer, maxLength, format, value); \
01411 }
01412
01413 #define __STAT2(_arg, format, value, value2) if (!strcmp(arg, _arg)) { \
01414 return snprintf(buffer, maxLength, format, value, value2); \
01415 }
01416
01417 __STAT("pid", "%d", getpid())
01418 __STAT("uptime", "%ld", (long int) (Cache::getCurrentTime() - startTime))
01419 __STAT("time", "%ld", (long int) Cache::getCurrentTime())
01420 __STAT2("version", "%s-%s", PACKAGE, VERSION)
01421 __STAT("pointer_size", "%u", (unsigned) (8 * sizeof (void*)))
01422 __STAT("rusage_user", "%8.6f", 0.0)
01423 __STAT("rusage_system", "%8.6f", 0.0)
01424 __STAT("curr_items", "%d", 0)
01425 __STAT("total_items", "%d", 0)
01426 __STAT("bytes", "%d", 0)
01427 __STAT("curr_connections", "%d", 0)
01428 __STAT("total_connections", "%d", 0)
01429 __STAT("connection_structures", "%d", 0)
01430 __STAT("cmd_get", "%llu", stats->getsCount)
01431 __STAT("cmd_set", "%llu", stats->setsCount)
01432 __STAT("get_hits", "%llu", stats->getsCount)
01433 __STAT("get_misses", "%llu", stats->missCount)
01434 __STAT("evictions", "%d", 0)
01435 __STAT("bytes_read", "%llu", stats->bytesRead)
01436 __STAT("bytes_written", "%llu", stats->bytesWritten)
01437 __STAT("limit_maxbytes", "%d", 0)
01438 __STAT("threads", "%d", threadCount)
01439
01440 return 0;
01441 }
01442
01443 inline void * threadWorker(void * arg) {
01444
01445 Thread * thread = (Thread *) arg;
01446
01447 int ret = event_base_loop(thread->eventBase, 0);
01448
01449 if (ret == -1) {
01450 perror("event_base_loop");
01451 exit(1);
01452 } else if (ret == 0) {
01453 cerr << "no events added...\n";
01454 exit(1);
01455 } else {
01456 cout << "Thread " << thread->index << " has ended\n";
01457 }
01458
01459 return NULL;
01460 }
01461
01462 inline void resetBuffer(Info * i) {
01463 if (i->offset == i->cursor) {
01464 i->offset = i->cursor = 0;
01465 } else if (i->offset > i->cursor && sizeof(i->buffer) - i->cursor < Cache::MAX_OBJECT_SIZE
01466 + MAX_HEADER_LENGTH) {
01467 for (char * dst = i->buffer, *src = i->buffer + i->cursor; src != i->buffer + i->offset; dst++, src++) {
01468 *dst = *src;
01469 }
01470 i->offset -= i->cursor;
01471 i->cursor = 0;
01472 }
01473 }
01474
01475 inline void parseState(Info * i) {
01476
01477 #define __CHECK_STATE(s, st) if (*((int*)(i->buffer + i->cursor)) == *((int*)(s))) { \
01478 if (st == GET_HEADER || st <= ADD_HEADER){ \
01479 i->state = st; \
01480 i->cursor += 4; \
01481 return; \
01482 } else if (!strncmp(i->buffer+i->cursor+4, s+4, \
01483 sizeof(s)-5)){ \
01484 i->state = st; \
01485 i->cursor += sizeof(s)-1; \
01486 return; \
01487 } \
01488 }
01489
01490 __CHECK_STATE("get ", GET_HEADER)
01491 __CHECK_STATE ("set ", SET_HEADER)
01492 __CHECK_STATE ("cas ", CAS_HEADER)
01493 __CHECK_STATE ("add ", ADD_HEADER)
01494 __CHECK_STATE("gets ", GETS_HEADER)
01495 __CHECK_STATE("quit\r\n", QUIT)
01496 __CHECK_STATE("append ", APPEND_HEADER)
01497 __CHECK_STATE("replace ", REPLACE_HEADER)
01498 __CHECK_STATE("prepend ", PREPEND_HEADER)
01499 __CHECK_STATE("incr ", INCR_HEADER)
01500 __CHECK_STATE("decr ", DECR_HEADER)
01501 __CHECK_STATE("stats\r\n", STATS_HEADER)
01502 __CHECK_STATE("version\r\n", VERSION_CMD)
01503 i->state = ERROR_STATE;
01504 if (DEBUG_ASCII) {
01505 cout << "Nieznany stan! " << i->buffer + i->cursor << "\n";
01506 }
01507 }
01508
01509 void loadConfFile(const string& serverConfFilename) {
01510
01511 ifstream file(serverConfFilename.c_str());
01512
01513 if (!file) {
01514 throw FileNotFound(serverConfFilename);
01515 }
01516
01517 bool assigned[7] = { };
01518 unsigned int index;
01519
01520 const int bufferSize = 128;
01521 char buffer[bufferSize];
01522
01523 int timeout = 0;
01524
01525 int line = 0;
01526
01527 while (file.getline(buffer, bufferSize)) {
01528
01529 ++line;
01530
01531 string s = removeBlanks(string(buffer));
01532
01533 if (s.length() == 0) {
01534 continue;
01535 }
01536
01537 index = -1;
01538
01539 #define __CHECK_OPT_N(opt, var) if (++index + 1 && !s.compare(0, sizeof(opt), opt "=")) { \
01540 var = extractValue(s, sizeof(opt), serverConfFilename, line); \
01541 } else
01542 #define __CHECK_OPT_S(opt, var) if (++index + 1 && !s.compare(0, sizeof(opt), opt "=")) { \
01543 var = s.substr(sizeof(opt)); \
01544 } else
01545
01546 __CHECK_OPT_N("port", port)
01547
01548 __CHECK_OPT_N ("hardQuota", hardQuota)
01549 __CHECK_OPT_N("threadCount", threadCount)
01550 __CHECK_OPT_N("saveServerLogs", logSwitch)
01551 __CHECK_OPT_N("connectionTimeout", timeout)
01552 __CHECK_OPT_N("serverLoggerInterval", serverLoggerInterval)
01553 __CHECK_OPT_S("serverLogsDirectory", serverLogsDirectory)
01554 throw ConfigParseException("Config parse error", serverConfFilename, line);
01555
01556 assert(sizeof(assigned) > index);
01557
01558 if (assigned[index]) {
01559 throw ConfigSemanticException("Double variable definition", serverConfFilename, line);
01560 }
01561 assigned[index] = 1;
01562
01563 }
01564
01565 for (unsigned int i = 0; i < sizeof(assigned); i++) {
01566 if (!assigned[i]) {
01567 throw ConfigSemanticException("Configuration file is insufficient. "
01568 "Check the documentation!", serverConfFilename, line);
01569 }
01570 }
01571
01572 if (!file.eof() && file.fail()) {
01573 throw IOException(serverConfFilename);
01574 }
01575
01576 connectionTimeout.tv_sec = timeout / 1000;
01577 connectionTimeout.tv_usec = (timeout % 1000) * 1000;
01578
01579 std::cout << "Server:\tserver configuration read from: " << serverConfFilename << endl;
01580
01581 }
01582
01583 void printBinaryBuffer(void * buffer, int n, const char * label) {
01584
01585 if (DEBUG_BINARY >= 10) {
01586
01587 static const int row = 8;
01588
01589 if (label) {
01590 printf("%s\n", label);
01591 }
01592 for (int i = 0; i < n;) {
01593 printf("%5d: ", i);
01594 int j;
01595 for (j = 0; j < row && i < n; j++, i++) {
01596 printf(" %02x", *((unsigned char *) buffer + i));
01597 }
01598 printf("%*s | ", 4 * (row - j) + 1, " ");
01599 i -= j;
01600 for (j = 0; j < row && i < n; j++, i++) {
01601 char c = *((unsigned char *) buffer + i);
01602 if (c >= ' ' && c <= 'z') {
01603 printf(".%c", c);
01604 } else {
01605 printf(". ");
01606 }
01607 }
01608 printf(".");
01609 printf("\n");
01610 }
01611 printf("\n");
01612
01613 }
01614
01615 }
01616
01617 void printBinaryCommand(uint8_t cmd) {
01618
01619 #define __COMMAND(__code, __text) if (cmd == __code) { \
01620 printf(__text); \
01621 }
01622
01623 if (DEBUG_BINARY >= 3) {
01624 printf(" === ");
01625 __COMMAND(0x00, "get");
01626 __COMMAND(0x01, "set");
01627 __COMMAND(0x02, "add");
01628 __COMMAND(0x03, "replace");
01629 __COMMAND(0x0c, "getk");
01630 __COMMAND(0x0e, "append");
01631 __COMMAND(0x0f, "prepend");
01632 __COMMAND(0x07, "quit");
01633 __COMMAND(0x0b, "version");
01634 __COMMAND(0x10, "stat");
01635 __COMMAND(0x05, "increment");
01636 __COMMAND(0x06, "decrement");
01637 __COMMAND(0x0d, "getKQ");
01638 __COMMAND(0x09, "getQ");
01639 __COMMAND(0x11, "setQ");
01640 __COMMAND(0x12, "addQ");
01641 __COMMAND(0x13, "replaceQ");
01642 __COMMAND(0x19, "appendQ");
01643 __COMMAND(0x1a, "prependQ");
01644 __COMMAND(0x0a, "no-op");
01645 printf(" ===\n");
01646 }
01647
01648 }
01649
01650 void freeQueue(Info * i) {
01651 if (i->queueBuffer) {
01652 free(i->queueBuffer);
01653 i->queueBuffer = NULL;
01654 i->queueLength = 0;
01655 i->queueCapacity = 0;
01656 }
01657 }
01658
01659 void signalHandler(int n) {
01660 printf("Server:\tSIGINT received!\n");
01661 #if DEBUG_BINARY >= 20
01662 fclose(f);
01663 #endif
01664 exit(0);
01665 }
01666
01667 void daemonize() {
01668 pid_t forkval;
01669
01670 forkval = fork();
01671
01672
01673 if (forkval < 0) {
01674 perror("fork");
01675 exit(1);
01676 } else if (forkval > 0) {
01677 printf("Demonizing server (PID - %d) ...\n\n", (int) forkval);
01678
01679 exit(0);
01680 }
01681
01682
01683
01684
01685 if (setsid() < 0) {
01686 perror("setsid error!");
01687 exit(1);
01688 }
01689
01690
01691 close(STDIN_FILENO);
01692 close(STDOUT_FILENO);
01693 close(STDERR_FILENO);
01694
01695 }