00001
00002
00003
00004
00005
00006
00007
00008 #include <stdio.h>
00009 #include <stdlib.h>
00010 #include <syslog.h>
00011 #include <errno.h>
00012 #include <unistd.h>
00013 #include <string.h>
00014
00015
00016 #include <sys/types.h>
00017 #include <sys/socket.h>
00018
00019
00020 #include <fcntl.h>
00021
00022 #include <netinet/in.h>
00023 #include <arpa/inet.h>
00024
00025
00026 #include <time.h>
00027
00028 #include <pthread.h>
00029
00030
00031 #include <event.h>
00032
00033 #include <assert.h>
00034
00035 #include <signal.h>
00036
00037 #include <stdlib.h>
00038
00039 int attack;
00040
00041 int admin_port;
00042
00043 char** keys;
00044 char** values;
00045
00046 int* keyLength;
00047 int* valueLength;
00048
00049 int count=0;
00050 int threadCount;
00051 int getsPerSet;
00052
00053 int reqSent;
00054 int valReqSent;
00055
00056 int timeout;
00057
00058 struct timeval tv;
00059
00060 pthread_t * thread;
00061
00062 struct statistics {
00066 unsigned long long wrongAnswerCount;
00070 unsigned long long correctAnswerCount;
00074 unsigned long long hitCount;
00078 unsigned long long missCount;
00082 unsigned long long sentSetsCount;
00086 unsigned long long sentGetsCount;
00090 unsigned long long storedCount;
00094 unsigned long long notStoredCount;
00098 double totalTime;
00102 double totalSetResponseTime;
00106 double totalGetResponseTime;
00107 };
00108
00109 struct timespec start, stop, getSent, getResp, setSent, setResp;
00110
00111 #define TOTAL_STATS_SIZE sizeof(struct statistics)
00112
00113 struct statistics stats;
00114
00115 #define OP_SET 1
00116 #define OP_GET 2
00117 #define OP_GETS 3
00118
00119 #define TMPL_SET "set %s %d %d %d\r\n%s\r\n"
00120 #define TMPL_GET "get %s\r\n"
00121 #define TMPL_GETS "gets %s\r\n"
00122
00127 struct request {
00131 struct event *ev;
00135 struct event *respev;
00139 unsigned int op :2;
00143 const char *req;
00147 int reqsize;
00151 struct event_base *base;
00155 char *response;
00159 off_t offset;
00163 off_t respoffset;
00167 off_t resppos;
00171 off_t tokcount;
00175 off_t flagpos;
00179 off_t objsizepos;
00183 off_t objpos;
00187 off_t mark;
00188 };
00189
00190
00191
00192
00193
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231 void init_objects(const char * file_name) {
00232 FILE *fp = NULL;
00233 if ((fp = fopen(file_name, "rb")) == NULL) {
00234 fprintf(stderr, "Unable to open file.\n");
00235 return;
00236 }
00237
00238 fread(&count, sizeof(int), 1, fp);
00239
00240 printf("Read %d input values.\n", count);
00241
00242 keys = (char**) malloc(count * sizeof(char*));
00243 values = (char**) malloc(count * sizeof(char*));
00244
00245 keyLength = (int*) malloc(count * sizeof(int));
00246 valueLength = (int*) malloc(count * sizeof(int));
00247
00248 int i;
00249 for (i = 0; i < count; i++) {
00250 fread(&keyLength[i], sizeof(int), 1, fp);
00251 fread(&valueLength[i], sizeof(int), 1, fp);
00252 keys[i] = (char*) malloc((keyLength[i]+1) * sizeof(char));
00253 fread(keys[i], keyLength[i], 1, fp);
00254 values[i] = (char*) malloc((valueLength[i]+1) * sizeof(char));
00255 fread(values[i], valueLength[i], 1, fp);
00256 keys[i][keyLength[i]]='\0';
00257 values[i][valueLength[i]]='\0';
00258
00259 }
00260
00261 fclose(fp);
00262 }
00263
00264
00265
00266
00267
00268
00269 void nothing(int n) {
00270 }
00271
00272 void * writeEv(void* arg) {
00273
00274 static int num = 0;
00275 int ret;
00276
00277 struct sigaction action;
00278
00279 action.sa_handler = nothing;
00280 sigemptyset(&action.sa_mask);
00281 action.sa_flags = SA_NOCLDSTOP;
00282
00283 if (sigaction(SIGUSR1, &action, NULL)) {
00284 perror("sigaction");
00285 exit(1);
00286 }
00287
00288 usleep(rand() % 100000);
00289 fprintf(stderr, "start thread %d\n", ++num);
00290
00291 do {
00292 ret = event_base_loop((struct event_base *) arg, 0);
00293 printf("X");
00294 fflush(stdout);
00295 } while (ret > -1);
00296 fprintf(stderr, "Error occurred\n");
00297 return NULL;
00298
00299 }
00300
00301 void * admin(void* arg) {
00302 struct sockaddr_in serv_addr, cli_addr;
00303 int fd, cli_fd;
00304
00305 fd = socket(AF_INET, SOCK_STREAM, 0);
00306 memset(&serv_addr, 0, sizeof(serv_addr));
00307 serv_addr.sin_family = AF_INET;
00308 serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
00309 serv_addr.sin_port = htons(admin_port);
00310
00311 int flag = 1;
00312 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag)) < 0) {
00313 perror("setsockopt(SO_REUSEADDR)");
00314 }
00315
00316 if (bind(fd, (struct sockaddr*) &serv_addr, sizeof(serv_addr))) {
00317 perror("bind");
00318 exit(1);
00319 }
00320
00321 if (listen(fd, 1)) {
00322 perror("listen");
00323 exit(1);
00324 }
00325
00326 socklen_t cli_len;
00327 cli_len = sizeof(struct sockaddr_in);
00328
00329 printf("Started admin channel.\n");
00330
00331 while (1) {
00332 cli_fd = accept(fd, (struct sockaddr*) &cli_addr, &cli_len);
00333
00334 #define START '1'
00335 #define STOP '9'
00336 #define STATS '5'
00337
00338 char type;
00339 if (recv(cli_fd, (void*) &type, 1, 0) == 1) {
00340
00341 switch (type) {
00342 case START:
00343 printf("waking threads...\n");
00344 attack = 1;
00345 int i;
00346 for (i = 0; i < threadCount; i++) {
00347 pthread_kill(thread[i], SIGUSR1);
00348 }
00349 break;
00350 case STOP:
00351 printf("stoping...\n");
00352 attack = 0;
00353 clock_gettime(CLOCK_REALTIME, &stop);
00354 stats.totalTime += (double) (stop.tv_sec - start.tv_sec)
00355 + (double) (stop.tv_nsec - start.tv_nsec)
00356 / 1000000000.0;
00357 break;
00358 case STATS:
00359 printf("Stats requested.\n");
00360
00361 char sendBuf[TOTAL_STATS_SIZE];
00362
00363 memcpy((void*) (sendBuf), (void*) &stats, TOTAL_STATS_SIZE);
00364
00365 if (send(cli_fd, (void*) sendBuf, TOTAL_STATS_SIZE, 0)
00366 == TOTAL_STATS_SIZE) {
00367 printf("Stats sent.\n");
00368 }
00369
00370 break;
00371 }
00372 }
00373
00374 close(cli_fd);
00375 }
00376
00377 close(fd);
00378 }
00379
00380 const char * build_req(short type, int key_index, int *req_len) {
00381 int len;
00382 char *req;
00383
00384 if (type == OP_SET) {
00385 len = sizeof(TMPL_SET) / sizeof(TMPL_SET[0]) + keyLength[key_index]
00386 + valueLength[key_index] + 10 + 1;
00387 req = malloc(len);
00388 len = sprintf(req, TMPL_SET, keys[key_index], key_index, rand()
00389 % timeout + 1, valueLength[key_index], values[key_index]);
00390
00391 } else if (type == OP_GET) {
00392 len = sizeof(TMPL_GET) / sizeof(TMPL_GET[0]) + keyLength[key_index] + 1;
00393 req = malloc(len);
00394
00395 len = sprintf(req, TMPL_GET, keys[key_index]);
00396 } else {
00397 assert(0);
00398 }
00399 *req_len = len;
00400 return req;
00401 }
00402
00403 #define RET_NEED_MORE 1
00404 #define RET_OK 2
00405 #define RET_ERROR -1
00406
00407 #define SHORTEST_SET_RESPONSE "STORED\r\n"
00408 #define LONGEST_SET_RESPONSE "NOT_STORED\r\n"
00409 #define SHORTEST_SET_RESPONSE_LENGTH sizeof(SHORTEST_SET_RESPONSE)-1
00410 #define LONGEST_SET_RESPONSE_LENGTH sizeof(LONGEST_SET_RESPONSE)-1
00411
00412 #define SHORTEST_GET_RESPONSE "END\r\n"
00413
00414 #define LONGEST_GET_RESPONSE_LENGTH 2*1024*1024
00415 #define SHORTEST_GET_RESPONSE_LENGTH 5
00416
00417 static int validate_set_response(const char *response, int len) {
00418 #define OK_CMD "STORED\r\n"
00419 #define FAIL_CMD "NOT_STORED\r\n"
00420 assert(response != NULL);
00421
00422 if (len < SHORTEST_SET_RESPONSE_LENGTH) {
00423 return RET_NEED_MORE;
00424 }
00425
00426 if (response[0] == 'S') {
00427 if (strncmp(response, OK_CMD, len) == 0) {
00428 return RET_OK;
00429 }
00430 }
00431
00432 if (response[0] == 'N') {
00433 if (len < LONGEST_SET_RESPONSE_LENGTH) {
00434 return RET_NEED_MORE;
00435 }
00436 }
00437 return RET_ERROR;
00438 }
00439
00440 void write_callback(int fd, short event, void *arg);
00441
00442 void read_callback(int fd, short event, void *arg) {
00443
00444 struct request *req = (struct request *) arg;
00445 int ret;
00446
00447 if (event == EV_TIMEOUT) {
00448
00449 return;
00450 }
00451
00452 if (req->op == OP_SET) {
00453 if (req->response == NULL) {
00454 req->response = malloc(LONGEST_SET_RESPONSE_LENGTH);
00455 }
00456 assert(req->response != NULL);
00457 int toRead = 0;
00458 if (req->respoffset < SHORTEST_SET_RESPONSE_LENGTH) {
00459 toRead = SHORTEST_SET_RESPONSE_LENGTH - req->respoffset;
00460 } else if (req->respoffset > SHORTEST_SET_RESPONSE_LENGTH
00461 && req->respoffset < LONGEST_SET_RESPONSE_LENGTH) {
00462 toRead = LONGEST_SET_RESPONSE_LENGTH - req->respoffset;
00463 }
00464
00465
00466 ret = read(fd, (req->response + req->respoffset), toRead);
00467
00468
00469 if (ret < 0) {
00470 if (errno == EAGAIN || errno == EINPROGRESS) {
00471 event_add(req->respev, &tv);
00472 return;
00473 }
00474 }
00475 req->respoffset += ret;
00476 ret = validate_set_response(req->response, req->respoffset);
00477 if (ret == RET_NEED_MORE) {
00478 event_add(req->respev, &tv);
00479 return;
00480 } else if (ret == RET_OK) {
00481 stats.storedCount++;
00482 stats.correctAnswerCount++;
00483 free((void *) req->req);
00484 req->req = NULL;
00485 req->offset = 0;
00486 event_set(req->respev, fd, EV_WRITE, write_callback, req);
00487 event_base_set(req->base, req->respev);
00488 event_add(req->respev, &tv);
00489 req->respoffset = 0;
00490 clock_gettime(CLOCK_REALTIME, &setResp);
00491 stats.totalSetResponseTime += (double) (setResp.tv_sec
00492 - setSent.tv_sec) + (double) (setResp.tv_nsec
00493 - setSent.tv_nsec) / 1000000000.0;
00494 } else {
00495 stats.notStoredCount++;
00496 stats.correctAnswerCount++;
00497 free((void *) req->req);
00498 req->req = NULL;
00499 req->offset = 0;
00500 event_set(req->respev, fd, EV_WRITE, write_callback, req);
00501 event_base_set(req->base, req->respev);
00502 event_add(req->respev, &tv);
00503 req->respoffset = 0;
00504 clock_gettime(CLOCK_REALTIME, &setResp);
00505 stats.totalSetResponseTime += (double) (setResp.tv_sec
00506 - setSent.tv_sec) + (double) (setResp.tv_nsec
00507 - setSent.tv_nsec) / 1000000000.0;
00508 }
00509 }
00510 }
00511
00512 void read_get_callback(int fd, short event, void *arg) {
00513
00514 struct request *req = (struct request *) arg;
00515
00516 if (event == EV_TIMEOUT) {
00517
00518 return;
00519 }
00520
00521 if (req->response == NULL) {
00522 req->resppos = 0;
00523 req->tokcount = 0;
00524 req->response = malloc(LONGEST_GET_RESPONSE_LENGTH);
00525 req->mark = 0;
00526 }
00527 assert(req->response != NULL);
00528
00529 int len = recv(fd, req->response + req->respoffset,
00530 LONGEST_GET_RESPONSE_LENGTH, MSG_DONTWAIT);
00531
00532 if (len == 0) {
00533 event_set(req->respev, fd, EV_READ, read_get_callback, req);
00534 event_base_set(req->base, req->respev);
00535 event_add(req->respev, &tv);
00536 return;
00537 } else if (len > 0) {
00538 req->respoffset += len;
00539 }
00540
00541
00542 while (req->resppos < req->respoffset) {
00543
00544 if (req->tokcount < 3 || req->tokcount == 7) {
00545 switch (req->response[req->resppos]) {
00546 case '\r':
00547 req->resppos++;
00548 break;
00549 case '\n':
00550 if (req->resppos - req->mark == SHORTEST_GET_RESPONSE_LENGTH
00551 - 1) {
00552 if (!strncmp(req->response + req->mark,
00553 SHORTEST_GET_RESPONSE, SHORTEST_GET_RESPONSE_LENGTH)) {
00554 if (req->tokcount == 7) {
00555 int size = atoi(req->response + req->objsizepos);
00556 int flag = atoi(req->response + req->flagpos);
00557
00558 if (size == valueLength[flag] && !strncmp(
00559 req->response + req->objpos, values[flag],
00560 size)) {
00561 stats.hitCount++;
00562 stats.correctAnswerCount++;
00563 } else {
00564 stats.hitCount++;
00565 stats.wrongAnswerCount++;
00566 }
00567 } else {
00568 stats.missCount++;
00569 stats.correctAnswerCount++;
00570 }
00571 clock_gettime(CLOCK_REALTIME, &getResp);
00572 stats.totalGetResponseTime += (double) (getResp.tv_sec
00573 - getSent.tv_sec) + (double) (getResp.tv_nsec
00574 - getSent.tv_nsec) / 1000000000.0;
00575 } else {
00576 assert(0);
00577 }
00578 free((void *) req->req);
00579 req->mark = 0;
00580 req->tokcount = 0;
00581 req->req = NULL;
00582 req->offset = 0;
00583 req->respoffset = 0;
00584 req->resppos = 0;
00585
00586 event_set(req->respev, fd, EV_WRITE, write_callback, req);
00587 event_base_set(req->base, req->respev);
00588 event_add(req->respev, &tv);
00589 return;
00590 }
00591 break;
00592 case ' ':
00593 if (req->tokcount == 1) {
00594 req->flagpos = req->resppos + 1;
00595 } else if (req->tokcount == 2) {
00596 req->objsizepos = req->resppos + 1;
00597 }
00598 req->response[req->resppos] = '\0';
00599 req->tokcount++;
00600 req->resppos++;
00601 break;
00602 default:
00603 req->resppos++;
00604 break;
00605 }
00606 } else if (req->tokcount == 3) {
00607 switch (req->response[req->resppos]) {
00608 case '\r':
00609 req->tokcount++;
00610 req->response[req->resppos] = '\0';
00611 req->resppos++;
00612 break;
00613 case '\n':
00614 assert(0);
00615 break;
00616 default:
00617 req->resppos++;
00618 break;
00619 }
00620 } else if (req->tokcount == 4) {
00621 switch (req->response[req->resppos]) {
00622 case '\r':
00623 assert(0);
00624 break;
00625 case '\n':
00626 req->tokcount++;
00627 req->objpos = ++req->resppos;
00628 break;
00629 default:
00630 assert(0);
00631 break;
00632 }
00633 } else if (req->tokcount == 5) {
00634 switch (req->response[req->resppos]) {
00635 case '\r':
00636 req->tokcount++;
00637 req->resppos++;
00638 break;
00639 case '\n':
00640 assert(0);
00641 break;
00642 default:
00643 req->resppos++;
00644 break;
00645 }
00646 } else if (req->tokcount == 6) {
00647 switch (req->response[req->resppos]) {
00648 case '\r':
00649 assert(0);
00650 break;
00651 case '\n':
00652 req->tokcount++;
00653 req->mark = ++req->resppos;
00654 break;
00655 default:
00656 assert(0);
00657 break;
00658 }
00659 }
00660 }
00661
00662 event_set(req->respev, fd, EV_READ, read_get_callback, req);
00663 event_base_set(req->base, req->respev);
00664 event_add(req->respev, &tv);
00665 }
00666
00667 void write_callback(int fd, short event, void *arg) {
00668
00669
00670
00671
00672 struct request *req = (struct request *) arg;
00673 int ret;
00674
00675 if (event == EV_TIMEOUT) {
00676
00677 printf("EV_TIMEOUT\n");
00678 return;
00679 }
00680
00681 if (req->req == NULL) {
00682
00683
00684 if (!attack) {
00685 printf("atacking thread pauses...\n");
00686 while (!attack) {
00687 pause();
00688 }
00689 printf("atacking thread continues to attack!\n");
00690 clock_gettime(CLOCK_REALTIME, &start);
00691 }
00692 assert(req->offset == 0);
00693
00694 if (getsPerSet == -1) {
00695 req->op = OP_GET;
00696 } else if (!(rand() % (getsPerSet + 1))) {
00697 req->op = OP_SET;
00698 } else
00699 req->op = OP_GET;
00700
00701 req->req = build_req(req->op, rand() % count, &(req->reqsize));
00702 }
00703
00704
00705
00706 if ((ret = write(fd, req->req, req->reqsize - req->offset)) < 0) {
00707 if (errno == EAGAIN || errno == EINPROGRESS) {
00708 fprintf(stderr, "EAGAIN\n");
00709 event_add(req->ev, &tv);
00710 } else {
00711 fprintf(stderr, "Write error: %s", strerror(errno));
00712
00713 }
00714 } else {
00715 req->offset += ret;
00716 if (req->offset == req->reqsize) {
00717 if (req->op == OP_SET) {
00718 clock_gettime(CLOCK_REALTIME, &setSent);
00719 stats.sentSetsCount++;
00720 event_set(req->respev, fd, EV_READ, read_callback, req);
00721 } else if (req->op == OP_GET) {
00722 clock_gettime(CLOCK_REALTIME, &getSent);
00723 stats.sentGetsCount++;
00724 event_set(req->respev, fd, EV_READ, read_get_callback, req);
00725 }
00726 free((void *) req->req);
00727 req->req = NULL;
00728 req->offset = 0;
00729
00730 event_base_set(req->base, req->respev);
00731 event_add(req->respev, &tv);
00732 } else {
00733 event_add(req->respev, &tv);
00734 }
00735 }
00736
00737
00738 }
00739
00740 int main(int argc, char** argv) {
00741
00742 attack = 0;
00743
00744 srand(time(NULL));
00745
00746 if (argc < 8) {
00747 printf(
00748 "Usage: ./client admin_port host port fileName threadCount getsPerSet timeout\n");
00749 return 1;
00750 }
00751
00752 admin_port = atoi(argv[1]);
00753
00754 pthread_t adminThread;
00755
00756 pthread_create(&adminThread, NULL, admin, NULL);
00757
00758 memset((void*) &stats, 0, TOTAL_STATS_SIZE);
00759
00760 reqSent = 0;
00761 valReqSent = 0;
00762
00763 threadCount = atoi(argv[5]);
00764 getsPerSet = atoi(argv[6]);
00765 timeout = atoi(argv[7]);
00766
00767 init_objects(argv[4]);
00768
00769
00770
00771
00772
00773
00774
00775
00776
00777
00778
00779
00780
00781 tv.tv_sec = 1000;
00782 tv.tv_usec = 0;
00783
00784 int fd[threadCount];
00785
00786 struct event_base * eb[threadCount + 2];
00787
00788 struct event ev[threadCount + 2];
00789 struct event respEv[threadCount];
00790
00791 int i;
00792 for (i = 0; i < threadCount + 2; i++)
00793 eb[i] = event_init();
00794
00795 struct sockaddr_in serv_addr;
00796
00797 for (i = 0; i < threadCount; i++) {
00798 fd[i] = socket(AF_INET, SOCK_STREAM, 0);
00799 memset(&serv_addr, 0, sizeof(serv_addr));
00800 serv_addr.sin_family = AF_INET;
00801 serv_addr.sin_addr.s_addr = inet_addr(argv[2]);
00802 serv_addr.sin_port = htons(atoi(argv[3]));
00803 if (connect(fd[i], (struct sockaddr*) &serv_addr, sizeof(serv_addr))
00804 == -1)
00805 printf("Connection failed\n");
00806 else
00807 printf("Connected\n");
00808 }
00809
00810 struct request reqTab[threadCount];
00811 for (i = 0; i < threadCount; i++) {
00812 reqTab[i].ev = &ev[i];
00813 reqTab[i].respev = &respEv[i];
00814 reqTab[i].req = NULL;
00815 reqTab[i].offset = 0;
00816 reqTab[i].respoffset = 0;
00817 reqTab[i].reqsize = 0;
00818 reqTab[i].response = NULL;
00819 reqTab[i].op = OP_SET;
00820 reqTab[i].base = eb[i];
00821 }
00822
00823 for (i = 0; i < threadCount; i++) {
00824 event_set(&ev[i], fd[i], EV_WRITE, write_callback, (void*) &reqTab[i]);
00825 event_base_set(eb[i], &ev[i]);
00826 event_add(&ev[i], &tv);
00827 }
00828
00829 thread = malloc(threadCount * sizeof(pthread_t));
00830
00831 for (i = 0; i < threadCount; i++)
00832 pthread_create(&thread[i], NULL, writeEv, (void *) eb[i]);
00833
00834 char c;
00835 scanf("%c", &c);
00836
00837 return 0;
00838 }