可伸缩多线程任务队列
  xx2YH4ad7R0N 2023年11月02日 57 0


在我们的工作中,我们经常需要异步执行一些任务,下面介绍的这个可伸缩多线程队列,可满足我们的需求。

  主要有以下几个功能:

    1、任务队列是多线程,许多任务可以异步进行,任务队列使用线程池来执行任务。

    2、任务队列支持优先级,优先级高的任务优先执行(即使是后来添加的)

    3、任务队列可以被暂停,但是用户还是可以添加任务,当任务队列被唤醒时,任务可以继续执行下去

    4、在运行过程中,任务队列使用的线程池,用户可以自行增加和减少

  大体框架主要由3个类构成

    1、CJob,任务类,用户需要从该类派生来实现自身需要完成的任务

    2、CJobExecuter,任务执行类,任务均由该类来调用执行,每一个类相当于对应一个线程

    3、CMThreadedJobQ,多线程任务队列,添加任务已经任务的分发均由该类完成,该类维护一个任务队列和一个完成队列的线程池。

  类图如下:


可伸缩多线程任务队列_优先级

  该例子中,CJobExecuter和CMThreadJobQ这两个类的调用关系是非常值得我们学习的,同时,CJob作为一个基类,子类派生可以实现不同的任务,可扩展性也不错。源代码解析如下:

  Job.h文件:



class CJob
{
public:
CJob();
virtual ~CJob();

BOOL m_Completed; //任务是否完成:TRUE 完成,FALSE 未完成
static long lastUsedID; //最后的ID

//================================================================================================
//函数名: setPriority
//函数描述: 设置任务优先级
//输入: [in] priority 优先级别
//输出: 无
//返回: 无
//================================================================================================
void setPriority(int priority);

//================================================================================================
//函数名: getPriority
//函数描述: 返回任务优先级
//输入: 无
//输出: 无
//返回: 任务优先级
//================================================================================================
int getPriority();

//================================================================================================
//函数名: getID
//函数描述: 返回任务ID
//输入: 无
//输出: 无
//返回: 任务ID
//================================================================================================
long getID();

//================================================================================================
//函数名: setAutoDelete
//函数描述: 设置完成任务后是否删除任务
//输入: [in] autoDeleteFlag
//输出: 无
//返回: 无
//================================================================================================
void setAutoDelete(BOOL autoDeleteFlag = TRUE);

//================================================================================================
//函数名: AutoDelete
//函数描述: 返回删除任务标记
//输入: 无
//输出: 无
//返回: 任务标记
//================================================================================================
BOOL AutoDelete();

//================================================================================================
//函数名: execute
//函数描述: 任务真正工作的函数,纯虚函数,需要子类化实现
//输入: 无
//输出: 无
//返回: 任务ID
//================================================================================================
virtual void execute() = 0;
private:
long m_ID; //任务ID
BOOL m_autoDeleteFlag; //是否自动删除任务标记,TRUE 删除,FALSE 不删除,默认为TRUE
int m_priority; //任务优先级,默认为5

};



  Job.cpp文件:



long CJob::lastUsedID = 0;

CJob::CJob()
{
this->m_ID = InterlockedIncrement(&lastUsedID);
this->m_autoDeleteFlag = TRUE;
this->m_priority = 5;
this->m_Completed= FALSE;
}

CJob::~CJob()
{
}

BOOL CJob::AutoDelete()
{
return m_autoDeleteFlag;
}

void CJob::setAutoDelete(BOOL autoDeleteFlag)
{
m_autoDeleteFlag = autoDeleteFlag;
}

long CJob::getID()
{
return this->m_ID;
}

int CJob::getPriority()
{
return this->m_priority;
}

void CJob::setPriority(int priority)
{
this->m_priority = priority;
}



  JobExecuter.h文件:



