//---------------------------------------------------------------------------- // // Copyright (C) Intel Corporation, 2006 - 2007. // // File: ChannelConsumer.cpp // // Contents: In charge of handling messages from the socks connection // to the tunnel. // // Notes: //---------------------------------------------------------------------------- //=================================================== // INCLUDES //=================================================== #include #include #include "TunnelConsumer.h" #include "ChannelConsumer.h" #include "TcpConsumer.h" #include "TcpSupplier.h" #include "TunnelManager.h" #include "TunnelSupplier.h" #include "global.h" #include "TunnelHandler.h" //=================================================== // channelConsumer Implementation //=================================================== bool Channel_Consumer::canMQContainMB(size_t mb_len) { return ((_msg_queue.high_water_mark() - _msg_queue.message_bytes()) > mb_len); } Channel_Consumer::~Channel_Consumer() { ACE_DEBUG((MY_DEBUG "----->Channel_Consumer\n")); ACE_DEBUG((MY_TRACE ACE_TEXT("(%x) Channel_Consumer::dtor - Data received: %d\tData sent: %d\tData gap: %d\n"), this, _total_received_data_length, _total_sent_data_length, (_total_received_data_length-_total_sent_data_length))); if (_mb_remain != NULL) { delete _mb_remain; } _msg_queue.flush(); } //----------------------------------------- // Open a new channel with Intel(R) AMT // Find the tunnel according to destination. // Called by Socks_Supplier //----------------------------------------- STATUS Channel_Consumer::openChannel( const ACE_CString &address, const ACE_UINT32 port, const ACE_CString &sender_address, const ACE_UINT32 sender_port, Tcp_Consumer* consumer) { ACE_TRACE("Channel_Consumer::openChannel"); ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, lock, _channel_mutex, STATUS_FAILURE); //find if tunnel exist: ACE_READ_GUARD_RETURN( ACE_RW_Thread_Mutex , tunnel_lock, Tunnel_Manager::instance().guard(), STATUS_LOCK_FAILURE); AMT_Tunnel_Handler* tunnel_handler = (Tunnel_Manager::instance().find_tunnel(address, port)); if (NULL == tunnel_handler) { ACE_DEBUG((MY_ERROR ACE_TEXT("Channel consumer could not find the requested tunnel - %s:%d\n"), address.c_str(), port)); return STATUS_FAILURE; } _tunnel_consumer = tunnel_handler->consumer(); //send channel Request message to tunnel STATUS res; _state = PENDING_OPEN; if ((res =_tunnel_consumer->send_channel_open_request( this, consumer, (ACE_UINT32)address.length(), address, port, (ACE_UINT32)sender_address.length(), sender_address, sender_port ) ) != STATUS_SUCCESS) { ACE_DEBUG((MY_DEBUG ACE_TEXT("[%s] Management Console failed to connect to Intel remote client %s:%d\n"), sender_address.c_str(),address.c_str(), port)); _state = INIT; _tunnel_consumer = NULL; return STATUS_FAILURE; } return res; } //----------------------------------------- // add the message data to the message queue // and register the tunnel socket to the reactor // for outgoing meesage //----------------------------------------- STATUS Channel_Consumer::sendData(ACE_Message_Block *mb) { ACE_TRACE("Channel_Consumer::sendData"); ACE_DEBUG((MY_TRACE ACE_TEXT("Channel_Consumer::sendData. %x\n"), this)); ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex, lock, _channel_mutex, STATUS_FAILURE); if (_tunnel_consumer == NULL) { ACE_DEBUG((MY_DEBUG ACE_TEXT(" : trying to send data on invalid tunnel. ignoring data...\n"), _tunnel_handler)); mb->release(); return STATUS_SUCCESS; } _total_received_data_length += mb->length(); // send data can't be called after the supplier closed this connection ACE_ASSERT((!_mc_closed) && (_tcp_supplier != NULL)); STATUS res = STATUS_SUCCESS; // if need to enter to queue: if ((_win_size == 0) || (! _msg_queue.is_empty() ) || _state == PENDING_OPEN) { ACE_DEBUG (( MY_DEBUG ACE_TEXT(" : Channel_Consumer queuing message, window size is %x, queue is empty: %x\n"), _tunnel_handler, _win_size, _msg_queue.is_empty())); if (_amt_closed) { ACE_DEBUG((MY_DEBUG ACE_TEXT(" : Channel_Consumer:sendData - state is AMT_CLOSE. ignore the message\n "), _tunnel_handler)); // We cannot fw this message, notify the caller. ACE_Message_Block::release(mb); return STATUS_SUCCESS; } res = putMBInQueue(mb); } else { // Queue is empty and we have window size // We can forward the message right now res = forwardMBtoTunnel(mb); } if (STATUS_QUEUE_FULL == res) { ACE_DEBUG((MY_DEBUG "Channel_Consumer::sendData - going to unregister handler for read\n")); //unregister input from reactor _registered = false; _tcp_supplier->unRegisterHandler(ACE_Event_Handler::READ_MASK); } return res; } // forward message block to tunnel consumer queue // This is an internal function and must be called after taking this cunsumer's lock. STATUS Channel_Consumer::forwardMBtoTunnel(ACE_Message_Block *mb) { STATUS res; ACE_DEBUG((MY_TRACE ACE_TEXT("Channel_Consumer::forwardMBtoTunnel \t(%x)\n"), this)); //message is too big to send , split and send only window size if (_win_size < mb->length()) { ACE_DEBUG ((MY_DEBUG ACE_TEXT (" : Channel_Consumer splitting message, window size = %d, message length = %d\n"), _tunnel_handler,_win_size, mb->length())); //Split message ACE_Message_Block *mb2=NULL; splitMessage(mb, mb2); //Insert the other message to the top of the queue if(_msg_queue.enqueue_head(mb2) == -1) { if (errno == EWOULDBLOCK) res = STATUS_QUEUE_TIMEOUT; else res = STATUS_QUEUE_DEACTIVATED; } } //insert the message in the tunnel queue res = _tunnel_consumer->send_channel_data(_tunnel_handler, mb); if (res != STATUS_SUCCESS) { // return the message into the queue and try again later _msg_queue.enqueue_head(mb); ACE_DEBUG((MY_DEBUG ACE_TEXT(" : Failed to add message to tunnel queue\n"), _tunnel_handler)); return res; } // managed to queue mb. Add information for debug purpose setTotalSentDataLength( getTotalSentDataLength() + mb->length()); ACE_DEBUG((MY_TRACE ACE_TEXT("Channel_Consumer::forwardMBtoTunnel: after sending data to tunnel, length: %d\t(%x)\n"), mb->length(), this)); // update window size _win_size -= (unsigned int)mb->length(); ACE_Message_Block::release(mb); return STATUS_SUCCESS; } //----------------------------------------- // Split mb to 2 in way that: // mb will hold only _window size length // mb2 will hold the rest of the message //----------------------------------------- void Channel_Consumer::splitMessage(ACE_Message_Block*& mb, ACE_Message_Block*& mb2) { mb2 = mb->clone(); mb2->rd_ptr(_win_size); char *mb_start = mb->rd_ptr(); mb->rd_ptr(_win_size); char *mb_end = mb->rd_ptr(); mb->rd_ptr(mb_start); mb->wr_ptr(mb_end); } //----------------------------------------- // Set a new window size. // When this function is being called, we go through // the message queue, and transfer as much messages // as we can, to tunnel message queue. //----------------------------------------- STATUS Channel_Consumer::setWinSize(unsigned int win) { ACE_TRACE("Channel_Consumer::setWinSize"); ACE_DEBUG ((MY_TRACE ACE_TEXT (" : Channel_Consumer::setWinSize -\t(%x)\n"), _tunnel_handler, this)); // Get lock on channel manager ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex, lock, _channel_mutex, STATUS_FAILURE); _win_size += win; size_t send_amount = 0; int queue_full = 0; ACE_Message_Block *mb; STATUS rep = STATUS_SUCCESS; //go throuth message queue: while ((!_msg_queue.is_empty()) &&(_win_size != 0)) { mb = NULL; if (_msg_queue.dequeue_head(mb) == -1) { if (errno == EWOULDBLOCK) rep = STATUS_QUEUE_TIMEOUT; else rep = STATUS_QUEUE_DEACTIVATED; ACE_ERROR_RETURN ((MY_DEBUG " failed dequeue"),rep); } if (mb == NULL) { ACE_ERROR_RETURN ((MY_DEBUG " failed dequeue"),STATUS_QUEUE_DEACTIVATED); } rep = handleMB(mb); } // If we have a previous saved message and enough room in the queue. // Put his message in the queue if (_mb_remain != NULL) { ACE_DEBUG ((MY_DEBUG ACE_TEXT (" : after win adjust - _mb_remain is being handle\n"), _tunnel_handler)); //if message need to be fully inserted to channelConsumer queue: if ((!_msg_queue.is_empty()) || (_win_size == 0)) { if (!canMQContainMB(_mb_remain->size())) { ACE_DEBUG ((MY_DEBUG ACE_TEXT (" : _mb_remain still can't be contained in mq- svc wont be registered to reactor again\n"), _tunnel_handler)); rep = STATUS_QUEUE_FULL; } else { ACE_DEBUG ((MY_DEBUG ACE_TEXT (" : putting _mb_remain back\n"), _tunnel_handler)); if (!_msg_queue.is_empty()) { ACE_DEBUG ((MY_DEBUG ACE_TEXT ("MQ is not empty yet\n"), _tunnel_handler)); } else if(_win_size == 0) { ACE_DEBUG ((MY_DEBUG ACE_TEXT ("win size is 0\n"), _tunnel_handler)); } if(_msg_queue.enqueue_tail(_mb_remain) == -1) { ACE_DEBUG ((MY_DEBUG ACE_TEXT (" : error in _msg_queue.enqueue_tail \n"), _tunnel_handler)); if (errno == EWOULDBLOCK) rep = STATUS_QUEUE_TIMEOUT; else rep = STATUS_QUEUE_DEACTIVATED; } else { _mb_remain = NULL; rep = STATUS_SUCCESS; } } } // message/or part of it can be send right now: else { rep = handleMB(_mb_remain); _mb_remain = NULL; } } if (_tcp_supplier == NULL) { ACE_DEBUG((MY_DEBUG ACE_TEXT(" : Channel_Consumer::setWinSize - tcpSupplier is NULL\n"), _tunnel_handler)); return STATUS_SUCCESS; } if (rep == STATUS_QUEUE_FULL) { //unregister input from reactor (there is no place in socket) if ((_registered) && (_tcp_supplier != NULL)) { _registered = false; _tcp_supplier->unRegisterHandler(ACE_Event_Handler::READ_MASK); } } else if( (!_registered) && (rep != STATUS_QUEUE_FULL) && (_tcp_supplier != NULL)) { //register _registered = true; _tcp_supplier->registerHandler(ACE_Event_Handler::READ_MASK); } return STATUS_SUCCESS; } //----------------------------------------- // Handle MB. // If mb is hangup message - send close channel to its tunnel // otherwise - send the data to the tunnel. //----------------------------------------- STATUS Channel_Consumer::handleMB(ACE_Message_Block* mb) { if (_tunnel_consumer == NULL) { mb->release (); return STATUS_SUCCESS; } //if close connection message if ( mb->msg_type () == ACE_Message_Block::MB_HANGUP) { ACE_DEBUG ((MY_DEBUG ACE_TEXT (" : sending close message\n"), _tunnel_handler)); _tunnel_consumer->send_channel_close(_tunnel_handler); mb->release (); _state = PENDING_CLOSE; _mc_closed = true; return STATUS_SUCCESS; } // forward message to tunnel return forwardMBtoTunnel(mb); } //----------------------------------------- // activate the channel. // This handler will used each time we use tunnel API // Called from the Tunnel_Supplier after getting channel open reply //----------------------------------------- STATUS Channel_Consumer::activateChannel(unsigned int id, unsigned int win) { ACE_TRACE("Channel_Consumer::activateChannel"); // Get lock on channel manager ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex, lock, _channel_mutex, STATUS_FAILURE); _tunnel_handler = id; // If we are not in pending open we have a problem. if (_state == PENDING_OPEN) { _state = ACTIVE; } else { ACE_DEBUG((MY_DEBUG ACE_TEXT("Channel_Consumer::activateChannel. state is NOT pending open(%x)\n"), this)); } setWinSize(win); return STATUS_SUCCESS; } //----------------------------------------- // Channel open failure // Called by: // 1. Tunnel_Supplier after receiving channel open failure // 2. Tunnel_Consumer - in open direct request, when some error occured //----------------------------------------- void Channel_Consumer::channelOpenFailure() { ACE_TRACE("Channel_Consumer::channelOpenFailure"); // Get lock ACE_GUARD( ACE_Recursive_Thread_Mutex, lock, _channel_mutex); // If MC alerady closed this connection we can die if (_mc_closed) { lock.release(); delete this; return; } _state = PENDING_CLOSE; _amt_closed = true; _tunnel_consumer = NULL; } //----------------------------------------- // Called by TCP supplier (when it's closing) //----------------------------------------- STATUS Channel_Consumer::closeTunnelConnection() { ACE_TRACE("Channel_Consumer::closeTunnelConnection"); ACE_DEBUG((MY_TRACE ACE_TEXT("Channel_Consumer::closeTunnelConnection: (%x)\n"), this)); // Get lock on channel Consumer ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, lock, _channel_mutex, STATUS_LOCK_FAILURE); ACE_ASSERT(!_mc_closed); STATUS res = STATUS_SUCCESS; if ((_state == INIT) || (_amt_closed && this->_tunnel_consumer == NULL)) { // We can die safly now, since we are the only reference to this object ACE_DEBUG((MY_DEBUG ACE_TEXT(" : Channel_Consumer: channel already closed by tunnel going to delete\n"), _tunnel_handler)); lock.release(); delete this; res = STATUS_SUCCESS; } // If this channel is still open else if ((_state == ACTIVE) || (_state == PENDING_OPEN)) { ACE_DEBUG((MY_DEBUG ACE_TEXT(" : ChannelConsumer:: close - going to close ACTIVE/PENDING_OPEN channel (%d)\n"), _tunnel_handler, (int)_state)); //if close message can be send instantly: if (_msg_queue.is_empty() && _tunnel_consumer != NULL && ( _state == ACTIVE)) { _mc_closed = true; this->_tunnel_consumer->send_channel_close(_tunnel_handler); ACE_DEBUG((MY_TRACE ACE_TEXT(" : after sending close message to tunnel consumer\n"), _tunnel_handler)); } else { // need to insert hangup mesage into channel's queueu // (either there's date message we should send first or channel is IN PENDING_OPEN state): _mc_closed = true; ACE_Message_Block *mb; _tcp_supplier = NULL; ACE_NEW_RETURN(mb, ACE_Message_Block (0, ACE_Message_Block::MB_HANGUP), STATUS_MALLOC_FAILURE); res = putMBInQueue(mb); } _state = PENDING_CLOSE; } else if (_amt_closed && this->_tunnel_consumer != NULL) { ACE_DEBUG((MY_DEBUG ACE_TEXT(" : ChannelConsumer:: close - going to close PENDING_CLOSE channel\n"), _tunnel_handler)); // We must die now if (_tunnel_consumer->send_channel_close(_tunnel_handler) != STATUS_SUCCESS) { ACE_DEBUG((MY_DEBUG ACE_TEXT(" : Channel_Consumer::closeTunnelConnection - failed to inform tunnel consumer about closing.\n"), _tunnel_handler)); } _mc_closed = true; } return res; } //----------------------------------------- // Put a message block in our queue // If the queue is full then we will save this message in // _mb_remain and unregister the service handler from read //----------------------------------------- STATUS Channel_Consumer::putMBInQueue(ACE_Message_Block *mb) { // If we don't have enough space in our queue STATUS res = STATUS_SUCCESS; if (!canMQContainMB(mb->size())) { // No enough room in the message queue, save this message and // tell the supplier to stop reading from socket ACE_DEBUG((MY_DEBUG ACE_TEXT(" : current mb length = %d is larger from the lenght remains in channels queue = %d\n"), _tunnel_handler, mb->size(), (_msg_queue.high_water_mark() - _msg_queue.message_bytes()))); if (_mb_remain != NULL) { ACE_DEBUG((MY_DEBUG ACE_TEXT(" : _mb_remain is not NULL\n"), _tunnel_handler)); } _mb_remain = mb; res = STATUS_QUEUE_FULL; } else if (_msg_queue.enqueue_tail(mb) == -1) { ACE_DEBUG((MY_DEBUG ACE_TEXT(" : ChannelConsumer failed to enqueue message\n"), _tunnel_handler)); ACE_Message_Block::release(mb); if (errno == EWOULDBLOCK) res = STATUS_QUEUE_TIMEOUT; else res = STATUS_QUEUE_DEACTIVATED; } return res; } // This method is called by Tunnel Supplier when AMT send a close message void Channel_Consumer::channelClose() { ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, lock, _channel_mutex, ;); // We don't delete ourself here, only in connectionUnbind _amt_closed = true; _state = PENDING_CLOSE; ACE_DEBUG((MY_DEBUG ACE_TEXT("Channel_Consumer (%d) received channel close request from AMT\n"), _tunnel_handler)); } //This function called when AMT disconnected void Channel_Consumer::connectionUnbind() { ACE_TRACE("Channel_Consumer::connectionUnbind"); // Get lock on channel manager ACE_GUARD( ACE_Recursive_Thread_Mutex, lock, _channel_mutex); _state = PENDING_CLOSE; _amt_closed = true; this->_tunnel_consumer = NULL; if (_mc_closed) { ACE_DEBUG((MY_DEBUG ACE_TEXT(" Channel_Consumer : connectionUnbind MC_CLOSE deleting itself\n"), _tunnel_handler)); // TCP supplier already closed this channel lock.release(); delete this; } else { ACE_DEBUG((MY_DEBUG ACE_TEXT(" Channel_Consumer : connectionUnbind waiting for MC to close the connection now\n"), _tunnel_handler)); } } //----------------------------------------- // This function is being called when Channel manager // already holds amt channel for this connection, // This happends when AMT device request for direct // open request. // In this function we finish creating the channel //----------------------------------------- STATUS Channel_Consumer::openChannelRep(ACE_INET_Addr amt_addr, bool status, Tcp_Consumer* consumer, AMT_Tunnel_Consumer* tunnel_consumer) { ACE_TRACE("Channel_Consumer::openChannelRep"); ACE_CString addr; getStringFullAddr(amt_addr, addr); STATUS rep; // Get lock on channel manager ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex, lock, _channel_mutex, STATUS_FAILURE); //find if tunnel exist: if (status) { _state = ACTIVE; } _tunnel_consumer = tunnel_consumer; if (NULL == _tunnel_consumer) { ACE_DEBUG((MY_DEBUG ACE_TEXT(" : tried to send openChannel Reply, but tunnel consumer doesnt exist\n"), _tunnel_handler)); return STATUS_FATAL_ERROR; //TODO: need to close consumer as well - how? } if ((rep = _tunnel_consumer->channelOpenDirectRep(status, _tunnel_handler, this, consumer)) != STATUS_SUCCESS) { ACE_DEBUG((MY_DEBUG ACE_TEXT(" : failed to send direct reply\n"), _tunnel_handler)); return rep; } return STATUS_SUCCESS; } STATUS Channel_Consumer::getStringFullAddr(ACE_INET_Addr& addr , ACE_CString& str1) { //get full address: ACE_TCHAR buffer[256]; //max host length addr.addr_to_string(buffer , 256); str1.set(buffer); return STATUS_SUCCESS; }