欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

linux內(nèi)核select/poll,epoll實現(xiàn)與區(qū)別

 更新時間:2016年11月20日 21:15:24   投稿:mdxy-dxy  
這篇文章主要介紹了linux內(nèi)核select/poll,epoll實現(xiàn)與區(qū)別,需要的朋友可以參考下

下面文章在這段時間內(nèi)研究 select/poll/epoll的內(nèi)核實現(xiàn)的一點心得體會:
select,poll,epoll都是多路復(fù)用IO的函數(shù),簡單說就是在一個線程里,可以同時處理多個文件描述符的讀寫。
select/poll的實現(xiàn)很類似,epoll是從select/poll擴展而來,主要是為了解決select/poll天生的缺陷。
epoll在內(nèi)核版本2.6以上才出現(xiàn)的新的函數(shù),而他們在linux內(nèi)核中的實現(xiàn)都是十分相似。
這三種函數(shù)都需要設(shè)備驅(qū)動提供poll回調(diào)函數(shù),對于套接字而言,他們是 tcp_poll,udp_poll和datagram_poll;
對于自己開發(fā)的設(shè)備驅(qū)動而言,是自己實現(xiàn)的poll接口函數(shù)。

select實現(xiàn)(2.6的內(nèi)核,其他版本的內(nèi)核,應(yīng)該都相差不多)
應(yīng)用程序調(diào)用select,進入內(nèi)核調(diào)用sys_select,做些簡單初始化工作,接著進入 core_sys_select,
此函數(shù)主要工作是把描述符集合從用戶空間復(fù)制到內(nèi)核空間, 最終進入do_select,完成其主要的功能。
do_select里,調(diào)用 poll_initwait,主要工作是注冊poll_wait的回調(diào)函數(shù)為__pollwait,
當(dāng)在設(shè)備驅(qū)動的poll回調(diào)函數(shù)里調(diào)用poll_wait,其實就是調(diào)用__pollwait,
__pollwait的主要工作是把當(dāng)前進程掛載到等待隊列里,當(dāng)?shù)却氖录絹砭蜁拘汛诉M程。
接著執(zhí)行for循環(huán),循環(huán)里首先遍歷每個文件描述符,調(diào)用對應(yīng)描述符的poll回調(diào)函數(shù),檢測是否就緒,
遍歷完所有描述符之后,只要有描述符處于就緒狀態(tài),信號中斷,出錯或者超時,就退出循環(huán),
否則會調(diào)用schedule_xxx函數(shù),讓當(dāng)前進程睡眠,一直到超時或者有描述符就緒被喚醒。
接著又會再次遍歷每個描述符,調(diào)用poll再次檢測。
如此循環(huán),直到符合條件才會退出。
以下是 2.6.31內(nèi)核的有關(guān)select函數(shù)的部分片段:
他們調(diào)用關(guān)系:
select --> sys_select --> core_sys_select --> do_select

