首页 > 无锁队列讲解(多生产者多消费者场景)
头像
总想玩世不恭
编辑于 2021-02-27 20:16
+ 关注

无锁队列讲解(多生产者多消费者场景)

(有什么问题欢迎讨论区留言,b站链接为本人录制的视频讲解~)

代码根据论文  [Implementing Lock-Free Queues](http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.53.8674&rep=rep1&type=pdf)  复现。
代码地址:https://github.com/zxwsbg/lock-free-queue
b站链接:https://www.bilibili.com/video/BV1q54y1Y71W/

背景介绍

生产者和消费者问题是操作系统中老生常谈的一个问题。针对不同的场景(单生产者-单消费者、单生产者-多消费者、多生产者-多消费者)有着不同的解决方法,本文研究的是多生产者多消费者的场景。

解决多生产者多消费者的常见方法是`互斥锁+信号量`,而互斥锁开销过大,在高并发常见下会有很大的性能影响。而采用无锁队列的方法可以避免锁的使用来达到减小开销的目的。

无锁队列的实现的要点就是 CAS 操作,后续在考虑到内存重新分配的时候会造成安全性问题(ABA问题),故引入了 DoubleCAS 或者其他内存分配机制来解决问题。



无锁队列实现

数据结构介绍


按照论文中提到的两种方法,选了一种较优的方法实现。
以单向链表的形式实现这个队列,每个节点的数据结构为
class QueueNode {
  public:
    int        val;
    QueueNode* next;
    QueueNode(int val) : val(val) {
        next = NULL;
    }
};
而对于一个无锁队列对象而言,包含以下部分
- 数据:队列的最大长度、链表头结点、链表尾节点
- 函数:初始化(构造函数)、入队、出队
class LockFreeQueue {
  public:
    LockFreeQueue();
    bool enqueue(int val);
    int  dequeue();
    ~LockFreeQueue();

  private:
    int        queue_size; // 暂时未使用,论文里并没有提及最大资源数
    QueueNode* tail;
    QueueNode* head;
};
其数据结构包含关系如下图所示

入队操作

底下我们以入队操作为例
bool LockFreeQueue::enqueue(int val)
{
    QueueNode* cur_node;
    QueueNode* add_node = new QueueNode(val);
    while (1) {
        cur_node = tail;
        if (__sync_bool_compare_and_swap(&(cur_node->next), NULL, add_node)) {
            break;
        }
        else {
            __sync_bool_compare_and_swap(&tail, cur_node, cur_node->next);
        }
    }
    __sync_bool_compare_and_swap(&tail, cur_node, add_node);
    return 1;
}
这里的 `__sync_bool_compare_and_swap`就是 GCC 提供的 CAS 算法。
第7行的CAS首先尝试将新加的节点放到链表末尾,这里有两种结果:
1. 加入成功,那么退出循环进入倒数第三行
2. 加入不成功,说明这时候有其他线程抢先往里面插入了一个节点,那么就把当前节点的位置更新为尾节点,再次进入循环直到能正确更新

到了最后一个CAS的时候,只需要进行一次置尾操作,并不需要循环,原因是:如果当前线程将节点已经加进去了的话,那么其他所有线程的操作都会失败,只有当前线程更新尾节点完成后,其他线程的第二个CAS操作才能成功。

这里有一个小trick,明明第6行每次循环时都会更新节点,为什么还需要第二个CAS操作呢?因为在极端情况下,一个线程已经完成了增加节点操作,在置尾操作(第三个CAS)之前突然挂了,这时候就导致其他所有线程全部不能更新。

在左耳朵耗子的博客中,对于上述问题的解决方法描述是选用了《Implementing Lock-Free Queues》论文中提到的第二个方法,该方法采用的是一开始在head,然后不断找next直到找尾节点(通过多个线程共用fetch来减少开销)。本文以论文中提到的第一个方法为例进行描述(因为我觉得它更优雅)。

出队操作

与入队操作类似。
int LockFreeQueue::dequeue()
{
    QueueNode* cur_node;
    int        val;
    while (1) {
        cur_node = head;
        if (cur_node->next == NULL) {
            return -1;
        }

        if (__sync_bool_compare_and_swap(&head, cur_node, cur_node->next)) {
            break;
        }
    }
    val = cur_node->next->val;
    delete cur_node;
    return val;
}


多生产者-多消费者模型

我们采用 C++ 的 Thread 库,模拟多个生产者和多个消费者的情况。多个生产者线程向无所队列中插入数据,多个消费者从无锁队列中取出数据。


测试部分的代码实现在 `lockfreequeue_test.cpp` 文件中。以生产者为例,其代码如下所示。
int            thread_number;
int            task_number; // 每个线程需要入队/出队的资源个数
LockFreeQueue* lfq;

void produce(int offset)
{
    // 算上偏移量,保证不会出现重复
    for (int i = task_number * offset; i < task_number * (offset + 1); i++) { 
        printf("produce %d\n", i);
        lfq->enqueue(i);
    }
}
int main(int argc, char** argv)
{
    lfq = new LockFreeQueue;
    std::vector<std::thread> thread_vector1;

    for (int i = 0; i < thread_number; i++) {
        thread_vector1.push_back(std::thread(produce, i));
    }

    for (auto& thr1 : thread_vector1) {
        thr1.join();
    }
}

正确性检测

对于结果,首先做正确性检验,具体实现在 `check.py` 文件中,对于produce,要求之前没有生产过该资源;对于consume,要求之前生产过并且没有消费过该资源。

ABA 问题

在高并发场景下会出现该问题。如上述情况,以默认值参数运行后经常不能通过正确性检测。原因根据我的判断,是因为ABA问题:当我们做CAS的之前,如果head的那块内存被回收并被重用了,而重用的内存又被EnQueue()进来了。而判断的方法也是个小 trick ,只需要将deque操作中释放内存的步骤注释掉,那么就不会出现内存重用的问题,这样做后也确实可以通过正确性测试。这个问题在论文中也提到了并给出了相应解决方法,但其需要自己实现一套内存分配系统(麻烦)。于是本文参考另一篇论文[Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms](http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf)
,利用`Double-CAS`来解决此问题,每访问一个节点,都用引用计数的方式给其+1,`Double-CAS`则可以一次既判断原始的节点值,又判断引用计数值,保证其未被置换出去过。这一块的具体代码实现可见github项目代码的`fix_aba_problem`分支。

对比检验

为了对比实验,写了个几乎差不多的正常的多生产者多消费者单链表实现(互斥锁+信号量)。经过测试,时间几乎没差(并发场景不够)。

全部评论

(0) 回帖
加载中...
话题 回帖

推荐话题

相关热帖

近期热帖

近期精华帖

热门推荐