159 lines
3.6 KiB
C++

//----------------------------------------------------------------------------
//
// Copyright (C) Intel Corporation, 2006 - 2007.
//
// File: MPSReactorTask.h
//
// Contents: MPS_Reactor_Task is the MPS task for the TP_Reactor thread pool.
//
// Notes:
//----------------------------------------------------------------------------
#ifndef _MPS_MY_TASK__H__
#define _MPS_MY_TASK__H__
//FW Declaration
class global;
/***************************************************************
// class MPS_Reactor_Task
//
// MPS_Reactor_Task plays a role for TP_Reactor threads pool
//
// MPS_Reactor_Task is the ACE_Task resposible for:
// 1. Creation and deletion of TP_Reactor and TP_Reactor thread pool
// 2. Running TP_Reactor event loop
//
/**************************************************************/
class MPS_Reactor_Task : public ACE_Task<ACE_MT_SYNCH>
{
public:
MPS_Reactor_Task (void): sem_ ((unsigned int) 0),
my_reactor_ (0) {}
virtual ~MPS_Reactor_Task () { /*stop ();*/ }
virtual int svc (void);
int start (int num_threads);
int stop (void);
private:
int create_reactor (void);
int delete_reactor (void);
ACE_SYNCH_RECURSIVE_MUTEX lock_;
ACE_Thread_Semaphore sem_;
ACE_Reactor *my_reactor_;
};
int
MPS_Reactor_Task::create_reactor (void)
{
ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
monitor,
this->lock_,
-1);
ACE_ASSERT (this->my_reactor_ == 0);
ACE_TP_Reactor * pImpl = 0;
ACE_NEW_RETURN (pImpl,ACE_TP_Reactor, -1);
ACE_NEW_RETURN (my_reactor_,
ACE_Reactor (pImpl ,1),
-1);
ACE_DEBUG ((MY_DEBUG ACE_TEXT ("Thread Pool Reactor created\n")));
ACE_Reactor::instance (this->my_reactor_);
this->reactor (my_reactor_);
return 0;
}
int
MPS_Reactor_Task::delete_reactor (void)
{
ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
monitor,
this->lock_,
-1);
ACE_DEBUG ((MY_DEBUG
ACE_TEXT ("Thread Pool Reactor deleted\n")));
delete this->my_reactor_;
ACE_Reactor::instance ((ACE_Reactor *) 0);
this->my_reactor_ = 0;
this->reactor (0);
return 0;
}
int
MPS_Reactor_Task::start (int num_threads)
{
if (this->create_reactor () == -1)
ACE_ERROR_RETURN ((MY_ERROR
ACE_TEXT ("%p.\n"),
ACE_TEXT ("Unable to create reactor")),
-1);
if (this->activate (THR_NEW_LWP, num_threads) == -1)
ACE_ERROR_RETURN ((MY_ERROR
ACE_TEXT ("%p.\n"),
ACE_TEXT ("Unable to activate thread pool")),
-1);
for (; num_threads > 0 ; num_threads--)
sem_.acquire ();
return 0;
}
int
MPS_Reactor_Task::stop (void)
{
if (this->my_reactor_ != 0)
{
ACE_DEBUG ((MY_DEBUG ACE_TEXT ("End TP_Reactor event loop\n")));
ACE_Reactor::instance()->end_reactor_event_loop ();
}
if (this->wait () == -1)
ACE_ERROR ((MY_ERROR
ACE_TEXT ("%p.\n"),
ACE_TEXT ("Unable to stop thread pool")));
if (this->delete_reactor () == -1)
ACE_ERROR ((MY_ERROR
ACE_TEXT ("%p.\n"),
ACE_TEXT ("Unable to delete reactor")));
return 0;
}
int
MPS_Reactor_Task::svc (void)
{
//ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("%I (%t) MPS_Reactor_Task started\n")));
// signal that we are ready
sem_.release (1);
while (ACE_Reactor::instance()->reactor_event_loop_done () == 0)
{
ACE_Reactor::instance()->run_reactor_event_loop ();
}
ACE_DEBUG ((MY_DEBUG ACE_TEXT ("MPS_Reactor_Task finished\n")));
return 0;
}
#endif