当前位置: 代码迷 >> 综合 >> SylixOS 延时工作队列实现原理详解
  详细解决方案

SylixOS 延时工作队列实现原理详解

热度:43   发布时间:2024-01-10 06:39:06.0

SylixOS系统工作队列内部其实分两部分,简单工作队列(simple work queue)和 延时工作队列(delay work queue)。

  • 简单工作队列由__wqS打头的函数簇实现,本质是基于内核工作队列并结合了执行线程创建、同步保护等相关操作实现。
  • 延时工作队列由__wqD打头的函数簇实现,本质和内核工作队无关,是基于单向链表和等待唤醒链表实现的。

这里的延时工作队列其实就是系统工作队列中具备延时时的实现,系统工作队列中还有简单工作队列的实现,为了便于讲解,这里只列出延时工作队列实现的代码。


类型定义

/*********************************************************************************************************带有延迟属性作业队列控制结构 *********************************************************************************************************/
typedef struct {
    LW_LIST_MONO         DWQN_monoList;LW_CLASS_WAKEUP_NODE DWQN_wun;VOIDFUNCPTR          DWQN_pfunc;PVOID                DWQN_pvArg[LW_JOB_ARGS];
} LW_WORK_DNODE;typedef struct {
    LW_CLASS_WAKEUP      DWQ_wakeup;                                    /* 延迟作业队列 */PLW_LIST_MONO        DWQ_pmonoPool;                                 /* 节点池 */LW_WORK_DNODE       *DWQ_pwdnPool;LW_OBJECT_HANDLE     DWQ_ulLock;                                    /* 操作锁 */LW_OBJECT_HANDLE     DWQ_ulSem;                                     /* 通知信号量 */UINT                 DWQ_uiCount;                                   /* 等待的作业数量 */UINT64               DWQ_u64Time;                                   /* 最后一次执行时间点 */ULONG                DWQ_ulScanPeriod;                              /* 循环扫描周期 */
} LW_WORK_DQUEUE;
/*********************************************************************************************************作业队列控制结构 *********************************************************************************************************/
typedef struct {
    LW_WORK_DQUEUE       WQ_dq;LW_OBJECT_HANDLE     WQ_ulTask;                                     /* 服务线程 */BOOL                 WQ_bDelReq;
} LW_WORK_QUEUE;
typedef LW_WORK_QUEUE   *PLW_WORK_QUEUE;

创建和删除一个延时工作队列

/********************************************************************************************************* ** 函数名称: __wqDCreate ** 功能描述: 创建一个带有延迟功能的工作队列 ** 输 入 : pwq 工作队列控制块 ** uiQSize 队列大小 ** ulScanPeriod 服务线程扫描周期 ** 输 出 : ERROR_CODE *********************************************************************************************************/
static PLW_WORK_QUEUE  __wqDCreate (PLW_WORK_QUEUE  pwq, UINT  uiQSize, ULONG  ulScanPeriod)
{
    UINT                i;LW_WORK_DNODE      *pwdn;LW_CLASS_WAKEUP    *pwakeup = &pwq->WQ_dq.DWQ_wakeup;//分配工作节点空间pwdn = (LW_WORK_DNODE *)__KHEAP_ALLOC(sizeof(LW_WORK_DNODE) * uiQSize);if (pwdn == LW_NULL) {
    return  (LW_NULL);}pwq->WQ_dq.DWQ_pwdnPool = pwdn;//初始化保护锁信号量pwq->WQ_dq.DWQ_ulLock = API_SemaphoreMCreate("wqd_lock", LW_PRIO_DEF_CEILING,LW_OPTION_WAIT_PRIORITY |LW_OPTION_INHERIT_PRIORITY | LW_OPTION_DELETE_SAFE | LW_OPTION_OBJECT_GLOBAL, LW_NULL);if (pwq->WQ_dq.DWQ_ulLock == LW_OBJECT_HANDLE_INVALID) {
    __KHEAP_FREE(pwdn);return  (LW_NULL);}//初始化资源同步信号量pwq->WQ_dq.DWQ_ulSem = API_SemaphoreBCreate("wqd_sem", LW_FALSE,LW_OPTION_OBJECT_GLOBAL, LW_NULL);if (pwq->WQ_dq.DWQ_ulSem == LW_OBJECT_HANDLE_INVALID) {
    API_SemaphoreMDelete(&pwq->WQ_dq.DWQ_ulLock);__KHEAP_FREE(pwdn);return  (LW_NULL);}//初始化所有工作节点for (i = 0; i < uiQSize; i++) {
    _list_mono_free(&pwq->WQ_dq.DWQ_pmonoPool, &pwdn->DWQN_monoList);__WAKEUP_NODE_INIT(&pwdn->DWQN_wun);pwdn++;}//初始化时间参数pwq->WQ_dq.DWQ_u64Time      = API_TimeGet64();pwq->WQ_dq.DWQ_ulScanPeriod = ulScanPeriod;//初始化等待唤醒链表控制块__WAKEUP_INIT(pwakeup, NULL, NULL);return  (pwq);
}
/********************************************************************************************************* ** 函数名称: __wqDDelete ** 功能描述: 删除一个带有延迟功能的工作队列 ** 输 入 : pwq 工作队列控制块 ** 输 出 : NONE *********************************************************************************************************/
static VOID  __wqDDelete (PLW_WORK_QUEUE  pwq)
{
    API_SemaphoreMPend(pwq->WQ_dq.DWQ_ulLock, LW_OPTION_WAIT_INFINITE);API_SemaphoreMDelete(&pwq->WQ_dq.DWQ_ulLock);API_SemaphoreBDelete(&pwq->WQ_dq.DWQ_ulSem);__KHEAP_FREE(pwq->WQ_dq.DWQ_pwdnPool);
}

