638 lines
19 KiB
C++

//----------------------------------------------------------------------------
//
// Copyright (C) Intel Corporation, 2006 - 2007.
//
// File: TunnelConsumer.cpp
//
// Contents: Handles outgoing data on APF tunnel.
//
// Notes:
//----------------------------------------------------------------------------
// Include for ignoring SIGPIPE under linux
#if !defined (ACE_WIN32)
#include <ace/Signal.h>
#endif /* !(ACE_WIN32) */
#include "TunnelConsumer.h"
#include "ChannelManager.h"
#include "ChannelConsumer.h"
#include "TunnelHandler.h"
#include "TunnelManager.h"
#include "APF.h"
#include "global.h"
#include "TcpConsumer.h"
static const ACE_UINT32 APF_RESERVED_F_FIELD = 0xffffffff;
static const ACE_UINT32 APF_RESERVED_0_FIELD = 0x00000000;
AMT_Tunnel_Consumer::AMT_Tunnel_Consumer(AMT_Tunnel_Handler *tunnel_handler):
tunnel_handler_(tunnel_handler), _msg_counter(0)
{
}
AMT_Tunnel_Consumer::~AMT_Tunnel_Consumer(void)
{
ACE_DEBUG((MY_DEBUG "----->Tunnel_Consumer dtor\n"));
ACE_DEBUG((MY_TRACE ACE_TEXT("AMT_Tunnel_Consumer::dtor .message counter remain in queue= %d. (%x)\n"), _msg_counter, tunnel_handler_));
}
// Get reference to this tunnel socket stream
ACE_SOCK_Stream& AMT_Tunnel_Consumer::getPeer(void) const
{
return (ACE_SOCK_Stream &)tunnel_handler_->peer();
}
// Get reference to this tunnel channel manager
Channel_Manager& AMT_Tunnel_Consumer::channel_manager() const
{
return tunnel_handler_->channel_mgr();
}
// Get this tunnel handler state
int AMT_Tunnel_Consumer::tunnelState (void) const
{
return tunnel_handler_->tunnelState();
}
void AMT_Tunnel_Consumer::tunnelState (int state)
{
tunnel_handler_->tunnelState(state);
}
// Put message in this tunnel handler queue
STATUS AMT_Tunnel_Consumer::putq_wrapper(ACE_Message_Block *mb, PRIORITY prio , ACE_Time_Value *tv)
{
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::putq"));
{
// every time we enqueue message, a new notification is created in reactor.
// we count those notification so we will know when the reactor doesn't contain
// any notification dispatcher for this handler (important for deletion)
ACE_GUARD_RETURN( ACE_Recursive_Thread_Mutex ,
lock,
tunnel_handler_->_dispatch_output_counter_mutex,
STATUS_LOCK_FAILURE);
tunnel_handler_->_dispatch_output_counter++;
}
if (this->tunnel_handler_->msg_queue()->is_full())
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT("AMT_Tunnel_Consumer: Queue is full\n")));
}
//for DEBUG purpose only:
_msg_counter++;
int rc;
if (prio == PRIO_HIGH)
{
rc = this->tunnel_handler_->msg_queue()->enqueue_head(mb);
}
else
{
rc = this->tunnel_handler_->putq(mb);
}
ACE_DEBUG((MY_TRACE ACE_TEXT("AMT_Tunnel_Consumer::putq_wrapper - after putting in queue %x\n"), tunnel_handler_));
if (rc == -1)
{
ACE_Message_Block::release(mb);
ACE_DEBUG((MY_DEBUG
ACE_TEXT ("AMT_Tunnel_Consumer failed to put message in queue \n")));
if (errno == EWOULDBLOCK)
return STATUS_QUEUE_TIMEOUT;
else
return STATUS_FAILURE;
}
return STATUS_SUCCESS;
}
// Send outgoing APF message.
STATUS AMT_Tunnel_Consumer::handle_output()
{
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::handle_output"));
ACE_Message_Block *mb = 0;
int qcount;
STATUS status;
if (tunnel_handler_->msg_queue()->message_count() <= 0)
return STATUS_SUCCESS;
qcount = this->tunnel_handler_->getq(mb);
//for DEBUG purpose only:
_msg_counter--;
if ((mb != 0) && (qcount != -1))
{
switch(mb->msg_type())
{
case MB_DISCONNECT:
{
// this mb can contain data - in case of APF-disconnect message
if (mb->length() != 0)
{
send_data(mb);
}
ACE_Message_Block::release(mb);
getPeer().close_writer();
// if there are more output notifications for this handler in reactor, we dont want to return STATUS_CONNECTION_CLOSED
// because it can cause to delete this handler (while reactor still holding it)
// only the last dispatcher return STATUS_CONNECTION_CLOSED;
ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex ,
lock2,
tunnel_handler_->_dispatch_output_counter_mutex,
STATUS_LOCK_FAILURE);
ACE_READ_GUARD_RETURN(ACE_Recursive_Thread_Mutex ,
lock,
tunnel_handler_->_active_mutex,
STATUS_LOCK_FAILURE);
ACE_WRITE_GUARD_RETURN( ACE_RW_Thread_Mutex ,
lock3,
tunnel_handler_->_state_mutex,
STATUS_LOCK_FAILURE);
// Go over queue and send channel open failure for remaining channel open requests
empty_queue();
tunnel_handler_->tunnelState(AMT_Tunnel_Handler::DISCONNECTED);
if ((tunnel_handler_->_dispatch_output_counter == 0) &&(tunnel_handler_->_active_counter == 1))
{
ACE_DEBUG((MY_TRACE ACE_TEXT("Tunnel_Consumer::handle_output - got hangup message, and it IS the last dispatcher\n")));
return STATUS_CONNECTION_CLOSED;
}
else
{
ACE_DEBUG((MY_TRACE ACE_TEXT("Tunnel_Consumer::handle_output - got hangup message, but its NOT the last dispatcher [%x]\n"), tunnel_handler_));
return STATUS_SUCCESS;
}
break;
}
case MB_CHANNEL_OPEN:
status = handle_channel_open_request(mb);
break;
case MB_CHANNEL_OPEN_REPLY:
status = handle_channel_open_reply(mb);
break;
case MB_CHANNEL_CLOSE:
status = handle_channel_close(mb);
break;
default:
status = send_data(mb);
}
}
else
{
ACE_DEBUG((MY_DEBUG ACE_TEXT("AMT_Tunnel_Consumer::handle_output - error while trying to dequeue mb\n")));
}
ACE_Message_Block::release (mb);
return STATUS_SUCCESS;
}
// This function remove all messages from queue and handles
// channel open request by replying with channel open failure.
void AMT_Tunnel_Consumer::empty_queue()
{
ACE_Message_Block *mb = NULL;
ACE_DEBUG((MY_TRACE ACE_TEXT("AMT_Tunnel_Consumer::empty_queue- going to dequeue all messages from queue. msg_counter = %d\n"), _msg_counter));
//go through message queue:
while (tunnel_handler_->msg_queue()->message_count() > 0)
{
_msg_counter--;
if (this->tunnel_handler_->getq(mb) == -1 ||
mb == NULL)
{
return;
}
if (mb->msg_type() == MB_CHANNEL_OPEN)
{
Internal_Channel_Open_Msg *msg = (Internal_Channel_Open_Msg*)mb;
msg->channel_consumer->channelOpenFailure();
((SocksConsumer*)msg->tcp_consumer)->openRep(Tcp_Consumer::FAILURE);
msg->tcp_consumer->connectionClose();
}
if (mb->msg_type() == MB_CHANNEL_OPEN_REPLY)
{
Internal_Channel_Open_Reply_Msg *msg = (Internal_Channel_Open_Reply_Msg*)mb;
if (msg->channel_consumer != NULL)
msg->channel_consumer->connectionUnbind();
if (msg->tcp_consumer != NULL)
msg->tcp_consumer->connectionClose();
}
mb = NULL;
}
}
// Insert the APF message into the outgoing queue
STATUS AMT_Tunnel_Consumer::send_APF_message(APF_BasicMessage &msg, PRIORITY prio)
{
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::send_APF_message"));
ACE_Message_Block *mb = 0;
msg.write(mb);
return putq_wrapper (mb, prio);
}
// Insert a new status message into the outgoing queue
STATUS AMT_Tunnel_Consumer::send_APF_message(ACE_UINT8 msg, PRIORITY prio)
{
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::send_APF_message"));
ACE_Message_Block *mb = 0;
ACE_NEW_RETURN( mb,
ACE_Message_Block (1),
STATUS_MALLOC_FAILURE);
if (memcpy_s(mb->wr_ptr(), 1, &msg, 1)) {
ACE_ERROR((MY_ERROR
ACE_TEXT(" AMT_Tunnel_Consumer:: Failed to send APF message\n")));
}
mb->wr_ptr(1);
return putq_wrapper(mb,prio);
}
// Channel specific messages (functions access by channel consumer)
STATUS AMT_Tunnel_Consumer::send_channel_open_request( Channel_Consumer *channel_consumer,
Tcp_Consumer *tcp_consumer,
ACE_UINT32 connectedAddressStringLen,
ACE_CString connectedAddressString,
ACE_UINT32 connectedPort,
ACE_UINT32 originatorIpAddressLen,
ACE_CString originatorIpAddress,
ACE_UINT32 originatorPort)
{
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::send_channel_open_request"));
Internal_Channel_Open_Msg* msg = new Internal_Channel_Open_Msg();
msg->channel_consumer = channel_consumer;
msg->tcp_consumer = tcp_consumer;
msg->connectedAddressStringLen = connectedAddressStringLen;
msg->connectedAddressString = connectedAddressString;
msg->connectedPort = connectedPort;
msg->originatorIpAddressLen = originatorIpAddressLen;
msg->originatorIpAddress = originatorIpAddress;
msg->originatorPort = originatorPort;
return putq_wrapper(msg);
}
// Channel specific messages (functions access by channel consumer)
STATUS AMT_Tunnel_Consumer::channelOpenDirectRep( bool status,
ACE_INT32 amt_id,
Channel_Consumer *channel_consumer,
Tcp_Consumer *tcp_consumer,
ACE_INT32 mps_id)
{
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::channelOpenDirectRep"));
Internal_Channel_Open_Reply_Msg* msg = new Internal_Channel_Open_Reply_Msg();
msg->status = status;
msg->MPS_id = mps_id;
msg->AMT_id = amt_id;
msg->channel_consumer = channel_consumer;
msg->tcp_consumer = tcp_consumer;
return putq_wrapper(msg);
}
// close channel
STATUS AMT_Tunnel_Consumer::send_channel_close(ACE_UINT32 recipent_channel)
{
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::send_channel_close"));
Internal_Channel_Close_Msg* msg = new Internal_Channel_Close_Msg();
msg->recipent_channel = recipent_channel;
return putq_wrapper(msg);
}
// Send data over channel
STATUS AMT_Tunnel_Consumer::send_channel_data( ACE_UINT32 recipent_channel,
ACE_Message_Block *data)
{
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::send_channel_data"));
ACE_DEBUG ((MY_DEBUG
ACE_TEXT ("[%s] AMT_Tunnel_CONSUMER Channel data: MC(%d) => AMT(%d), size = %d\n"),
identifier(),
0,
recipent_channel,
data->length()));
APF_ChannelData channel_data;
channel_data.recipientChannel = recipent_channel;
channel_data.dataLen = (ACE_UINT32)data->length();
channel_data.data = data;
// Send this packet to AMT
if (send_APF_message(channel_data) != STATUS_SUCCESS)
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT("failed to send APF message\n")));
return STATUS_FAILURE;
}
return STATUS_SUCCESS;
}
// Close this consumer
void AMT_Tunnel_Consumer::handle_close()
{
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::close"));
// This method must only be called after the supplier notified all channels
// that we are closing. So no channel consumer can call us.
ACE_DEBUG ((MY_TRACE
ACE_TEXT ("AMT_Tunnel_Consumer::close\n")));
getPeer().close_writer();
}
STATUS AMT_Tunnel_Consumer::handle_channel_open_request(ACE_Message_Block *mb)
{
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::handle_channel_open_request"));
if (tunnelState() != AMT_Tunnel_Handler::PFWD_OPEN)
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT("TunnelConsumer, in handle_channel_open_request with illegal state %d\n"),
tunnelState()));
return STATUS_SUCCESS;
}
ACE_DEBUG((MY_DEBUG
ACE_TEXT("Tunnel_Consumer, handling channel open request from MC\n")));
Internal_Channel_Open_Msg *msg = (Internal_Channel_Open_Msg*)mb;
APF_ChannelOpenForwardedRequest request;
ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, lock, channel_manager().guard(),STATUS_FAILURE);
// Create a new pending channel
AMT_Channel *ch = channel_manager().create_channel();
if (ch == NULL)
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT("Tunnel_Consumer fails to create new channel\n")));
msg->channel_consumer->channelOpenFailure();
((SocksConsumer*)msg->tcp_consumer)->openRep(Tcp_Consumer::FAILURE);
msg->tcp_consumer->connectionClose();
return STATUS_SUCCESS;
}
// Construct a new Channel Open request
request.channelTypeStringLen = (ACE_UINT32)strnlen_s(APF_CHANNEL_OPEN_FORWARD, 15);
request.channelTypeString.set(APF_CHANNEL_OPEN_FORWARD);
request.senderChannel = ch->getMPSid();
request.initialWindowSize = *getMaximumWindowSize();
request.reserved = APF_RESERVED_F_FIELD;
request.connectedAddressStringLen = msg->connectedAddressStringLen;
request.connectedAddressString = msg->connectedAddressString;
request.connectedPort = msg->connectedPort;
request.originatorIpAddressLen = msg->originatorIpAddressLen;
request.originatorIpAddress = msg->originatorIpAddress;
request.originatorPort = msg->originatorPort;
// Update the channel details
ch->setState(AMT_Channel::PENDING_OPEN);
ch->setWindowSize(request.initialWindowSize);
ch->setTcpConsumer(msg->tcp_consumer);
ch->setChannelConsumer(msg->channel_consumer);
lock.release();
ACE_Message_Block *apf_mb = 0;
request.write(apf_mb);
STATUS status = send_data(apf_mb);
// If we failed to send the message then return channel open failure
if (status != STATUS_SUCCESS)
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT("Tunnel_Consumer failed to send channel open request - connection closed\n")));
msg->channel_consumer->channelOpenFailure();
((SocksConsumer*)msg->tcp_consumer)->openRep(Tcp_Consumer::FAILURE);
msg->tcp_consumer->connectionClose();
channel_manager().remove_channel(ch->getMPSid(), Channel_Manager::MPS_ENDPOINT);
delete ch;
}
ACE_Message_Block::release(apf_mb);
return status;
}
STATUS AMT_Tunnel_Consumer::send_data(ACE_Message_Block *mb)
{
ssize_t res = 0;
// Handle SIGPIPE under linux
#if !defined (ACE_WIN32)
ACE_Sig_Action no_sigpipe((ACE_SignalHandler) SIG_IGN);
ACE_Sig_Action original_action;
no_sigpipe.register_action(SIGPIPE, &original_action);
#endif /* !(ACE_WIN32) */
res = this->getPeer().send_n(mb, &ACE_Time_Value(*getMaxTunnelTimeout()));
#if !defined (ACE_WIN32)
no_sigpipe.restore_action(SIGPIPE, original_action);
#endif /* !(ACE_WIN32) */
if (res <= 0)
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT ("AMT_Tunnel_Consumer failed to send message - tunnel socket is closed\n")));
return STATUS_NETWORK_ERROR;
}
if (res != mb->length())
// Failed to send the entire message
ACE_DEBUG((MY_DEBUG
ACE_TEXT ("AMT_Tunnel_Consumer failed to send the entire message\n")));
return STATUS_SUCCESS;
}
// Handle a close channel message
STATUS AMT_Tunnel_Consumer::handle_channel_close(ACE_Message_Block *mb)
{
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::handle_channel_close"));
if (tunnelState() != AMT_Tunnel_Handler::PFWD_OPEN)
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT("TunnelConsumer, in handle_channel_close with illegal state %d\n"),
tunnelState()));
return STATUS_SUCCESS;
}
Internal_Channel_Close_Msg *msg = (Internal_Channel_Close_Msg*)mb;
// Get the channel from map
AMT_Channel *channel = NULL;
ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, lock, channel_manager().guard(),STATUS_FAILURE);
channel = channel_manager().find_channel(msg->recipent_channel,
Channel_Manager::AMT_ENDPOINT);
if (channel == NULL)
{
// Channel doesn't exists - error
ACE_DEBUG((MY_DEBUG
ACE_TEXT("TunnelConsumer, in handle_channel_close channel doesn't exists\n")));
return STATUS_SUCCESS;
}
if (channel->getState() == AMT_Channel::TUNNEL_CLOSED)
{
ACE_DEBUG((MY_DEBUG ACE_TEXT(" TunnelConsumer - got close channel request on PENDING_CLOSE state - going to delete channel\n")));
// AMT closed the connection before us, we can delete this channel
// Remove this channel from mps map
if (channel->getChannelConsumer() != NULL)
{
channel->getChannelConsumer()->connectionUnbind();
}
channel_manager().remove_channel(msg->recipent_channel ,Channel_Manager::AMT_ENDPOINT);
delete channel;
}
else if (channel->getState() == AMT_Channel::OPEN)
{
ACE_DEBUG((MY_DEBUG ACE_TEXT(" TunnelConsumer - got close channel request on OPEN state\n")));
// MC requesting first
channel->setState(AMT_Channel::MC_CLOSED);
}
lock.release();
// send close request to AMT
APF_ChannelClose close_req;
close_req.recipientChannel = msg->recipent_channel;
ACE_Message_Block *apf_mb = 0;
close_req.write(apf_mb);
STATUS status = send_data(apf_mb);
ACE_Message_Block::release(apf_mb);
return status;
}
const char* AMT_Tunnel_Consumer::identifier()
{
return tunnel_handler_->identifier();
}
STATUS AMT_Tunnel_Consumer::handle_channel_open_reply(ACE_Message_Block *mb)
{
ACE_TRACE(ACE_TEXT("AMT_Tunnel_Consumer::handle_channel_open_direct"));
if (tunnelState() != AMT_Tunnel_Handler::PFWD_OPEN)
{
ACE_DEBUG((MY_DEBUG
ACE_TEXT("TunnelConsumer, in handle_channel_open_direct with illegal state %d\n"),
tunnelState()));
return STATUS_SUCCESS;
}
Internal_Channel_Open_Reply_Msg *msg = (Internal_Channel_Open_Reply_Msg*)mb;
ACE_DEBUG((MY_DEBUG
ACE_TEXT("TunnelConsumer, \tsending CHANNEL_OPEN_REPLY to AMT. status = %b\n"),
msg->status));
// Get the channel from map
APF_ChannelOpenReply reply;
AMT_Channel *channel = NULL;
ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, lock, channel_manager().guard(),STATUS_FAILURE);
//find channel:
if (msg->MPS_id != -1)
{
channel = channel_manager().find_channel(msg->MPS_id, Channel_Manager::MPS_ENDPOINT);
}
else if (msg->channel_consumer != NULL)
{
// happens when ChannelConsumer called this function
channel = channel_manager().find_channel(msg->AMT_id, Channel_Manager::AMT_ENDPOINT);
//in case channel wasn't found - channel = NULL
}
//else - there isn't any matched channel (never been created)
ACE_UINT32 amt_id;
if (channel != NULL)
amt_id = channel->getAMTid();
else if (msg->AMT_id != -1) //channel = NULL can happens in case of failure during creation of channel
amt_id = msg->AMT_id;
else
{
ACE_DEBUG((MY_DEBUG ACE_TEXT("MT_Tunnel_Consumer::handle_channel_open_reply - cant reach AMT id")));
return STATUS_FATAL_ERROR;
}
if (false == msg->status)
{
reply.openChannelStatus = APF_CHANNEL_OPEN_FAILURE;
reply.recipientChannel = amt_id;
reply.senderChannel = OPEN_CONNECT_FAILED;
reply.initialWindowSize = APF_RESERVED_0_FIELD;
reply.reserved = APF_RESERVED_0_FIELD;
if (channel != NULL)
{
channel_manager().remove_channel(msg->MPS_id, Channel_Manager::MPS_ENDPOINT);
delete channel;
}
}
else
{
if (channel != NULL)
{
//fill open reply message:
reply.openChannelStatus = APF_CHANNEL_OPEN_CONFIRMATION;
reply.recipientChannel = amt_id;
reply.senderChannel = channel->getMPSid();
reply.initialWindowSize = channel->getWindowSize();
reply.reserved = APF_RESERVED_F_FIELD;
//update channel data:
channel->setState(AMT_Channel::OPEN);
channel->setChannelConsumer(msg->channel_consumer);
channel->setTcpConsumer(msg->tcp_consumer);
}
else
{
//can happen when disconnect occurred between the time channelConsumer enqueued this message and here
// (the time we handle it). in this case disconnect will remove all channels from channelMgr
// (including this pending direct request)
//notify the all TCP connection component to unbind the connection with this tunnel:
//Commenting these lines as part of KW fix, because we cannot call null channel.
//channel->getChannelConsumer()->connectionUnbind();
//channel->getTcpConsumer()->connectionClose();
}
}
lock.release();
// send this packet to AMT
ACE_Message_Block *apf_mb = 0;
reply.write(apf_mb);
STATUS status = send_data(apf_mb);
ACE_Message_Block::release(apf_mb);
return status;
}