//---------------------------------------------------------------------------- // // Copyright (C) Intel Corporation, 2006 - 2007. // // File: TcpSupplier.cpp // // Contents: Handles incoming data on TCP connection. // // Notes: //---------------------------------------------------------------------------- //=================================================== // INCLUDES //=================================================== #include #include #include #include "TcpSupplier.h" #include "ChannelConsumer.h" Tcp_Supplier::~Tcp_Supplier() { ACE_DEBUG((MY_DEBUG ACE_TEXT(" --->Tcp_Supplier dtor\n"))); } Tcp_Consumer* Tcp_Supplier::consumer() { if (_svc_handler != NULL) { return _svc_handler->_consumer; } return NULL; } STATUS Tcp_Supplier::handle_input(ACE_HANDLE h) { return tunnelData(); } STATUS Tcp_Supplier::tunnelData(void) { ACE_TRACE(ACE_TEXT("Tcp_Supplier::tunnelData")); int send_length = *getMaximumWindowSize(); ssize_t actual_send_length; //char* body_data = new char[send_length]; ACE_Message_Block *mb = new ACE_Message_Block(send_length,ACE_Message_Block::MB_DATA); actual_send_length = getPeer().recv(mb->wr_ptr(), send_length, &ACE_Time_Value(*getMaxTunnelTimeout())); ACE_DEBUG((MY_TRACE ACE_TEXT("Tcp_Supplier::tunnelData Received data: %d (Supplier pointer: %x)\t(Consumer: %x)\n"), actual_send_length, this, _channel_consummer)); //socket closed: if (actual_send_length <= 0) { ACE_DEBUG((MY_DEBUG ACE_TEXT("TCP supplier failed to read from socket - MC closed the socket. actual_bytes= %d errno = %d\n"),actual_send_length,errno)); mb->release(); return STATUS_NETWORK_ERROR; } mb->wr_ptr(actual_send_length); //send STATUS rep = _channel_consummer->sendData(mb); CHECK_STATUS_REP(rep, ACE_TEXT("Tcp_Supplier failed to send data to AMT\n")); return STATUS_SUCCESS; } //----------------------------------------- // Get reference to socket stream //----------------------------------------- ACE_SOCK_Stream& Tcp_Supplier::getPeer(void) const { return (ACE_SOCK_Stream &)_svc_handler->peer(); } //----------------------------------------- // Get reference to socket stream //----------------------------------------- void Tcp_Supplier::unRegisterHandler(ACE_Reactor_Mask reactor_flag) { ACE_TRACE("unRegisterHandler"); ACE_DEBUG((MY_TRACE ACE_TEXT("TCP supplier unRegisterHandler\n"))); _svc_handler->cancel_io(reactor_flag); } //----------------------------------------- // Get reference to socket stream //----------------------------------------- void Tcp_Supplier::registerHandler(ACE_Reactor_Mask reactor_flag) { ACE_TRACE("registerHandler"); ACE_DEBUG((MY_TRACE ACE_TEXT("TCP supplier RegisterHandler\n"))); _svc_handler->initiate_io(reactor_flag); } //----------------------------------------- // Handle close - the reactor has closed the read mask //----------------------------------------- void Tcp_Supplier::handle_close() { ACE_TRACE(ACE_TEXT("Tcp_Supplier::close")); if (_channel_consummer != NULL) { ACE_DEBUG((MY_DEBUG ACE_TEXT( "Tcp_Supplier:: close() - going closeTunnelConnection \n"))); if ((_channel_consummer->closeTunnelConnection()) != STATUS_SUCCESS) { ACE_DEBUG((MY_DEBUG ACE_TEXT( "Tcp_Supplier:: close() - closeTunnelConnection return with -1\n"))); } } getPeer().close_reader(); } //----------------------------------------- // Create Channel consumer, and through it // channel open Reply (according to is_connected). //----------------------------------------- STATUS Tcp_Supplier::channelOpenRep(ACE_INET_Addr amt_addr, bool is_connected, ACE_UINT32 tunnel_handler, ACE_UINT32 win_size, AMT_Tunnel_Consumer* tunnel_consumer) { ACE_TRACE(ACE_TEXT("Tcp_Supplier::channelOpenRep")); ACE_NEW_RETURN( _channel_consummer, Channel_Consumer(this, tunnel_handler, win_size), STATUS_MALLOC_FAILURE); return _channel_consummer->openChannelRep(amt_addr, is_connected, consumer(), tunnel_consumer); }