SylixOS系统工作队列内部其实分两部分,简单工作队列(simple work queue)和 延时工作队列(delay work queue)。
- 简单工作队列由__wqS打头的函数簇实现,本质是基于内核工作队列并结合了执行线程创建、同步保护等相关操作实现。
- 延时工作队列由__wqD打头的函数簇实现,本质和内核工作队无关,是基于单向链表和等待唤醒链表实现的。
这里的延时工作队列其实就是系统工作队列中具备延时时的实现,系统工作队列中还有简单工作队列的实现,为了便于讲解,这里只列出延时工作队列实现的代码。
类型定义
/*********************************************************************************************************带有延迟属性作业队列控制结构 *********************************************************************************************************/
typedef struct {
LW_LIST_MONO DWQN_monoList;LW_CLASS_WAKEUP_NODE DWQN_wun;VOIDFUNCPTR DWQN_pfunc;PVOID DWQN_pvArg[LW_JOB_ARGS];
} LW_WORK_DNODE;typedef struct {
LW_CLASS_WAKEUP DWQ_wakeup; /* 延迟作业队列 */PLW_LIST_MONO DWQ_pmonoPool; /* 节点池 */LW_WORK_DNODE *DWQ_pwdnPool;LW_OBJECT_HANDLE DWQ_ulLock; /* 操作锁 */LW_OBJECT_HANDLE DWQ_ulSem; /* 通知信号量 */UINT DWQ_uiCount; /* 等待的作业数量 */UINT64 DWQ_u64Time; /* 最后一次执行时间点 */ULONG DWQ_ulScanPeriod; /* 循环扫描周期 */
} LW_WORK_DQUEUE;
/*********************************************************************************************************作业队列控制结构 *********************************************************************************************************/
typedef struct {
LW_WORK_DQUEUE WQ_dq;LW_OBJECT_HANDLE WQ_ulTask; /* 服务线程 */BOOL WQ_bDelReq;
} LW_WORK_QUEUE;
typedef LW_WORK_QUEUE *PLW_WORK_QUEUE;
创建和删除一个延时工作队列
/********************************************************************************************************* ** 函数名称: __wqDCreate ** 功能描述: 创建一个带有延迟功能的工作队列 ** 输 入 : pwq 工作队列控制块 ** uiQSize 队列大小 ** ulScanPeriod 服务线程扫描周期 ** 输 出 : ERROR_CODE *********************************************************************************************************/
static PLW_WORK_QUEUE __wqDCreate (PLW_WORK_QUEUE pwq, UINT uiQSize, ULONG ulScanPeriod)
{
UINT i;LW_WORK_DNODE *pwdn;LW_CLASS_WAKEUP *pwakeup = &pwq->WQ_dq.DWQ_wakeup;//分配工作节点空间pwdn = (LW_WORK_DNODE *)__KHEAP_ALLOC(sizeof(LW_WORK_DNODE) * uiQSize);if (pwdn == LW_NULL) {
return (LW_NULL);}pwq->WQ_dq.DWQ_pwdnPool = pwdn;//初始化保护锁信号量pwq->WQ_dq.DWQ_ulLock = API_SemaphoreMCreate("wqd_lock", LW_PRIO_DEF_CEILING,LW_OPTION_WAIT_PRIORITY |LW_OPTION_INHERIT_PRIORITY | LW_OPTION_DELETE_SAFE | LW_OPTION_OBJECT_GLOBAL, LW_NULL);if (pwq->WQ_dq.DWQ_ulLock == LW_OBJECT_HANDLE_INVALID) {
__KHEAP_FREE(pwdn);return (LW_NULL);}//初始化资源同步信号量pwq->WQ_dq.DWQ_ulSem = API_SemaphoreBCreate("wqd_sem", LW_FALSE,LW_OPTION_OBJECT_GLOBAL, LW_NULL);if (pwq->WQ_dq.DWQ_ulSem == LW_OBJECT_HANDLE_INVALID) {
API_SemaphoreMDelete(&pwq->WQ_dq.DWQ_ulLock);__KHEAP_FREE(pwdn);return (LW_NULL);}//初始化所有工作节点for (i = 0; i < uiQSize; i++) {
_list_mono_free(&pwq->WQ_dq.DWQ_pmonoPool, &pwdn->DWQN_monoList);__WAKEUP_NODE_INIT(&pwdn->DWQN_wun);pwdn++;}//初始化时间参数pwq->WQ_dq.DWQ_u64Time = API_TimeGet64();pwq->WQ_dq.DWQ_ulScanPeriod = ulScanPeriod;//初始化等待唤醒链表控制块__WAKEUP_INIT(pwakeup, NULL, NULL);return (pwq);
}
/********************************************************************************************************* ** 函数名称: __wqDDelete ** 功能描述: 删除一个带有延迟功能的工作队列 ** 输 入 : pwq 工作队列控制块 ** 输 出 : NONE *********************************************************************************************************/
static VOID __wqDDelete (PLW_WORK_QUEUE pwq)
{
API_SemaphoreMPend(pwq->WQ_dq.DWQ_ulLock, LW_OPTION_WAIT_INFINITE);API_SemaphoreMDelete(&pwq->WQ_dq.DWQ_ulLock);API_SemaphoreBDelete(&pwq->WQ_dq.DWQ_ulSem);__KHEAP_FREE(pwq->WQ_dq.DWQ_pwdnPool);
}
清空一个工作队列
/********************************************************************************************************* ** 函数名称: __wqDFlush ** 功能描述: 清空一个带有延迟功能的工作队列 ** 输 入 : pwq 工作队列控制块 ** 输 出 : NONE *********************************************************************************************************/
static VOID __wqDFlush (PLW_WORK_QUEUE pwq)
{
PLW_CLASS_WAKEUP_NODE pwun;LW_WORK_DNODE *pwdn;//清除同步信号量API_SemaphoreBClear(pwq->WQ_dq.DWQ_ulSem);API_SemaphoreMPend(pwq->WQ_dq.DWQ_ulLock, LW_OPTION_WAIT_INFINITE);//释放所有工作节点while (pwq->WQ_dq.DWQ_wakeup.WU_plineHeader) {
//删除等待唤醒节点pwun = _LIST_ENTRY(pwq->WQ_dq.DWQ_wakeup.WU_plineHeader, LW_CLASS_WAKEUP_NODE, WUN_lineManage);_List_Line_Del(&pwun->WUN_lineManage, &pwq->WQ_dq.DWQ_wakeup.WU_plineHeader);//回收工作节点pwdn = _LIST_ENTRY(pwun, LW_WORK_DNODE, DWQN_wun);_list_mono_free(&pwq->WQ_dq.DWQ_pmonoPool, &pwdn->DWQN_monoList);}pwq->WQ_dq.DWQ_uiCount = 0;API_SemaphoreMPost(pwq->WQ_dq.DWQ_ulLock);
}
向延时工作队列插入一个工作项
/********************************************************************************************************* ** 函数名称: __wqDInsert ** 功能描述: 将一个工作插入到工作队列 ** 输 入 : pwq 工作队列控制块 ** ulDelay 最小延迟执行时间 ** pfunc 执行函数 ** pvArg0 ~ 5 执行参数 ** 输 出 : ERROR_CODE *********************************************************************************************************/
static ULONG __wqDInsert (PLW_WORK_QUEUE pwq, ULONG ulDelay,VOIDFUNCPTR pfunc, PVOID pvArg0,PVOID pvArg1,PVOID pvArg2,PVOID pvArg3,PVOID pvArg4,PVOID pvArg5)
{
PLW_LIST_MONO pmonoWDN;LW_WORK_DNODE *pwdn;API_SemaphoreMPend(pwq->WQ_dq.DWQ_ulLock, LW_OPTION_WAIT_INFINITE);//如果节点池为空则返回错误,否则从节点池中分配一个节点if (!pwq->WQ_dq.DWQ_pmonoPool) {
API_SemaphoreMPost(pwq->WQ_dq.DWQ_ulLock);_ErrorHandle(ENOSPC);return (ENOSPC);} else {
pmonoWDN = _list_mono_allocate(&pwq->WQ_dq.DWQ_pmonoPool);}//将回调工作保存到工作节点中pwdn = _LIST_ENTRY(pmonoWDN, LW_WORK_DNODE, DWQN_monoList);pwdn->DWQN_pfunc = pfunc;pwdn->DWQN_pvArg[0] = pvArg0;pwdn->DWQN_pvArg[1] = pvArg1;pwdn->DWQN_pvArg[2] = pvArg2;pwdn->DWQN_pvArg[3] = pvArg3;pwdn->DWQN_pvArg[4] = pvArg4;pwdn->DWQN_pvArg[5] = pvArg5;//将工作节点添加到等待唤醒链表中pwdn->DWQN_wun.WUN_ulCounter = ulDelay;_WakeupAdd(&pwq->WQ_dq.DWQ_wakeup, &pwdn->DWQN_wun, LW_FALSE);pwq->WQ_dq.DWQ_uiCount++;//释放同步信号,触发等待唤醒链表检查API_SemaphoreBPost(pwq->WQ_dq.DWQ_ulSem);API_SemaphoreMPost(pwq->WQ_dq.DWQ_ulLock);return (ERROR_NONE);
}
获取当前队列中作业数量
/********************************************************************************************************* ** 函数名称: __wqDStatus ** 功能描述: 获取工作队列状态 ** 输 入 : pwq 工作队列控制块 ** pulCount 当前队列中作业数量 ** 输 出 : NONE *********************************************************************************************************/
static VOID __wqDStatus (PLW_WORK_QUEUE pwq, UINT *puiCount)
{
*puiCount = pwq->WQ_dq.DWQ_uiCount;
}
执行一次工作队列,需要在一个线程的死循环中调用
/********************************************************************************************************* ** 函数名称: __wqDExec ** 功能描述: 执行一次工作队列 ** 输 入 : pwq 工作队列控制块 ** 输 出 : NONE *********************************************************************************************************/
static VOID __wqDExec (PLW_WORK_QUEUE pwq)
{
UINT i;PLW_CLASS_WAKEUP_NODE pwun;LW_WORK_DNODE *pwdn;VOIDFUNCPTR pfunc;PVOID pvArg[LW_JOB_ARGS];UINT64 u64Now;ULONG ulCounter;LW_CLASS_WAKEUP *pwakeup = &pwq->WQ_dq.DWQ_wakeup;//等待同步信号,通过超时设置保证该函数能周期运行API_SemaphoreBPend(pwq->WQ_dq.DWQ_ulSem, pwq->WQ_dq.DWQ_ulScanPeriod);API_SemaphoreMPend(pwq->WQ_dq.DWQ_ulLock, LW_OPTION_WAIT_INFINITE);//获取当前心跳计数,u64Now = API_TimeGet64();//获取并更新据上次轮询已走过的时间ulCounter = (ULONG)(u64Now - pwq->WQ_dq.DWQ_u64Time);pwq->WQ_dq.DWQ_u64Time = u64Now;//获取队列首项pwakeup->WU_plineOp = pwakeup->WU_plineHeader;//对队列首项进行循环访问,知道队列为空,或首项计时未到而退出while (pwakeup->WU_plineOp) {
//获取等待唤醒对象pwun = _LIST_ENTRY(pwakeup->WU_plineOp, LW_CLASS_WAKEUP_NODE, WUN_lineManage); //首项等待时间大于已运行时间,则更新首项等待时间并退出if (pwun->WUN_ulCounter > ulCounter) {
pwun->WUN_ulCounter -= ulCounter; break; } else {
//首项计时已到,处理首项并从队列中删除 //更新已运行时间ulCounter -= pwun->WUN_ulCounter; pwun->WUN_ulCounter = 0;//获取等待节点对应的工作节点对象pwdn = _LIST_ENTRY(pwun, LW_WORK_DNODE, DWQN_wun);//将等待节点从等待队列中删除_WakeupDel(&pwq->WQ_dq.DWQ_wakeup, pwun, LW_FALSE);pwq->WQ_dq.DWQ_uiCount--;//备份工作节点回调函数和回调函数参数,因为函数回调时已被回收,且已可能被再次分配,所以要先备份后再调用pfunc = pwdn->DWQN_pfunc;for (i = 0; i < LW_JOB_ARGS; i++) {
pvArg[i] = pwdn->DWQN_pvArg[i];}//释放工作节点,即将使用后的工作节点放回空闲节点池_list_mono_free(&pwq->WQ_dq.DWQ_pmonoPool, &pwdn->DWQN_monoList);API_SemaphoreMPost(pwq->WQ_dq.DWQ_ulLock);//回调工作节点操作,DWQ_ulLock只保护节点操作不保护回调操作,所以回调前要先解锁再上锁if (pfunc) {
pfunc(pvArg[0], pvArg[1], pvArg[2], pvArg[3], pvArg[4], pvArg[5]);}API_SemaphoreMPend(pwq->WQ_dq.DWQ_ulLock, LW_OPTION_WAIT_INFINITE);}//获取新的工作节点首项pwakeup->WU_plineOp = _list_line_get_next(pwakeup->WU_plineOp); }API_SemaphoreMPost(pwq->WQ_dq.DWQ_ulLock);
}