当前位置: 代码迷 >> 综合 >> (三)DPDK-Core Libraries
  详细解决方案

(三)DPDK-Core Libraries

热度:116   发布时间:2023-10-17 00:47:36.0

 

(三)DPDK-Core Libraries

环形缓冲区管理(librte_ring)

简介

dpdk的无锁队列ring是借鉴了linux内核kfifo无锁队列。ring的实质是FIFO的环形队列。

ring的特点:

  • 无锁出入队(除了cas(compare and swap)操作)
  • 多消费/生产者同时出入队

使用方法:

  • structrte_ring * rte_ring_create(constchar *name, unsigned count, int socket_id, unsignedflags)/*
    name:ring的namecount:ring队列的长度必须是2的幂次方。socket_id:ring位于的socket。flags:指定创建的ring的属性:单/多生产者、单/多消费者两者之间的组合。0表示使用默认属性(多生产者、多消费者)。不同的属性出入队的操作会有所不同。*/

出入队

  • 有不同的出入队方式(单、bulk、burst)都在rte_ring.h中。
  • 例如:rte_ring_enqueue和rte_ring_dequeue

创建ring

(三)DPDK-Core Libraries

struct rte_ring {TAILQ_ENTRY(rte_ring) next;      /**< Next in list. *///ring的唯一标示,不可能同时有两个相同name的ring存在char name[RTE_RING_NAMESIZE];    /**< Name of the ring. */int flags;                       /**< Flags supplied at creation. *//** Ring producer status. */struct prod {uint32_t watermark;      /**< Maximum items before EDQUOT. */uint32_t sp_enqueue;     /**< True, if single producer. */uint32_t size;           /**< Size of ring. */uint32_t mask;           /**< Mask (size-1) of ring. */volatile uint32_t head;  /**< Producer head. */volatile uint32_t tail;  /**< Producer tail. */} prod __rte_cache_aligned;/** Ring consumer status. */struct cons {uint32_t sc_dequeue;     /**< True, if single consumer. */uint32_t size;           /**< Size of the ring. */uint32_t mask;           /**< Mask (size-1) of ring. */volatile uint32_t head;  /**< Consumer head. */volatile uint32_t tail;  /**< Consumer tail. */
#ifdef RTE_RING_SPLIT_PROD_CONS/*这个属性就是要求gcc在编译的时候,把cons/prod结构都单独分配到一个cache行,为什么这样做?因为如果没有这些的话,这两个结构在内存上是连续的,编译器不会把他们分配到不同cache 行,而一般上这两个结构是要被不同的核访问的,如果连续的话这两个核就会产生伪共享问题。*/} cons __rte_cache_aligned;
#else} cons;
#endif#ifdef RTE_LIBRTE_RING_DEBUGstruct rte_ring_debug_stats stats[RTE_MAX_LCORE];
#endifvoid * ring[0] __rte_cache_aligned; /**< Memory space of ring starts here.* not volatile so need to be careful* about compiler re-ordering */
};

在rte_ring_list链表中创建一个rte_tailq_entry节点。

在memzone中根据队列的大小count申请一块内存(rte_ring的大小加上count*sizeof(void *))。

紧邻着rte_ring结构的void *数组用于放置入队的对象(单纯的赋值指针值)。

rte_ring结构中有生产者结构prod、消费者结构cons。

初始化参数之后,把rte_tailq_entry的data节点指向rte_ring结构地址。

 

在使用这个结构的时候,一般是将1个核作为生产者,向这个ring队列里面添加数据;另一个core或者多个core 作为消费者从这个ring队列中获取数据。生产者核访问上面的prod结构,消费者访问cons结构。

 

 

实现多生产/消费者同时生产/消费

(三)DPDK-Core Libraries

  • 移动prod.head表示生产者预定的生产数量
  • 当该生产者生产结束,且在此之前的生产也都结束后,移动prod.tail表示实际生产的位置
  • 同样,移动cons.head表示消费者预定的消费数量
  • 当该消费者消费结束,且在此之前的消费也都结束后,移动cons.tail表示实际消费的位置

 

出/入队列

多生产者入队代码

static inline int __attribute__((always_inline))
__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,unsigned n, enum rte_ring_queue_behavior behavior)
{uint32_t prod_head, prod_next;uint32_t cons_tail, free_entries;const unsigned max = n;int success;unsigned i;uint32_t mask = r->prod.mask;int ret;/* move prod.head atomically */do {/* Reset n to the initial burst count */n = max;prod_head = r->prod.head;cons_tail = r->cons.tail;/*在这里dpdk提供的索引计算方法,能保证即使prod_head > cons_tail,*取模求得的值也始终落在0~size(ring)-1范围内*/free_entries = (mask + cons_tail - prod_head);/* check that we have enough room in ring */if (unlikely(n > free_entries)) {if (behavior == RTE_RING_QUEUE_FIXED) {__RING_STAT_ADD(r, enq_fail, n);return -ENOBUFS;}else {/* No free entry available */if (unlikely(free_entries == 0)) {__RING_STAT_ADD(r, enq_fail, n);return 0;}n = free_entries;}}prod_next = prod_head + n;/*这里使用CAS指令来移动r->prod.head,去掉了锁操作,也算优化*点*/success = rte_atomic32_cmpset(&r->prod.head, prod_head,prod_next);} while (unlikely(success == 0));/* write entries in ring */ENQUEUE_PTRS();/*COMPILER_BARRIER 是一个宏定义,它的作用就是确保上面的ENQUEUE_PTRS*宏处理在下面r->prod.tail = prod_next;之前执行?大家可能会问,*ENQUEUE_PTRS怎么可能会跑到r->prod.tail = prod_next之后执行?*其实是有可能的,GCC为了提高性能,它会优化代码,它可能会调整代码的*执行顺序,把后面的指令放到前面,GCC这些编译器是依据单核情况实现,*所以这种情况下,程序员必须介入,上面这条指令,就是告诉编译器不允许*调整这个指令顺序。*/rte_compiler_barrier();/* if we exceed the watermark */if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :(int)(n | RTE_RING_QUOT_EXCEED);__RING_STAT_ADD(r, enq_quota, n);}else {ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;__RING_STAT_ADD(r, enq_success, n);}/** If there are other enqueues in progress that preceeded us,* we need to wait for them to complete*/while (unlikely(r->prod.tail != prod_head))rte_pause();r->prod.tail = prod_next;return ret;
}

 

内存池管理(librte_mempool)

网络报文缓冲区管理(librte_mbuf)

定时器管理(librte_timer) 

 

 

 

 

 

 

 

 

 

 

 

 

 

  相关解决方案