int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
{
  ktime_t expire, *to = NULL;
  struct poll_wqueues table;
  poll_table *wait;
  int retval, i, timed_out = 0;
  unsigned long slack = 0;
  
  ///這里為了獲得集合中的最大描述符,這樣可減少循環(huán)中遍歷的次數(shù)。
  ///也就是為什么linux中select第一個參數(shù)為何如此重要了
  rcu_read_lock();
  retval = max_select_fd(n, fds);
  rcu_read_unlock();
  if (retval < 0)
    return retval;
  n = retval;

  ////初始化 poll_table結(jié)構(gòu),其中一個重要任務(wù)是把 __pollwait函數(shù)地址賦值給它,
  poll_initwait(&table);
  wait = &table.pt;
  if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
    wait = NULL;
    timed_out = 1;
  }
  if (end_time && !timed_out)
    slack = estimate_accuracy(end_time);

  retval = 0;
  ///主循環(huán),將會在這里完成描述符的狀態(tài)輪訓(xùn)
  for (;;) {
    unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;

    inp = fds->in; outp = fds->out; exp = fds->ex;
    rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;

    for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
      unsigned long in, out, ex, all_bits, bit = 1, mask, j;
      unsigned long res_in = 0, res_out = 0, res_ex = 0;
      const struct file_operations *f_op = NULL;
      struct file *file = NULL;
      ///select中 fd_set 以及 do_select 中的 fd_set_bits 參數(shù),都是按照位來保存描述符,意思是比如申請一個1024位的內(nèi)存,
      ///如果第 28位置1,說明此集合有 描述符 28, 
      in = *inp++; out = *outp++; ex = *exp++;
      all_bits = in | out | ex; // 檢測讀寫異常3個集合中有無描述符
      if (all_bits == 0) {
        i += __NFDBITS;
        continue;
      }

      for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {
        int fput_needed;
        if (i >= n)
          break;
        if (!(bit & all_bits))
          continue;
        file = fget_light(i, &fput_needed); ///通過 描述符 index 獲得 struct file結(jié)構(gòu)指針,
        if (file) {
          f_op = file->f_op; //通過 struct file 獲得 file_operations,這是操作文件的回調(diào)函數(shù)集合。
          mask = DEFAULT_POLLMASK;
          if (f_op && f_op->poll) {
            wait_key_set(wait, in, out, bit);
            mask = (*f_op->poll)(file, wait); //調(diào)用我們的設(shè)備中實現(xiàn)的 poll函數(shù),
                                     //因此,為了能讓select正常工作,在我們設(shè)備驅(qū)動中,必須要提供poll的實現(xiàn),
          }
          fput_light(file, fput_needed);
          if ((mask & POLLIN_SET) && (in & bit)) {
            res_in |= bit;
            retval++;
            wait = NULL; /// 此處包括以下的,把wait設(shè)置為NULL,是因為檢測到mask = (*f_op->poll)(file, wait); 描述符已經(jīng)就緒
                       /// 無需再把當(dāng)前進程添加到等待隊列里,do_select 遍歷完所有描述符之后就會退出。
          } 
          if ((mask & POLLOUT_SET) && (out & bit)) {
            res_out |= bit;
            retval++;
            wait = NULL;
          }
          if ((mask & POLLEX_SET) && (ex & bit)) {
            res_ex |= bit;
            retval++;
            wait = NULL;
          }
        }
      }
      if (res_in)
        *rinp = res_in;
      if (res_out)
        *routp = res_out;
      if (res_ex)
        *rexp = res_ex;
      cond_resched();
    }
    wait = NULL; //已經(jīng)遍歷完一遍,該加到等待隊列的,都已經(jīng)加了,無需再加,因此設(shè)置為NULL
    if (retval || timed_out || signal_pending(current)) //描述符就緒,超時,或者信號中斷就退出循環(huán)
      break;
    if (table.error) {//出錯退出循環(huán)
      retval = table.error;
      break;
    }

    /*
     * If this is the first loop and we have a timeout
     * given, then we convert to ktime_t and set the to
     * pointer to the expiry value.
     */
    if (end_time && !to) {
      expire = timespec_to_ktime(*end_time);
      to = &expire;
    }
    /////讓進程休眠,直到超時,或者被就緒的描述符喚醒,
    if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
            to, slack))
      timed_out = 1;
  }

  poll_freewait(&table);

  return retval;
}
void poll_initwait(struct poll_wqueues *pwq)
{
  init_poll_funcptr(&pwq->pt, __pollwait); //設(shè)置poll_table的回調(diào)函數(shù)為 __pollwait,這樣當(dāng)我們在驅(qū)動中調(diào)用poll_wait 就會調(diào)用到 __pollwait
  ........
}
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
        poll_table *p)
{
  ...................
  init_waitqueue_func_entry(&entry->wait, pollwake); // 設(shè)置喚醒進程調(diào)用的回調(diào)函數(shù),當(dāng)在驅(qū)動中調(diào)用 wake_up喚醒隊列時候,
                                          // pollwake會被調(diào)用,這里其實就是調(diào)用隊列的默認函數(shù) default_wake_function
                                          // 用來喚醒睡眠的進程。
  add_wait_queue(wait_address, &entry->wait);     //加入到等待隊列
}

