677 lines
19 KiB
C++
677 lines
19 KiB
C++
//----------------------------------------------------------------------------
|
|
//
|
|
// Copyright (C) Intel Corporation, 2006 - 2007.
|
|
//
|
|
// File: ChannelConsumer.cpp
|
|
//
|
|
// Contents: In charge of handling messages from the socks connection
|
|
// to the tunnel.
|
|
//
|
|
// Notes:
|
|
//----------------------------------------------------------------------------
|
|
|
|
//===================================================
|
|
// INCLUDES
|
|
//===================================================
|
|
#include <ace/Message_Block.h>
|
|
#include <ace/SOCK_Stream.h>
|
|
|
|
#include "TunnelConsumer.h"
|
|
#include "ChannelConsumer.h"
|
|
#include "TcpConsumer.h"
|
|
#include "TcpSupplier.h"
|
|
#include "TunnelManager.h"
|
|
#include "TunnelSupplier.h"
|
|
#include "global.h"
|
|
#include "TunnelHandler.h"
|
|
|
|
//===================================================
|
|
// channelConsumer Implementation
|
|
//===================================================
|
|
bool Channel_Consumer::canMQContainMB(size_t mb_len)
|
|
{
|
|
return ((_msg_queue.high_water_mark() - _msg_queue.message_bytes()) > mb_len);
|
|
}
|
|
|
|
Channel_Consumer::~Channel_Consumer()
|
|
{
|
|
ACE_DEBUG((MY_DEBUG "----->Channel_Consumer\n"));
|
|
ACE_DEBUG((MY_TRACE
|
|
ACE_TEXT("(%x) Channel_Consumer::dtor - Data received: %d\tData sent: %d\tData gap: %d\n"),
|
|
this,
|
|
_total_received_data_length,
|
|
_total_sent_data_length,
|
|
(_total_received_data_length-_total_sent_data_length)));
|
|
if (_mb_remain != NULL)
|
|
{
|
|
delete _mb_remain;
|
|
}
|
|
|
|
_msg_queue.flush();
|
|
|
|
}
|
|
|
|
//-----------------------------------------
|
|
// Open a new channel with Intel(R) AMT
|
|
// Find the tunnel according to destination.
|
|
// Called by Socks_Supplier
|
|
//-----------------------------------------
|
|
STATUS Channel_Consumer::openChannel( const ACE_CString &address,
|
|
const ACE_UINT32 port,
|
|
const ACE_CString &sender_address,
|
|
const ACE_UINT32 sender_port,
|
|
Tcp_Consumer* consumer)
|
|
{
|
|
ACE_TRACE("Channel_Consumer::openChannel");
|
|
|
|
ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
|
|
lock,
|
|
_channel_mutex,
|
|
STATUS_FAILURE);
|
|
|
|
//find if tunnel exist:
|
|
ACE_READ_GUARD_RETURN( ACE_RW_Thread_Mutex ,
|
|
tunnel_lock,
|
|
Tunnel_Manager::instance().guard(),
|
|
STATUS_LOCK_FAILURE);
|
|
|
|
|
|
|
|
AMT_Tunnel_Handler* tunnel_handler = (Tunnel_Manager::instance().find_tunnel(address, port));
|
|
if (NULL == tunnel_handler)
|
|
{
|
|
ACE_DEBUG((MY_ERROR
|
|
ACE_TEXT("Channel consumer could not find the requested tunnel - %s:%d\n"),
|
|
address.c_str(), port));
|
|
return STATUS_FAILURE;
|
|
}
|
|
_tunnel_consumer = tunnel_handler->consumer();
|
|
//send channel Request message to tunnel
|
|
STATUS res;
|
|
_state = PENDING_OPEN;
|
|
if ((res =_tunnel_consumer->send_channel_open_request(
|
|
this,
|
|
consumer,
|
|
(ACE_UINT32)address.length(),
|
|
address,
|
|
port,
|
|
(ACE_UINT32)sender_address.length(),
|
|
sender_address,
|
|
sender_port
|
|
) ) != STATUS_SUCCESS)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT("[%s] Management Console failed to connect to Intel remote client %s:%d\n"),
|
|
sender_address.c_str(),address.c_str(), port));
|
|
_state = INIT;
|
|
_tunnel_consumer = NULL;
|
|
return STATUS_FAILURE;
|
|
}
|
|
|
|
return res;
|
|
}
|
|
|
|
//-----------------------------------------
|
|
// add the message data to the message queue
|
|
// and register the tunnel socket to the reactor
|
|
// for outgoing meesage
|
|
//-----------------------------------------
|
|
STATUS Channel_Consumer::sendData(ACE_Message_Block *mb)
|
|
{
|
|
ACE_TRACE("Channel_Consumer::sendData");
|
|
|
|
ACE_DEBUG((MY_TRACE
|
|
ACE_TEXT("Channel_Consumer::sendData. %x\n"),
|
|
this));
|
|
|
|
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex,
|
|
lock,
|
|
_channel_mutex,
|
|
STATUS_FAILURE);
|
|
if (_tunnel_consumer == NULL)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG ACE_TEXT(" <MC(%d)>: trying to send data on invalid tunnel. ignoring data...\n"),
|
|
_tunnel_handler));
|
|
mb->release();
|
|
return STATUS_SUCCESS;
|
|
}
|
|
_total_received_data_length += mb->length();
|
|
|
|
// send data can't be called after the supplier closed this connection
|
|
ACE_ASSERT((!_mc_closed) && (_tcp_supplier != NULL));
|
|
|
|
STATUS res = STATUS_SUCCESS;
|
|
|
|
// if need to enter to queue:
|
|
if ((_win_size == 0) || (! _msg_queue.is_empty() ) || _state == PENDING_OPEN)
|
|
{
|
|
ACE_DEBUG (( MY_DEBUG
|
|
ACE_TEXT(" <MC(%d)>: Channel_Consumer queuing message, window size is %x, queue is empty: %x\n"),
|
|
_tunnel_handler, _win_size, _msg_queue.is_empty()));
|
|
|
|
if (_amt_closed)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG ACE_TEXT(" <MC(%d)>: Channel_Consumer:sendData - state is AMT_CLOSE. ignore the message\n "),
|
|
_tunnel_handler));
|
|
// We cannot fw this message, notify the caller.
|
|
ACE_Message_Block::release(mb);
|
|
return STATUS_SUCCESS;
|
|
}
|
|
|
|
res = putMBInQueue(mb);
|
|
}
|
|
|
|
else
|
|
{
|
|
// Queue is empty and we have window size
|
|
// We can forward the message right now
|
|
res = forwardMBtoTunnel(mb);
|
|
}
|
|
if (STATUS_QUEUE_FULL == res)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG "Channel_Consumer::sendData - going to unregister handler for read\n"));
|
|
//unregister input from reactor
|
|
_registered = false;
|
|
_tcp_supplier->unRegisterHandler(ACE_Event_Handler::READ_MASK);
|
|
}
|
|
|
|
return res;
|
|
}
|
|
|
|
// forward message block to tunnel consumer queue
|
|
// This is an internal function and must be called after taking this cunsumer's lock.
|
|
STATUS Channel_Consumer::forwardMBtoTunnel(ACE_Message_Block *mb)
|
|
{
|
|
STATUS res;
|
|
ACE_DEBUG((MY_TRACE ACE_TEXT("Channel_Consumer::forwardMBtoTunnel \t(%x)\n"), this));
|
|
//message is too big to send , split and send only window size
|
|
if (_win_size < mb->length())
|
|
{
|
|
ACE_DEBUG ((MY_DEBUG
|
|
ACE_TEXT (" <MC(%d)>: Channel_Consumer splitting message, window size = %d, message length = %d\n"),
|
|
_tunnel_handler,_win_size, mb->length()));
|
|
|
|
//Split message
|
|
ACE_Message_Block *mb2=NULL;
|
|
splitMessage(mb, mb2);
|
|
|
|
//Insert the other message to the top of the queue
|
|
if(_msg_queue.enqueue_head(mb2) == -1)
|
|
{
|
|
if (errno == EWOULDBLOCK)
|
|
res = STATUS_QUEUE_TIMEOUT;
|
|
else
|
|
res = STATUS_QUEUE_DEACTIVATED;
|
|
}
|
|
}
|
|
|
|
//insert the message in the tunnel queue
|
|
res = _tunnel_consumer->send_channel_data(_tunnel_handler, mb);
|
|
if (res != STATUS_SUCCESS)
|
|
{
|
|
// return the message into the queue and try again later
|
|
_msg_queue.enqueue_head(mb);
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT(" <MC(%d)>: Failed to add message to tunnel queue\n"),
|
|
_tunnel_handler));
|
|
return res;
|
|
}
|
|
|
|
// managed to queue mb. Add information for debug purpose
|
|
setTotalSentDataLength( getTotalSentDataLength() + mb->length());
|
|
ACE_DEBUG((MY_TRACE
|
|
ACE_TEXT("Channel_Consumer::forwardMBtoTunnel: after sending data to tunnel, length: %d\t(%x)\n"),
|
|
mb->length(),
|
|
this));
|
|
|
|
// update window size
|
|
_win_size -= (unsigned int)mb->length();
|
|
ACE_Message_Block::release(mb);
|
|
|
|
return STATUS_SUCCESS;
|
|
}
|
|
|
|
//-----------------------------------------
|
|
// Split mb to 2 in way that:
|
|
// mb will hold only _window size length
|
|
// mb2 will hold the rest of the message
|
|
//-----------------------------------------
|
|
void Channel_Consumer::splitMessage(ACE_Message_Block*& mb, ACE_Message_Block*& mb2)
|
|
{
|
|
mb2 = mb->clone();
|
|
mb2->rd_ptr(_win_size);
|
|
|
|
char *mb_start = mb->rd_ptr();
|
|
mb->rd_ptr(_win_size);
|
|
char *mb_end = mb->rd_ptr();
|
|
|
|
mb->rd_ptr(mb_start);
|
|
mb->wr_ptr(mb_end);
|
|
}
|
|
//-----------------------------------------
|
|
// Set a new window size.
|
|
// When this function is being called, we go through
|
|
// the message queue, and transfer as much messages
|
|
// as we can, to tunnel message queue.
|
|
//-----------------------------------------
|
|
STATUS Channel_Consumer::setWinSize(unsigned int win)
|
|
{
|
|
ACE_TRACE("Channel_Consumer::setWinSize");
|
|
ACE_DEBUG ((MY_TRACE
|
|
ACE_TEXT (" <MC(%d)>: Channel_Consumer::setWinSize -\t(%x)\n"), _tunnel_handler, this));
|
|
// Get lock on channel manager
|
|
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex,
|
|
lock,
|
|
_channel_mutex,
|
|
STATUS_FAILURE);
|
|
_win_size += win;
|
|
size_t send_amount = 0;
|
|
int queue_full = 0;
|
|
ACE_Message_Block *mb;
|
|
STATUS rep = STATUS_SUCCESS;
|
|
|
|
//go throuth message queue:
|
|
while ((!_msg_queue.is_empty()) &&(_win_size != 0))
|
|
{
|
|
mb = NULL;
|
|
if (_msg_queue.dequeue_head(mb) == -1)
|
|
{
|
|
if (errno == EWOULDBLOCK)
|
|
rep = STATUS_QUEUE_TIMEOUT;
|
|
else
|
|
rep = STATUS_QUEUE_DEACTIVATED;
|
|
ACE_ERROR_RETURN ((MY_DEBUG " failed dequeue"),rep);
|
|
}
|
|
if (mb == NULL)
|
|
{
|
|
ACE_ERROR_RETURN ((MY_DEBUG " failed dequeue"),STATUS_QUEUE_DEACTIVATED);
|
|
}
|
|
|
|
rep = handleMB(mb);
|
|
}
|
|
|
|
// If we have a previous saved message and enough room in the queue.
|
|
// Put his message in the queue
|
|
if (_mb_remain != NULL)
|
|
{
|
|
ACE_DEBUG ((MY_DEBUG
|
|
ACE_TEXT (" <MC(%d)>: after win adjust - _mb_remain is being handle\n"),
|
|
_tunnel_handler));
|
|
|
|
//if message need to be fully inserted to channelConsumer queue:
|
|
if ((!_msg_queue.is_empty()) || (_win_size == 0))
|
|
{
|
|
if (!canMQContainMB(_mb_remain->size()))
|
|
{
|
|
ACE_DEBUG ((MY_DEBUG
|
|
ACE_TEXT (" <MC(%d)>: _mb_remain still can't be contained in mq- svc wont be registered to reactor again\n"),
|
|
_tunnel_handler));
|
|
rep = STATUS_QUEUE_FULL;
|
|
}
|
|
else
|
|
{
|
|
ACE_DEBUG ((MY_DEBUG
|
|
ACE_TEXT (" <MC(%d)>: putting _mb_remain back\n"), _tunnel_handler));
|
|
if (!_msg_queue.is_empty()) {
|
|
ACE_DEBUG ((MY_DEBUG
|
|
ACE_TEXT ("MQ is not empty yet\n"), _tunnel_handler));
|
|
}
|
|
else if(_win_size == 0) {
|
|
ACE_DEBUG ((MY_DEBUG
|
|
ACE_TEXT ("win size is 0\n"), _tunnel_handler));
|
|
}
|
|
|
|
if(_msg_queue.enqueue_tail(_mb_remain) == -1)
|
|
{
|
|
ACE_DEBUG ((MY_DEBUG
|
|
ACE_TEXT (" <MC(%d)>: error in _msg_queue.enqueue_tail \n"), _tunnel_handler));
|
|
if (errno == EWOULDBLOCK)
|
|
rep = STATUS_QUEUE_TIMEOUT;
|
|
else
|
|
rep = STATUS_QUEUE_DEACTIVATED;
|
|
}
|
|
else
|
|
{
|
|
_mb_remain = NULL;
|
|
rep = STATUS_SUCCESS;
|
|
}
|
|
}
|
|
}
|
|
// message/or part of it can be send right now:
|
|
else
|
|
{
|
|
rep = handleMB(_mb_remain);
|
|
_mb_remain = NULL;
|
|
}
|
|
}
|
|
if (_tcp_supplier == NULL)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG ACE_TEXT(" <MC(%d)>: Channel_Consumer::setWinSize - tcpSupplier is NULL\n"),
|
|
_tunnel_handler));
|
|
return STATUS_SUCCESS;
|
|
}
|
|
|
|
if (rep == STATUS_QUEUE_FULL)
|
|
{
|
|
//unregister input from reactor (there is no place in socket)
|
|
if ((_registered) && (_tcp_supplier != NULL))
|
|
{
|
|
_registered = false;
|
|
_tcp_supplier->unRegisterHandler(ACE_Event_Handler::READ_MASK);
|
|
}
|
|
}
|
|
else if( (!_registered) && (rep != STATUS_QUEUE_FULL) && (_tcp_supplier != NULL))
|
|
{
|
|
//register
|
|
_registered = true;
|
|
_tcp_supplier->registerHandler(ACE_Event_Handler::READ_MASK);
|
|
}
|
|
|
|
return STATUS_SUCCESS;
|
|
}
|
|
|
|
//-----------------------------------------
|
|
// Handle MB.
|
|
// If mb is hangup message - send close channel to its tunnel
|
|
// otherwise - send the data to the tunnel.
|
|
//-----------------------------------------
|
|
STATUS Channel_Consumer::handleMB(ACE_Message_Block* mb)
|
|
{
|
|
if (_tunnel_consumer == NULL)
|
|
{
|
|
mb->release ();
|
|
return STATUS_SUCCESS;
|
|
}
|
|
//if close connection message
|
|
if ( mb->msg_type () == ACE_Message_Block::MB_HANGUP)
|
|
{
|
|
ACE_DEBUG ((MY_DEBUG
|
|
ACE_TEXT (" <MC(%d)>: sending close message\n"),
|
|
_tunnel_handler));
|
|
_tunnel_consumer->send_channel_close(_tunnel_handler);
|
|
mb->release ();
|
|
_state = PENDING_CLOSE;
|
|
_mc_closed = true;
|
|
return STATUS_SUCCESS;
|
|
}
|
|
// forward message to tunnel
|
|
return forwardMBtoTunnel(mb);
|
|
}
|
|
//-----------------------------------------
|
|
// activate the channel.
|
|
// This handler will used each time we use tunnel API
|
|
// Called from the Tunnel_Supplier after getting channel open reply
|
|
//-----------------------------------------
|
|
STATUS Channel_Consumer::activateChannel(unsigned int id, unsigned int win)
|
|
{
|
|
ACE_TRACE("Channel_Consumer::activateChannel");
|
|
|
|
// Get lock on channel manager
|
|
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex,
|
|
lock,
|
|
_channel_mutex,
|
|
STATUS_FAILURE);
|
|
|
|
_tunnel_handler = id;
|
|
// If we are not in pending open we have a problem.
|
|
if (_state == PENDING_OPEN)
|
|
{
|
|
_state = ACTIVE;
|
|
}
|
|
else {
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT("Channel_Consumer::activateChannel. state is NOT pending open(%x)\n"),
|
|
this));
|
|
}
|
|
setWinSize(win);
|
|
return STATUS_SUCCESS;
|
|
}
|
|
//-----------------------------------------
|
|
// Channel open failure
|
|
// Called by:
|
|
// 1. Tunnel_Supplier after receiving channel open failure
|
|
// 2. Tunnel_Consumer - in open direct request, when some error occured
|
|
//-----------------------------------------
|
|
void Channel_Consumer::channelOpenFailure()
|
|
{
|
|
ACE_TRACE("Channel_Consumer::channelOpenFailure");
|
|
// Get lock
|
|
ACE_GUARD( ACE_Recursive_Thread_Mutex,
|
|
lock,
|
|
_channel_mutex);
|
|
|
|
// If MC alerady closed this connection we can die
|
|
if (_mc_closed)
|
|
{
|
|
lock.release();
|
|
delete this;
|
|
return;
|
|
}
|
|
|
|
_state = PENDING_CLOSE;
|
|
_amt_closed = true;
|
|
_tunnel_consumer = NULL;
|
|
}
|
|
|
|
//-----------------------------------------
|
|
// Called by TCP supplier (when it's closing)
|
|
//-----------------------------------------
|
|
STATUS Channel_Consumer::closeTunnelConnection()
|
|
{
|
|
ACE_TRACE("Channel_Consumer::closeTunnelConnection");
|
|
|
|
ACE_DEBUG((MY_TRACE
|
|
ACE_TEXT("Channel_Consumer::closeTunnelConnection: (%x)\n"),
|
|
this));
|
|
|
|
// Get lock on channel Consumer
|
|
ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
|
|
lock,
|
|
_channel_mutex,
|
|
STATUS_LOCK_FAILURE);
|
|
|
|
ACE_ASSERT(!_mc_closed);
|
|
STATUS res = STATUS_SUCCESS;
|
|
|
|
if ((_state == INIT) ||
|
|
(_amt_closed && this->_tunnel_consumer == NULL)) {
|
|
// We can die safly now, since we are the only reference to this object
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT(" <MC(%d)>: Channel_Consumer: channel already closed by tunnel going to delete\n"),
|
|
_tunnel_handler));
|
|
lock.release();
|
|
delete this;
|
|
res = STATUS_SUCCESS;
|
|
}
|
|
// If this channel is still open
|
|
else if ((_state == ACTIVE) || (_state == PENDING_OPEN))
|
|
{
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT(" <MC(%d)>: ChannelConsumer:: close - going to close ACTIVE/PENDING_OPEN channel (%d)\n"),
|
|
_tunnel_handler, (int)_state));
|
|
|
|
//if close message can be send instantly:
|
|
if (_msg_queue.is_empty() && _tunnel_consumer != NULL && ( _state == ACTIVE))
|
|
{
|
|
_mc_closed = true;
|
|
this->_tunnel_consumer->send_channel_close(_tunnel_handler);
|
|
ACE_DEBUG((MY_TRACE
|
|
ACE_TEXT(" <MC(%d)>: after sending close message to tunnel consumer\n"),
|
|
_tunnel_handler));
|
|
}
|
|
else
|
|
{
|
|
// need to insert hangup mesage into channel's queueu
|
|
// (either there's date message we should send first or channel is IN PENDING_OPEN state):
|
|
_mc_closed = true;
|
|
ACE_Message_Block *mb;
|
|
_tcp_supplier = NULL;
|
|
ACE_NEW_RETURN(mb, ACE_Message_Block (0, ACE_Message_Block::MB_HANGUP), STATUS_MALLOC_FAILURE);
|
|
res = putMBInQueue(mb);
|
|
|
|
}
|
|
_state = PENDING_CLOSE;
|
|
}
|
|
else if (_amt_closed && this->_tunnel_consumer != NULL)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT(" <MC(%d)>: ChannelConsumer:: close - going to close PENDING_CLOSE channel\n"),
|
|
_tunnel_handler));
|
|
|
|
// We must die now
|
|
if (_tunnel_consumer->send_channel_close(_tunnel_handler) != STATUS_SUCCESS)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT(" <MC(%d)>: Channel_Consumer::closeTunnelConnection - failed to inform tunnel consumer about closing.\n"),
|
|
_tunnel_handler));
|
|
}
|
|
_mc_closed = true;
|
|
}
|
|
|
|
return res;
|
|
}
|
|
|
|
//-----------------------------------------
|
|
// Put a message block in our queue
|
|
// If the queue is full then we will save this message in
|
|
// _mb_remain and unregister the service handler from read
|
|
//-----------------------------------------
|
|
STATUS Channel_Consumer::putMBInQueue(ACE_Message_Block *mb)
|
|
{
|
|
// If we don't have enough space in our queue
|
|
STATUS res = STATUS_SUCCESS;
|
|
if (!canMQContainMB(mb->size()))
|
|
{
|
|
// No enough room in the message queue, save this message and
|
|
// tell the supplier to stop reading from socket
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT(" <MC(%d)>: current mb length = %d is larger from the lenght remains in channels queue = %d\n"),
|
|
_tunnel_handler,
|
|
mb->size(),
|
|
(_msg_queue.high_water_mark() - _msg_queue.message_bytes())));
|
|
|
|
if (_mb_remain != NULL)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT(" <MC(%d)>: _mb_remain is not NULL\n"),
|
|
_tunnel_handler));
|
|
}
|
|
_mb_remain = mb;
|
|
res = STATUS_QUEUE_FULL;
|
|
}
|
|
|
|
else if (_msg_queue.enqueue_tail(mb) == -1)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG ACE_TEXT(" <MC(%d)>: ChannelConsumer failed to enqueue message\n"),
|
|
_tunnel_handler));
|
|
ACE_Message_Block::release(mb);
|
|
if (errno == EWOULDBLOCK)
|
|
res = STATUS_QUEUE_TIMEOUT;
|
|
else
|
|
res = STATUS_QUEUE_DEACTIVATED;
|
|
}
|
|
return res;
|
|
}
|
|
|
|
// This method is called by Tunnel Supplier when AMT send a close message
|
|
void Channel_Consumer::channelClose()
|
|
{
|
|
ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
|
|
lock,
|
|
_channel_mutex,
|
|
;);
|
|
|
|
// We don't delete ourself here, only in connectionUnbind
|
|
_amt_closed = true;
|
|
_state = PENDING_CLOSE;
|
|
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT("Channel_Consumer (%d) received channel close request from AMT\n"),
|
|
_tunnel_handler));
|
|
}
|
|
|
|
//This function called when AMT disconnected
|
|
void Channel_Consumer::connectionUnbind()
|
|
{
|
|
ACE_TRACE("Channel_Consumer::connectionUnbind");
|
|
|
|
// Get lock on channel manager
|
|
ACE_GUARD( ACE_Recursive_Thread_Mutex,
|
|
lock,
|
|
_channel_mutex);
|
|
|
|
_state = PENDING_CLOSE;
|
|
_amt_closed = true;
|
|
this->_tunnel_consumer = NULL;
|
|
if (_mc_closed)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT(" Channel_Consumer : connectionUnbind MC_CLOSE deleting itself\n"),
|
|
_tunnel_handler));
|
|
// TCP supplier already closed this channel
|
|
lock.release();
|
|
delete this;
|
|
}
|
|
else
|
|
{
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT(" Channel_Consumer : connectionUnbind waiting for MC to close the connection now\n"),
|
|
_tunnel_handler));
|
|
}
|
|
|
|
}
|
|
|
|
//-----------------------------------------
|
|
// This function is being called when Channel manager
|
|
// already holds amt channel for this connection,
|
|
// This happends when AMT device request for direct
|
|
// open request.
|
|
// In this function we finish creating the channel
|
|
//-----------------------------------------
|
|
STATUS Channel_Consumer::openChannelRep(ACE_INET_Addr amt_addr,
|
|
bool status,
|
|
Tcp_Consumer* consumer,
|
|
AMT_Tunnel_Consumer* tunnel_consumer)
|
|
{
|
|
ACE_TRACE("Channel_Consumer::openChannelRep");
|
|
ACE_CString addr;
|
|
getStringFullAddr(amt_addr, addr);
|
|
STATUS rep;
|
|
|
|
// Get lock on channel manager
|
|
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex,
|
|
lock,
|
|
_channel_mutex,
|
|
STATUS_FAILURE);
|
|
|
|
//find if tunnel exist:
|
|
if (status)
|
|
{
|
|
_state = ACTIVE;
|
|
}
|
|
_tunnel_consumer = tunnel_consumer;
|
|
if (NULL == _tunnel_consumer)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT(" <MC(%d)>: tried to send openChannel Reply, but tunnel consumer doesnt exist\n"),
|
|
_tunnel_handler));
|
|
return STATUS_FATAL_ERROR; //TODO: need to close consumer as well - how?
|
|
}
|
|
if ((rep = _tunnel_consumer->channelOpenDirectRep(status, _tunnel_handler, this, consumer)) != STATUS_SUCCESS)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT(" <MC(%d)>: failed to send direct reply\n"), _tunnel_handler));
|
|
return rep;
|
|
}
|
|
return STATUS_SUCCESS;
|
|
}
|
|
|
|
STATUS Channel_Consumer::getStringFullAddr(ACE_INET_Addr& addr , ACE_CString& str1)
|
|
{
|
|
//get full address:
|
|
ACE_TCHAR buffer[256]; //max host length
|
|
addr.addr_to_string(buffer , 256);
|
|
str1.set(buffer);
|
|
return STATUS_SUCCESS;
|
|
}
|