//---------------------------------------------------------------------------- // // Copyright (C) Intel Corporation, 2006 - 2007. // // File: ChannelManager.cpp // // Contents: Manages the channels in the MPS // // Notes: //---------------------------------------------------------------------------- #include "ChannelManager.h" #include "ChannelConsumer.h" #include "TcpConsumer.h" // Constructor Channel_Manager::Channel_Manager(int size): all_channels_(size),_state(Channel_Manager::ACTIVE) { for (size_t i = 0; i < all_channels_.size(); i++) { all_channels_[i] = NULL; } } Channel_Manager::~Channel_Manager(void) { } // Get next available mps id ACE_INT32 Channel_Manager::get_available_mps_handle() { // Find if we can reuse old channel ACE_UINT32 i = 0; for (; i < all_channels_.size(); i++) { if (all_channels_[i] == NULL) return i; } // We add another channel to the vector all_channels_.push_back(NULL); return i; } // Create a new channel with a given mps id AMT_Channel* Channel_Manager::create_channel() { if (_state == NOT_ACTIVE) { ACE_DEBUG((MY_DEBUG ACE_TEXT("Channel_Manager::create_channel() - in NOT_ACTIVE state - going to return NULL\n"))); return NULL; } ACE_INT32 mps_id = get_available_mps_handle(); if (mps_id == -1) return NULL; AMT_Channel* ch = new AMT_Channel; ch->setMPSid(mps_id); (channel_map_[MPS_ENDPOINT])[mps_id] = ch; all_channels_[mps_id] = ch; return ch; } AMT_Channel* Channel_Manager::create_channel(const ACE_UINT32 amt_id) { if ((find_channel(amt_id, AMT_ENDPOINT)) != NULL) { // We already have a channel with that AMT key. return NULL; } AMT_Channel* ch = create_channel(); if (ch != NULL) { ch->setAMTid(amt_id); (channel_map_[AMT_ENDPOINT])[amt_id] = ch; } return ch; } // Add a new key to a given channel, this method is called after open reply. int Channel_Manager::add_amt_id(const ACE_UINT32 mps_id, const ACE_UINT32 amt_id) { AMT_Channel* ch; if ((ch = find_channel(mps_id, MPS_ENDPOINT)) == NULL) return -1; if ((find_channel(amt_id, AMT_ENDPOINT)) != NULL) // We already have a channel with that AMT key. return -1; (channel_map_[AMT_ENDPOINT])[amt_id] = ch; return 0; } // Find a channel according to ID and endpoint AMT_Channel* Channel_Manager::find_channel(const ACE_UINT32 id, CHANNEL_ENDPOINT endpoint) { CHANNEL_MAP::iterator itr = channel_map_[endpoint].find(id); if (itr == channel_map_[endpoint].end()) return NULL; return itr->second; } // Delete channel from only one map (according to endpoint) int Channel_Manager::remove_channel_from_endpoint(const ACE_UINT32 id, CHANNEL_ENDPOINT endpoint) { if ((endpoint == MPS_ENDPOINT) && (id < all_channels_.size())) { all_channels_[id] = NULL; } CHANNEL_MAP::iterator itr = channel_map_[endpoint].find(id); if (itr == channel_map_[endpoint].end()) return -1; channel_map_[endpoint].erase(itr); return 0; } // Delete channel from all maps int Channel_Manager::remove_channel(const ACE_UINT32 id, CHANNEL_ENDPOINT endpoint) { CHANNEL_MAP::iterator itr = channel_map_[endpoint].find(id); if (itr == channel_map_[endpoint].end()) return -1; ACE_UINT32 mps_id = ((AMT_Channel*)(itr->second))->getMPSid(); int other_endpoint = (endpoint == AMT_ENDPOINT ? MPS_ENDPOINT : AMT_ENDPOINT); ACE_UINT32 other_id = (endpoint == AMT_ENDPOINT ? ((AMT_Channel*)(itr->second))->getMPSid() : ((AMT_Channel*)(itr->second))->getAMTid()); if (mps_id <= all_channels_.size()) { all_channels_[mps_id] = NULL; } // Remove this channel from the given endpoint channel_map_[endpoint].erase(itr); // Remove it's pair channel from other endpoint itr = channel_map_[other_endpoint].find(other_id); if (itr != channel_map_[other_endpoint].end()) channel_map_[other_endpoint].erase(itr); return 0; } // Close all channels void Channel_Manager::close_all_channels() { for (size_t i=0; i < all_channels_.size(); i++) { if (all_channels_[i] == NULL) continue; if (all_channels_[i]->getChannelConsumer() != NULL) { all_channels_[i]->getChannelConsumer()->connectionUnbind(); } if (all_channels_[i]->getTcpConsumer() != NULL) { all_channels_[i]->getTcpConsumer()->connectionClose(); } delete all_channels_[i]; all_channels_[i] = NULL; } (channel_map_[MPS_ENDPOINT]).clear(); (channel_map_[AMT_ENDPOINT]).clear(); _state = NOT_ACTIVE; }