int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
        fd_set __user *exp, struct timespec *end_time)
{
  ........
  //把描述符集合從用戶空間復(fù)制到內(nèi)核空間
  if ((ret = get_fd_set(n, inp, fds.in)) ||
    (ret = get_fd_set(n, outp, fds.out)) ||
    (ret = get_fd_set(n, exp, fds.ex)))
  .........
  ret = do_select(n, &fds, end_time);
  .............
  ////把do_select返回集合,從內(nèi)核空間復(fù)制到用戶空間
  if (set_fd_set(n, inp, fds.res_in) ||
    set_fd_set(n, outp, fds.res_out) ||
    set_fd_set(n, exp, fds.res_ex))
    ret = -EFAULT;
   ............
}

poll的實現(xiàn)跟select基本差不多,按照
poll --> do_sys_poll --> do_poll --> do_pollfd 的調(diào)用序列
其中do_pollfd是對每個描述符調(diào)用 其回調(diào)poll狀態(tài)輪訓(xùn)。
poll比select的好處就是沒有描述多少限制,select 有1024 的限制,描述符不能超過此值,poll不受限制。
我們從上面代碼分析,可以總結(jié)出select/poll天生的缺陷:
1)每次調(diào)用select/poll都需要要把描述符集合從用戶空間copy到內(nèi)核空間,檢測完成之后,又要把檢測的結(jié)果集合從內(nèi)核空間copy到用戶空間
當(dāng)描述符很多,而且select經(jīng)常被喚醒,這種開銷會比較大
2)如果說描述符集合來回復(fù)制不算什么,那么多次的全部描述符遍歷就比較恐怖了,
我們在應(yīng)用程序中,每次調(diào)用select/poll 都必須首先遍歷描述符,把他們加到fd_set集合里,這是應(yīng)用層的第一次遍歷,
接著進入內(nèi)核空間,至少進行一次遍歷和調(diào)用每個描述符的poll回調(diào)檢測,一般可能是2次遍歷,第一次沒發(fā)現(xiàn)就緒描述符,
加入等待隊列,第二次是被喚醒,接著再遍歷一遍。再回到應(yīng)用層,我們還必須再次遍歷所有描述符,用 FD_ISSET檢測結(jié)果集。
如果描述符很多,這種遍歷就很消耗CPU資源了。
3)描述符多少限制,當(dāng)然poll沒有限制,select卻有1024的硬性限制,除了修改內(nèi)核增加1024限制外沒別的辦法。
既然有這么些缺點 ,那不是 select/poll變得一無是處了,那就大錯特錯了。
他們依然是代碼移植的最好函數(shù),因為幾乎所有平臺都有對它們的實現(xiàn)提供接口。
在描述符不是太多,他們依然十分出色的完成多路復(fù)用IO,
而且如果每個連接上的描述符都處于活躍狀態(tài),他們的效率其實跟epoll也差不了多少。
曾經(jīng)使用多個線程+每個線程采用poll的辦法開發(fā)TCP服務(wù)器,處理文件收發(fā),連接達到幾千個,
當(dāng)時的瓶頸已經(jīng)不在網(wǎng)絡(luò)IO,而在磁盤IO了。