清空一个工作队列

/********************************************************************************************************* ** 函数名称: __wqDFlush ** 功能描述: 清空一个带有延迟功能的工作队列 ** 输 入 : pwq 工作队列控制块 ** 输 出 : NONE *********************************************************************************************************/
static VOID  __wqDFlush (PLW_WORK_QUEUE  pwq)
{
    PLW_CLASS_WAKEUP_NODE   pwun;LW_WORK_DNODE          *pwdn;//清除同步信号量API_SemaphoreBClear(pwq->WQ_dq.DWQ_ulSem);API_SemaphoreMPend(pwq->WQ_dq.DWQ_ulLock, LW_OPTION_WAIT_INFINITE);//释放所有工作节点while (pwq->WQ_dq.DWQ_wakeup.WU_plineHeader) {
    //删除等待唤醒节点pwun = _LIST_ENTRY(pwq->WQ_dq.DWQ_wakeup.WU_plineHeader, LW_CLASS_WAKEUP_NODE, WUN_lineManage);_List_Line_Del(&pwun->WUN_lineManage, &pwq->WQ_dq.DWQ_wakeup.WU_plineHeader);//回收工作节点pwdn = _LIST_ENTRY(pwun, LW_WORK_DNODE, DWQN_wun);_list_mono_free(&pwq->WQ_dq.DWQ_pmonoPool, &pwdn->DWQN_monoList);}pwq->WQ_dq.DWQ_uiCount = 0;API_SemaphoreMPost(pwq->WQ_dq.DWQ_ulLock);
}

向延时工作队列插入一个工作项

/********************************************************************************************************* ** 函数名称: __wqDInsert ** 功能描述: 将一个工作插入到工作队列 ** 输 入 : pwq 工作队列控制块 ** ulDelay 最小延迟执行时间 ** pfunc 执行函数 ** pvArg0 ~ 5 执行参数 ** 输 出 : ERROR_CODE *********************************************************************************************************/
static ULONG  __wqDInsert (PLW_WORK_QUEUE  pwq, ULONG           ulDelay,VOIDFUNCPTR     pfunc, PVOID           pvArg0,PVOID           pvArg1,PVOID           pvArg2,PVOID           pvArg3,PVOID           pvArg4,PVOID           pvArg5)
{
    PLW_LIST_MONO   pmonoWDN;LW_WORK_DNODE  *pwdn;API_SemaphoreMPend(pwq->WQ_dq.DWQ_ulLock, LW_OPTION_WAIT_INFINITE);//如果节点池为空则返回错误,否则从节点池中分配一个节点if (!pwq->WQ_dq.DWQ_pmonoPool) {
    API_SemaphoreMPost(pwq->WQ_dq.DWQ_ulLock);_ErrorHandle(ENOSPC);return  (ENOSPC);} else {
    pmonoWDN = _list_mono_allocate(&pwq->WQ_dq.DWQ_pmonoPool);}//将回调工作保存到工作节点中pwdn = _LIST_ENTRY(pmonoWDN, LW_WORK_DNODE, DWQN_monoList);pwdn->DWQN_pfunc    = pfunc;pwdn->DWQN_pvArg[0] = pvArg0;pwdn->DWQN_pvArg[1] = pvArg1;pwdn->DWQN_pvArg[2] = pvArg2;pwdn->DWQN_pvArg[3] = pvArg3;pwdn->DWQN_pvArg[4] = pvArg4;pwdn->DWQN_pvArg[5] = pvArg5;//将工作节点添加到等待唤醒链表中pwdn->DWQN_wun.WUN_ulCounter = ulDelay;_WakeupAdd(&pwq->WQ_dq.DWQ_wakeup, &pwdn->DWQN_wun, LW_FALSE);pwq->WQ_dq.DWQ_uiCount++;//释放同步信号,触发等待唤醒链表检查API_SemaphoreBPost(pwq->WQ_dq.DWQ_ulSem);API_SemaphoreMPost(pwq->WQ_dq.DWQ_ulLock);return  (ERROR_NONE);
}

