//---------------------------------------------------------------------------- // // Copyright (C) Intel Corporation, 2006 - 2007. // // File: TcpSvcHandler.cpp // // Contents: Handles TCP connection // // Notes: //---------------------------------------------------------------------------- #include "TcpSvcHandler.h" #include "TcpSupplier.h" #include "ChannelConsumer.h" #include "TcpConsumer.h" #include "SocksSvcHandler.h" #include "TunnelManager.h" //#include "MPS_common.h" #include "global.h" #include #include Tcp_Svc_Handler::~Tcp_Svc_Handler (void) { ACE_DEBUG((MY_DEBUG ACE_TEXT("---->Tcp_Svc_Handler dtor\n"))); TCP_COUNTER.Decrement(); peer().close(); } int Tcp_Svc_Handler::open(void *) { ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::open")); if (!TCP_COUNTER.IncrementCheckMax()){ ACE_DEBUG((MY_INFO ACE_TEXT("Connection closed. Maximum number of %d Management Console TCP connections exceeded.\n"),MAX_TCP_COUNTER)); return -1; } //create supplier/consumer _consumer = createConsumer(this); _supplier = createSupplier(this); if (initConnection() == -1) { ACE_ERROR_RETURN ((MY_DEBUG "%p\n", "init connection failed. going to exit thread"), -1); } _notification_strategy.reactor(ACE_Reactor::instance()); _notification_strategy.event_handler(this); _notification_strategy.mask(ACE_Event_Handler::WRITE_MASK); this->msg_queue()->notification_strategy(&_notification_strategy); this->msg_queue()->high_water_mark( size_t(*getMaxQueueSize() / *getMaxChannels())); this->reactor (ACE_Reactor::instance ()); _flg_mask = ACE_Event_Handler::READ_MASK; // Get the host name we are connected to ACE_INET_Addr sender_addr; peer().get_remote_addr(sender_addr); char hoststr[MAX_HOST_NAME_LEN+1]; sender_addr.get_host_addr (hoststr, MAX_HOST_NAME_LEN+1); _identifier = ACE_CString(hoststr); ACE_DEBUG((MY_DEBUG ACE_TEXT("Open new connection with management console [%s]\n"),_identifier.c_str())); //register for input handler: if (ACE_Reactor::instance ()->register_handler (this, _flg_mask) == -1) ACE_ERROR_RETURN ((MY_DEBUG "Tcp_Svc_Handler::open - failed to register to reactor for READ mask\n"), -1); return 0; } // Schedule this handle for a new events (given in mask) STATUS Tcp_Svc_Handler::initiate_io(ACE_Reactor_Mask mask) { ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::initiate_io")); ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, locker, _flag_mutex, STATUS_LOCK_FAILURE); if (ACE_BIT_ENABLED (_flg_mask, mask)) { return STATUS_SUCCESS; } if (this->peer().get_handle() != ACE_INVALID_HANDLE) { if (ACE_Reactor::instance ()->register_handler(this, mask) == -1) { ACE_DEBUG((MY_DEBUG ACE_TEXT("failed to initiate io\n"))); return STATUS_FATAL_ERROR; } } else { ACE_DEBUG ((MY_DEBUG ACE_TEXT( "Tcp_Svc_Handler - initiate_io with illegal handler\n"))); } ACE_LOG_MSG->msg_ostream()->flush(); ACE_SET_BITS (_flg_mask, mask); return STATUS_SUCCESS; } // Cancel registration of this handle to events (given in mask) STATUS Tcp_Svc_Handler::cancel_io(ACE_Reactor_Mask mask) { ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::cancel_io")); ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, locker, _flag_mutex, STATUS_LOCK_FAILURE); if (ACE_BIT_DISABLED (_flg_mask, mask)) { return STATUS_SUCCESS; } if (ACE_Reactor::instance ()->cancel_wakeup (this, mask) == -1) return STATUS_FATAL_ERROR; ACE_CLR_BITS (_flg_mask, mask); return STATUS_SUCCESS; } int Tcp_Svc_Handler::handle_input (ACE_HANDLE h) { ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::handle_input")); ACE_DEBUG((MY_TRACE ACE_TEXT ("Tcp_Svc_Handler::handle_input - (%x)\n"), this)); if (_supplier == NULL) { ACE_DEBUG((MY_ERROR ACE_TEXT("[%s] TCP SUPPLIER = NULL!!!!\n"),identifier())); return -1; } STATUS rep = _supplier->handle_input(h); switch (rep) { case STATUS_SUCCESS: return 0; case STATUS_FATAL_ERROR: ACE_ERROR_RETURN ((MY_INFO ACE_TEXT("[%s] management console has shutdown unexpectedly\n"), identifier()), -1); case STATUS_NETWORK_ERROR: case STATUS_CONNECTION_CLOSED: ACE_DEBUG ((MY_INFO ACE_TEXT("[%s] management console has shutdown\n"),identifier())); return -1; default: ACE_ERROR_RETURN ((MY_DEBUG ACE_TEXT("reply status from handle_input()= %d\n"),rep), 0); } } int Tcp_Svc_Handler::handle_output(ACE_HANDLE h) { ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::handle_output\n")); ACE_DEBUG((MY_TRACE ACE_TEXT ("Tcp_Svc_Handler::handle_output (%x)\n"), this)); int reply = 0; if (needReturnWithoutWorking(&reply)) { return reply; } while ((! this->msg_queue()->is_empty()) &&(reply != -1)) { //try to aquire the mutex (on failure return 0) TRY_ACQUIRE_GUARD( ACE_Recursive_Thread_Mutex , lock, _output_mutex); if (lock.locked () == 0) { _active_counter--; ACE_DEBUG((MY_TRACE ACE_TEXT ("some other is active now- going to exit with return value 0 (%x)\n"), this)); return 0; } //sanity check: if (_consumer == NULL) { reply = -1; break; } ACE_LOG_MSG->msg_ostream()->flush(); STATUS rep = _consumer->handle_output(h); switch (rep) { case STATUS_SUCCESS: break ; case STATUS_GENERAL_ERROR: case STATUS_FAILURE: case STATUS_BAD_REQUEST: case STATUS_MALLOC_FAILURE: case STATUS_ILLEGAL_PARAMS: case STATUS_QUEUE_TIMEOUT: case STATUS_QUEUE_FULL: ACE_DEBUG((MY_ERROR ACE_TEXT("[%s] Some general error occured while processing data from Intel client to MC\n"),identifier())); break; case STATUS_CONNECTION_CLOSED: //in this states tunnel handler already unbind the connection with tcp_Consumer ACE_DEBUG((MY_DEBUG ACE_TEXT("[%s] Intel remote client closed the connection\n"),identifier())); reply = -1; //will cause handle_close to be called for write mask. break; case STATUS_NETWORK_ERROR: case STATUS_FATAL_ERROR: case STATUS_QUEUE_DEACTIVATED: ACE_DEBUG ((MY_DEBUG ACE_TEXT("[%s] Fatal error occured while handling outgoing data to MC - setting state to disconnect\n"),identifier())); { ACE_WRITE_GUARD_RETURN( ACE_RW_Thread_Mutex , lock, _state_mutex, STATUS_LOCK_FAILURE); state(DISCONNECTING); } reply= 0; break; //sanity check: default: ACE_DEBUG ((MY_DEBUG ACE_TEXT("ILLEGAL reply status from handle_output()\n"))); reply= 0; break; } } return processReturnVal(reply); } int Tcp_Svc_Handler::processReturnVal(int result) { if (result == -1) return -1; //else: //maybe the last dispacher was called during this thead working //(because this thread was at the middle of working, the last dispacher couldn't return -1) //so this thread MUST return -1 (otherwise we will have memory leak...) ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex , dispatch_output_lock, _dispatch_output_counter_mutex, STATUS_LOCK_FAILURE); //going to work on this thread ACE_READ_GUARD_RETURN(ACE_Recursive_Thread_Mutex , active_lock, _active_mutex, STATUS_LOCK_FAILURE); ACE_READ_GUARD_RETURN( ACE_RW_Thread_Mutex , state_lock, _state_mutex, STATUS_LOCK_FAILURE); //update active flag (since we finish working) _active_counter--; if ((_dispatch_output_counter == 0) && (state() == DISCONNECTED)&& (_active_counter == 0)) { ACE_DEBUG((MY_TRACE ACE_TEXT("Tcp_Consumer::handle_output - last dispatcher in DISCONNECTED state. going to return -1\n"))); return -1; } ACE_DEBUG((MY_TRACE ACE_TEXT("Tcp_Consumer::processReturnVal - going to return 0\n"))); return 0; } int Tcp_Svc_Handler::handle_close (ACE_HANDLE h , ACE_Reactor_Mask close_mask) { ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::handle_close")); ACE_DEBUG((MY_DEBUG ACE_TEXT("Tcp_Svc_Handler::handle_close on handle:%d\n"),_unique_id)); //This mutex is taken to prevent scenario where handle_close on READ and handle_close on // WRITE happends concurrently (they can both try to delete this) ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex , lock, _close_mutex, -1); //read closed: //or MC closed the socket, or we had error hanle input data from socket: if ((ACE_BIT_ENABLED (close_mask, ACE_Event_Handler::READ_MASK)) && (_supplier != NULL)) { closeSupplier(); } // close on write mask if (ACE_BIT_ENABLED (close_mask, ACE_Event_Handler::WRITE_MASK) || ACE_BIT_ENABLED (close_mask, ACE_Event_Handler::EXCEPT_MASK)) { this->_notification_strategy.mask(ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL); this->reactor()->purge_pending_notifications(this,ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL); if(_consumer != NULL) { closeConsumer(); } } if ((this->_supplier == NULL) && (this->_consumer == NULL)) { ACE_DEBUG((MY_DEBUG ACE_TEXT("Tcp_Svc_Handler BOTH supplier and consumer are NULL\n\tgoing to delete Tcp_Svc_Handler.........%x\n"), this)); lock.release(); this->shutdown_tcp_svc(); } return 0; } void Tcp_Svc_Handler::closeSupplier() { ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::closeSupplier")); if (_supplier != NULL) { _supplier->handle_close(); delete _supplier; _supplier = NULL; } peer().close_reader(); } void Tcp_Svc_Handler::closeConsumer() { ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::closeConsumer")); peer().close_writer(); _consumer->handle_close(); delete _consumer; _consumer = NULL; } void Tcp_Svc_Handler::state(Tcp_Svc_Handler::ConnectState s) { _tcp_state = s; } // Return the current state of the Proxy. Tcp_Svc_Handler::ConnectState Tcp_Svc_Handler::state(void) const { return _tcp_state; } const ACE_INET_Addr & Tcp_Svc_Handler::remote_addr (void) const { return this->_remote_addr; } void Tcp_Svc_Handler::remote_addr(ACE_INET_Addr &ra) { this->_remote_addr = ra; } const ACE_INET_Addr & Tcp_Svc_Handler::local_addr (void) const { return this->_local_addr; } void Tcp_Svc_Handler::local_addr (ACE_INET_Addr &la) { this->_local_addr = la; } STATUS Tcp_Svc_Handler::initConnection() { ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::initConnection")); bool is_connected = true; ACE_CString local, remote; getStringFullAddr(_local_addr, local); getStringFullAddr(_remote_addr, remote); ACE_DEBUG((MY_DEBUG ACE_TEXT ("INIT tcp connection\n\tamt address = %s\n\tMC addr = %s\n connection state = %b"), local.c_str(), remote.c_str(), is_connected)); ACE_UINT32 res = 0; res = _supplier->channelOpenRep(_local_addr, true, _tunnel_handler, _amt_win_size, _tunnel_consumer); if ((res != STATUS_SUCCESS) || (!is_connected)) { return STATUS_FAILURE; } return STATUS_SUCCESS; } Tcp_Supplier* Tcp_Svc_Handler::createSupplier(Tcp_Svc_Handler* h) { return new Tcp_Supplier(h); } Tcp_Consumer* Tcp_Svc_Handler::createConsumer(Tcp_Svc_Handler* h) { return new Tcp_Consumer(h); } Tcp_Supplier* Socks_Svc_Handler::createSupplier(Tcp_Svc_Handler* h) { return new SocksSupplier(h); } Tcp_Consumer* Socks_Svc_Handler::createConsumer(Tcp_Svc_Handler* h) { return new SocksConsumer(h); } STATUS Tcp_Svc_Handler::getStringFullAddr(ACE_INET_Addr& addr , ACE_CString& str1) { //get full address: ACE_TCHAR buffer[256]; //max host length addr.addr_to_string(buffer , 256); //str1.fast_resize(addr.get_addr_size()); str1.set(buffer); return STATUS_SUCCESS; } // Delete this handler void Tcp_Svc_Handler::shutdown_tcp_svc() { ACE_TRACE(ACE_TEXT("Tcp_Svc_Handler::shutdown")); ACE_Reactor * TPReactor = ACE_Reactor::instance (); TPReactor->remove_handler (this, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL); // Don't call handle_close this->msg_queue_->flush(); this->reactor(0); this->destroy(); } //------------------------------------------------------- // return true on the following cases: // (1) some other thread is currently handling this object message's // (so it will handle all messages currently in the queue) // (2) the object is in DISCONNECT state, but we are NOT the last dispacher // (it can happend whan some earlier thread handled several of messages, // so we dont have anything to do) //------------------------------------------------------- bool Tcp_Svc_Handler::needReturnWithoutWorking(int* reply) { //update counter: ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex , lock2, _dispatch_output_counter_mutex, false); ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex , lock, _active_mutex, false); _dispatch_output_counter--; //check if some other thread handles output data of this object: if (_active_counter > 0) { ACE_DEBUG((MY_TRACE ACE_TEXT("Tcp_Svc_Handler::handle_output - active counter is %x state=%d counter=%d\n "), _active_counter, state(), _dispatch_output_counter)); } //else //check if this is the last dispacher in disconnect state: ACE_READ_GUARD_RETURN( ACE_RW_Thread_Mutex , lock3, _state_mutex, false); if ((_dispatch_output_counter == 0) && (state() == DISCONNECTED) && (_active_counter == 0)) { ACE_DEBUG((MY_TRACE ACE_TEXT("Tcp_Consumer::handle_output - last dispatcher in DISCONNECTED state. going to return -1\n"))); *reply = -1; return true; } //else: //update is_active flag _active_counter++; ACE_DEBUG((MY_TRACE ACE_TEXT("Tcp_Consumer::handle_output - not the last dispatcher. tcp=[%x] state=%d dispach counter=%d\n"), this, state(), _dispatch_output_counter)); return false; }