我們再來看epoll為了解決select/poll天生的缺陷,是如何實現(xiàn)的。
epoll只是select/poll的擴展,他不是在linux內(nèi)核中另起爐灶,做顛覆性的設(shè)計的,他只是在select的基礎(chǔ)上來解決他們的缺陷。
他的底層依然需要設(shè)備驅(qū)動提供poll回調(diào)來作為狀態(tài)檢測基礎(chǔ)。
epoll分為三個函數(shù) epoll_create,epoll_ctl, epoll_wait 。
他們的實現(xiàn)在 eventpoll.c代碼里。
epoll_create創(chuàng)建epoll設(shè)備,用來管理所有添加進去的描述符,epoll_ctl 用來添加新的描述符,修改或者刪除描述符。
epoll_wait等待描述符事件。
epoll_wait的等待已經(jīng)不再是輪訓(xùn)方式的等待了,epoll內(nèi)部有個描述符就緒隊列,epoll_wait只檢測這個隊列即可,
他采用睡眠一會檢測一下的方式,如果發(fā)現(xiàn)描述符就緒隊列不為空,就把此隊列中的描述符copy到用戶空間,然后返回。
描述符就緒隊列里的數(shù)據(jù)又是從何而來的?
原來使用 epoll_ctl添加新描述符時候,epoll_ctl內(nèi)核實現(xiàn)里會修改兩個回調(diào)函數(shù),
一個是 poll_table結(jié)構(gòu)里的qproc回調(diào)函數(shù)指針,
在 select中是 __pollwait函數(shù),在epoll中換成 ep_ptable_queue_proc,
當(dāng)在epoll_ctl中調(diào)用新添加的描述符的poll回調(diào)時候,底層驅(qū)動就會調(diào)用 poll_wait添加等待隊列,
底層驅(qū)動調(diào)用poll_wait時候,
其實就是調(diào)用ep_ptable_queue_proc,此函數(shù)會修改等待隊列的回調(diào)函數(shù)為 ep_poll_callback, 并加入到等待隊列頭里;
一旦底層驅(qū)動發(fā)現(xiàn)數(shù)據(jù)就緒,就會調(diào)用wake_up喚醒等待隊列,從而 ep_poll_callback將被調(diào)用,
在ep_poll_callback中 會把這個就緒的描述符添加到 epoll的描述符就緒隊列里,并同時喚醒 epoll_wait 所在的進程。
如此這般,就是epoll的內(nèi)核實現(xiàn)的精髓。
看他是如何解決 select/poll的缺陷的, 首先他通過 epoll_ctl的EPOLL_CTL_ADD命令把描述符添加進epoll內(nèi)部管理器里,
只需添加一次即可,直到用 epoll_ctl的EPOLL_CTL_DEL命令刪除此描述符為止,
而不像select/poll是每次執(zhí)行都必須添加,很顯然大量減少了描述符在內(nèi)核和用戶空間不斷的來回copy的開銷。
其次雖然 epoll_wait內(nèi)部也是循環(huán)檢測,但是它只需檢測描述符就緒隊列是否為空即可,
比起select/poll必須輪訓(xùn)每個描述符的poll,其開銷簡直可以忽略不計。
他同時也沒描述符多少的限制,只要你機器的內(nèi)存夠大,就能容納非常多的描述符。

以下是 epoll相關(guān)部分內(nèi)核代碼片段:

struct epitem {
  /* RB tree node used to link this structure to the eventpoll RB tree */
  struct rb_node rbn; //   紅黑樹節(jié)點,
  
  struct epoll_filefd ffd;  // 存儲此變量對應(yīng)的描述符

  struct epoll_event event; //用戶定義的結(jié)構(gòu)
  /*其他成員*/

};

struct eventpoll {
  /*其他成員*/
  .......

  /* Wait queue used by file->poll() */
  wait_queue_head_t poll_wait;

  /* List of ready file descriptors */
  struct list_head rdllist;      ///描述符就緒隊列,掛載的是 epitem結(jié)構(gòu) 

  /* RB tree root used to store monitored fd structs */
  struct rb_root rbr; /// 存儲 新添加的 描述符的紅黑樹根, 此成員用來存儲添加進來的所有描述符。掛載的是epitem結(jié)構(gòu)
  
   .........
};

