612 lines
18 KiB
C++

//----------------------------------------------------------------------------
//
// Copyright (C) Intel Corporation, 2006 - 2007.
//
// File: TunnelHandler.h
//
// Contents: Handles tunnel connection
//
// Notes:
//----------------------------------------------------------------------------
#include "TunnelHandler.h"
#include "TunnelConsumer.h"
#include "TunnelSupplier.h"
#include "global.h"
#include "TunnelManager.h"
#define MPS_MAX_CHANNELS 12
// Constructor
AMT_Tunnel_Handler::AMT_Tunnel_Handler(void):
channel_mgr_(*getMaxChannels()),
tunnel_state_(SESSION_INIT),
_notification_strategy(NULL,NULL,ACE_Event_Handler::NULL_MASK),
flg_mask_ (ACE_Event_Handler::NULL_MASK),
_identifier("UNKNOWN"),
_dispatch_output_counter(0),
_active_counter(0),
consumer_(NULL),
supplier_(NULL),
AMT_ID_(),
_tcp_forward_connections()
{
}
AMT_Tunnel_Handler::~AMT_Tunnel_Handler(void)
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT("--->AMT_Tunnel_Handler dtor\n")));
TUNNEL_COUNTER.Decrement();
peer().close();
}
// Receive and process incoming APF message.
int AMT_Tunnel_Handler::handle_input (ACE_HANDLE)
{
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Handler::handle_input"));
ACE_DEBUG((MY_TRACE
ACE_TEXT ("AMT_Tunnel_Handler::handle_input - (%x)\n"), this));
// if someone else is operating on the disconnect mutex than wait
ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
lock,
_mutex_disconnect,
STATUS_LOCK_FAILURE);
if (this->supplier() == NULL)
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT("AMT_Tunnel_Handler::EXIT handle_input with error code\n\t supplier is NULL\n")));
return -1;
}
STATUS res = this->supplier()->handle_input();
switch (res)
{
case STATUS_SUCCESS:
return 0;
case STATUS_GENERAL_ERROR:
ACE_DEBUG((MY_ERROR
ACE_TEXT("[%s] Some general error occurred while processing APF request\n"),identifier()));
return 0;
case STATUS_RECOVER_ERROR:
case STATUS_NETWORK_ERROR:
case STATUS_FATAL_ERROR:
ACE_ERROR_RETURN ((MY_INFO
ACE_TEXT("[%s] Intel remote client has shutdown unexpectedly\n"),identifier()),
-1);
case STATUS_CONNECTION_CLOSED:
ACE_DEBUG ((MY_INFO
ACE_TEXT("[%s] Intel remote client has disconnected\n"),identifier()));
return -1;
default:
ACE_ERROR_RETURN ((MY_DEBUG
"ILLEGAL reply status in handle input: %d\n"), res
-1);
}
}
// Send APF messages to AMT.
int AMT_Tunnel_Handler::handle_output (ACE_HANDLE)
{
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Handler::handle_output"));
ACE_DEBUG((MY_TRACE
ACE_TEXT("AMT_Tunnel_Handler handle_output %x\n"), this));
int reply = 0;
if (needReturnWithoutWorking(&reply))
{
return reply;
}
STATUS res = STATUS_SUCCESS;
while (( ! this->msg_queue()->is_empty()) &&(reply != -1))
{
ACE_DEBUG((MY_TRACE
ACE_TEXT("AMT_Tunnel_Handler::handle_output inside while loop, mutex going to be taken\n")));
//try to aquire the mutex (on failure return 0)
TRY_ACQUIRE_GUARD( ACE_Recursive_Thread_Mutex ,
lock,
output_mutex_);
if (lock.locked () == 0)
{
_active_counter--;
ACE_DEBUG((MY_TRACE
ACE_TEXT ("AMT_Tunnel_Handler::handle_output- some other thread is active now. going to return 0 (%x)\n"), this));
return 0;
}
if (this->consumer() == NULL)
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT("AMT_Tunnel_Handler::EXIT handle_output:\n\tconsumer is NULL\n")));
reply = 0;
break;
}
ACE_DEBUG((MY_TRACE
ACE_TEXT("AMT_Tunnel_Handler::inside handle_output, mutex was taken!\n")));
res = this->consumer()->handle_output();
switch (res)
{
case STATUS_SUCCESS:
break;
case STATUS_FAILURE:
ACE_DEBUG((MY_DEBUG ACE_TEXT("failed to process APF message\n")));
reply = -1;
break;
case STATUS_NETWORK_ERROR:
ACE_DEBUG((MY_DEBUG
ACE_TEXT(" Intel(R) AMT peer has shutdown unexpectedly\n")));
reply = -1;
break;
case STATUS_CONNECTION_CLOSED:
ACE_DEBUG ((MY_DEBUG
ACE_TEXT("Intel(R) AMT peer disconnected\n")));
reply = -1;
break;
default:
ACE_DEBUG ((MY_DEBUG
ACE_TEXT("ILLEGAL reply status in handle output: %d\n")));
reply = -1;
break;
}
}
ACE_DEBUG((MY_TRACE
ACE_TEXT("AMT_Tunnel_Handler::handle_output, after loop\n")));
return processReturnVal(reply);
}
// handle close
int AMT_Tunnel_Handler::handle_close(ACE_HANDLE, ACE_Reactor_Mask close_mask)
{
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Handler::handle_close"));
ACE_DEBUG((MY_TRACE ACE_TEXT("AMT_Tunnel_Handler handle_close (%x)\n"), this));
//This mutex is taken to prevent scenario where handle_close on READ and handle_close on
// WRITE happens concurrently (they can both try to delete this)
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex ,
lock,
_close_mutex,
-1);
// Check if we received close on read mask
if (ACE_BIT_ENABLED (close_mask, ACE_Event_Handler::READ_MASK))
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT("AMT_Tunnel_Handler handle close on read\n")));
// Cancel timer on object (it can be canceled a couple of times, it doesn't matter).
this->reactor()->cancel_timer(this);
if (supplier() != NULL)
{
//if we enter here NOT due to disconnect message-
//we need try to inform AMT that we are closing connection
// (like when socket closed for READ)
ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex ,
lock,
_mutex_disconnect,
STATUS_LOCK_FAILURE);
if ((_disconnect_data._is_consumer_hold_disconnect == false) && (consumer() != NULL))
{
// notify the consumer to close itself
disconnect();
}
else
{
//the consumer already being notified about disconnection.
ACE_DEBUG((MY_TRACE
ACE_TEXT("TunnelConsumer already contain disconnect/hangup message\n")));
}
peer().close_reader();
supplier()->handle_close();
delete supplier_;
supplier_ = NULL;
}
else
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT("AMT_Tunnel_Handler handle close on read - supplier is null\n"), this));
}
}
// delete the consumer if write is closed
if (ACE_BIT_ENABLED(close_mask, ACE_Event_Handler::WRITE_MASK) ||
ACE_BIT_ENABLED (close_mask, ACE_Event_Handler::EXCEPT_MASK))
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT("AMT_Tunnel_Handler handle close on write %x\n"), this));
this->_notification_strategy.mask(ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL);
this->reactor()->purge_pending_notifications(this,ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL);
if ((consumer() != NULL))
{
consumer()->handle_close();
delete consumer_;
consumer_ = NULL;
}
}
if ((supplier() == NULL) && (consumer() == NULL))
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT("AMT_Tunnel_Handler BOTH supplier and consumer are NULL\n\tgoing to delete AMTtunnelHandler.........%x\n"), this));
lock.release();
this->shutdown_tunnel();
}
return 0;
}
// Upcall from the <ACE_Acceptor> that delegates
// control to our Connection_Handler.
int AMT_Tunnel_Handler::open(void *)
{
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Handler::open"));
if (!TUNNEL_COUNTER.IncrementCheckMax()){
ACE_DEBUG((MY_INFO ACE_TEXT("Connection closed. Maximum number of %d TLS tunnels exceeded.\n"),MAX_TUNNEL_COUNTER));
return -1;
}
// Create the supplier and consumer
ACE_NEW_RETURN(consumer_, AMT_Tunnel_Consumer(this),STATUS_MALLOC_FAILURE);
ACE_NEW_RETURN(supplier_, AMT_Tunnel_Supplier(this),STATUS_MALLOC_FAILURE);
// Set the queue size for outgoing messages
this->msg_queue()->high_water_mark(*getMaxQueueSize());
ACE_Reactor *TPReactor = ACE_Reactor::instance ();
this->reactor (TPReactor);
_notification_strategy.reactor(ACE_Reactor::instance());
_notification_strategy.event_handler(this);
_notification_strategy.mask(ACE_Event_Handler::WRITE_MASK);
this->msg_queue()->notification_strategy(&_notification_strategy);
if (initiate_io (ACE_Event_Handler::READ_MASK) != STATUS_SUCCESS)
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT("AMTTunnelHandler failed to initiate_io with READ_MASK\n")));
return -1;
}
const bool* needAuthenticationPtr = getAmtNeedAuthentication();
if (needAuthenticationPtr == NULL) {
ACE_DEBUG((MY_DEBUG
ACE_TEXT("AMTTunnelHandler failed - missing NeedAuthentication element\n")));
return -1;
}
if (*needAuthenticationPtr)
{
// Start timeout counter here for authentication if needed.
// If there is no authentication, than timeout is not needed - This should be verified for MPS09
this->reactor()->schedule_timer(this, NULL,
ACE_Time_Value(TIMEOUT_DELAY), ACE_Time_Value(TIMEOUT_INTERVAL));
}
return 0;
}
// Schedule this handle for a new events (given in mask)
STATUS AMT_Tunnel_Handler::initiate_io (ACE_Reactor_Mask mask)
{
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Handler::initiate_io"));
if (ACE_BIT_ENABLED (flg_mask_, mask))
{
return STATUS_SUCCESS;
}
if (ACE_Reactor::instance()->register_handler(this, mask) == -1)
{
ACE_DEBUG((MY_DEBUG ACE_TEXT("schedule wakeup error\n")));
return STATUS_FATAL_ERROR;
}
ACE_SET_BITS (flg_mask_, mask);
return STATUS_SUCCESS;
}
// Cancel registration of this handle to events (given in mask)
STATUS AMT_Tunnel_Handler::cancel_io (ACE_Reactor_Mask mask)
{
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Handler::cancel_io"));
if (ACE_BIT_DISABLED (flg_mask_, mask))
return STATUS_SUCCESS;
//if (ACE_Reactor::instance ()->remove_handler(this, mask) == -1)
// return STATUS_FATAL_ERROR;
if (ACE_Reactor::instance ()->cancel_wakeup (this, mask) == -1)
return STATUS_FATAL_ERROR;
ACE_CLR_BITS (flg_mask_, mask);
return STATUS_SUCCESS;
}
STATUS AMT_Tunnel_Handler::terminate_io (ACE_Reactor_Mask mask)
{
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Handler::cancel_io"));
if (ACE_BIT_DISABLED (flg_mask_, mask))
return STATUS_SUCCESS;
if (ACE_Reactor::instance ()->remove_handler (this, mask) == -1)
return STATUS_FATAL_ERROR;
ACE_CLR_BITS (flg_mask_, mask);
return STATUS_SUCCESS;
}
// Delete this handler
void AMT_Tunnel_Handler::shutdown_tunnel()
{
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Handler::shutdown"));
ACE_Reactor * TPReactor = ACE_Reactor::instance ();
TPReactor->remove_handler (this,
ACE_Event_Handler::ALL_EVENTS_MASK |
ACE_Event_Handler::DONT_CALL); // Don't call handle_close
this->msg_queue_->flush();
this->reactor(0);
this->destroy();
}
int AMT_Tunnel_Handler::processReturnVal(int result)
{
if (result == -1)
{
ACE_DEBUG((MY_TRACE ACE_TEXT("AMT_Tunnel_Handler::processReturnVal - going to return -1\n")));
return -1;
}
//else:
//maybe the last dispatcher was called during this thread working
//(because this thread was at the middle of working, the last dispatcher couldn't return -1)
//so this thread MUST return -1 (otherwise we will have memory leak...)
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex ,
lock2,
_dispatch_output_counter_mutex,
STATUS_LOCK_FAILURE);
//going to work on this thread
ACE_READ_GUARD_RETURN(ACE_Recursive_Thread_Mutex ,
lock,
_active_mutex,
STATUS_LOCK_FAILURE);
ACE_READ_GUARD_RETURN( ACE_RW_Thread_Mutex ,
state_lock,
_state_mutex,
STATUS_LOCK_FAILURE);
//update active flag (since we finish working)
_active_counter--;
if ((_dispatch_output_counter == 0) && (tunnelState() == DISCONNECTED) && (_active_counter ==0))
{
ACE_DEBUG((MY_TRACE ACE_TEXT("Tcp_Consumer::handle_output - last dispatcher in DISCONNECTED state. going to return -1\n")));
return -1;
}
ACE_DEBUG((MY_TRACE ACE_TEXT("AMT_Tunnel_Handler::processReturnVal - going to return 0\n")));
return 0;
}
//-------------------------------------------------------
// Handle tunnel timeout event:
// Once a connection is created we set a timer to make
// sure authentication is performed within this period.
// In case AMT didn't perform authentication in this time we will
// close the connection.
//
//-------------------------------------------------------
int AMT_Tunnel_Handler::handle_timeout(const ACE_Time_Value &current_time, const void * arg)
{
ACE_DEBUG((MY_INFO ACE_TEXT ("Timeout while trying to establish a tunnel. Disconnecting from Intel Remote Client %s.\n"), identifier()));
ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
lock,
_mutex_disconnect,
STATUS_LOCK_FAILURE);
_disconnect_data.setParams(APF_DISCONNECT_CONNECTION_TIMED_OUT, true, PRIO_LOW);
reactor()->cancel_timer(this);
// removed the timeout handler, do not want handle_close to be called
reactor()->remove_handler(this, ACE_Event_Handler::TIMER_MASK | ACE_Event_Handler::DONT_CALL);
// Remove read mask - this will cause handle_close on read mask.
reactor()->remove_handler(this, ACE_Event_Handler::READ_MASK);
return 0;
}
//-------------------------------------------------------
// return true on the following cases:
// (1) some other thread is currently handling this object message's
// (so it will handle all messages currently in the queue)
// (2) the object is in DISCONNECT state, but we are NOT the last dispatcher
// (it can happen when some earlier thread handled several of messages,
// so we don't have anything to do)
//-------------------------------------------------------
bool AMT_Tunnel_Handler::needReturnWithoutWorking(int* reply)
{
//update counter:
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex ,
lock2,
_dispatch_output_counter_mutex,
false);
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex ,
lock,
_active_mutex,
false);
_dispatch_output_counter--;
//check if some other thread handles output data of this object:
if (_active_counter > 0)
{
ACE_DEBUG((MY_TRACE ACE_TEXT("Tunnel_Consumer::handle_output - active counter is %x state=%d counter=%d\n going to return 0\n"),
_active_counter,
tunnelState(),
_dispatch_output_counter));
}
//else
//check if this is the last dispatcher in disconnect state:
ACE_READ_GUARD_RETURN( ACE_RW_Thread_Mutex ,
lock3,
_state_mutex,
false);
if ((_dispatch_output_counter == 0) && (tunnelState() == DISCONNECTED) && (_active_counter == 0))
{
ACE_DEBUG((MY_TRACE ACE_TEXT("Tunnel_Consumer::handle_output - last dispatcher in DISCONNECTED state. going to return -1\n")));
*reply = -1;
return true;
}
//else:
//update active counter
_active_counter++;
ACE_DEBUG((MY_TRACE ACE_TEXT("Tunnel_Consumer::handle_output - not the last dispatcher. tcp=[%x] state=%d counter=%d\n"),
this,
tunnelState(),
_dispatch_output_counter));
return false;
}
//-------------------------------------------------------
// NOTE:
// you must take TunnelMgr mutex on WRITE, BEFORE calling this function
//-------------------------------------------------------
void AMT_Tunnel_Handler::removeTcpForwardConnection(Port_Address& addr)
{
vector<Port_Address>::iterator iter;
for(iter =_tcp_forward_connections.begin();
iter!=_tcp_forward_connections.end();
iter++)
{
if((Port_Address)(*iter) == addr)
{
_tcp_forward_connections.erase(iter);
break;
}
}
Tunnel_Manager::instance().delete_tunnel(addr.fqdn, addr.port);
}
//-------------------------------------------------------
// Remove the references to the open ports of this connection
// from the tunnel manager map
// NOTE:
// you must take TunnelMgr mutex on WRITE, BEFORE calling this function
//-------------------------------------------------------
void AMT_Tunnel_Handler::removeFromTunnelMgr()
{
for(vector<Port_Address>::iterator iter = _tcp_forward_connections.begin();
iter!= _tcp_forward_connections.end();
iter++)
{
Tunnel_Manager::instance().delete_tunnel(iter->fqdn, iter->port);
}
}
//-----------------------------------------------------------
// Disconnect this tunnel due to some error
//
//NOTES:
// (1) _mutex_disconnect MUST be taken before invoking this method!
// This function should be called only ONCE on each tunnel
// to make sure the consumer is still running.
// (2) the disconnect uses the params in _disconnect_data.
//-----------------------------------------------------------
STATUS AMT_Tunnel_Handler::disconnect()
{
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Handler::disconnect"));
ACE_DEBUG((MY_DEBUG
ACE_TEXT("AMT_Tunnel_Handler sending disconnect message to AMT\n")));
// set a flag to make sure this method is called only once
_disconnect_data._is_consumer_hold_disconnect = true;
peer().close_reader();
// remove this connection from tunnel manager
{
ACE_WRITE_GUARD_RETURN(ACE_RW_Thread_Mutex ,
tunnel_lock,
Tunnel_Manager::instance().guard(),
STATUS_LOCK_FAILURE);
// Remove ourself from the tunnel manager
removeFromTunnelMgr();
}
//send alerts to subscribers list:
for (unsigned int i=0; i < _tcp_forward_connections.size(); i++)
{
supplier()->sendAlertToSubscribMCList(mps__ConnectionStateTypeDefinition__DISCONNECTED,
_tcp_forward_connections[i].fqdn, _tcp_forward_connections[i].port);
}
{
// Get lock on channel manager
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex,
lock,
channel_mgr().guard(),
STATUS_FAILURE);
// notify all channels that we're closed
channel_mgr().close_all_channels();
}
if (_disconnect_data._is_APF_send)
{
// send APF disconnect message
APF_Disconnect apf_disconnect;
apf_disconnect.reason = _disconnect_data._disconnect_reason;
apf_disconnect.reserved = APF_ZERO_RESERVE;
if (consumer()->send_APF_message(apf_disconnect, _disconnect_data._hangup_prio) != STATUS_SUCCESS)
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT("AMT_Tunnel_Supplier failed sending disconnect message to AMT\n")));
}
else
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT("Disconnection from AMT - reason: %d\n"), apf_disconnect.reason));
}
}
else
{
// Put HANGUP message in the consumer queue - this will cause the
// consumer to start closing itself.
ACE_Message_Block* consumer_disconnect = new ACE_Message_Block(0, MB_DISCONNECT);
if ( consumer()->putq_wrapper (consumer_disconnect, _disconnect_data._hangup_prio) != STATUS_SUCCESS)
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT("AMT_Tunnel_Supplier failed sending consumer_disconnect message to AMT\n")));
}
}
return STATUS_CONNECTION_CLOSED;
}