538 lines
13 KiB
C++

//----------------------------------------------------------------------------
//
// Copyright (C) Intel Corporation, 2006 - 2007.
//
// File: TcpSvcHandler.cpp
//
// Contents: Handles TCP connection
//
// Notes:
//----------------------------------------------------------------------------
#include "TcpSvcHandler.h"
#include "TcpSupplier.h"
#include "ChannelConsumer.h"
#include "TcpConsumer.h"
#include "SocksSvcHandler.h"
#include "TunnelManager.h"
//#include "MPS_common.h"
#include "global.h"
#include <ace/Log_Msg.h>
#include <ostream>
Tcp_Svc_Handler::~Tcp_Svc_Handler (void)
{
ACE_DEBUG((MY_DEBUG ACE_TEXT("---->Tcp_Svc_Handler dtor\n")));
TCP_COUNTER.Decrement();
peer().close();
}
int Tcp_Svc_Handler::open(void *)
{
ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::open"));
if (!TCP_COUNTER.IncrementCheckMax()){
ACE_DEBUG((MY_INFO ACE_TEXT("Connection closed. Maximum number of %d Management Console TCP connections exceeded.\n"),MAX_TCP_COUNTER));
return -1;
}
//create supplier/consumer
_consumer = createConsumer(this);
_supplier = createSupplier(this);
if (initConnection() == -1)
{
ACE_ERROR_RETURN ((MY_DEBUG
"%p\n",
"init connection failed. going to exit thread"),
-1);
}
_notification_strategy.reactor(ACE_Reactor::instance());
_notification_strategy.event_handler(this);
_notification_strategy.mask(ACE_Event_Handler::WRITE_MASK);
this->msg_queue()->notification_strategy(&_notification_strategy);
this->msg_queue()->high_water_mark(
size_t(*getMaxQueueSize() / *getMaxChannels()));
this->reactor (ACE_Reactor::instance ());
_flg_mask = ACE_Event_Handler::READ_MASK;
// Get the host name we are connected to
ACE_INET_Addr sender_addr;
peer().get_remote_addr(sender_addr);
char hoststr[MAX_HOST_NAME_LEN+1];
sender_addr.get_host_addr (hoststr, MAX_HOST_NAME_LEN+1);
_identifier = ACE_CString(hoststr);
ACE_DEBUG((MY_DEBUG ACE_TEXT("Open new connection with management console [%s]\n"),_identifier.c_str()));
//register for input handler:
if (ACE_Reactor::instance ()->register_handler
(this, _flg_mask) == -1)
ACE_ERROR_RETURN ((MY_DEBUG
"Tcp_Svc_Handler::open - failed to register to reactor for READ mask\n"),
-1);
return 0;
}
// Schedule this handle for a new events (given in mask)
STATUS Tcp_Svc_Handler::initiate_io(ACE_Reactor_Mask mask)
{
ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::initiate_io"));
ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
locker,
_flag_mutex,
STATUS_LOCK_FAILURE);
if (ACE_BIT_ENABLED (_flg_mask, mask))
{
return STATUS_SUCCESS;
}
if (this->peer().get_handle() != ACE_INVALID_HANDLE)
{
if (ACE_Reactor::instance ()->register_handler(this, mask) == -1)
{
ACE_DEBUG((MY_DEBUG ACE_TEXT("failed to initiate io\n")));
return STATUS_FATAL_ERROR;
}
}
else
{
ACE_DEBUG ((MY_DEBUG
ACE_TEXT( "Tcp_Svc_Handler - initiate_io with illegal handler\n")));
}
ACE_LOG_MSG->msg_ostream()->flush();
ACE_SET_BITS (_flg_mask, mask);
return STATUS_SUCCESS;
}
// Cancel registration of this handle to events (given in mask)
STATUS Tcp_Svc_Handler::cancel_io(ACE_Reactor_Mask mask)
{
ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::cancel_io"));
ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
locker,
_flag_mutex,
STATUS_LOCK_FAILURE);
if (ACE_BIT_DISABLED (_flg_mask, mask))
{
return STATUS_SUCCESS;
}
if (ACE_Reactor::instance ()->cancel_wakeup (this, mask) == -1)
return STATUS_FATAL_ERROR;
ACE_CLR_BITS (_flg_mask, mask);
return STATUS_SUCCESS;
}
int Tcp_Svc_Handler::handle_input (ACE_HANDLE h)
{
ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::handle_input"));
ACE_DEBUG((MY_TRACE
ACE_TEXT ("Tcp_Svc_Handler::handle_input - (%x)\n"), this));
if (_supplier == NULL)
{
ACE_DEBUG((MY_ERROR
ACE_TEXT("[%s] TCP SUPPLIER = NULL!!!!\n"),identifier()));
return -1;
}
STATUS rep = _supplier->handle_input(h);
switch (rep)
{
case STATUS_SUCCESS:
return 0;
case STATUS_FATAL_ERROR:
ACE_ERROR_RETURN ((MY_INFO
ACE_TEXT("[%s] management console has shutdown unexpectedly\n"),
identifier()),
-1);
case STATUS_NETWORK_ERROR:
case STATUS_CONNECTION_CLOSED:
ACE_DEBUG ((MY_INFO
ACE_TEXT("[%s] management console has shutdown\n"),identifier()));
return -1;
default:
ACE_ERROR_RETURN ((MY_DEBUG
ACE_TEXT("reply status from handle_input()= %d\n"),rep),
0);
}
}
int Tcp_Svc_Handler::handle_output(ACE_HANDLE h)
{
ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::handle_output\n"));
ACE_DEBUG((MY_TRACE
ACE_TEXT ("Tcp_Svc_Handler::handle_output (%x)\n"), this));
int reply = 0;
if (needReturnWithoutWorking(&reply))
{
return reply;
}
while ((! this->msg_queue()->is_empty()) &&(reply != -1))
{
//try to aquire the mutex (on failure return 0)
TRY_ACQUIRE_GUARD( ACE_Recursive_Thread_Mutex ,
lock,
_output_mutex);
if (lock.locked () == 0)
{
_active_counter--;
ACE_DEBUG((MY_TRACE
ACE_TEXT ("some other is active now- going to exit with return value 0 (%x)\n"), this));
return 0;
}
//sanity check:
if (_consumer == NULL)
{
reply = -1;
break;
}
ACE_LOG_MSG->msg_ostream()->flush();
STATUS rep = _consumer->handle_output(h);
switch (rep)
{
case STATUS_SUCCESS:
break ;
case STATUS_GENERAL_ERROR:
case STATUS_FAILURE:
case STATUS_BAD_REQUEST:
case STATUS_MALLOC_FAILURE:
case STATUS_ILLEGAL_PARAMS:
case STATUS_QUEUE_TIMEOUT:
case STATUS_QUEUE_FULL:
ACE_DEBUG((MY_ERROR
ACE_TEXT("[%s] Some general error occured while processing data from Intel client to MC\n"),identifier()));
break;
case STATUS_CONNECTION_CLOSED:
//in this states tunnel handler already unbind the connection with tcp_Consumer
ACE_DEBUG((MY_DEBUG
ACE_TEXT("[%s] Intel remote client closed the connection\n"),identifier()));
reply = -1; //will cause handle_close to be called for write mask.
break;
case STATUS_NETWORK_ERROR:
case STATUS_FATAL_ERROR:
case STATUS_QUEUE_DEACTIVATED:
ACE_DEBUG ((MY_DEBUG
ACE_TEXT("[%s] Fatal error occured while handling outgoing data to MC - setting state to disconnect\n"),identifier()));
{
ACE_WRITE_GUARD_RETURN( ACE_RW_Thread_Mutex ,
lock,
_state_mutex,
STATUS_LOCK_FAILURE);
state(DISCONNECTING);
}
reply= 0;
break;
//sanity check:
default:
ACE_DEBUG ((MY_DEBUG
ACE_TEXT("ILLEGAL reply status from handle_output()\n")));
reply= 0;
break;
}
}
return processReturnVal(reply);
}
int Tcp_Svc_Handler::processReturnVal(int result)
{
if (result == -1)
return -1;
//else:
//maybe the last dispacher was called during this thead working
//(because this thread was at the middle of working, the last dispacher couldn't return -1)
//so this thread MUST return -1 (otherwise we will have memory leak...)
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex ,
dispatch_output_lock,
_dispatch_output_counter_mutex,
STATUS_LOCK_FAILURE);
//going to work on this thread
ACE_READ_GUARD_RETURN(ACE_Recursive_Thread_Mutex ,
active_lock,
_active_mutex,
STATUS_LOCK_FAILURE);
ACE_READ_GUARD_RETURN( ACE_RW_Thread_Mutex ,
state_lock,
_state_mutex,
STATUS_LOCK_FAILURE);
//update active flag (since we finish working)
_active_counter--;
if ((_dispatch_output_counter == 0) && (state() == DISCONNECTED)&& (_active_counter == 0))
{
ACE_DEBUG((MY_TRACE ACE_TEXT("Tcp_Consumer::handle_output - last dispatcher in DISCONNECTED state. going to return -1\n")));
return -1;
}
ACE_DEBUG((MY_TRACE ACE_TEXT("Tcp_Consumer::processReturnVal - going to return 0\n")));
return 0;
}
int Tcp_Svc_Handler::handle_close (ACE_HANDLE h , ACE_Reactor_Mask close_mask)
{
ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::handle_close"));
ACE_DEBUG((MY_DEBUG ACE_TEXT("Tcp_Svc_Handler::handle_close on handle:%d\n"),_unique_id));
//This mutex is taken to prevent scenario where handle_close on READ and handle_close on
// WRITE happends concurrently (they can both try to delete this)
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex ,
lock,
_close_mutex,
-1);
//read closed:
//or MC closed the socket, or we had error hanle input data from socket:
if ((ACE_BIT_ENABLED (close_mask, ACE_Event_Handler::READ_MASK)) &&
(_supplier != NULL))
{
closeSupplier();
}
// close on write mask
if (ACE_BIT_ENABLED (close_mask, ACE_Event_Handler::WRITE_MASK) ||
ACE_BIT_ENABLED (close_mask, ACE_Event_Handler::EXCEPT_MASK))
{
this->_notification_strategy.mask(ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL);
this->reactor()->purge_pending_notifications(this,ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL);
if(_consumer != NULL)
{
closeConsumer();
}
}
if ((this->_supplier == NULL) && (this->_consumer == NULL))
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT("Tcp_Svc_Handler BOTH supplier and consumer are NULL\n\tgoing to delete Tcp_Svc_Handler.........%x\n"), this));
lock.release();
this->shutdown_tcp_svc();
}
return 0;
}
void Tcp_Svc_Handler::closeSupplier()
{
ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::closeSupplier"));
if (_supplier != NULL)
{
_supplier->handle_close();
delete _supplier;
_supplier = NULL;
}
peer().close_reader();
}
void Tcp_Svc_Handler::closeConsumer()
{
ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::closeConsumer"));
peer().close_writer();
_consumer->handle_close();
delete _consumer;
_consumer = NULL;
}
void Tcp_Svc_Handler::state(Tcp_Svc_Handler::ConnectState s)
{
_tcp_state = s;
}
// Return the current state of the Proxy.
Tcp_Svc_Handler::ConnectState Tcp_Svc_Handler::state(void) const
{
return _tcp_state;
}
const ACE_INET_Addr &
Tcp_Svc_Handler::remote_addr (void) const
{
return this->_remote_addr;
}
void Tcp_Svc_Handler::remote_addr(ACE_INET_Addr &ra)
{
this->_remote_addr = ra;
}
const ACE_INET_Addr & Tcp_Svc_Handler::local_addr (void) const
{
return this->_local_addr;
}
void Tcp_Svc_Handler::local_addr (ACE_INET_Addr &la)
{
this->_local_addr = la;
}
STATUS Tcp_Svc_Handler::initConnection()
{
ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::initConnection"));
bool is_connected = true;
ACE_CString local, remote;
getStringFullAddr(_local_addr, local);
getStringFullAddr(_remote_addr, remote);
ACE_DEBUG((MY_DEBUG
ACE_TEXT ("INIT tcp connection\n\tamt address = %s\n\tMC addr = %s\n connection state = %b"),
local.c_str(),
remote.c_str(),
is_connected));
ACE_UINT32 res = 0;
res = _supplier->channelOpenRep(_local_addr, true, _tunnel_handler, _amt_win_size, _tunnel_consumer);
if ((res != STATUS_SUCCESS) || (!is_connected))
{
return STATUS_FAILURE;
}
return STATUS_SUCCESS;
}
Tcp_Supplier* Tcp_Svc_Handler::createSupplier(Tcp_Svc_Handler* h)
{
return new Tcp_Supplier(h);
}
Tcp_Consumer* Tcp_Svc_Handler::createConsumer(Tcp_Svc_Handler* h)
{
return new Tcp_Consumer(h);
}
Tcp_Supplier* Socks_Svc_Handler::createSupplier(Tcp_Svc_Handler* h)
{
return new SocksSupplier(h);
}
Tcp_Consumer* Socks_Svc_Handler::createConsumer(Tcp_Svc_Handler* h)
{
return new SocksConsumer(h);
}
STATUS Tcp_Svc_Handler::getStringFullAddr(ACE_INET_Addr& addr , ACE_CString& str1)
{
//get full address:
ACE_TCHAR buffer[256]; //max host length
addr.addr_to_string(buffer , 256);
//str1.fast_resize(addr.get_addr_size());
str1.set(buffer);
return STATUS_SUCCESS;
}
// Delete this handler
void Tcp_Svc_Handler::shutdown_tcp_svc()
{
ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::shutdown"));
ACE_Reactor * TPReactor = ACE_Reactor::instance ();
TPReactor->remove_handler (this,
ACE_Event_Handler::ALL_EVENTS_MASK |
ACE_Event_Handler::DONT_CALL); // Don't call handle_close
this->msg_queue_->flush();
this->reactor(0);
this->destroy();
}
//-------------------------------------------------------
// return true on the following cases:
// (1) some other thread is currently handling this object message's
// (so it will handle all messages currently in the queue)
// (2) the object is in DISCONNECT state, but we are NOT the last dispacher
// (it can happend whan some earlier thread handled several of messages,
// so we dont have anything to do)
//-------------------------------------------------------
bool Tcp_Svc_Handler::needReturnWithoutWorking(int* reply)
{
//update counter:
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex ,
lock2,
_dispatch_output_counter_mutex,
false);
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex ,
lock,
_active_mutex,
false);
_dispatch_output_counter--;
//check if some other thread handles output data of this object:
if (_active_counter > 0)
{
ACE_DEBUG((MY_TRACE ACE_TEXT("Tcp_Svc_Handler::handle_output - active counter is %x state=%d counter=%d\n "),
_active_counter,
state(),
_dispatch_output_counter));
}
//else
//check if this is the last dispacher in disconnect state:
ACE_READ_GUARD_RETURN( ACE_RW_Thread_Mutex ,
lock3,
_state_mutex,
false);
if ((_dispatch_output_counter == 0) && (state() == DISCONNECTED) && (_active_counter == 0))
{
ACE_DEBUG((MY_TRACE ACE_TEXT("Tcp_Consumer::handle_output - last dispatcher in DISCONNECTED state. going to return -1\n")));
*reply = -1;
return true;
}
//else:
//update is_active flag
_active_counter++;
ACE_DEBUG((MY_TRACE ACE_TEXT("Tcp_Consumer::handle_output - not the last dispatcher. tcp=[%x] state=%d dispach counter=%d\n"),
this,
state(),
_dispatch_output_counter));
return false;
}