//epoll_create
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
  int error;
  struct eventpoll *ep = NULL;

  /*其他代碼*/
  ......
  //分配 eventpoll結(jié)構(gòu),這個結(jié)構(gòu)是epoll的靈魂,他包含了所有需要處理得數(shù)據(jù)。 
  error = ep_alloc(&ep);
  if (error < 0)
    return error;
  
  error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep,
         flags & O_CLOEXEC); ///打開 eventpoll 的描述符,并把 ep存儲到 file->private_data變量里。
  if (error < 0)
    ep_free(ep);

  return error;
}

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
    struct epoll_event __user *, event)
{
  /*其他代碼*/
  .....
  ep = file->private_data;
  ......
  epi = ep_find(ep, tfile, fd);  ///從 eventpoll的 rbr里查找描述符是 fd 的 epitem,

  error = -EINVAL;
  switch (op) {
  case EPOLL_CTL_ADD:
    if (!epi) {
      epds.events |= POLLERR | POLLHUP;
      error = ep_insert(ep, &epds, tfile, fd);   // 在這個函數(shù)里添加新描述符,同時修改重要的回調(diào)函數(shù)。
                                      //同時還調(diào)用描述符的poll,查看就緒狀態(tài)
    } else
      error = -EEXIST;
    break;
   /*其他代碼*/
   ........
}
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
       struct file *tfile, int fd)
{
  ..... /*其他代碼*/
 
  init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);//設(shè)置 poll_tabe回調(diào)函數(shù)為 ep_ptable_queue_proc
         //ep_ptable_queue_proc會設(shè)置等待隊列的回調(diào)指針為 ep_epoll_callback,同時添加等待隊列。
  
  ........ /*其他代碼*/
  
  revents = tfile->f_op->poll(tfile, &epq.pt); //調(diào)用描述符的poll回調(diào),在此函數(shù)里 ep_ptable_queue_proc會被調(diào)用

  ....... /*其他代碼*/

  ep_rbtree_insert(ep, epi); //把新生成關(guān)于epitem添加到紅黑樹里

  ...... /*其他代碼*/

  if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) { 
    list_add_tail(&epi->rdllink, &ep->rdllist); //如果 上邊的poll調(diào)用,檢測到描述符就緒,添加本描述符到就緒隊列里。

    if (waitqueue_active(&ep->wq))
      wake_up_locked(&ep->wq);
    if (waitqueue_active(&ep->poll_wait))
      pwake++;
  }
  ...... /*其他代碼*/
  /* We have to call this outside the lock */
  if (pwake)
    ep_poll_safewake(&ep->poll_wait); // 如果描述符就緒隊列不為空,則喚醒 epoll_wait所在的進程。

  ......... /*其他代碼*/

}
//這個函數(shù)設(shè)置等待隊列回調(diào)函數(shù)為 ep_poll_callback,
//這樣到底層有數(shù)據(jù)喚醒等待隊列時候,ep_poll_callback就會被調(diào)用,從而把就緒的描述符加到就緒隊列。
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
         poll_table *pt)
{
  struct epitem *epi = ep_item_from_epqueue(pt);
  struct eppoll_entry *pwq;

  if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
    init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
    pwq->whead = whead;
    pwq->base = epi;
    add_wait_queue(whead, &pwq->wait);
    list_add_tail(&pwq->llink, &epi->pwqlist);
    epi->nwait++;
  } else {
    /* We have to signal that an error occurred */
    epi->nwait = -1;
  }
}
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
  int pwake = 0;
  unsigned long flags;
  struct epitem *epi = ep_item_from_wait(wait);
  struct eventpoll *ep = epi->ep;
  ......... /*其他代碼*/
  if (!ep_is_linked(&epi->rdllink))
    list_add_tail(&epi->rdllink, &ep->rdllist); // 把當(dāng)前就緒的描述epitem結(jié)構(gòu)添加到就緒隊列里
  ......... /*其他代碼*/

  if (pwake)
    ep_poll_safewake(&ep->poll_wait); //如果隊列不為空,喚醒 epoll_wait所在進程
  ......... /*其他代碼*/
}

