677 lines
19 KiB
C++

//----------------------------------------------------------------------------
//
// Copyright (C) Intel Corporation, 2006 - 2007.
//
// File: ChannelConsumer.cpp
//
// Contents: In charge of handling messages from the socks connection
// to the tunnel.
//
// Notes:
//----------------------------------------------------------------------------
//===================================================
// INCLUDES
//===================================================
#include <ace/Message_Block.h>
#include <ace/SOCK_Stream.h>
#include "TunnelConsumer.h"
#include "ChannelConsumer.h"
#include "TcpConsumer.h"
#include "TcpSupplier.h"
#include "TunnelManager.h"
#include "TunnelSupplier.h"
#include "global.h"
#include "TunnelHandler.h"
//===================================================
// channelConsumer Implementation
//===================================================
bool Channel_Consumer::canMQContainMB(size_t mb_len)
{
return ((_msg_queue.high_water_mark() - _msg_queue.message_bytes()) > mb_len);
}
Channel_Consumer::~Channel_Consumer()
{
ACE_DEBUG((MY_DEBUG "----->Channel_Consumer\n"));
ACE_DEBUG((MY_TRACE
ACE_TEXT("(%x) Channel_Consumer::dtor - Data received: %d\tData sent: %d\tData gap: %d\n"),
this,
_total_received_data_length,
_total_sent_data_length,
(_total_received_data_length-_total_sent_data_length)));
if (_mb_remain != NULL)
{
delete _mb_remain;
}
_msg_queue.flush();
}
//-----------------------------------------
// Open a new channel with Intel(R) AMT
// Find the tunnel according to destination.
// Called by Socks_Supplier
//-----------------------------------------
STATUS Channel_Consumer::openChannel( const ACE_CString &address,
const ACE_UINT32 port,
const ACE_CString &sender_address,
const ACE_UINT32 sender_port,
Tcp_Consumer* consumer)
{
ACE_TRACE("Channel_Consumer::openChannel");
ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
lock,
_channel_mutex,
STATUS_FAILURE);
//find if tunnel exist:
ACE_READ_GUARD_RETURN( ACE_RW_Thread_Mutex ,
tunnel_lock,
Tunnel_Manager::instance().guard(),
STATUS_LOCK_FAILURE);
AMT_Tunnel_Handler* tunnel_handler = (Tunnel_Manager::instance().find_tunnel(address, port));
if (NULL == tunnel_handler)
{
ACE_DEBUG((MY_ERROR
ACE_TEXT("Channel consumer could not find the requested tunnel - %s:%d\n"),
address.c_str(), port));
return STATUS_FAILURE;
}
_tunnel_consumer = tunnel_handler->consumer();
//send channel Request message to tunnel
STATUS res;
_state = PENDING_OPEN;
if ((res =_tunnel_consumer->send_channel_open_request(
this,
consumer,
(ACE_UINT32)address.length(),
address,
port,
(ACE_UINT32)sender_address.length(),
sender_address,
sender_port
) ) != STATUS_SUCCESS)
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT("[%s] Management Console failed to connect to Intel remote client %s:%d\n"),
sender_address.c_str(),address.c_str(), port));
_state = INIT;
_tunnel_consumer = NULL;
return STATUS_FAILURE;
}
return res;
}
//-----------------------------------------
// add the message data to the message queue
// and register the tunnel socket to the reactor
// for outgoing meesage
//-----------------------------------------
STATUS Channel_Consumer::sendData(ACE_Message_Block *mb)
{
ACE_TRACE("Channel_Consumer::sendData");
ACE_DEBUG((MY_TRACE
ACE_TEXT("Channel_Consumer::sendData. %x\n"),
this));
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex,
lock,
_channel_mutex,
STATUS_FAILURE);
if (_tunnel_consumer == NULL)
{
ACE_DEBUG((MY_DEBUG ACE_TEXT(" <MC(%d)>: trying to send data on invalid tunnel. ignoring data...\n"),
_tunnel_handler));
mb->release();
return STATUS_SUCCESS;
}
_total_received_data_length += mb->length();
// send data can't be called after the supplier closed this connection
ACE_ASSERT((!_mc_closed) && (_tcp_supplier != NULL));
STATUS res = STATUS_SUCCESS;
// if need to enter to queue:
if ((_win_size == 0) || (! _msg_queue.is_empty() ) || _state == PENDING_OPEN)
{
ACE_DEBUG (( MY_DEBUG
ACE_TEXT(" <MC(%d)>: Channel_Consumer queuing message, window size is %x, queue is empty: %x\n"),
_tunnel_handler, _win_size, _msg_queue.is_empty()));
if (_amt_closed)
{
ACE_DEBUG((MY_DEBUG ACE_TEXT(" <MC(%d)>: Channel_Consumer:sendData - state is AMT_CLOSE. ignore the message\n "),
_tunnel_handler));
// We cannot fw this message, notify the caller.
ACE_Message_Block::release(mb);
return STATUS_SUCCESS;
}
res = putMBInQueue(mb);
}
else
{
// Queue is empty and we have window size
// We can forward the message right now
res = forwardMBtoTunnel(mb);
}
if (STATUS_QUEUE_FULL == res)
{
ACE_DEBUG((MY_DEBUG "Channel_Consumer::sendData - going to unregister handler for read\n"));
//unregister input from reactor
_registered = false;
_tcp_supplier->unRegisterHandler(ACE_Event_Handler::READ_MASK);
}
return res;
}
// forward message block to tunnel consumer queue
// This is an internal function and must be called after taking this cunsumer's lock.
STATUS Channel_Consumer::forwardMBtoTunnel(ACE_Message_Block *mb)
{
STATUS res;
ACE_DEBUG((MY_TRACE ACE_TEXT("Channel_Consumer::forwardMBtoTunnel \t(%x)\n"), this));
//message is too big to send , split and send only window size
if (_win_size < mb->length())
{
ACE_DEBUG ((MY_DEBUG
ACE_TEXT (" <MC(%d)>: Channel_Consumer splitting message, window size = %d, message length = %d\n"),
_tunnel_handler,_win_size, mb->length()));
//Split message
ACE_Message_Block *mb2=NULL;
splitMessage(mb, mb2);
//Insert the other message to the top of the queue
if(_msg_queue.enqueue_head(mb2) == -1)
{
if (errno == EWOULDBLOCK)
res = STATUS_QUEUE_TIMEOUT;
else
res = STATUS_QUEUE_DEACTIVATED;
}
}
//insert the message in the tunnel queue
res = _tunnel_consumer->send_channel_data(_tunnel_handler, mb);
if (res != STATUS_SUCCESS)
{
// return the message into the queue and try again later
_msg_queue.enqueue_head(mb);
ACE_DEBUG((MY_DEBUG
ACE_TEXT(" <MC(%d)>: Failed to add message to tunnel queue\n"),
_tunnel_handler));
return res;
}
// managed to queue mb. Add information for debug purpose
setTotalSentDataLength( getTotalSentDataLength() + mb->length());
ACE_DEBUG((MY_TRACE
ACE_TEXT("Channel_Consumer::forwardMBtoTunnel: after sending data to tunnel, length: %d\t(%x)\n"),
mb->length(),
this));
// update window size
_win_size -= (unsigned int)mb->length();
ACE_Message_Block::release(mb);
return STATUS_SUCCESS;
}
//-----------------------------------------
// Split mb to 2 in way that:
// mb will hold only _window size length
// mb2 will hold the rest of the message
//-----------------------------------------
void Channel_Consumer::splitMessage(ACE_Message_Block*& mb, ACE_Message_Block*& mb2)
{
mb2 = mb->clone();
mb2->rd_ptr(_win_size);
char *mb_start = mb->rd_ptr();
mb->rd_ptr(_win_size);
char *mb_end = mb->rd_ptr();
mb->rd_ptr(mb_start);
mb->wr_ptr(mb_end);
}
//-----------------------------------------
// Set a new window size.
// When this function is being called, we go through
// the message queue, and transfer as much messages
// as we can, to tunnel message queue.
//-----------------------------------------
STATUS Channel_Consumer::setWinSize(unsigned int win)
{
ACE_TRACE("Channel_Consumer::setWinSize");
ACE_DEBUG ((MY_TRACE
ACE_TEXT (" <MC(%d)>: Channel_Consumer::setWinSize -\t(%x)\n"), _tunnel_handler, this));
// Get lock on channel manager
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex,
lock,
_channel_mutex,
STATUS_FAILURE);
_win_size += win;
size_t send_amount = 0;
int queue_full = 0;
ACE_Message_Block *mb;
STATUS rep = STATUS_SUCCESS;
//go throuth message queue:
while ((!_msg_queue.is_empty()) &&(_win_size != 0))
{
mb = NULL;
if (_msg_queue.dequeue_head(mb) == -1)
{
if (errno == EWOULDBLOCK)
rep = STATUS_QUEUE_TIMEOUT;
else
rep = STATUS_QUEUE_DEACTIVATED;
ACE_ERROR_RETURN ((MY_DEBUG " failed dequeue"),rep);
}
if (mb == NULL)
{
ACE_ERROR_RETURN ((MY_DEBUG " failed dequeue"),STATUS_QUEUE_DEACTIVATED);
}
rep = handleMB(mb);
}
// If we have a previous saved message and enough room in the queue.
// Put his message in the queue
if (_mb_remain != NULL)
{
ACE_DEBUG ((MY_DEBUG
ACE_TEXT (" <MC(%d)>: after win adjust - _mb_remain is being handle\n"),
_tunnel_handler));
//if message need to be fully inserted to channelConsumer queue:
if ((!_msg_queue.is_empty()) || (_win_size == 0))
{
if (!canMQContainMB(_mb_remain->size()))
{
ACE_DEBUG ((MY_DEBUG
ACE_TEXT (" <MC(%d)>: _mb_remain still can't be contained in mq- svc wont be registered to reactor again\n"),
_tunnel_handler));
rep = STATUS_QUEUE_FULL;
}
else
{
ACE_DEBUG ((MY_DEBUG
ACE_TEXT (" <MC(%d)>: putting _mb_remain back\n"), _tunnel_handler));
if (!_msg_queue.is_empty()) {
ACE_DEBUG ((MY_DEBUG
ACE_TEXT ("MQ is not empty yet\n"), _tunnel_handler));
}
else if(_win_size == 0) {
ACE_DEBUG ((MY_DEBUG
ACE_TEXT ("win size is 0\n"), _tunnel_handler));
}
if(_msg_queue.enqueue_tail(_mb_remain) == -1)
{
ACE_DEBUG ((MY_DEBUG
ACE_TEXT (" <MC(%d)>: error in _msg_queue.enqueue_tail \n"), _tunnel_handler));
if (errno == EWOULDBLOCK)
rep = STATUS_QUEUE_TIMEOUT;
else
rep = STATUS_QUEUE_DEACTIVATED;
}
else
{
_mb_remain = NULL;
rep = STATUS_SUCCESS;
}
}
}
// message/or part of it can be send right now:
else
{
rep = handleMB(_mb_remain);
_mb_remain = NULL;
}
}
if (_tcp_supplier == NULL)
{
ACE_DEBUG((MY_DEBUG ACE_TEXT(" <MC(%d)>: Channel_Consumer::setWinSize - tcpSupplier is NULL\n"),
_tunnel_handler));
return STATUS_SUCCESS;
}
if (rep == STATUS_QUEUE_FULL)
{
//unregister input from reactor (there is no place in socket)
if ((_registered) && (_tcp_supplier != NULL))
{
_registered = false;
_tcp_supplier->unRegisterHandler(ACE_Event_Handler::READ_MASK);
}
}
else if( (!_registered) && (rep != STATUS_QUEUE_FULL) && (_tcp_supplier != NULL))
{
//register
_registered = true;
_tcp_supplier->registerHandler(ACE_Event_Handler::READ_MASK);
}
return STATUS_SUCCESS;
}
//-----------------------------------------
// Handle MB.
// If mb is hangup message - send close channel to its tunnel
// otherwise - send the data to the tunnel.
//-----------------------------------------
STATUS Channel_Consumer::handleMB(ACE_Message_Block* mb)
{
if (_tunnel_consumer == NULL)
{
mb->release ();
return STATUS_SUCCESS;
}
//if close connection message
if ( mb->msg_type () == ACE_Message_Block::MB_HANGUP)
{
ACE_DEBUG ((MY_DEBUG
ACE_TEXT (" <MC(%d)>: sending close message\n"),
_tunnel_handler));
_tunnel_consumer->send_channel_close(_tunnel_handler);
mb->release ();
_state = PENDING_CLOSE;
_mc_closed = true;
return STATUS_SUCCESS;
}
// forward message to tunnel
return forwardMBtoTunnel(mb);
}
//-----------------------------------------
// activate the channel.
// This handler will used each time we use tunnel API
// Called from the Tunnel_Supplier after getting channel open reply
//-----------------------------------------
STATUS Channel_Consumer::activateChannel(unsigned int id, unsigned int win)
{
ACE_TRACE("Channel_Consumer::activateChannel");
// Get lock on channel manager
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex,
lock,
_channel_mutex,
STATUS_FAILURE);
_tunnel_handler = id;
// If we are not in pending open we have a problem.
if (_state == PENDING_OPEN)
{
_state = ACTIVE;
}
else {
ACE_DEBUG((MY_DEBUG
ACE_TEXT("Channel_Consumer::activateChannel. state is NOT pending open(%x)\n"),
this));
}
setWinSize(win);
return STATUS_SUCCESS;
}
//-----------------------------------------
// Channel open failure
// Called by:
// 1. Tunnel_Supplier after receiving channel open failure
// 2. Tunnel_Consumer - in open direct request, when some error occured
//-----------------------------------------
void Channel_Consumer::channelOpenFailure()
{
ACE_TRACE("Channel_Consumer::channelOpenFailure");
// Get lock
ACE_GUARD( ACE_Recursive_Thread_Mutex,
lock,
_channel_mutex);
// If MC alerady closed this connection we can die
if (_mc_closed)
{
lock.release();
delete this;
return;
}
_state = PENDING_CLOSE;
_amt_closed = true;
_tunnel_consumer = NULL;
}
//-----------------------------------------
// Called by TCP supplier (when it's closing)
//-----------------------------------------
STATUS Channel_Consumer::closeTunnelConnection()
{
ACE_TRACE("Channel_Consumer::closeTunnelConnection");
ACE_DEBUG((MY_TRACE
ACE_TEXT("Channel_Consumer::closeTunnelConnection: (%x)\n"),
this));
// Get lock on channel Consumer
ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
lock,
_channel_mutex,
STATUS_LOCK_FAILURE);
ACE_ASSERT(!_mc_closed);
STATUS res = STATUS_SUCCESS;
if ((_state == INIT) ||
(_amt_closed && this->_tunnel_consumer == NULL)) {
// We can die safly now, since we are the only reference to this object
ACE_DEBUG((MY_DEBUG
ACE_TEXT(" <MC(%d)>: Channel_Consumer: channel already closed by tunnel going to delete\n"),
_tunnel_handler));
lock.release();
delete this;
res = STATUS_SUCCESS;
}
// If this channel is still open
else if ((_state == ACTIVE) || (_state == PENDING_OPEN))
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT(" <MC(%d)>: ChannelConsumer:: close - going to close ACTIVE/PENDING_OPEN channel (%d)\n"),
_tunnel_handler, (int)_state));
//if close message can be send instantly:
if (_msg_queue.is_empty() && _tunnel_consumer != NULL && ( _state == ACTIVE))
{
_mc_closed = true;
this->_tunnel_consumer->send_channel_close(_tunnel_handler);
ACE_DEBUG((MY_TRACE
ACE_TEXT(" <MC(%d)>: after sending close message to tunnel consumer\n"),
_tunnel_handler));
}
else
{
// need to insert hangup mesage into channel's queueu
// (either there's date message we should send first or channel is IN PENDING_OPEN state):
_mc_closed = true;
ACE_Message_Block *mb;
_tcp_supplier = NULL;
ACE_NEW_RETURN(mb, ACE_Message_Block (0, ACE_Message_Block::MB_HANGUP), STATUS_MALLOC_FAILURE);
res = putMBInQueue(mb);
}
_state = PENDING_CLOSE;
}
else if (_amt_closed && this->_tunnel_consumer != NULL)
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT(" <MC(%d)>: ChannelConsumer:: close - going to close PENDING_CLOSE channel\n"),
_tunnel_handler));
// We must die now
if (_tunnel_consumer->send_channel_close(_tunnel_handler) != STATUS_SUCCESS)
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT(" <MC(%d)>: Channel_Consumer::closeTunnelConnection - failed to inform tunnel consumer about closing.\n"),
_tunnel_handler));
}
_mc_closed = true;
}
return res;
}
//-----------------------------------------
// Put a message block in our queue
// If the queue is full then we will save this message in
// _mb_remain and unregister the service handler from read
//-----------------------------------------
STATUS Channel_Consumer::putMBInQueue(ACE_Message_Block *mb)
{
// If we don't have enough space in our queue
STATUS res = STATUS_SUCCESS;
if (!canMQContainMB(mb->size()))
{
// No enough room in the message queue, save this message and
// tell the supplier to stop reading from socket
ACE_DEBUG((MY_DEBUG
ACE_TEXT(" <MC(%d)>: current mb length = %d is larger from the lenght remains in channels queue = %d\n"),
_tunnel_handler,
mb->size(),
(_msg_queue.high_water_mark() - _msg_queue.message_bytes())));
if (_mb_remain != NULL)
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT(" <MC(%d)>: _mb_remain is not NULL\n"),
_tunnel_handler));
}
_mb_remain = mb;
res = STATUS_QUEUE_FULL;
}
else if (_msg_queue.enqueue_tail(mb) == -1)
{
ACE_DEBUG((MY_DEBUG ACE_TEXT(" <MC(%d)>: ChannelConsumer failed to enqueue message\n"),
_tunnel_handler));
ACE_Message_Block::release(mb);
if (errno == EWOULDBLOCK)
res = STATUS_QUEUE_TIMEOUT;
else
res = STATUS_QUEUE_DEACTIVATED;
}
return res;
}
// This method is called by Tunnel Supplier when AMT send a close message
void Channel_Consumer::channelClose()
{
ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
lock,
_channel_mutex,
;);
// We don't delete ourself here, only in connectionUnbind
_amt_closed = true;
_state = PENDING_CLOSE;
ACE_DEBUG((MY_DEBUG
ACE_TEXT("Channel_Consumer (%d) received channel close request from AMT\n"),
_tunnel_handler));
}
//This function called when AMT disconnected
void Channel_Consumer::connectionUnbind()
{
ACE_TRACE("Channel_Consumer::connectionUnbind");
// Get lock on channel manager
ACE_GUARD( ACE_Recursive_Thread_Mutex,
lock,
_channel_mutex);
_state = PENDING_CLOSE;
_amt_closed = true;
this->_tunnel_consumer = NULL;
if (_mc_closed)
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT(" Channel_Consumer : connectionUnbind MC_CLOSE deleting itself\n"),
_tunnel_handler));
// TCP supplier already closed this channel
lock.release();
delete this;
}
else
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT(" Channel_Consumer : connectionUnbind waiting for MC to close the connection now\n"),
_tunnel_handler));
}
}
//-----------------------------------------
// This function is being called when Channel manager
// already holds amt channel for this connection,
// This happends when AMT device request for direct
// open request.
// In this function we finish creating the channel
//-----------------------------------------
STATUS Channel_Consumer::openChannelRep(ACE_INET_Addr amt_addr,
bool status,
Tcp_Consumer* consumer,
AMT_Tunnel_Consumer* tunnel_consumer)
{
ACE_TRACE("Channel_Consumer::openChannelRep");
ACE_CString addr;
getStringFullAddr(amt_addr, addr);
STATUS rep;
// Get lock on channel manager
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex,
lock,
_channel_mutex,
STATUS_FAILURE);
//find if tunnel exist:
if (status)
{
_state = ACTIVE;
}
_tunnel_consumer = tunnel_consumer;
if (NULL == _tunnel_consumer)
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT(" <MC(%d)>: tried to send openChannel Reply, but tunnel consumer doesnt exist\n"),
_tunnel_handler));
return STATUS_FATAL_ERROR; //TODO: need to close consumer as well - how?
}
if ((rep = _tunnel_consumer->channelOpenDirectRep(status, _tunnel_handler, this, consumer)) != STATUS_SUCCESS)
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT(" <MC(%d)>: failed to send direct reply\n"), _tunnel_handler));
return rep;
}
return STATUS_SUCCESS;
}
STATUS Channel_Consumer::getStringFullAddr(ACE_INET_Addr& addr , ACE_CString& str1)
{
//get full address:
ACE_TCHAR buffer[256]; //max host length
addr.addr_to_string(buffer , 256);
str1.set(buffer);
return STATUS_SUCCESS;
}