638 lines
19 KiB
C++
638 lines
19 KiB
C++
//----------------------------------------------------------------------------
|
|
//
|
|
// Copyright (C) Intel Corporation, 2006 - 2007.
|
|
//
|
|
// File: TunnelConsumer.cpp
|
|
//
|
|
// Contents: Handles outgoing data on APF tunnel.
|
|
//
|
|
// Notes:
|
|
//----------------------------------------------------------------------------
|
|
|
|
// Include for ignoring SIGPIPE under linux
|
|
#if !defined (ACE_WIN32)
|
|
#include <ace/Signal.h>
|
|
#endif /* !(ACE_WIN32) */
|
|
#include "TunnelConsumer.h"
|
|
#include "ChannelManager.h"
|
|
#include "ChannelConsumer.h"
|
|
#include "TunnelHandler.h"
|
|
#include "TunnelManager.h"
|
|
#include "APF.h"
|
|
#include "global.h"
|
|
#include "TcpConsumer.h"
|
|
|
|
static const ACE_UINT32 APF_RESERVED_F_FIELD = 0xffffffff;
|
|
static const ACE_UINT32 APF_RESERVED_0_FIELD = 0x00000000;
|
|
|
|
AMT_Tunnel_Consumer::AMT_Tunnel_Consumer(AMT_Tunnel_Handler *tunnel_handler):
|
|
tunnel_handler_(tunnel_handler), _msg_counter(0)
|
|
{
|
|
}
|
|
|
|
AMT_Tunnel_Consumer::~AMT_Tunnel_Consumer(void)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG "----->Tunnel_Consumer dtor\n"));
|
|
|
|
ACE_DEBUG((MY_TRACE ACE_TEXT("AMT_Tunnel_Consumer::dtor .message counter remain in queue= %d. (%x)\n"), _msg_counter, tunnel_handler_));
|
|
|
|
}
|
|
|
|
|
|
// Get reference to this tunnel socket stream
|
|
ACE_SOCK_Stream& AMT_Tunnel_Consumer::getPeer(void) const
|
|
{
|
|
return (ACE_SOCK_Stream &)tunnel_handler_->peer();
|
|
}
|
|
|
|
// Get reference to this tunnel channel manager
|
|
Channel_Manager& AMT_Tunnel_Consumer::channel_manager() const
|
|
{
|
|
return tunnel_handler_->channel_mgr();
|
|
}
|
|
|
|
// Get this tunnel handler state
|
|
int AMT_Tunnel_Consumer::tunnelState (void) const
|
|
{
|
|
return tunnel_handler_->tunnelState();
|
|
}
|
|
|
|
void AMT_Tunnel_Consumer::tunnelState (int state)
|
|
{
|
|
tunnel_handler_->tunnelState(state);
|
|
}
|
|
// Put message in this tunnel handler queue
|
|
STATUS AMT_Tunnel_Consumer::putq_wrapper(ACE_Message_Block *mb, PRIORITY prio , ACE_Time_Value *tv)
|
|
{
|
|
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::putq"));
|
|
|
|
{
|
|
// every time we enqueue message, a new notification is created in reactor.
|
|
// we count those notification so we will know when the reactor doesn't contain
|
|
// any notification dispatcher for this handler (important for deletion)
|
|
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex ,
|
|
lock,
|
|
tunnel_handler_->_dispatch_output_counter_mutex,
|
|
STATUS_LOCK_FAILURE);
|
|
|
|
tunnel_handler_->_dispatch_output_counter++;
|
|
}
|
|
if (this->tunnel_handler_->msg_queue()->is_full())
|
|
{
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT("AMT_Tunnel_Consumer: Queue is full\n")));
|
|
}
|
|
|
|
//for DEBUG purpose only:
|
|
_msg_counter++;
|
|
int rc;
|
|
if (prio == PRIO_HIGH)
|
|
{
|
|
rc = this->tunnel_handler_->msg_queue()->enqueue_head(mb);
|
|
}
|
|
else
|
|
{
|
|
rc = this->tunnel_handler_->putq(mb);
|
|
}
|
|
ACE_DEBUG((MY_TRACE ACE_TEXT("AMT_Tunnel_Consumer::putq_wrapper - after putting in queue %x\n"), tunnel_handler_));
|
|
|
|
if (rc == -1)
|
|
{
|
|
ACE_Message_Block::release(mb);
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT ("AMT_Tunnel_Consumer failed to put message in queue \n")));
|
|
if (errno == EWOULDBLOCK)
|
|
return STATUS_QUEUE_TIMEOUT;
|
|
else
|
|
return STATUS_FAILURE;
|
|
}
|
|
|
|
return STATUS_SUCCESS;
|
|
}
|
|
|
|
|
|
// Send outgoing APF message.
|
|
STATUS AMT_Tunnel_Consumer::handle_output()
|
|
{
|
|
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::handle_output"));
|
|
|
|
ACE_Message_Block *mb = 0;
|
|
|
|
int qcount;
|
|
|
|
STATUS status;
|
|
|
|
if (tunnel_handler_->msg_queue()->message_count() <= 0)
|
|
return STATUS_SUCCESS;
|
|
qcount = this->tunnel_handler_->getq(mb);
|
|
|
|
//for DEBUG purpose only:
|
|
_msg_counter--;
|
|
|
|
if ((mb != 0) && (qcount != -1))
|
|
{
|
|
switch(mb->msg_type())
|
|
{
|
|
case MB_DISCONNECT:
|
|
{
|
|
// this mb can contain data - in case of APF-disconnect message
|
|
if (mb->length() != 0)
|
|
{
|
|
send_data(mb);
|
|
}
|
|
ACE_Message_Block::release(mb);
|
|
getPeer().close_writer();
|
|
|
|
// if there are more output notifications for this handler in reactor, we dont want to return STATUS_CONNECTION_CLOSED
|
|
// because it can cause to delete this handler (while reactor still holding it)
|
|
// only the last dispatcher return STATUS_CONNECTION_CLOSED;
|
|
ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex ,
|
|
lock2,
|
|
tunnel_handler_->_dispatch_output_counter_mutex,
|
|
STATUS_LOCK_FAILURE);
|
|
|
|
ACE_READ_GUARD_RETURN(ACE_Recursive_Thread_Mutex ,
|
|
lock,
|
|
tunnel_handler_->_active_mutex,
|
|
STATUS_LOCK_FAILURE);
|
|
|
|
ACE_WRITE_GUARD_RETURN( ACE_RW_Thread_Mutex ,
|
|
lock3,
|
|
tunnel_handler_->_state_mutex,
|
|
STATUS_LOCK_FAILURE);
|
|
|
|
// Go over queue and send channel open failure for remaining channel open requests
|
|
empty_queue();
|
|
|
|
tunnel_handler_->tunnelState(AMT_Tunnel_Handler::DISCONNECTED);
|
|
if ((tunnel_handler_->_dispatch_output_counter == 0) &&(tunnel_handler_->_active_counter == 1))
|
|
{
|
|
ACE_DEBUG((MY_TRACE ACE_TEXT("Tunnel_Consumer::handle_output - got hangup message, and it IS the last dispatcher\n")));
|
|
return STATUS_CONNECTION_CLOSED;
|
|
}
|
|
else
|
|
{
|
|
ACE_DEBUG((MY_TRACE ACE_TEXT("Tunnel_Consumer::handle_output - got hangup message, but its NOT the last dispatcher [%x]\n"), tunnel_handler_));
|
|
return STATUS_SUCCESS;
|
|
}
|
|
break;
|
|
|
|
}
|
|
case MB_CHANNEL_OPEN:
|
|
status = handle_channel_open_request(mb);
|
|
break;
|
|
case MB_CHANNEL_OPEN_REPLY:
|
|
status = handle_channel_open_reply(mb);
|
|
break;
|
|
case MB_CHANNEL_CLOSE:
|
|
status = handle_channel_close(mb);
|
|
break;
|
|
default:
|
|
status = send_data(mb);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
ACE_DEBUG((MY_DEBUG ACE_TEXT("AMT_Tunnel_Consumer::handle_output - error while trying to dequeue mb\n")));
|
|
}
|
|
|
|
ACE_Message_Block::release (mb);
|
|
|
|
return STATUS_SUCCESS;
|
|
}
|
|
|
|
// This function remove all messages from queue and handles
|
|
// channel open request by replying with channel open failure.
|
|
void AMT_Tunnel_Consumer::empty_queue()
|
|
{
|
|
ACE_Message_Block *mb = NULL;
|
|
ACE_DEBUG((MY_TRACE ACE_TEXT("AMT_Tunnel_Consumer::empty_queue- going to dequeue all messages from queue. msg_counter = %d\n"), _msg_counter));
|
|
|
|
//go through message queue:
|
|
while (tunnel_handler_->msg_queue()->message_count() > 0)
|
|
{
|
|
_msg_counter--;
|
|
if (this->tunnel_handler_->getq(mb) == -1 ||
|
|
mb == NULL)
|
|
{
|
|
return;
|
|
}
|
|
if (mb->msg_type() == MB_CHANNEL_OPEN)
|
|
{
|
|
Internal_Channel_Open_Msg *msg = (Internal_Channel_Open_Msg*)mb;
|
|
msg->channel_consumer->channelOpenFailure();
|
|
((SocksConsumer*)msg->tcp_consumer)->openRep(Tcp_Consumer::FAILURE);
|
|
msg->tcp_consumer->connectionClose();
|
|
}
|
|
|
|
if (mb->msg_type() == MB_CHANNEL_OPEN_REPLY)
|
|
{
|
|
Internal_Channel_Open_Reply_Msg *msg = (Internal_Channel_Open_Reply_Msg*)mb;
|
|
if (msg->channel_consumer != NULL)
|
|
msg->channel_consumer->connectionUnbind();
|
|
if (msg->tcp_consumer != NULL)
|
|
msg->tcp_consumer->connectionClose();
|
|
}
|
|
mb = NULL;
|
|
}
|
|
}
|
|
// Insert the APF message into the outgoing queue
|
|
STATUS AMT_Tunnel_Consumer::send_APF_message(APF_BasicMessage &msg, PRIORITY prio)
|
|
{
|
|
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::send_APF_message"));
|
|
ACE_Message_Block *mb = 0;
|
|
msg.write(mb);
|
|
return putq_wrapper (mb, prio);
|
|
}
|
|
|
|
// Insert a new status message into the outgoing queue
|
|
STATUS AMT_Tunnel_Consumer::send_APF_message(ACE_UINT8 msg, PRIORITY prio)
|
|
{
|
|
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::send_APF_message"));
|
|
ACE_Message_Block *mb = 0;
|
|
|
|
ACE_NEW_RETURN( mb,
|
|
ACE_Message_Block (1),
|
|
STATUS_MALLOC_FAILURE);
|
|
if (memcpy_s(mb->wr_ptr(), 1, &msg, 1)) {
|
|
ACE_ERROR((MY_ERROR
|
|
ACE_TEXT(" AMT_Tunnel_Consumer:: Failed to send APF message\n")));
|
|
}
|
|
mb->wr_ptr(1);
|
|
|
|
return putq_wrapper(mb,prio);
|
|
}
|
|
|
|
// Channel specific messages (functions access by channel consumer)
|
|
STATUS AMT_Tunnel_Consumer::send_channel_open_request( Channel_Consumer *channel_consumer,
|
|
Tcp_Consumer *tcp_consumer,
|
|
ACE_UINT32 connectedAddressStringLen,
|
|
ACE_CString connectedAddressString,
|
|
ACE_UINT32 connectedPort,
|
|
ACE_UINT32 originatorIpAddressLen,
|
|
ACE_CString originatorIpAddress,
|
|
ACE_UINT32 originatorPort)
|
|
{
|
|
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::send_channel_open_request"));
|
|
|
|
|
|
Internal_Channel_Open_Msg* msg = new Internal_Channel_Open_Msg();
|
|
msg->channel_consumer = channel_consumer;
|
|
msg->tcp_consumer = tcp_consumer;
|
|
msg->connectedAddressStringLen = connectedAddressStringLen;
|
|
msg->connectedAddressString = connectedAddressString;
|
|
msg->connectedPort = connectedPort;
|
|
msg->originatorIpAddressLen = originatorIpAddressLen;
|
|
msg->originatorIpAddress = originatorIpAddress;
|
|
msg->originatorPort = originatorPort;
|
|
|
|
return putq_wrapper(msg);
|
|
}
|
|
|
|
// Channel specific messages (functions access by channel consumer)
|
|
STATUS AMT_Tunnel_Consumer::channelOpenDirectRep( bool status,
|
|
ACE_INT32 amt_id,
|
|
Channel_Consumer *channel_consumer,
|
|
Tcp_Consumer *tcp_consumer,
|
|
ACE_INT32 mps_id)
|
|
{
|
|
|
|
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::channelOpenDirectRep"));
|
|
|
|
Internal_Channel_Open_Reply_Msg* msg = new Internal_Channel_Open_Reply_Msg();
|
|
msg->status = status;
|
|
msg->MPS_id = mps_id;
|
|
msg->AMT_id = amt_id;
|
|
msg->channel_consumer = channel_consumer;
|
|
msg->tcp_consumer = tcp_consumer;
|
|
|
|
return putq_wrapper(msg);
|
|
}
|
|
|
|
// close channel
|
|
STATUS AMT_Tunnel_Consumer::send_channel_close(ACE_UINT32 recipent_channel)
|
|
{
|
|
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::send_channel_close"));
|
|
|
|
Internal_Channel_Close_Msg* msg = new Internal_Channel_Close_Msg();
|
|
msg->recipent_channel = recipent_channel;
|
|
|
|
return putq_wrapper(msg);
|
|
}
|
|
|
|
// Send data over channel
|
|
STATUS AMT_Tunnel_Consumer::send_channel_data( ACE_UINT32 recipent_channel,
|
|
ACE_Message_Block *data)
|
|
{
|
|
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::send_channel_data"));
|
|
|
|
ACE_DEBUG ((MY_DEBUG
|
|
ACE_TEXT ("[%s] AMT_Tunnel_CONSUMER Channel data: MC(%d) => AMT(%d), size = %d\n"),
|
|
identifier(),
|
|
0,
|
|
recipent_channel,
|
|
data->length()));
|
|
APF_ChannelData channel_data;
|
|
channel_data.recipientChannel = recipent_channel;
|
|
channel_data.dataLen = (ACE_UINT32)data->length();
|
|
channel_data.data = data;
|
|
|
|
// Send this packet to AMT
|
|
if (send_APF_message(channel_data) != STATUS_SUCCESS)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT("failed to send APF message\n")));
|
|
return STATUS_FAILURE;
|
|
}
|
|
return STATUS_SUCCESS;
|
|
}
|
|
|
|
// Close this consumer
|
|
void AMT_Tunnel_Consumer::handle_close()
|
|
{
|
|
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::close"));
|
|
|
|
// This method must only be called after the supplier notified all channels
|
|
// that we are closing. So no channel consumer can call us.
|
|
ACE_DEBUG ((MY_TRACE
|
|
ACE_TEXT ("AMT_Tunnel_Consumer::close\n")));
|
|
getPeer().close_writer();
|
|
|
|
}
|
|
|
|
STATUS AMT_Tunnel_Consumer::handle_channel_open_request(ACE_Message_Block *mb)
|
|
{
|
|
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::handle_channel_open_request"));
|
|
|
|
if (tunnelState() != AMT_Tunnel_Handler::PFWD_OPEN)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT("TunnelConsumer, in handle_channel_open_request with illegal state %d\n"),
|
|
tunnelState()));
|
|
return STATUS_SUCCESS;
|
|
}
|
|
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT("Tunnel_Consumer, handling channel open request from MC\n")));
|
|
|
|
Internal_Channel_Open_Msg *msg = (Internal_Channel_Open_Msg*)mb;
|
|
APF_ChannelOpenForwardedRequest request;
|
|
|
|
ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, lock, channel_manager().guard(),STATUS_FAILURE);
|
|
|
|
// Create a new pending channel
|
|
AMT_Channel *ch = channel_manager().create_channel();
|
|
|
|
if (ch == NULL)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT("Tunnel_Consumer fails to create new channel\n")));
|
|
msg->channel_consumer->channelOpenFailure();
|
|
((SocksConsumer*)msg->tcp_consumer)->openRep(Tcp_Consumer::FAILURE);
|
|
msg->tcp_consumer->connectionClose();
|
|
return STATUS_SUCCESS;
|
|
}
|
|
|
|
// Construct a new Channel Open request
|
|
request.channelTypeStringLen = (ACE_UINT32)strnlen_s(APF_CHANNEL_OPEN_FORWARD, 15);
|
|
request.channelTypeString.set(APF_CHANNEL_OPEN_FORWARD);
|
|
|
|
request.senderChannel = ch->getMPSid();
|
|
request.initialWindowSize = *getMaximumWindowSize();
|
|
request.reserved = APF_RESERVED_F_FIELD;
|
|
|
|
request.connectedAddressStringLen = msg->connectedAddressStringLen;
|
|
request.connectedAddressString = msg->connectedAddressString;
|
|
request.connectedPort = msg->connectedPort;
|
|
|
|
request.originatorIpAddressLen = msg->originatorIpAddressLen;
|
|
request.originatorIpAddress = msg->originatorIpAddress;
|
|
request.originatorPort = msg->originatorPort;
|
|
|
|
// Update the channel details
|
|
ch->setState(AMT_Channel::PENDING_OPEN);
|
|
ch->setWindowSize(request.initialWindowSize);
|
|
ch->setTcpConsumer(msg->tcp_consumer);
|
|
ch->setChannelConsumer(msg->channel_consumer);
|
|
|
|
lock.release();
|
|
|
|
ACE_Message_Block *apf_mb = 0;
|
|
request.write(apf_mb);
|
|
|
|
STATUS status = send_data(apf_mb);
|
|
|
|
// If we failed to send the message then return channel open failure
|
|
if (status != STATUS_SUCCESS)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT("Tunnel_Consumer failed to send channel open request - connection closed\n")));
|
|
msg->channel_consumer->channelOpenFailure();
|
|
((SocksConsumer*)msg->tcp_consumer)->openRep(Tcp_Consumer::FAILURE);
|
|
msg->tcp_consumer->connectionClose();
|
|
channel_manager().remove_channel(ch->getMPSid(), Channel_Manager::MPS_ENDPOINT);
|
|
delete ch;
|
|
}
|
|
ACE_Message_Block::release(apf_mb);
|
|
return status;
|
|
}
|
|
|
|
STATUS AMT_Tunnel_Consumer::send_data(ACE_Message_Block *mb)
|
|
{
|
|
ssize_t res = 0;
|
|
|
|
// Handle SIGPIPE under linux
|
|
#if !defined (ACE_WIN32)
|
|
ACE_Sig_Action no_sigpipe((ACE_SignalHandler) SIG_IGN);
|
|
ACE_Sig_Action original_action;
|
|
no_sigpipe.register_action(SIGPIPE, &original_action);
|
|
#endif /* !(ACE_WIN32) */
|
|
|
|
res = this->getPeer().send_n(mb, &ACE_Time_Value(*getMaxTunnelTimeout()));
|
|
|
|
#if !defined (ACE_WIN32)
|
|
no_sigpipe.restore_action(SIGPIPE, original_action);
|
|
#endif /* !(ACE_WIN32) */
|
|
|
|
if (res <= 0)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT ("AMT_Tunnel_Consumer failed to send message - tunnel socket is closed\n")));
|
|
return STATUS_NETWORK_ERROR;
|
|
}
|
|
if (res != mb->length())
|
|
// Failed to send the entire message
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT ("AMT_Tunnel_Consumer failed to send the entire message\n")));
|
|
return STATUS_SUCCESS;
|
|
}
|
|
|
|
// Handle a close channel message
|
|
STATUS AMT_Tunnel_Consumer::handle_channel_close(ACE_Message_Block *mb)
|
|
{
|
|
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::handle_channel_close"));
|
|
|
|
if (tunnelState() != AMT_Tunnel_Handler::PFWD_OPEN)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT("TunnelConsumer, in handle_channel_close with illegal state %d\n"),
|
|
tunnelState()));
|
|
return STATUS_SUCCESS;
|
|
}
|
|
|
|
Internal_Channel_Close_Msg *msg = (Internal_Channel_Close_Msg*)mb;
|
|
// Get the channel from map
|
|
AMT_Channel *channel = NULL;
|
|
|
|
ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, lock, channel_manager().guard(),STATUS_FAILURE);
|
|
|
|
channel = channel_manager().find_channel(msg->recipent_channel,
|
|
Channel_Manager::AMT_ENDPOINT);
|
|
if (channel == NULL)
|
|
{
|
|
// Channel doesn't exists - error
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT("TunnelConsumer, in handle_channel_close channel doesn't exists\n")));
|
|
return STATUS_SUCCESS;
|
|
}
|
|
|
|
|
|
if (channel->getState() == AMT_Channel::TUNNEL_CLOSED)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG ACE_TEXT(" TunnelConsumer - got close channel request on PENDING_CLOSE state - going to delete channel\n")));
|
|
// AMT closed the connection before us, we can delete this channel
|
|
// Remove this channel from mps map
|
|
if (channel->getChannelConsumer() != NULL)
|
|
{
|
|
channel->getChannelConsumer()->connectionUnbind();
|
|
}
|
|
channel_manager().remove_channel(msg->recipent_channel ,Channel_Manager::AMT_ENDPOINT);
|
|
delete channel;
|
|
}
|
|
else if (channel->getState() == AMT_Channel::OPEN)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG ACE_TEXT(" TunnelConsumer - got close channel request on OPEN state\n")));
|
|
// MC requesting first
|
|
channel->setState(AMT_Channel::MC_CLOSED);
|
|
}
|
|
|
|
lock.release();
|
|
// send close request to AMT
|
|
APF_ChannelClose close_req;
|
|
close_req.recipientChannel = msg->recipent_channel;
|
|
ACE_Message_Block *apf_mb = 0;
|
|
close_req.write(apf_mb);
|
|
|
|
STATUS status = send_data(apf_mb);
|
|
ACE_Message_Block::release(apf_mb);
|
|
return status;
|
|
}
|
|
|
|
const char* AMT_Tunnel_Consumer::identifier()
|
|
{
|
|
return tunnel_handler_->identifier();
|
|
}
|
|
|
|
STATUS AMT_Tunnel_Consumer::handle_channel_open_reply(ACE_Message_Block *mb)
|
|
{
|
|
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::handle_channel_open_direct"));
|
|
|
|
if (tunnelState() != AMT_Tunnel_Handler::PFWD_OPEN)
|
|
{
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT("TunnelConsumer, in handle_channel_open_direct with illegal state %d\n"),
|
|
tunnelState()));
|
|
return STATUS_SUCCESS;
|
|
}
|
|
|
|
Internal_Channel_Open_Reply_Msg *msg = (Internal_Channel_Open_Reply_Msg*)mb;
|
|
ACE_DEBUG((MY_DEBUG
|
|
ACE_TEXT("TunnelConsumer, \tsending CHANNEL_OPEN_REPLY to AMT. status = %b\n"),
|
|
msg->status));
|
|
|
|
|
|
// Get the channel from map
|
|
APF_ChannelOpenReply reply;
|
|
AMT_Channel *channel = NULL;
|
|
|
|
ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, lock, channel_manager().guard(),STATUS_FAILURE);
|
|
|
|
//find channel:
|
|
if (msg->MPS_id != -1)
|
|
{
|
|
channel = channel_manager().find_channel(msg->MPS_id, Channel_Manager::MPS_ENDPOINT);
|
|
}
|
|
else if (msg->channel_consumer != NULL)
|
|
{
|
|
// happens when ChannelConsumer called this function
|
|
channel = channel_manager().find_channel(msg->AMT_id, Channel_Manager::AMT_ENDPOINT);
|
|
//in case channel wasn't found - channel = NULL
|
|
}
|
|
//else - there isn't any matched channel (never been created)
|
|
|
|
ACE_UINT32 amt_id;
|
|
|
|
if (channel != NULL)
|
|
amt_id = channel->getAMTid();
|
|
else if (msg->AMT_id != -1) //channel = NULL can happens in case of failure during creation of channel
|
|
amt_id = msg->AMT_id;
|
|
else
|
|
{
|
|
ACE_DEBUG((MY_DEBUG ACE_TEXT("MT_Tunnel_Consumer::handle_channel_open_reply - cant reach AMT id")));
|
|
return STATUS_FATAL_ERROR;
|
|
}
|
|
if (false == msg->status)
|
|
{
|
|
reply.openChannelStatus = APF_CHANNEL_OPEN_FAILURE;
|
|
reply.recipientChannel = amt_id;
|
|
reply.senderChannel = OPEN_CONNECT_FAILED;
|
|
reply.initialWindowSize = APF_RESERVED_0_FIELD;
|
|
reply.reserved = APF_RESERVED_0_FIELD;
|
|
|
|
if (channel != NULL)
|
|
{
|
|
channel_manager().remove_channel(msg->MPS_id, Channel_Manager::MPS_ENDPOINT);
|
|
delete channel;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (channel != NULL)
|
|
{
|
|
//fill open reply message:
|
|
reply.openChannelStatus = APF_CHANNEL_OPEN_CONFIRMATION;
|
|
reply.recipientChannel = amt_id;
|
|
reply.senderChannel = channel->getMPSid();
|
|
reply.initialWindowSize = channel->getWindowSize();
|
|
reply.reserved = APF_RESERVED_F_FIELD;
|
|
|
|
//update channel data:
|
|
|
|
channel->setState(AMT_Channel::OPEN);
|
|
channel->setChannelConsumer(msg->channel_consumer);
|
|
channel->setTcpConsumer(msg->tcp_consumer);
|
|
}
|
|
else
|
|
{
|
|
//can happen when disconnect occurred between the time channelConsumer enqueued this message and here
|
|
// (the time we handle it). in this case disconnect will remove all channels from channelMgr
|
|
// (including this pending direct request)
|
|
|
|
//notify the all TCP connection component to unbind the connection with this tunnel:
|
|
|
|
//Commenting these lines as part of KW fix, because we cannot call null channel.
|
|
//channel->getChannelConsumer()->connectionUnbind();
|
|
//channel->getTcpConsumer()->connectionClose();
|
|
}
|
|
}
|
|
|
|
lock.release();
|
|
// send this packet to AMT
|
|
ACE_Message_Block *apf_mb = 0;
|
|
reply.write(apf_mb);
|
|
|
|
STATUS status = send_data(apf_mb);
|
|
ACE_Message_Block::release(apf_mb);
|
|
return status;
|
|
}
|