ae.c是redis事件框架的具體實(shí)現(xiàn),這篇blog對(duì)這份源碼進(jìn)行簡(jiǎn)單說(shuō)明。其中談到了作者已經(jīng)標(biāo)記的一些未來(lái)可能做的改進(jìn)。
ae.c
1 #include <stdio.h> 2 #include <sys/time.h> 3 #include <sys/types.h> 4 #include <unistd.h> 5 #include <stdlib.h> 6 7 #include " ae.h " 8 #include " zmalloc.h " 9 #include " config.h " 10 11 /* Include the best multiplexing layer supported by this system. 12 * The following should be ordered by performances, descending. */
//為了支持不同的平臺(tái),redis用相同的接口封裝了系統(tǒng)提供的多路復(fù)用層代碼。接口共提供如下函數(shù):aeApiCreate\aeApiFree\aeApiAddEvent\aeApiDelEvent\aeApiPoll\aeApiName函數(shù),以及一個(gè)struct aeApiState
//通過(guò)包含不同的頭文件,選擇不同的底層實(shí)現(xiàn)
//按如下順序,效率遞減: epoll > kqueue > select 13 #ifdef HAVE_EPOLL 14 #include " ae_epoll.c " 15 #else 16 #ifdef HAVE_KQUEUE 17 #include " ae_kqueue.c " 18 #else 19 #include " ae_select.c " 20 #endif 21 #endif 22
//初始化函數(shù),創(chuàng)建事件循環(huán),函數(shù)內(nèi)部alloc一個(gè)結(jié)構(gòu),用于表示事件狀態(tài),供后續(xù)其他函數(shù)作為參數(shù)使用 23 aeEventLoop *aeCreateEventLoop( void ) { 24 aeEventLoop * eventLoop; 25 int i; 26 27 eventLoop = zmalloc( sizeof (* eventLoop)); 28 if (!eventLoop) return NULL;
//時(shí)間event用鏈表存儲(chǔ) 29 eventLoop->timeEventHead = NULL; 30 eventLoop->timeEventNextId = 0 ;
//表示是否停止事件循環(huán) 31 eventLoop->stop = 0 ;
//maxfd只由ae_select.c使用,后續(xù)有些相關(guān)的處理,如果使用epoll的話,其實(shí)可以進(jìn)行簡(jiǎn)化 32 eventLoop->maxfd = - 1 ;
//每次調(diào)用epoll\select前調(diào)用的函數(shù),由框架使用者注冊(cè) 33 eventLoop->beforesleep = NULL; 34 if (aeApiCreate(eventLoop) == - 1 ) { 35 zfree(eventLoop); 36 return NULL; 37 } 38 /* Events with mask == AE_NONE are not set. So let's initialize the 39 * vector with it. */
//將所有的文件描述符的mask設(shè)置為無(wú)效值,作為初始化 40 for (i = 0 ; i < AE_SETSIZE; i++ ) 41 eventLoop->events[i].mask = AE_NONE; 42 return eventLoop; 43 } 44
//底層實(shí)現(xiàn)執(zhí)行釋放操作后,釋放state的內(nèi)存 45 void aeDeleteEventLoop(aeEventLoop * eventLoop) { 46 aeApiFree(eventLoop); 47 zfree(eventLoop); 48 } 49
//停止事件循環(huán),redis作為一個(gè)無(wú)限循環(huán)的server,在redis.c中并沒有任何一處調(diào)用此函數(shù) 50 void aeStop(aeEventLoop * eventLoop) { 51 eventLoop->stop = 1 ; 52 } 53
//創(chuàng)建文件fd事件,加入事件循環(huán)監(jiān)控列表,使得后續(xù)epoll\select時(shí)將會(huì)測(cè)試這個(gè)文件描述符的可讀性(可寫性) 54 int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, 55 aeFileProc *proc, void * clientData) 56 { 57 if (fd >= AE_SETSIZE) return AE_ERR; 58 aeFileEvent *fe = &eventLoop-> events[fd]; 59 60 if (aeApiAddEvent(eventLoop, fd, mask) == - 1 ) 61 return AE_ERR; 62 fe->mask |= mask;
//注冊(cè)函數(shù) 63 if (mask & AE_READABLE) fe->rfileProc = proc; 64 if (mask & AE_WRITABLE) fe->wfileProc = proc; 65 fe->clientData = clientData;
//更新maxfd 66 if (fd > eventLoop-> maxfd) 67 eventLoop->maxfd = fd; 68 return AE_OK; 69 } 70 71 void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) 72 { 73 if (fd >= AE_SETSIZE) return ; 74 aeFileEvent *fe = &eventLoop-> events[fd]; 75 76 if (fe->mask == AE_NONE) return ; 77 fe->mask = fe->mask & (~ mask); 78 if (fd == eventLoop->maxfd && fe->mask == AE_NONE) { 79 /* Update the max fd */ 80 int j; 81 82 for (j = eventLoop->maxfd- 1 ; j >= 0 ; j-- ) 83 if (eventLoop->events[j].mask != AE_NONE) break ; 84 eventLoop->maxfd = j; 85 } 86 aeApiDelEvent(eventLoop, fd, mask); 87 } 88
//返回值為該文件描述符關(guān)注的事件類型(可讀、可寫) 89 int aeGetFileEvents(aeEventLoop *eventLoop, int fd) { 90 if (fd >= AE_SETSIZE) return 0 ; 91 aeFileEvent *fe = &eventLoop-> events[fd]; 92 93 return fe-> mask; 94 } 95 96 static void aeGetTime( long *seconds, long * milliseconds) 97 { 98 struct timeval tv; 99 100 gettimeofday(& tv, NULL); 101 *seconds = tv.tv_sec; 102 *milliseconds = tv.tv_usec/ 1000 ; 103 } 104 105 static void aeAddMillisecondsToNow( long long milliseconds, long *sec, long * ms) { 106 long cur_sec, cur_ms, when_sec, when_ms; 107 108 aeGetTime(&cur_sec, & cur_ms); 109 when_sec = cur_sec + milliseconds/ 1000 ; 110 when_ms = cur_ms + milliseconds% 1000 ; 111 if (when_ms >= 1000 ) { 112 when_sec ++ ; 113 when_ms -= 1000 ; 114 } 115 *sec = when_sec; 116 *ms = when_ms; 117 } 118 119 long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, 120 aeTimeProc *proc, void * clientData, 121 aeEventFinalizerProc * finalizerProc) 122 { 123 long long id = eventLoop->timeEventNextId++ ; 124 aeTimeEvent * te; 125 126 te = zmalloc( sizeof (* te)); 127 if (te == NULL) return AE_ERR; 128 te->id = id;
//time event執(zhí)行時(shí)間為絕對(duì)時(shí)間點(diǎn) 129 aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te-> when_ms); 130 te->timeProc = proc; 131 te->finalizerProc = finalizerProc; 132 te->clientData = clientData;
//time event鏈表為無(wú)序表,直接插入到鏈表頭 133 te->next = eventLoop-> timeEventHead; 134 eventLoop->timeEventHead = te; 135 return id; 136 } 137
//從鏈表中刪除,是以id作為key查找的(順序遍歷) 138 int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id) 139 { 140 aeTimeEvent *te, *prev = NULL; 141 142 te = eventLoop-> timeEventHead; 143 while (te) { 144 if (te->id == id) { 145 if (prev == NULL) 146 eventLoop->timeEventHead = te-> next; 147 else 148 prev->next = te-> next; 149 if (te-> finalizerProc) 150 te->finalizerProc(eventLoop, te-> clientData); 151 zfree(te); 152 return AE_OK; 153 } 154 prev = te; 155 te = te-> next; 156 } 157 return AE_ERR; /* NO event with the specified ID found */ 158 } 159 160 /* Search the first timer to fire. 161 * This operation is useful to know how many time the select can be 162 * put in sleep without to delay any event. (沒大看懂),也許是給一個(gè)阻塞時(shí)間的上限值 163 * If there are no timers NULL is returned. 164 * 165 * Note that's O(N) since time events are unsorted. 166 * Possible optimizations (not needed by Redis so far, but...): 167 * 1) Insert the event in order, so that the nearest is just the head. 168 * Much better but still insertion or deletion of timers is O(N). 169 * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)). 170 */ 171 static aeTimeEvent *aeSearchNearestTimer(aeEventLoop * eventLoop) 172 { 173 aeTimeEvent *te = eventLoop-> timeEventHead; 174 aeTimeEvent *nearest = NULL; 175 176 while (te) { 177 if (!nearest || te->when_sec < nearest->when_sec || 178 (te->when_sec == nearest->when_sec && 179 te->when_ms < nearest-> when_ms)) 180 nearest = te; 181 te = te-> next; 182 } 183 return nearest; 184 } 185
//對(duì)time event進(jìn)行處理 186 /* Process time events */ 187 static int processTimeEvents(aeEventLoop * eventLoop) {
//處理的事件數(shù)
188 int processed = 0 ; 189 aeTimeEvent * te; 190 long long maxId; 191 192 te = eventLoop-> timeEventHead; 193 maxId = eventLoop->timeEventNextId- 1 ; 194 while (te) { 195 long now_sec, now_ms; 196 long long id; 197 198 if (te->id > maxId) { 199 te = te-> next; 200 continue ; 201 } 202 aeGetTime(&now_sec, & now_ms); 203 if (now_sec > te->when_sec || 204 (now_sec == te->when_sec && now_ms >= te-> when_ms)) 205 { 206 int retval; 207 208 id = te-> id;
//如果執(zhí)行時(shí)間到或已超出,則執(zhí)行對(duì)應(yīng)的時(shí)間處理函數(shù) 209 retval = te->timeProc(eventLoop, id, te-> clientData); 210 processed++ ; 211 /* After an event is processed our time event list may 212 * no longer be the same, so we restart from head. //因?yàn)闀r(shí)間處理函數(shù)timeProc可能改變此鏈表 213 * Still we make sure to don't process events registered 214 * by event handlers itself in order to don't loop forever.? 215 * To do so we saved the max ID we want to handle. 216 * 217 * FUTURE OPTIMIZATIONS: 218 * Note that this is NOT great algorithmically. Redis uses 219 * a single time event so it's not a problem but the right 220 * way to do this is to add the new elements on head, and 221 * to flag deleted elements in a special way for later 222 * deletion (putting references to the nodes to delete into 223 * another linked list). */
//如果這個(gè)時(shí)間處理函數(shù)不再繼續(xù)執(zhí)行,則從time event的鏈表中刪除事件;否則,retval為繼續(xù)執(zhí)行的時(shí)間間隔(單位為ms),在當(dāng)前的timeevent struct的時(shí)間值上進(jìn)行增加 224 if (retval != AE_NOMORE) { 225 aeAddMillisecondsToNow(retval,&te->when_sec,&te-> when_ms); 226 } else { 227 aeDeleteTimeEvent(eventLoop, id); 228 } 229 te = eventLoop-> timeEventHead; 230 } else { 231 te = te-> next; 232 } 233 } 234 return processed; 235 } 236 237 /* Process every pending(懸而未決) time event, then every pending file event 238 * (that may be registered by time event callbacks just processed). 239 * Without special flags the function sleeps until some file event 240 * fires, or when the next time event occurrs (if any). 241 * 242 * If flags is 0, the function does nothing and returns. 243 * if flags has AE_ALL_EVENTS set, all the kind of events are processed. 244 * if flags has AE_FILE_EVENTS set, file events are processed. 245 * if flags has AE_TIME_EVENTS set, time events are processed. 246 * if flags has AE_DONT_WAIT set the function returns ASAP until all 247 * the events that's possible to process without to wait are processed. 248 * 249 * The function returns the number of events processed. */ 250 int aeProcessEvents(aeEventLoop *eventLoop, int flags) 251 { 252 int processed = 0 , numevents; 253 254 /* Nothing to do? return ASAP */ 255 if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0 ; 256 257 /* Note that we want call select() even if there are no 258 * file events to process as long as we want to process time 259 * events, in order to sleep until the next time event is ready 260 * to fire. */ 261 if (eventLoop->maxfd != - 1 || 262 ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { 263 int j; 264 aeTimeEvent *shortest = NULL; 265 struct timeval tv, * tvp; 266 267 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) 268 shortest = aeSearchNearestTimer(eventLoop); 269 if (shortest) { 270 long now_sec, now_ms; 271 272 /* Calculate the time missing for the nearest 273 * timer to fire. */ 274 aeGetTime(&now_sec, & now_ms); 275 tvp = & tv; 276 tvp->tv_sec = shortest->when_sec - now_sec; 277 if (shortest->when_ms < now_ms) { 278 tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; 279 tvp->tv_sec --; 280 } else { 281 tvp->tv_usec = (shortest->when_ms - now_ms)*1000; 282 } 283 if (tvp->tv_sec < 0) tvp->tv_sec = 0; 284 if (tvp->tv_usec < 0) tvp->tv_usec = 0; 疑似有bug,比如當(dāng)前時(shí)間為 1s +2ms ,shortest時(shí)間為0s+1ms的case 285 } else { 286 /* If we have to check for events but need to return 287 * ASAP because of AE_DONT_WAIT we need to se the timeout 288 * to zero */ 289 if (flags & AE_DONT_WAIT) { 290 tv.tv_sec = tv.tv_usec = 0 ; 291 tvp = & tv; 292 } else { 293 /* Otherwise we can block */ 294 tvp = NULL; /* wait forever */ 295 } 296 } 297
//指定這個(gè)tvp是說(shuō)這個(gè)tvp到期時(shí),至少由time event可以執(zhí)行 298 numevents = aeApiPoll(eventLoop, tvp); 299 for (j = 0 ; j < numevents; j++ ) { 300 aeFileEvent *fe = &eventLoop->events[eventLoop-> fired[j].fd]; 301 int mask = eventLoop-> fired[j].mask; 302 int fd = eventLoop-> fired[j].fd; 303 int rfired = 0 ; 304 305 /* note the fe->mask & mask & ... code: maybe an already processed 306 * event removed an element that fired and we still didn't 307 * processed, so we check if the event is still valid. */ 308 if (fe->mask & mask & AE_READABLE) { 309 rfired = 1 ; 310 fe->rfileProc(eventLoop,fd,fe-> clientData,mask); 311 }
//避免同一個(gè)注冊(cè)函數(shù)被調(diào)用兩次 312 if (fe->mask & mask & AE_WRITABLE) { 313 if (!rfired || fe->wfileProc != fe-> rfileProc) 314 fe->wfileProc(eventLoop,fd,fe-> clientData,mask); 315 } 316 processed++ ; 317 } 318 } 319 /* Check time events */ 320 if (flags & AE_TIME_EVENTS) 321 processed += processTimeEvents(eventLoop); 322 323 return processed; /* return the number of processed file/time events */ 324 } 325
//對(duì)select的一個(gè)簡(jiǎn)單封裝,沒有太大意義 326 /* Wait for millseconds until the given file descriptor becomes 327 * writable/readable/exception */ 328 int aeWait( int fd, int mask, long long milliseconds) { 329 struct timeval tv; 330 fd_set rfds, wfds, efds; 331 int retmask = 0 , retval; 332 333 tv.tv_sec = milliseconds/ 1000 ; 334 tv.tv_usec = (milliseconds% 1000 )* 1000 ; 335 FD_ZERO(& rfds); 336 FD_ZERO(& wfds); 337 FD_ZERO(& efds); 338 339 if (mask & AE_READABLE) FD_SET(fd,& rfds); 340 if (mask & AE_WRITABLE) FD_SET(fd,& wfds); 341 if ((retval = select (fd+ 1 , &rfds, &wfds, &efds, &tv)) > 0 ) { 342 if (FD_ISSET(fd,&rfds)) retmask |= AE_READABLE; 343 if (FD_ISSET(fd,&wfds)) retmask |= AE_WRITABLE; 344 return retmask; 345 } else { 346 return retval; 347 } 348 } 349 350 void aeMain(aeEventLoop * eventLoop) { 351 eventLoop->stop = 0 ; 352 while (!eventLoop-> stop) { 353 if (eventLoop->beforesleep != NULL) 354 eventLoop-> beforesleep(eventLoop); 355 aeProcessEvents(eventLoop, AE_ALL_EVENTS); 356 } 357 } 358 359 char *aeGetApiName( void ) { 360 return aeApiName(); 361 } 362 363 void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc * beforesleep) { 364 eventLoop->beforesleep = beforesleep; 365 }
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