//一个对象对应一个线程,执行任务Job
class CJobExecuter
{
public:
CJobExecuter(CMThreadedJobQ *pJobQ);
virtual ~CJobExecuter();

//================================================================================================
//函数名: stop
//函数描述: 停止执行任务
//输入: 无
//输出: 无
//返回: 无
//================================================================================================
void stop();

//================================================================================================
//函数名: execute
//函数描述: 执行一个任务
//输入: [in] pJob 任务指针
//输出: 无
//返回: 无
//================================================================================================
void execute(CJob* pJob);

static UINT ThreadFunction(LPVOID pParam); //线程函数

CMThreadedJobQ* m_pJobQ; //指向线程任务队列指针
CJob* m_pJob2Do; //指向正在执行任务的指针
int m_flag; //线程执行标记
CWinThread* m_pExecuterThread; //线程标识符
};



  JobExecuter.cpp文件:



#define STOP_WORKING -1
#define KEEP_WORKING 0

CJobExecuter::CJobExecuter(CMThreadedJobQ *pJobQ)
{
this->m_pJobQ= pJobQ;
this->m_pExecuterThread= AfxBeginThread(ThreadFunction,this);
this->m_pJob2Do = NULL;
this->m_flag = KEEP_WORKING;
}

CJobExecuter::~CJobExecuter()
{
if(this->m_pExecuterThread!= NULL )
{
this->m_pExecuterThread->ExitInstance();
delete m_pExecuterThread;
}
}

UINT CJobExecuter::ThreadFunction(LPVOID pParam)
{
CJobExecuter *pExecuter = (CJobExecuter *)pParam;
pExecuter->m_flag = 1;
::Sleep(1);
CSingleLock singleLock(&pExecuter->m_pJobQ->m_cs);
while(pExecuter->m_flag !=STOP_WORKING )
{
if(pExecuter->m_pJob2Do!= NULL)
{
pExecuter->m_pJob2Do->execute();
pExecuter->m_pJob2Do->m_Completed = TRUE;
if(pExecuter->m_pJob2Do->AutoDelete())
delete pExecuter->m_pJob2Do;
pExecuter->m_pJob2Do = NULL;
}

if(pExecuter->m_pJobQ == NULL) break;

CSingleLock singleLock(&pExecuter->m_pJobQ->m_cs);
singleLock.Lock();
if(pExecuter->m_pJobQ->getNoOfExecuter() > pExecuter->m_pJobQ->getMaxNoOfExecuter()) //CJobExecuter个数大于最大值,自动销毁
{
pExecuter->stop();
singleLock.Unlock();
}
else
{
pExecuter->m_pJobQ->addFreeJobExecuter(pExecuter); //完成任务后,添加到CMThreadedJobQ的空闲队列中
singleLock.Unlock();
pExecuter->m_pJobQ->m_pObserverThread->ResumeThread();
pExecuter->m_pExecuterThread->SuspendThread();
}
}

if(pExecuter->m_pJobQ != NULL)
{
pExecuter->m_pJobQ->deleteJobExecuter(pExecuter);
}
else
{
delete pExecuter;
}

return 0;
}

void CJobExecuter::execute(CJob* pJob)
{
this->m_pJob2Do = pJob;
::Sleep(0);
this->m_pExecuterThread->ResumeThread();
}

void CJobExecuter::stop()
{
this->m_flag = STOP_WORKING;
this->m_pExecuterThread->ResumeThread();
}



  MThreadedJobQ.h文件:



typedef CTypedPtrList< CPtrList ,CJob*>CJobQList;

