【版權聲明:轉載請保留出處:blog.csdn.net/gentleliu。郵箱:shallnew*163.com】
每一個cpu都有隊列來處理接收到的幀,都有其數據結構來處理入口和出口流量,因此,不同cpu之間沒有必要使用上鎖機制,。此隊列數據結構為softnet_data(定義在include/linux/netdevice.h中):
/* * Incoming packets are placed on per-cpu queues so that * no locking is needed. */ struct softnet_data { struct Qdisc *output_queue; struct sk_buff_headinput_pkt_queue;//有數據要傳輸的設備列表 struct list_headpoll_list; //雙向鏈表,當中的設備有輸入幀等著被處理。 struct sk_buff*completion_queue;//緩沖區列表,當中緩沖區已成功傳輸,能夠釋放掉 struct napi_structbacklog; };
此結構字段可用于傳輸和接收。換而言之,NET_RX_SOFTIRQ和NET_TX_SOFTIRQ軟IRQ都引用此結構。入口幀會排入input_pkt_queue(NAPI有所不同)。
/* * This is called single threaded during boot, so no need * to take the rtnl semaphore. */ static int __init net_dev_init(void) { int i, rc = -ENOMEM; ...... /* * Initialise the packet receive queues. */ for_each_possible_cpu(i) { struct softnet_data *queue; queue = &per_cpu(softnet_data, i); skb_queue_head_init(&queue->input_pkt_queue); queue->completion_queue = NULL; INIT_LIST_HEAD(&queue->poll_list); queue->backlog.poll = process_backlog; queue->backlog.weight = weight_p; queue->backlog.gro_list = NULL; queue->backlog.gro_count = 0; } ...... open_softirq(NET_TX_SOFTIRQ, net_tx_action); open_softirq(NET_RX_SOFTIRQ, net_rx_action); ...... }非NAPI設備驅動會為其所接收的每個幀產生一個中斷事件,在高流量負載下,會花掉大量時間處理中斷事件,造成資源浪費。而NAPI驅動混合了中斷事件和輪詢,在高流量負載下其性能會比舊方法要好。
NAPI主要思想是混合使用中斷事件和輪詢,而不是只使用中斷事件驅動模型。當收到新的幀時,關中斷,再一次處理全然部入口隊列。從內核觀點來看,NAPI方法由于中斷事件少了,降低了cpu負載。
使用非NAPI的驅動程序的xx_rx()函數一般例如以下:
void xx_rx() { struct sk_buff *skb; skb = dev_alloc_skb(pkt_len + 5); if (skb != NULL) { skb_reserve(skb, 2);/* Align IP on 16 byte boundaries */ /*memcpy(skb_put(skb, 2), pkt, pkt_len);*/ //copy data to skb skb->protocol = eth_type_trans(skb, dev); netif_rx(skb); } }第一步是分配一個緩存區來保存報文。 注意緩存分配函數 (dev_alloc_skb) 須要知道數據長度。
第二步將報文數據被復制到緩存區; skb_put ?函數更新緩存中的數據末尾指針并返回指向新建空間的指針。
第三步提取協議標識及獲取其它信息。
最后調用netif_rx(skb)做進一步處理,該函數一般定義在net/core/dev.c中。
int netif_rx(struct sk_buff *skb) { struct softnet_data *queue; unsigned long flags; /* if netpoll wants it, pretend we never saw it */ if (netpoll_rx(skb)) return NET_RX_DROP; if (!skb->tstamp.tv64) net_timestamp(skb); /* * The code is rearranged so that the path is the most * short when CPU is congested, but is still operating. */ local_irq_save(flags); queue = &__get_cpu_var(softnet_data); __get_cpu_var(netdev_rx_stat).total++; if (queue->input_pkt_queue.qlen <= netdev_max_backlog) {//是否還有空間,netdev_max_backlog一般為300 //僅僅有當新緩沖區為空時,才會觸發軟中斷(napi_schedule()),假設緩沖區不為空,軟中斷已被觸發,沒有必要再去觸發一次。 if (queue->input_pkt_queue.qlen) { enqueue: __skb_queue_tail(&queue->input_pkt_queue, skb);//這里是關鍵之處,將skb增加input_pkt_queue之中。 local_irq_restore(flags); return NET_RX_SUCCESS; } napi_schedule(&queue->backlog);//觸發軟中斷 goto enqueue; } __get_cpu_var(netdev_rx_stat).dropped++; local_irq_restore(flags); kfree_skb(skb); return NET_RX_DROP; } EXPORT_SYMBOL(netif_rx);
static inline void napi_schedule(struct napi_struct *n) { if (napi_schedule_prep(n)) __napi_schedule(n); }
void __napi_schedule(struct napi_struct *n) { unsigned long flags; local_irq_save(flags); list_add_tail(&n->poll_list, &__get_cpu_var(softnet_data).poll_list);//將該設備增加輪詢鏈表,等待該設備的幀被處理 __raise_softirq_irqoff(NET_RX_SOFTIRQ);//終于觸發軟中斷 local_irq_restore(flags); } EXPORT_SYMBOL(__napi_schedule);
至此中斷的上半部完畢,其它的工作交由下半部來實現。napi_schedule(&queue->backlog)函數將有等待的接收數據包的NIC鏈入softnet_data的poll_list隊列,然后觸發軟中斷,讓下半部去完畢數據的處理工作。
而是用NAPI設備的接受數據時直接觸發軟中斷,不須要通過netif_rx()函數設置好接收隊列再觸發軟中斷。比方e100硬中斷處理函數為:
static irqreturn_t e100_intr(int irq, void *dev_id) { struct net_device *netdev = dev_id; struct nic *nic = netdev_priv(netdev); u8 stat_ack = ioread8(&nic->csr->scb.stat_ack); DPRINTK(INTR, DEBUG, "stat_ack = 0x%02X\n", stat_ack); if (stat_ack == stat_ack_not_ours || /* Not our interrupt */ stat_ack == stat_ack_not_present) /* Hardware is ejected */ return IRQ_NONE; /* Ack interrupt(s) */ iowrite8(stat_ack, &nic->csr->scb.stat_ack); /* We hit Receive No Resource (RNR); restart RU after cleaning */ if (stat_ack & stat_ack_rnr) nic->ru_running = RU_SUSPENDED; if (likely(napi_schedule_prep(&nic->napi))) { e100_disable_irq(nic); __napi_schedule(&nic->napi);//此處觸發軟中斷 } return IRQ_HANDLED; }在前面我們已經知道在net_dev_init()函數中注冊了收報軟中斷函數net_rx_action(),當軟中斷被觸發之后,該函數將被調用。
net_rx_action()函數為:
static void net_rx_action(struct softirq_action *h) { struct list_head *list = &__get_cpu_var(softnet_data).poll_list; unsigned long time_limit = jiffies + 2; int budget = netdev_budget; void *have; local_irq_disable(); while (!list_empty(list)) { struct napi_struct *n; int work, weight; /* If softirq window is exhuasted then punt. * Allow this to run for 2 jiffies since which will allow * an average latency of 1.5/HZ. */ if (unlikely(budget <= 0 || time_after(jiffies, time_limit)))//入口隊列仍然有緩沖區,軟IRQ再度被調度運行。 goto softnet_break; local_irq_enable(); /* Even though interrupts have been re-enabled, this * access is safe because interrupts can only add new * entries to the tail of this list, and only ->poll() * calls can remove this head entry from the list. */ n = list_entry(list->next, struct napi_struct, poll_list); have = netpoll_poll_lock(n); weight = n->weight; /* This NAPI_STATE_SCHED test is for avoiding a race * with netpoll's poll_napi(). Only the entity which * obtains the lock and sees NAPI_STATE_SCHED set will * actually make the ->poll() call. Therefore we avoid * accidently calling ->poll() when NAPI is not scheduled. */ work = 0; if (test_bit(NAPI_STATE_SCHED, &n->state)) { work = n->poll(n, weight);//運行poll函數,返回已處理的幀 trace_napi_poll(n); } WARN_ON_ONCE(work > weight); budget -= work; local_irq_disable(); /* Drivers must not modify the NAPI state if they * consume the entire weight. In such cases this code * still "owns" the NAPI instance and therefore can * move the instance around on the list at-will. */ if (unlikely(work == weight)) {//隊列被清空。調用napi_complete()負責此事。 if (unlikely(napi_disable_pending(n))) { local_irq_enable(); napi_complete(n); local_irq_disable(); } else list_move_tail(&n->poll_list, list); } netpoll_poll_unlock(have); } out: local_irq_enable(); #ifdef CONFIG_NET_DMA /* * There may not be any more sk_buffs coming right now, so push * any pending DMA copies to hardware */ dma_issue_pending_all(); #endif return; softnet_break: __get_cpu_var(netdev_rx_stat).time_squeeze++; __raise_softirq_irqoff(NET_RX_SOFTIRQ); goto out; }由上可見,下半部的主要工作是遍歷有數據幀等待接收的設備鏈表,對于每一個設備,運行它對應的poll函數。
對非NAPI設備來說,poll函數在net_dev_init()函數中初始化為process_backlog()。
process_backlog()函數定義為:
static int process_backlog(struct napi_struct *napi, int quota) { int work = 0; struct softnet_data *queue = &__get_cpu_var(softnet_data); unsigned long start_time = jiffies; napi->weight = weight_p; do { struct sk_buff *skb; local_irq_disable(); skb = __skb_dequeue(&queue->input_pkt_queue); if (!skb) { __napi_complete(napi); local_irq_enable(); break; } local_irq_enable(); netif_receive_skb(skb); } while (++work < quota && jiffies == start_time); return work; }
對NAPI設備來的說,驅動程序必須提供一個poll方法,poll 方法有以下原型:
int (*poll)(struct napi_struct *dev, int *budget);?
在初始化時須要加入該方法:
netif_napi_add(netdev, &nic->napi, xx_poll, XX_NAPI_WEIGHT);
NAPI驅動 的 poll 方法實現一般例如以下(借用《Linux設備驅動程序》中代碼,內核有點沒對上,懶得去寫了):
static int xx_poll(struct net_device *dev, int *budget) { int npackets = 0, quota = min(dev->quota, *budget); struct sk_buff *skb; struct xx_priv *priv = netdev_priv(dev); struct xx_packet *pkt; while (npackets < quota && priv->rx_queue) { pkt = xx_dequeue_buf(dev); skb = dev_alloc_skb(pkt->datalen + 2); if (! skb) { if (printk_ratelimit()) printk(KERN_NOTICE "xx: packet dropped\n"); priv->stats.rx_dropped++; xx_release_buffer(pkt); continue; } memcpy(skb_put(skb, pkt->datalen), pkt->data, pkt->datalen); skb->dev = dev; skb->protocol = eth_type_trans(skb, dev); skb->ip_summed = CHECKSUM_UNNECESSARY; /* don't check it */ netif_receive_skb(skb); /* Maintain stats */ npackets++; priv->stats.rx_packets++; priv->stats.rx_bytes += pkt->datalen; xx_release_buffer(pkt); } /* If we processed all packets, we're done; tell the kernel and reenable ints */ *budget -= npackets; dev->quota -= npackets; if (! priv->rx_queue) { netif_rx_complete(dev); xx_rx_ints(dev, 1); return 0; } /* We couldn't process everything. */ return 1; }
NAPI驅動提供自己的poll函數和私有隊列。
無論是非NAPI或NAPI,他們的poll函數最后都會調用netif_receive_skb(skb)來處理接收到的幀。該函數會想各個已注冊的協議例程發送一個skb,之后數據進入Linux內核協議棧處理。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
