//---------------------------------------------------------------------------- // // Copyright (C) Intel Corporation, 2006 - 2007. // // File: ChannelConsumer.h // // Contents: In charge of handling messages from the socks connection // to the tunnel. // // Notes: //---------------------------------------------------------------------------- #ifndef _MPS_CHANNEL_CONSUMER_H__ #define _MPS_CHANNEL_CONSUMER_H__ //=================================================== // INCLUDES //=================================================== #include #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ #include #include #include #include #include #include //#include "tunnelConsumer.h" #include "OptionsUtils.h" // FW declaration class Tcp_Consumer; class Tcp_Supplier; class AMT_Tunnel_Consumer; /*************************************************************************************** // // /**************************************************************************************/ class Channel_Consumer { public: enum Channel_Consumer_State { INIT, PENDING_OPEN, ACTIVE, PENDING_CLOSE }; //=================================================== // CTOR / DTOR //=================================================== Channel_Consumer( Tcp_Supplier* supplier, ACE_INT32 tunnel_handler = DFLT_NEGETIVE_VALUE, ACE_UINT32 win_size = DFLT_ZERO): _tcp_supplier(supplier), _tunnel_handler(tunnel_handler), _win_size(win_size), _state(INIT), _mb_remain(NULL), _tunnel_consumer(NULL), _amt_closed(false), _mc_closed(false), _total_received_data_length(0), _total_sent_data_length(0), _registered() { const unsigned int * maxQueueSizeP = getMaxQueueSize(); const unsigned int * maxChannelsP = getMaxChannels(); if (maxQueueSizeP != NULL && maxChannelsP != NULL) { _msg_queue.high_water_mark ( size_t(*maxQueueSizeP / *maxChannelsP) ); } } ~Channel_Consumer(); //=================================================== // PROTOTYPE: //=================================================== STATUS openChannel(const ACE_CString &address, const ACE_UINT32 port, const ACE_CString &sender_address, const ACE_UINT32 sender_port, Tcp_Consumer* consumer); STATUS sendData(ACE_Message_Block* mb); //----------------------------------------- // send close message to tunnel consumer. //----------------------------------------- STATUS closeTunnelConnection(); //----------------------------------------- // Set this channel state to pending close // This message is sent by Tunnel Supplier. //----------------------------------------- void channelClose(); //----------------------------------------- // unbind connection with Tunnel consumer. //----------------------------------------- void connectionUnbind(); //----------------------------------------- // Set a new window size. // When this function is being called, we go through // the message queue, and transfer as much messages // as we can, to tunnel message queue. //----------------------------------------- STATUS setWinSize(unsigned int win); //----------------------------------------- // activate the channel. // This handler will used each time we use tunnel API //----------------------------------------- STATUS activateChannel(unsigned int id, unsigned int win); //----------------------------------------- // This channel failed to open (received from tunnel supplier/tunnel consumer) //----------------------------------------- void channelOpenFailure(); //----------------------------------------- // This function is being called when Channel manager // already holds amt channel for this connection, // This happens when AMT device request for direct // open request. // In this function we finish creating the channel //----------------------------------------- STATUS openChannelRep(ACE_INET_Addr amt_addr, bool status, Tcp_Consumer* consumer, AMT_Tunnel_Consumer* tunnel_consumer); private: void terminate(); STATUS getStringFullAddr(ACE_INET_Addr& addr , ACE_CString& str1); bool canMQContainMB(size_t mb_len); //----------------------------------------- // Split mb to 2 in way that: // mb will hold only _window size length // mb2 will hold the rest of the message //----------------------------------------- void splitMessage(ACE_Message_Block*& mb, ACE_Message_Block*& mb2); //----------------------------------------- // Handle MB. // If mb is hangup message - send close channel to its tunnel // otherwise - send the data to the tunnel. //----------------------------------------- STATUS handleMB(ACE_Message_Block* mb); // forward message to tunnel consumer STATUS forwardMBtoTunnel(ACE_Message_Block *mb); //----------------------------------------- // Put a message block in our queue // If the queue is full then we will save this message in // _mb_remain and return STATUS_QUEUE_FULL //----------------------------------------- STATUS putMBInQueue(ACE_Message_Block *mb); //for DEBUG: void setTotalSentDataLength(ssize_t dataLen) { _total_sent_data_length = dataLen; } ssize_t getTotalSentDataLength() { return _total_sent_data_length; } //=================================================== // Data Member //=================================================== unsigned int _win_size; unsigned int _tunnel_handler; bool _registered; bool _amt_closed; bool _mc_closed; Channel_Consumer_State _state; ACE_Message_Queue _msg_queue; Tcp_Supplier* _tcp_supplier; AMT_Tunnel_Consumer* _tunnel_consumer; ACE_Recursive_Thread_Mutex _channel_mutex; ACE_Message_Block* _mb_remain; // For debug ssize_t _total_received_data_length; ssize_t _total_sent_data_length; }; #endif //_MPS_CHANNEL_CONSUMER_H__