//线程池任务队列
class CMThreadedJobQ
{

public:
typedef struct THNODE
{
CJobExecuter* pExecuter;
THNODE * pNext ;
} THNODE;

CMThreadedJobQ();
virtual ~CMThreadedJobQ();

//================================================================================================
//函数名: deleteJobExecuter
//函数描述: 删除一个JobExecuter对象
//输入: [in] pEx
//输出: 无
//返回: 无
//================================================================================================
void deleteJobExecuter(CJobExecuter *pEx);

//================================================================================================
//函数名: setMaxNoOfExecuter
//函数描述: 设置CJobExecuter的个数
//输入: [in] value
//输出: 无
//返回: 无
//================================================================================================
void setMaxNoOfExecuter(int value);

//================================================================================================
//函数名: addJobExecuter
//函数描述: 添加一个CJobExecuter
//输入: [in] pEx
//输出: 无
//返回: 无
//================================================================================================
void addJobExecuter(CJobExecuter *pEx);

//================================================================================================
//函数名: getJobExecuter
//函数描述: 返回一个CJobExecuter
//输入: 无
//输出: 无
//返回: 处理任务的指针
//================================================================================================
CJobExecuter* getJobExecuter();

//================================================================================================
//函数名: addFreeJobExecuter
//函数描述: 添加一个CJobExecuter
//输入: [in] pEx
//输出: 无
//返回: 无
//================================================================================================
void addFreeJobExecuter(CJobExecuter *pEx);

//================================================================================================
//函数名: addJob
//函数描述: 添加一个任务
//输入: [in] pJob
//输出: 无
//返回: 无
//================================================================================================
void addJob(CJob *pJob);

//================================================================================================
//函数名: getMaxNoOfExecuter
//函数描述: 获取CJobExecuter个数的最大值
//输入: 无
//输出: 无
//返回: 无
//================================================================================================
int getMaxNoOfExecuter();

//================================================================================================
//函数名: getNoOfExecuter
//函数描述: 获取当前CJobExecuter的个数
//输入: 无
//输出: 无
//返回: 无
//================================================================================================
int getNoOfExecuter();

static UINT JobObserverThreadFunction(LPVOID);

//================================================================================================
//函数名: pause
//函数描述: 挂起JobObserverThread线程
//输入: 无
//输出: 无
//返回: 无
//================================================================================================
void pause();

//================================================================================================
//函数名: resume
//函数描述: 唤醒JobObserverThread线程
//输入: 无
//输出: 无
//返回: 无
//================================================================================================
void resume();

CWinThread* m_pObserverThread; //向空闲的executer线程添加任务的线程
CCriticalSection m_cs; //关键代码段,用于互斥
CJobQList m_jobQList; //任务队列
private :
BOOL m_pause; //JobObserverThread线程运行标记
int m_MaxNoOfExecuter; //CJobExecuter最大个数
int m_NoOfExecuter; //当前CJobExecuter个数
THNODE* m_pFreeEList; //维护空闲处理任务线程的队列
THNODE* m_pAllEList; //维护所有处理任务线程的队列
};



  MThreadedJobQ.cpp文件:



CMThreadedJobQ::CMThreadedJobQ()
{
m_MaxNoOfExecuter = 2;
m_pause = FALSE;
m_pObserverThread = AfxBeginThread(JobObserverThreadFunction,this);
m_pFreeEList =NULL;
m_NoOfExecuter =0;
m_pAllEList = NULL;
}

CMThreadedJobQ::~CMThreadedJobQ()
{
THNODE* pTempNode;
while (m_pAllEList != NULL)
{
pTempNode = m_pAllEList->pNext;
delete m_pAllEList->pExecuter;
delete m_pAllEList;
m_pAllEList = pTempNode;
}

while (m_pFreeEList != NULL)
{ pTempNode = m_pFreeEList->pNext;
delete m_pFreeEList;
m_pFreeEList = pTempNode;
}

m_pObserverThread->ExitInstance();
delete m_pObserverThread;
}


void CMThreadedJobQ::pause()
{
this->m_pause = TRUE;
}

void CMThreadedJobQ::resume()
{
this->m_pause = FALSE;
this->m_pObserverThread->ResumeThread();
}

UINT CMThreadedJobQ::JobObserverThreadFunction(LPVOID pParam)
{
CMThreadedJobQ *pMTJQ = (CMThreadedJobQ *)pParam;
CJobExecuter *pJExecuter;

while(TRUE)
{
Sleep(100);
if(pMTJQ->m_pause != TRUE)
{
while(!pMTJQ->m_jobQList.IsEmpty() )
{
pJExecuter = pMTJQ->getJobExecuter();
if( pJExecuter!=NULL)
{
pMTJQ->m_cs.Lock();
pJExecuter->execute(pMTJQ->m_jobQList.GetHead());
pMTJQ->m_jobQList.RemoveHead();
AfxGetApp()->m_pMainWnd->PostMessage(REFRESH_LIST);
pMTJQ->m_cs.Unlock();
}
else
{
break;
}
if(pMTJQ->m_pause == TRUE)
break;
}
}
pMTJQ->m_pObserverThread->SuspendThread();
}
return 0;
}

