00001
00006 #ifndef EVENTSERVER_H_
00007 #define EVENTSERVER_H_
00008
00009 #include <ctime>
00010 #include <iostream>
00011 #include <fstream>
00012 #include <string>
00013 #include <cstring>
00014 #include <vector>
00015 #include <cstdlib>
00016 #include <unistd.h>
00017 #include <ctime>
00018 #include <csignal>
00019 #include <pthread.h>
00020 #include <event.h>
00021 #include <fcntl.h>
00022 #include <cerrno>
00023 #include <sys/socket.h>
00024 #include <netinet/in.h>
00025 #include <arpa/inet.h>
00026 #include <cmath>
00027 #include <cstdlib>
00028 #include <cstdio>
00029 #include <cassert>
00030 #include <byteswap.h>
00031
00032
00033
00074 #define HAVE_CONFIG_H 1
00075
00076 #ifdef HAVE_CONFIG_H
00077 #include "config.h"
00078 #else
00079 #error "No config.h provided!"
00080 #endif
00081
00082 #include "Cache.h"
00083 #include "ServerLogger.h"
00084 #include "functions.h"
00085
00089 #define DEBUG_BINARY 0
00090
00094 #define DEBUG_ASCII 0
00095
00096 #if DEBUG_BINARY >= 20
00097
00100 FILE * f;
00101 #endif
00102
00106 enum State {
00107 SET_HEADER, SET_VALUE, CAS_HEADER, CAS_VALUE, ADD_HEADER, ADD_VALUE,
00108 REPLACE_HEADER,
00109 REPLACE_VALUE,
00110 APPEND_HEADER,
00111 APPEND_VALUE,
00112 PREPEND_HEADER,
00113 PREPEND_VALUE,
00114 UNDEFINED,
00115 QUIT,
00116 VERSION_CMD,
00117 GET_HEADER,
00118 GETS_HEADER,
00119 ERROR_STATE,
00120 GET_RESPONSE_END,
00121 STATS_HEADER,
00122 STATS_RESPONSE,
00123 INCR_HEADER,
00124 DECR_HEADER,
00125 INCR_DECR_RESPONSE
00126 };
00127
00131 #define DEBUG_EV_SERVER 1
00132
00136 const char STORED_MSG[] = "STORED\r\n";
00140 const char NOT_STORED_MSG[] = "NOT_STORED\r\n";
00144 const char EXISTS_MSG[] = "EXISTS\r\n";
00148 const char NOT_FOUND_MSG[] = "NOT_FOUND\r\n";
00152 const char ERROR_MSG[] = "ERROR\r\n";
00156 const char LINE_TERMINATOR[] = "\r\n";
00160 const char REQUEST_TERMINATOR[] = "END\r\n";
00161
00165 #define MAX_HEADER_LENGTH 300
00166
00170 #define REPORT_ERROR { \
00171 i->cursor = i->offset = i->tokenCount = i->writeOffset = 0; \
00172 i->sendIt = ERROR_MSG; \
00173 i->length = sizeof(ERROR_MSG) - 1; \
00174 \
00175 i->state = ERROR_STATE; \
00176 \
00177 event_del(&i->readEvent); \
00178 event_add(&i->writeEvent, &connectionTimeout); \
00179 \
00180 }
00181
00185 struct Thread {
00186
00190 int index;
00194 pthread_t threadID;
00198 struct event_base * eventBase;
00203 struct event pipeEvent;
00207 int in;
00211 int out;
00212
00216 uint64_t connections;
00217
00221 uint64_t totalConnections;
00222
00223 };
00224
00229 struct Info {
00230
00234 Thread * t;
00235
00239 struct event readEvent;
00243 struct event writeEvent;
00244
00248 int clientSocket;
00252 char buffer[2 * Cache::MAX_OBJECT_SIZE];
00253
00257 char sendBuffer[Cache::MAX_OBJECT_SIZE + MAX_HEADER_LENGTH + sizeof(LINE_TERMINATOR) - 1
00258 + sizeof(REQUEST_TERMINATOR) - 1];
00262 const char * sendIt;
00263
00267 Value value;
00271 unsigned int offset;
00275 unsigned int writeOffset;
00279 unsigned int cursor;
00283 int tokens[6];
00287 int tokGet;
00291 int tokenCount;
00295 bool tokenFlag;
00299 bool endFlag;
00304 bool continueFlag;
00308 unsigned int length;
00313 bool active;
00314
00318 State state;
00319
00323 char protocol;
00324
00328 char quit;
00329
00333 char * queueBuffer;
00334
00338 unsigned int queueLength;
00339
00343 unsigned int queueCapacity;
00344
00345 };
00346
00350 struct workerData {
00354 int clientSocket;
00355
00359 int threadNo;
00360
00361 }__attribute__((__packed__));
00362
00366 const int QUEUE_CHUNK_LENGTH = 1024 * 1024;
00367
00371 struct raw_request_header {
00375 uint8_t magic;
00379 uint8_t opcode;
00383 uint16_t keylen;
00387 uint8_t extlen;
00391 uint8_t datatype;
00395 uint16_t reserved;
00399 uint32_t bodylen;
00403 uint32_t opaque;
00407 uint64_t cas;
00408 }__attribute__((__packed__));
00409
00413 struct set_request_header {
00417 raw_request_header header;
00421 uint32_t flags;
00425 uint32_t expiration;
00426 }__attribute__((__packed__));
00427
00431 struct increment_request_header {
00435 raw_request_header header;
00439 uint64_t delta;
00443 uint64_t initial_value;
00447 uint32_t expiration;
00448 }__attribute__((__packed__));
00449
00453 union request_header {
00457 raw_request_header header;
00461 set_request_header set_header;
00465 increment_request_header increment_header;
00466 };
00467
00472 struct raw_response_header {
00476 uint8_t magic;
00480 uint8_t opcode;
00484 uint16_t keylen;
00488 uint8_t extlen;
00492 uint8_t datatype;
00496 uint16_t status;
00500 uint32_t bodylen;
00504 uint32_t opaque;
00508 uint64_t cas;
00509 }__attribute__((__packed__));
00510
00514 struct hit_response_header {
00518 raw_response_header header;
00522 uint32_t flags;
00523 }__attribute__((__packed__));
00524
00528 struct increment_response_header {
00532 raw_response_header header;
00536 uint64_t value;
00537 }__attribute__((__packed__));
00538
00542 union response_header {
00546 raw_response_header header;
00550 hit_response_header hit_header;
00554 increment_response_header increment_header;
00555 };
00556
00560 inline void loadConfFile(const string& serverConfFilename);
00564 inline void * threadWorker(void * arg);
00568 inline void acceptCallback(int serverSocket, short event, void * arg);
00572 inline void pipeCallback(int fd, short event, void * arg);
00578 inline void readCallback(int socket, short event, void * arg);
00583 inline void writeCallback(int socket, short event, void * arg);
00587 inline void binaryHandler(Info * i);
00591 inline void asciiHandler(Info * i);
00596 inline void parseState(Info * i);
00602 inline void resetBuffer(Info * i);
00603
00605
00606
00607
00608
00612 #define sweepConnectionTemplate(i, event) { \
00613 i->t->connections--; \
00614 event_del(&event); \
00615 close(i->clientSocket); \
00616 if (i->queueBuffer) { \
00617 free(i->queueBuffer); \
00618 } \
00619 free(i); \
00620 }
00621
00625 #define sweepConnection(i) sweepConnectionTemplate(i, i->readEvent)
00626
00630 #define sweepConnectionWrite(i) sweepConnectionTemplate(i, i->writeEvent)
00631
00635 void printBinaryBuffer(void * buffer, int n, const char * label = NULL);
00636
00640 void printBinaryCommand(uint8_t cmd);
00641
00645 void signalHandler(int n);
00646
00650 void prepareStatBinaryResponse(request_header * header, Info * i);
00651
00655 int getStat(Stats * stats, const char * arg, char * buffer, int maxLength);
00656
00660 void freeQueue(Info * i);
00661
00665 void daemonize();
00666
00667
00668
00669
00670
00674 unsigned port;
00675
00679 std::string userName = SINGLE_USER;
00680
00684 std::string spaceName = SINGLE_SPACE;
00685
00689 unsigned softQuota;
00690
00694 unsigned hardQuota;
00695
00699 unsigned serverLoggerInterval;
00700
00704 bool logSwitch;
00705
00709 std::string serverLogsDirectory;
00710
00714 int threadCount;
00715
00719 timeval connectionTimeout;
00720
00724 Thread * threads;
00725
00729 time_t startTime;
00730
00734 Cache * cache;
00735
00739 Space * space;
00740
00744 ServerLogger * serverLogger;
00745
00746 #endif