198 lines
5.9 KiB
C++

//----------------------------------------------------------------------------
//
// Copyright (C) Intel Corporation, 2006 - 2007.
//
// File: ChannelConsumer.h
//
// Contents: In charge of handling messages from the socks connection
// to the tunnel.
//
// Notes:
//----------------------------------------------------------------------------
#ifndef _MPS_CHANNEL_CONSUMER_H__
#define _MPS_CHANNEL_CONSUMER_H__
//===================================================
// INCLUDES
//===================================================
#include <ace/ACE.h>
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
#include <ace/Message_Queue.h>
#include <ace/Thread_Manager.h>
#include <ace/SOCK_Stream.h>
#include <ace/SStringfwd.h>
#include <ace/Global_Macros.h>
#include <ace/Synch_Options.h>
//#include "tunnelConsumer.h"
#include "OptionsUtils.h"
// FW declaration
class Tcp_Consumer;
class Tcp_Supplier;
class AMT_Tunnel_Consumer;
/***************************************************************************************
//
//
/**************************************************************************************/
class Channel_Consumer
{
public:
enum Channel_Consumer_State {
INIT,
PENDING_OPEN,
ACTIVE,
PENDING_CLOSE
};
//===================================================
// CTOR / DTOR
//===================================================
Channel_Consumer( Tcp_Supplier* supplier,
ACE_INT32 tunnel_handler = DFLT_NEGETIVE_VALUE,
ACE_UINT32 win_size = DFLT_ZERO):
_tcp_supplier(supplier),
_tunnel_handler(tunnel_handler),
_win_size(win_size),
_state(INIT),
_mb_remain(NULL),
_tunnel_consumer(NULL),
_amt_closed(false),
_mc_closed(false),
_total_received_data_length(0),
_total_sent_data_length(0),
_registered()
{
const unsigned int * maxQueueSizeP = getMaxQueueSize();
const unsigned int * maxChannelsP = getMaxChannels();
if (maxQueueSizeP != NULL && maxChannelsP != NULL)
{
_msg_queue.high_water_mark
(
size_t(*maxQueueSizeP / *maxChannelsP)
);
}
}
~Channel_Consumer();
//===================================================
// PROTOTYPE:
//===================================================
STATUS openChannel(const ACE_CString &address, const ACE_UINT32 port,
const ACE_CString &sender_address, const ACE_UINT32 sender_port,
Tcp_Consumer* consumer);
STATUS sendData(ACE_Message_Block* mb);
//-----------------------------------------
// send close message to tunnel consumer.
//-----------------------------------------
STATUS closeTunnelConnection();
//-----------------------------------------
// Set this channel state to pending close
// This message is sent by Tunnel Supplier.
//-----------------------------------------
void channelClose();
//-----------------------------------------
// unbind connection with Tunnel consumer.
//-----------------------------------------
void connectionUnbind();
//-----------------------------------------
// Set a new window size.
// When this function is being called, we go through
// the message queue, and transfer as much messages
// as we can, to tunnel message queue.
//-----------------------------------------
STATUS setWinSize(unsigned int win);
//-----------------------------------------
// activate the channel.
// This handler will used each time we use tunnel API
//-----------------------------------------
STATUS activateChannel(unsigned int id, unsigned int win);
//-----------------------------------------
// This channel failed to open (received from tunnel supplier/tunnel consumer)
//-----------------------------------------
void channelOpenFailure();
//-----------------------------------------
// This function is being called when Channel manager
// already holds amt channel for this connection,
// This happens when AMT device request for direct
// open request.
// In this function we finish creating the channel
//-----------------------------------------
STATUS openChannelRep(ACE_INET_Addr amt_addr,
bool status,
Tcp_Consumer* consumer,
AMT_Tunnel_Consumer* tunnel_consumer);
private:
void terminate();
STATUS getStringFullAddr(ACE_INET_Addr& addr , ACE_CString& str1);
bool canMQContainMB(size_t mb_len);
//-----------------------------------------
// Split mb to 2 in way that:
// mb will hold only _window size length
// mb2 will hold the rest of the message
//-----------------------------------------
void splitMessage(ACE_Message_Block*& mb, ACE_Message_Block*& mb2);
//-----------------------------------------
// Handle MB.
// If mb is hangup message - send close channel to its tunnel
// otherwise - send the data to the tunnel.
//-----------------------------------------
STATUS handleMB(ACE_Message_Block* mb);
// forward message to tunnel consumer
STATUS forwardMBtoTunnel(ACE_Message_Block *mb);
//-----------------------------------------
// Put a message block in our queue
// If the queue is full then we will save this message in
// _mb_remain and return STATUS_QUEUE_FULL
//-----------------------------------------
STATUS putMBInQueue(ACE_Message_Block *mb);
//for DEBUG:
void setTotalSentDataLength(ssize_t dataLen) { _total_sent_data_length = dataLen; }
ssize_t getTotalSentDataLength() { return _total_sent_data_length; }
//===================================================
// Data Member
//===================================================
unsigned int _win_size;
unsigned int _tunnel_handler;
bool _registered;
bool _amt_closed;
bool _mc_closed;
Channel_Consumer_State _state;
ACE_Message_Queue<ACE_MT_SYNCH> _msg_queue;
Tcp_Supplier* _tcp_supplier;
AMT_Tunnel_Consumer* _tunnel_consumer;
ACE_Recursive_Thread_Mutex _channel_mutex;
ACE_Message_Block* _mb_remain;
// For debug
ssize_t _total_received_data_length;
ssize_t _total_sent_data_length;
};
#endif //_MPS_CHANNEL_CONSUMER_H__