int CMThreadedJobQ::getNoOfExecuter()
{
return this->m_NoOfExecuter;
}

int CMThreadedJobQ::getMaxNoOfExecuter()
{
return this->m_MaxNoOfExecuter;
}

void CMThreadedJobQ::addJob(CJob *pJob)
{
CJob * pTempJob;
CSingleLock sLock(&this->m_cs);
sLock.Lock();
POSITION pos,lastPos;
pos = this->m_jobQList.GetHeadPosition();
lastPos = pos;
if(pos != NULL)
pTempJob =this->m_jobQList.GetHead();
while(pos != NULL )
{
if( pJob->getPriority() > pTempJob->getPriority())
break;
lastPos = pos;
pTempJob = this->m_jobQList.GetNext(pos);
}
if(pos == NULL)
this->m_jobQList.AddTail(pJob);
else
this->m_jobQList.InsertBefore(lastPos,pJob);
this->m_pObserverThread->ResumeThread();
sLock.Unlock();
}

void CMThreadedJobQ::addFreeJobExecuter(CJobExecuter *pEx)
{
m_cs.Lock();
THNODE* node = new THNODE;
node->pExecuter = pEx;
node->pNext = this->m_pFreeEList;
this->m_pFreeEList = node;
m_cs.Unlock();
}

CJobExecuter* CMThreadedJobQ::getJobExecuter()
{
THNODE *pTemp;
CJobExecuter *pEx=NULL;
m_cs.Lock();

if(this->m_pFreeEList != NULL) //有空闲CJobExecuter,就返回
{
pTemp = this->m_pFreeEList;
this->m_pFreeEList = this->m_pFreeEList->pNext;
pEx = pTemp->pExecuter;
delete pTemp ;
m_cs.Unlock();
return pEx;
}

if(this->m_NoOfExecuter < this->m_MaxNoOfExecuter) //没有空闲CJobExecuter,并且当前CJobExecuter小于最大值,就生成一个新的CJobExecuter
{
pEx = new CJobExecuter(this);
this->addJobExecuter(pEx);
this->m_NoOfExecuter++;
m_cs.Unlock();
return pEx;
}
m_cs.Unlock();
return NULL;
}

void CMThreadedJobQ::addJobExecuter(CJobExecuter *pEx)
{
m_cs.Lock();
THNODE* node = new THNODE;
node->pExecuter= pEx;
node->pNext = this->m_pAllEList;
this->m_pAllEList = node;
m_cs.Unlock();
}

void CMThreadedJobQ::setMaxNoOfExecuter(int value)
{
this->m_cs.Lock();
if(value >1 && value <11)
this->m_MaxNoOfExecuter = value;
m_pObserverThread->ResumeThread();
this->m_cs.Unlock();
}

void CMThreadedJobQ::deleteJobExecuter(CJobExecuter *pEx)
{
THNODE* pNode,*pNodeP;
CSingleLock singleLock(&m_cs);
singleLock.Lock();
if(this->m_pAllEList != NULL)
{
pNode = this->m_pAllEList;
if(pNode->pExecuter == pEx )
{
this->m_pAllEList = pNode->pNext;
delete pNode;
}
else
{
pNodeP =pNode;
pNode = pNode->pNext ;
while(pNode != NULL )
{
if(pNode->pExecuter== pEx ) break;
pNodeP = pNode;
pNode = pNode->pNext ;
}
if(pNode!= NULL)
{
pNodeP->pNext = pNode->pNext;
delete pNode;
}
}
}
this->m_NoOfExecuter--;
singleLock.Unlock();
pEx->stop();
Sleep(1);
delete pEx;
}



  以上,就是该可伸缩多线程任务的主体框架,当我们工作需要实现类似这样的需要:异步执行多个不同的任务时,这个例子就是一个很好的参考例子,我研究这些代码只是为了让我在遇到这种问题的时候,可以有一个思路去思考,而不至于无从下手,仅此而已。

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  SosSVDJm4HSq   2023年11月02日   89   0   0 大小写html优先级
  EeGZtZT5Jsfk   2023年11月02日   43   0   0 redis2d用户组
xx2YH4ad7R0N