538 lines
13 KiB
C++
538 lines
13 KiB
C++
//----------------------------------------------------------------------------
|
|
//
|
|
// Copyright (C) Intel Corporation, 2006 - 2007.
|
|
//
|
|
// File: TcpSvcHandler.cpp
|
|
//
|
|
// Contents: Handles TCP connection
|
|
//
|
|
// Notes:
|
|
//----------------------------------------------------------------------------
|
|
|
|
#include "TcpSvcHandler.h"
|
|
#include "TcpSupplier.h"
|
|
#include "ChannelConsumer.h"
|
|
#include "TcpConsumer.h"
|
|
#include "SocksSvcHandler.h"
|
|
#include "TunnelManager.h"
|
|
//#include "MPS_common.h"
|
|
#include "global.h"
|
|
#include <ace/Log_Msg.h>
|
|
#include <ostream>
|
|
|
|
Tcp_Svc_Handler::~Tcp_Svc_Handler (void)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG ACE_TEXT("---->Tcp_Svc_Handler dtor\n")));
|
|
TCP_COUNTER.Decrement();
|
|
peer().close();
|
|
}
|
|
int Tcp_Svc_Handler::open(void *)
|
|
{
|
|
ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::open"));
|
|
if (!TCP_COUNTER.IncrementCheckMax()){
|
|
ACE_DEBUG((MY_INFO ACE_TEXT("Connection closed. Maximum number of %d Management Console TCP connections exceeded.\n"),MAX_TCP_COUNTER));
|
|
return -1;
|
|
}
|
|
|
|
//create supplier/consumer
|
|
_consumer = createConsumer(this);
|
|
_supplier = createSupplier(this);
|
|
|
|
if (initConnection() == -1)
|
|
{
|
|
ACE_ERROR_RETURN ((MY_DEBUG
|
|
"%p\n",
|
|
"init connection failed. going to exit thread"),
|
|
-1);
|
|
|
|
}
|
|
_notification_strategy.reactor(ACE_Reactor::instance());
|
|
_notification_strategy.event_handler(this);
|
|
_notification_strategy.mask(ACE_Event_Handler::WRITE_MASK);
|
|
this->msg_queue()->notification_strategy(&_notification_strategy);
|
|
|
|
this->msg_queue()->high_water_mark(
|
|
size_t(*getMaxQueueSize() / *getMaxChannels()));
|
|
|
|
this->reactor (ACE_Reactor::instance ());
|
|
_flg_mask = ACE_Event_Handler::READ_MASK;
|
|
|
|
|
|
// Get the host name we are connected to
|
|
ACE_INET_Addr sender_addr;
|
|
peer().get_remote_addr(sender_addr);
|
|
|
|
char hoststr[MAX_HOST_NAME_LEN+1];
|
|
sender_addr.get_host_addr (hoststr, MAX_HOST_NAME_LEN+1);
|
|
_identifier = ACE_CString(hoststr);
|
|
ACE_DEBUG((MY_DEBUG ACE_TEXT("Open new connection with management console [%s]\n"),_identifier.c_str()));
|
|
|
|
//register for input handler:
|
|
if (ACE_Reactor::instance ()->register_handler
|
|
(this, _flg_mask) == -1)
|
|
ACE_ERROR_RETURN ((MY_DEBUG
|
|
"Tcp_Svc_Handler::open - failed to register to reactor for READ mask\n"),
|
|
-1);
|
|
|
|
return 0;
|
|
}
|
|
|
|
// Schedule this handle for a new events (given in mask)
|
|
STATUS Tcp_Svc_Handler::initiate_io(ACE_Reactor_Mask mask)
|
|
{
|
|
ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::initiate_io"));
|
|
|
|
ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
|
|
locker,
|
|
_flag_mutex,
|
|
STATUS_LOCK_FAILURE);
|
|
|
|
if (ACE_BIT_ENABLED (_flg_mask, mask))
|
|
{
|
|
return STATUS_SUCCESS;
|
|
}
|
|
|
|
if (this->peer().get_handle() != ACE_INVALID_HANDLE)
|
|
{
|
|
if (ACE_Reactor::instance ()->register_handler(this, mask) == -1)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG ACE_TEXT("failed to initiate io\n")));
|
|
return STATUS_FATAL_ERROR;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
ACE_DEBUG ((MY_DEBUG
|
|
ACE_TEXT( "Tcp_Svc_Handler - initiate_io with illegal handler\n")));
|
|
}
|
|
|
|
ACE_LOG_MSG->msg_ostream()->flush();
|
|
ACE_SET_BITS (_flg_mask, mask);
|
|
return STATUS_SUCCESS;
|
|
}
|
|
|
|
// Cancel registration of this handle to events (given in mask)
|
|
STATUS Tcp_Svc_Handler::cancel_io(ACE_Reactor_Mask mask)
|
|
{
|
|
ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::cancel_io"));
|
|
|
|
ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
|
|
locker,
|
|
_flag_mutex,
|
|
STATUS_LOCK_FAILURE);
|
|
|
|
if (ACE_BIT_DISABLED (_flg_mask, mask))
|
|
{
|
|
return STATUS_SUCCESS;
|
|
}
|
|
if (ACE_Reactor::instance ()->cancel_wakeup (this, mask) == -1)
|
|
return STATUS_FATAL_ERROR;
|
|
|
|
ACE_CLR_BITS (_flg_mask, mask);
|
|
return STATUS_SUCCESS;
|
|
}
|
|
|
|
int Tcp_Svc_Handler::handle_input (ACE_HANDLE h)
|
|
{
|
|
ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::handle_input"));
|
|
ACE_DEBUG((MY_TRACE
|
|
ACE_TEXT ("Tcp_Svc_Handler::handle_input - (%x)\n"), this));
|
|
|
|
if (_supplier == NULL)
|
|
{
|
|
ACE_DEBUG((MY_ERROR
|
|
ACE_TEXT("[%s] TCP SUPPLIER = NULL!!!!\n"),identifier()));
|
|
return -1;
|
|
}
|
|
|
|
STATUS rep = _supplier->handle_input(h);
|
|
|
|
switch (rep)
|
|
{
|
|
case STATUS_SUCCESS:
|
|
return 0;
|
|
|
|
|
|
case STATUS_FATAL_ERROR:
|
|
ACE_ERROR_RETURN ((MY_INFO
|
|
ACE_TEXT("[%s] management console has shutdown unexpectedly\n"),
|
|
identifier()),
|
|
-1);
|
|
|
|
case STATUS_NETWORK_ERROR:
|
|
case STATUS_CONNECTION_CLOSED:
|
|
ACE_DEBUG ((MY_INFO
|
|
ACE_TEXT("[%s] management console has shutdown\n"),identifier()));
|
|
return -1;
|
|
default:
|
|
ACE_ERROR_RETURN ((MY_DEBUG
|
|
ACE_TEXT("reply status from handle_input()= %d\n"),rep),
|
|
0);
|
|
}
|
|
}
|
|
|
|
int Tcp_Svc_Handler::handle_output(ACE_HANDLE h)
|
|
{
|
|
ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::handle_output\n"));
|
|
ACE_DEBUG((MY_TRACE
|
|
ACE_TEXT ("Tcp_Svc_Handler::handle_output (%x)\n"), this));
|
|
|
|
int reply = 0;
|
|
if (needReturnWithoutWorking(&reply))
|
|
{
|
|
return reply;
|
|
}
|
|
|
|
while ((! this->msg_queue()->is_empty()) &&(reply != -1))
|
|
{
|
|
//try to aquire the mutex (on failure return 0)
|
|
TRY_ACQUIRE_GUARD( ACE_Recursive_Thread_Mutex ,
|
|
lock,
|
|
_output_mutex);
|
|
|
|
if (lock.locked () == 0)
|
|
{
|
|
_active_counter--;
|
|
ACE_DEBUG((MY_TRACE
|
|
ACE_TEXT ("some other is active now- going to exit with return value 0 (%x)\n"), this));
|
|
return 0;
|
|
}
|
|
|
|
//sanity check:
|
|
if (_consumer == NULL)
|
|
{
|
|
reply = -1;
|
|
break;
|
|
}
|
|
|
|
|
|
ACE_LOG_MSG->msg_ostream()->flush();
|
|
|
|
STATUS rep = _consumer->handle_output(h);
|
|
switch (rep)
|
|
{
|
|
|
|
case STATUS_SUCCESS:
|
|
break ;
|
|
|
|
case STATUS_GENERAL_ERROR:
|
|
case STATUS_FAILURE:
|
|
case STATUS_BAD_REQUEST:
|
|
case STATUS_MALLOC_FAILURE:
|
|
case STATUS_ILLEGAL_PARAMS:
|
|
case STATUS_QUEUE_TIMEOUT:
|
|
case STATUS_QUEUE_FULL:
|
|
ACE_DEBUG((MY_ERROR
|
|
ACE_TEXT("[%s] Some general error occured while processing data from Intel client to MC\n"),identifier()));
|
|
break;
|
|
|
|
case STATUS_CONNECTION_CLOSED:
|
|
//in this states tunnel handler already unbind the connection with tcp_Consumer
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT("[%s] Intel remote client closed the connection\n"),identifier()));
|
|
|
|
|
|
reply = -1; //will cause handle_close to be called for write mask.
|
|
break;
|
|
|
|
case STATUS_NETWORK_ERROR:
|
|
case STATUS_FATAL_ERROR:
|
|
case STATUS_QUEUE_DEACTIVATED:
|
|
ACE_DEBUG ((MY_DEBUG
|
|
ACE_TEXT("[%s] Fatal error occured while handling outgoing data to MC - setting state to disconnect\n"),identifier()));
|
|
|
|
{
|
|
ACE_WRITE_GUARD_RETURN( ACE_RW_Thread_Mutex ,
|
|
lock,
|
|
_state_mutex,
|
|
STATUS_LOCK_FAILURE);
|
|
|
|
|
|
state(DISCONNECTING);
|
|
}
|
|
reply= 0;
|
|
break;
|
|
|
|
//sanity check:
|
|
default:
|
|
ACE_DEBUG ((MY_DEBUG
|
|
ACE_TEXT("ILLEGAL reply status from handle_output()\n")));
|
|
reply= 0;
|
|
break;
|
|
}
|
|
}
|
|
|
|
return processReturnVal(reply);
|
|
}
|
|
|
|
int Tcp_Svc_Handler::processReturnVal(int result)
|
|
{
|
|
if (result == -1)
|
|
return -1;
|
|
|
|
//else:
|
|
//maybe the last dispacher was called during this thead working
|
|
//(because this thread was at the middle of working, the last dispacher couldn't return -1)
|
|
//so this thread MUST return -1 (otherwise we will have memory leak...)
|
|
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex ,
|
|
dispatch_output_lock,
|
|
_dispatch_output_counter_mutex,
|
|
STATUS_LOCK_FAILURE);
|
|
|
|
//going to work on this thread
|
|
ACE_READ_GUARD_RETURN(ACE_Recursive_Thread_Mutex ,
|
|
active_lock,
|
|
_active_mutex,
|
|
STATUS_LOCK_FAILURE);
|
|
|
|
ACE_READ_GUARD_RETURN( ACE_RW_Thread_Mutex ,
|
|
state_lock,
|
|
_state_mutex,
|
|
STATUS_LOCK_FAILURE);
|
|
|
|
//update active flag (since we finish working)
|
|
_active_counter--;
|
|
|
|
if ((_dispatch_output_counter == 0) && (state() == DISCONNECTED)&& (_active_counter == 0))
|
|
{
|
|
ACE_DEBUG((MY_TRACE ACE_TEXT("Tcp_Consumer::handle_output - last dispatcher in DISCONNECTED state. going to return -1\n")));
|
|
return -1;
|
|
}
|
|
|
|
ACE_DEBUG((MY_TRACE ACE_TEXT("Tcp_Consumer::processReturnVal - going to return 0\n")));
|
|
|
|
return 0;
|
|
}
|
|
|
|
int Tcp_Svc_Handler::handle_close (ACE_HANDLE h , ACE_Reactor_Mask close_mask)
|
|
{
|
|
ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::handle_close"));
|
|
|
|
ACE_DEBUG((MY_DEBUG ACE_TEXT("Tcp_Svc_Handler::handle_close on handle:%d\n"),_unique_id));
|
|
|
|
//This mutex is taken to prevent scenario where handle_close on READ and handle_close on
|
|
// WRITE happends concurrently (they can both try to delete this)
|
|
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex ,
|
|
lock,
|
|
_close_mutex,
|
|
-1);
|
|
|
|
//read closed:
|
|
//or MC closed the socket, or we had error hanle input data from socket:
|
|
if ((ACE_BIT_ENABLED (close_mask, ACE_Event_Handler::READ_MASK)) &&
|
|
(_supplier != NULL))
|
|
{
|
|
|
|
closeSupplier();
|
|
}
|
|
|
|
// close on write mask
|
|
if (ACE_BIT_ENABLED (close_mask, ACE_Event_Handler::WRITE_MASK) ||
|
|
ACE_BIT_ENABLED (close_mask, ACE_Event_Handler::EXCEPT_MASK))
|
|
{
|
|
|
|
|
|
|
|
this->_notification_strategy.mask(ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL);
|
|
this->reactor()->purge_pending_notifications(this,ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL);
|
|
|
|
if(_consumer != NULL)
|
|
{
|
|
closeConsumer();
|
|
}
|
|
|
|
}
|
|
|
|
if ((this->_supplier == NULL) && (this->_consumer == NULL))
|
|
{
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT("Tcp_Svc_Handler BOTH supplier and consumer are NULL\n\tgoing to delete Tcp_Svc_Handler.........%x\n"), this));
|
|
|
|
lock.release();
|
|
this->shutdown_tcp_svc();
|
|
|
|
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
void Tcp_Svc_Handler::closeSupplier()
|
|
{
|
|
ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::closeSupplier"));
|
|
|
|
if (_supplier != NULL)
|
|
{
|
|
_supplier->handle_close();
|
|
delete _supplier;
|
|
_supplier = NULL;
|
|
}
|
|
peer().close_reader();
|
|
}
|
|
|
|
void Tcp_Svc_Handler::closeConsumer()
|
|
{
|
|
ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::closeConsumer"));
|
|
peer().close_writer();
|
|
_consumer->handle_close();
|
|
delete _consumer;
|
|
_consumer = NULL;
|
|
}
|
|
|
|
void Tcp_Svc_Handler::state(Tcp_Svc_Handler::ConnectState s)
|
|
{
|
|
_tcp_state = s;
|
|
}
|
|
|
|
// Return the current state of the Proxy.
|
|
|
|
Tcp_Svc_Handler::ConnectState Tcp_Svc_Handler::state(void) const
|
|
{
|
|
return _tcp_state;
|
|
}
|
|
|
|
const ACE_INET_Addr &
|
|
Tcp_Svc_Handler::remote_addr (void) const
|
|
{
|
|
return this->_remote_addr;
|
|
}
|
|
|
|
void Tcp_Svc_Handler::remote_addr(ACE_INET_Addr &ra)
|
|
{
|
|
this->_remote_addr = ra;
|
|
}
|
|
|
|
const ACE_INET_Addr & Tcp_Svc_Handler::local_addr (void) const
|
|
{
|
|
return this->_local_addr;
|
|
}
|
|
|
|
void Tcp_Svc_Handler::local_addr (ACE_INET_Addr &la)
|
|
{
|
|
this->_local_addr = la;
|
|
}
|
|
|
|
STATUS Tcp_Svc_Handler::initConnection()
|
|
{
|
|
ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::initConnection"));
|
|
bool is_connected = true;
|
|
|
|
ACE_CString local, remote;
|
|
|
|
getStringFullAddr(_local_addr, local);
|
|
getStringFullAddr(_remote_addr, remote);
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT ("INIT tcp connection\n\tamt address = %s\n\tMC addr = %s\n connection state = %b"),
|
|
local.c_str(),
|
|
remote.c_str(),
|
|
is_connected));
|
|
|
|
ACE_UINT32 res = 0;
|
|
|
|
res = _supplier->channelOpenRep(_local_addr, true, _tunnel_handler, _amt_win_size, _tunnel_consumer);
|
|
if ((res != STATUS_SUCCESS) || (!is_connected))
|
|
{
|
|
return STATUS_FAILURE;
|
|
}
|
|
return STATUS_SUCCESS;
|
|
}
|
|
|
|
Tcp_Supplier* Tcp_Svc_Handler::createSupplier(Tcp_Svc_Handler* h)
|
|
{
|
|
return new Tcp_Supplier(h);
|
|
}
|
|
Tcp_Consumer* Tcp_Svc_Handler::createConsumer(Tcp_Svc_Handler* h)
|
|
{
|
|
return new Tcp_Consumer(h);
|
|
}
|
|
|
|
Tcp_Supplier* Socks_Svc_Handler::createSupplier(Tcp_Svc_Handler* h)
|
|
{
|
|
return new SocksSupplier(h);
|
|
}
|
|
Tcp_Consumer* Socks_Svc_Handler::createConsumer(Tcp_Svc_Handler* h)
|
|
{
|
|
return new SocksConsumer(h);
|
|
}
|
|
|
|
STATUS Tcp_Svc_Handler::getStringFullAddr(ACE_INET_Addr& addr , ACE_CString& str1)
|
|
{
|
|
//get full address:
|
|
ACE_TCHAR buffer[256]; //max host length
|
|
addr.addr_to_string(buffer , 256);
|
|
//str1.fast_resize(addr.get_addr_size());
|
|
str1.set(buffer);
|
|
return STATUS_SUCCESS;
|
|
}
|
|
// Delete this handler
|
|
void Tcp_Svc_Handler::shutdown_tcp_svc()
|
|
{
|
|
ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::shutdown"));
|
|
|
|
ACE_Reactor * TPReactor = ACE_Reactor::instance ();
|
|
|
|
TPReactor->remove_handler (this,
|
|
ACE_Event_Handler::ALL_EVENTS_MASK |
|
|
ACE_Event_Handler::DONT_CALL); // Don't call handle_close
|
|
this->msg_queue_->flush();
|
|
this->reactor(0);
|
|
this->destroy();
|
|
}
|
|
//-------------------------------------------------------
|
|
// return true on the following cases:
|
|
// (1) some other thread is currently handling this object message's
|
|
// (so it will handle all messages currently in the queue)
|
|
// (2) the object is in DISCONNECT state, but we are NOT the last dispacher
|
|
// (it can happend whan some earlier thread handled several of messages,
|
|
// so we dont have anything to do)
|
|
//-------------------------------------------------------
|
|
bool Tcp_Svc_Handler::needReturnWithoutWorking(int* reply)
|
|
{
|
|
//update counter:
|
|
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex ,
|
|
lock2,
|
|
_dispatch_output_counter_mutex,
|
|
false);
|
|
|
|
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex ,
|
|
lock,
|
|
_active_mutex,
|
|
false);
|
|
|
|
_dispatch_output_counter--;
|
|
|
|
//check if some other thread handles output data of this object:
|
|
if (_active_counter > 0)
|
|
{
|
|
ACE_DEBUG((MY_TRACE ACE_TEXT("Tcp_Svc_Handler::handle_output - active counter is %x state=%d counter=%d\n "),
|
|
_active_counter,
|
|
state(),
|
|
_dispatch_output_counter));
|
|
}
|
|
|
|
//else
|
|
//check if this is the last dispacher in disconnect state:
|
|
ACE_READ_GUARD_RETURN( ACE_RW_Thread_Mutex ,
|
|
lock3,
|
|
_state_mutex,
|
|
false);
|
|
|
|
|
|
if ((_dispatch_output_counter == 0) && (state() == DISCONNECTED) && (_active_counter == 0))
|
|
{
|
|
ACE_DEBUG((MY_TRACE ACE_TEXT("Tcp_Consumer::handle_output - last dispatcher in DISCONNECTED state. going to return -1\n")));
|
|
*reply = -1;
|
|
return true;
|
|
}
|
|
|
|
//else:
|
|
|
|
//update is_active flag
|
|
_active_counter++;
|
|
ACE_DEBUG((MY_TRACE ACE_TEXT("Tcp_Consumer::handle_output - not the last dispatcher. tcp=[%x] state=%d dispach counter=%d\n"),
|
|
this,
|
|
state(),
|
|
_dispatch_output_counter));
|
|
return false;
|
|
}
|
|
|