epoll_wait內(nèi)核代碼里主要是調(diào)用ep_poll,列出ep_poll部分代碼片段:
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
      int maxevents, long timeout)
{
  int res, eavail;
  unsigned long flags;
  long jtimeout;
  wait_queue_t wait;
  ......... /*其他代碼*/
  
  if (list_empty(&ep->rdllist)) {
    
    init_waitqueue_entry(&wait, current);
    wait.flags |= WQ_FLAG_EXCLUSIVE;
    __add_wait_queue(&ep->wq, &wait);  
    // 如果檢測到就緒隊列為空,添加當(dāng)前進程到等待隊列,并執(zhí)行否循環(huán)
    for (;;) {
      
      set_current_state(TASK_INTERRUPTIBLE);
      if (!list_empty(&ep->rdllist) || !jtimeout)  //如果就緒隊列不為空,或者超時則退出循環(huán)
        break;
      if (signal_pending(current)) { //如果信號中斷,退出循環(huán)
        res = -EINTR;
        break;
      }

      spin_unlock_irqrestore(&ep->lock, flags);
      jtimeout = schedule_timeout(jtimeout);//睡眠,知道被喚醒或者超時為止。
      spin_lock_irqsave(&ep->lock, flags);
    }
    __remove_wait_queue(&ep->wq, &wait);

    set_current_state(TASK_RUNNING);
  }
  
  ......... /*其他代碼*/
  
  if (!res && eavail &&
    !(res = ep_send_events(ep, events, maxevents)) && jtimeout)
    goto retry;
   // ep_send_events主要任務(wù)是把就緒隊列的就緒描述符copy到用戶空間的 epoll_event數(shù)組里,

  return res;
}

可以看到 ep_poll既epoll_wait的循環(huán)是相當(dāng)輕松的循環(huán),他只是簡單檢測就緒隊列而已,因此他的開銷很小。

我們最后看看描述符就緒時候,是如何通知給select/poll/epoll的,以網(wǎng)絡(luò)套接字的TCP協(xié)議來進行說明。

tcp協(xié)議對應(yīng)的 poll回調(diào)是tcp_poll, 對應(yīng)的等待隊列頭是 struct sock結(jié)構(gòu)里 sk_sleep成員,
在tcp_poll中會把 sk_sleep加入到等待隊列,等待數(shù)據(jù)就緒。

當(dāng)物理網(wǎng)卡接收到數(shù)據(jù)包,引發(fā)硬件中斷,驅(qū)動在中斷ISR例程里,構(gòu)建skb包,把數(shù)據(jù)copy進skb,接著調(diào)用netif_rx
把skb掛載到CPU相關(guān)的 input_pkt_queue隊列,同時引發(fā)軟中斷,在軟中斷的net_rx_action回調(diào)函數(shù)里從input_pkt_queue里取出
skb數(shù)據(jù)包,通過分析,調(diào)用協(xié)議相關(guān)的回調(diào)函數(shù),這樣層層傳遞,一直到struct sock,此結(jié)構(gòu)里的 sk_data_ready回調(diào)指針被調(diào)用
sk_data_ready指向 sock_def_readable 函數(shù),sock_def_readable函數(shù)其實就是 wake_up 喚醒 sock結(jié)構(gòu)里 的 sk_sleep。
以上機制,對 select/poll/epoll都是一樣的,接下來喚醒 sk_sleep方式就不一樣了,因為他們指向了不同的回調(diào)函數(shù)。
在 select/poll實現(xiàn)中,等待隊列回調(diào)函數(shù)是 pollwake其實就是調(diào)用default_wake_function,喚醒被select阻塞住的進程。
epoll實現(xiàn)中,等待回調(diào)函數(shù)是 ep_poll_callback, 此回調(diào)函數(shù)只是把就緒描述符加入到epoll的就緒隊列里。

所以呢 select/poll/epoll其實他們在內(nèi)核實現(xiàn)中,差別也不是太大,其實都差不多。
epoll雖然效率不錯,可以跟windows平臺中的完成端口比美,但是移植性太差,
目前幾乎就只有l(wèi)inux平臺才實現(xiàn)了epoll而且必須是2.6以上的內(nèi)核版本。

相關(guān)文章

最新評論