获取当前队列中作业数量

/********************************************************************************************************* ** 函数名称: __wqDStatus ** 功能描述: 获取工作队列状态 ** 输 入 : pwq 工作队列控制块 ** pulCount 当前队列中作业数量 ** 输 出 : NONE *********************************************************************************************************/
static VOID  __wqDStatus (PLW_WORK_QUEUE  pwq, UINT  *puiCount)
{
    *puiCount = pwq->WQ_dq.DWQ_uiCount;
}

执行一次工作队列,需要在一个线程的死循环中调用

/********************************************************************************************************* ** 函数名称: __wqDExec ** 功能描述: 执行一次工作队列 ** 输 入 : pwq 工作队列控制块 ** 输 出 : NONE *********************************************************************************************************/
static VOID  __wqDExec (PLW_WORK_QUEUE  pwq)
{
    UINT                   i;PLW_CLASS_WAKEUP_NODE  pwun;LW_WORK_DNODE         *pwdn;VOIDFUNCPTR            pfunc;PVOID                  pvArg[LW_JOB_ARGS];UINT64                 u64Now;ULONG                  ulCounter;LW_CLASS_WAKEUP       *pwakeup = &pwq->WQ_dq.DWQ_wakeup;//等待同步信号,通过超时设置保证该函数能周期运行API_SemaphoreBPend(pwq->WQ_dq.DWQ_ulSem, pwq->WQ_dq.DWQ_ulScanPeriod);API_SemaphoreMPend(pwq->WQ_dq.DWQ_ulLock, LW_OPTION_WAIT_INFINITE);//获取当前心跳计数,u64Now = API_TimeGet64();//获取并更新据上次轮询已走过的时间ulCounter = (ULONG)(u64Now - pwq->WQ_dq.DWQ_u64Time);pwq->WQ_dq.DWQ_u64Time = u64Now;//获取队列首项pwakeup->WU_plineOp = pwakeup->WU_plineHeader;//对队列首项进行循环访问,知道队列为空,或首项计时未到而退出while (pwakeup->WU_plineOp) {
        //获取等待唤醒对象pwun = _LIST_ENTRY(pwakeup->WU_plineOp, LW_CLASS_WAKEUP_NODE, WUN_lineManage);  //首项等待时间大于已运行时间,则更新首项等待时间并退出if (pwun->WUN_ulCounter > ulCounter) {
        pwun->WUN_ulCounter -= ulCounter; break;  } else {
      //首项计时已到,处理首项并从队列中删除 //更新已运行时间ulCounter -= pwun->WUN_ulCounter; pwun->WUN_ulCounter = 0;//获取等待节点对应的工作节点对象pwdn = _LIST_ENTRY(pwun, LW_WORK_DNODE, DWQN_wun);//将等待节点从等待队列中删除_WakeupDel(&pwq->WQ_dq.DWQ_wakeup, pwun, LW_FALSE);pwq->WQ_dq.DWQ_uiCount--;//备份工作节点回调函数和回调函数参数,因为函数回调时已被回收,且已可能被再次分配,所以要先备份后再调用pfunc = pwdn->DWQN_pfunc;for (i = 0; i < LW_JOB_ARGS; i++) {
    pvArg[i] = pwdn->DWQN_pvArg[i];}//释放工作节点,即将使用后的工作节点放回空闲节点池_list_mono_free(&pwq->WQ_dq.DWQ_pmonoPool, &pwdn->DWQN_monoList);API_SemaphoreMPost(pwq->WQ_dq.DWQ_ulLock);//回调工作节点操作,DWQ_ulLock只保护节点操作不保护回调操作,所以回调前要先解锁再上锁if (pfunc) {
    pfunc(pvArg[0], pvArg[1], pvArg[2], pvArg[3], pvArg[4], pvArg[5]);}API_SemaphoreMPend(pwq->WQ_dq.DWQ_ulLock, LW_OPTION_WAIT_INFINITE);}//获取新的工作节点首项pwakeup->WU_plineOp = _list_line_get_next(pwakeup->WU_plineOp); }API_SemaphoreMPost(pwq->WQ_dq.DWQ_ulLock);
}