OmniEvents
|
00001 // Package : omniEvents 00002 // ProxyPullSupplier.cc Created : 2003/12/04 00003 // Author : Alex Tingle 00004 // 00005 // Copyright (C) 2003-2005 Alex Tingle. 00006 // 00007 // This file is part of the omniEvents application. 00008 // 00009 // omniEvents is free software; you can redistribute it and/or 00010 // modify it under the terms of the GNU Lesser General Public 00011 // License as published by the Free Software Foundation; either 00012 // version 2.1 of the License, or (at your option) any later version. 00013 // 00014 // omniEvents is distributed in the hope that it will be useful, 00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00017 // Lesser General Public License for more details. 00018 // 00019 // You should have received a copy of the GNU Lesser General Public 00020 // License along with this library; if not, write to the Free Software 00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00022 // 00023 00024 #include "ProxyPullSupplier.h" 00025 #include "EventChannel.h" 00026 #include "Orb.h" 00027 #include "omniEventsLog.h" 00028 #include "PersistNode.h" 00029 #include <assert.h> 00030 00031 namespace OmniEvents { 00032 00033 // 00034 // ProxyPullSupplierManager 00035 // 00036 00037 PortableServer::Servant ProxyPullSupplierManager::incarnate( 00038 const PortableServer::ObjectId& oid, 00039 PortableServer::POA_ptr poa 00040 ) 00041 { 00042 // Evict the oldest proxy servant, if we have reached the maximum number. 00043 if(_servants.size()>=_channel.maxNumProxies()) 00044 { 00045 ProxyPullSupplier_i* oldest =NULL; 00046 unsigned long age =0; 00047 for(set<Proxy*>::iterator i=_servants.begin(); i!=_servants.end(); ++i) 00048 if(!oldest || dynamic_cast<ProxyPullSupplier_i*>(*i)->timestamp()<age) 00049 { 00050 oldest=dynamic_cast<ProxyPullSupplier_i*>(*i); 00051 age=oldest->timestamp(); 00052 } 00053 DB(5,"Evicting oldest ProxyPullSupplier to make space for a new one") 00054 try{ oldest->disconnect_pull_supplier(); }catch(CORBA::OBJECT_NOT_EXIST&){} 00055 } 00056 // Make a new servant. 00057 ProxyPullSupplier_i* result =new ProxyPullSupplier_i(_managedPoa,_queue); 00058 _servants.insert(result); 00059 return result; 00060 } 00061 00062 ProxyPullSupplierManager::ProxyPullSupplierManager( 00063 const EventChannel_i& channel, 00064 PortableServer::POA_ptr parentPoa, 00065 EventQueue& q 00066 ) 00067 : ProxyManager(parentPoa), 00068 _queue(q), 00069 _channel(channel) 00070 { 00071 ProxyManager::activate("ProxyPullSupplier"); 00072 } 00073 00074 ProxyPullSupplierManager::~ProxyPullSupplierManager() 00075 { 00076 DB(20,"~ProxyPullSupplierManager()") 00077 } 00078 00079 OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(ProxyPullSupplierManager) 00080 00081 CosEventChannelAdmin::ProxyPullSupplier_ptr 00082 ProxyPullSupplierManager::createObject() 00083 { 00084 return createNarrowedReference<CosEventChannelAdmin::ProxyPullSupplier>( 00085 _managedPoa.in(), 00086 CosEventChannelAdmin::_tc_ProxyPullSupplier->id() 00087 ); 00088 } 00089 00090 void ProxyPullSupplierManager::disconnect() 00091 { 00092 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i) 00093 { 00094 ProxyPullSupplier_i* pps =dynamic_cast<ProxyPullSupplier_i*>(*i); 00095 // We are in the EventChannel's thread. 00096 // Make sure all calls go though the ProxyPullSupplier POA. 00097 CosEventChannelAdmin::ProxyPullSupplier_var ppsv =pps->_this(); 00098 ppsv->disconnect_pull_supplier(); 00099 00100 } 00101 } 00102 00103 00104 // 00105 // ProxyPullSupplier_i 00106 // 00107 00108 // CORBA interface methods 00109 00110 void ProxyPullSupplier_i::connect_pull_consumer( 00111 CosEventComm::PullConsumer_ptr pullConsumer 00112 ) 00113 { 00114 if(_connected || !CORBA::is_nil(_target) || !CORBA::is_nil(_req)) 00115 throw CosEventChannelAdmin::AlreadyConnected(); 00116 touch(); 00117 _connected=true; 00118 if(!CORBA::is_nil(pullConsumer)) 00119 _target=CosEventComm::PullConsumer::_duplicate(pullConsumer); 00120 00121 if(omniEventsLog::exists()) 00122 { 00123 WriteLock log; 00124 output(log.os); 00125 } 00126 } 00127 00128 void ProxyPullSupplier_i::disconnect_pull_supplier() 00129 { 00130 DB(5,"ProxyPullSupplier_i::disconnect_pull_supplier()"); 00131 touch(); 00132 eraseKey("ConsumerAdmin/ProxyPullSupplier"); 00133 deactivateObject(); 00134 if(!_connected) 00135 { 00136 throw CORBA::OBJECT_NOT_EXIST( 00137 IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0), 00138 CORBA::COMPLETED_NO 00139 ); 00140 } 00141 else if(!CORBA::is_nil(_target)) 00142 { 00143 CORBA::Request_var req=_target->_request("disconnect_pull_consumer"); 00144 _target=CosEventComm::PullConsumer::_nil(); 00145 req->send_deferred(); 00146 Orb::inst().deferredRequest(req._retn()); 00147 } 00148 } 00149 00150 CORBA::Any* ProxyPullSupplier_i::pull() 00151 { 00152 if(!_connected) 00153 throw CosEventComm::Disconnected(); 00154 touch(); 00155 if(moreEvents()) 00156 return new CORBA::Any(*nextEvent()); 00157 else 00158 throw CORBA::TRANSIENT( 00159 IFELSE_OMNIORB4(omni::TRANSIENT_CallTimedout,0), 00160 CORBA::COMPLETED_NO 00161 ); 00162 } 00163 00164 CORBA::Any* ProxyPullSupplier_i::try_pull(CORBA::Boolean& has_event) 00165 { 00166 if(!_connected) 00167 throw CosEventComm::Disconnected(); 00168 touch(); 00169 if(moreEvents()) 00170 { 00171 has_event=1; 00172 return new CORBA::Any(*nextEvent()); 00173 } 00174 else 00175 { 00176 has_event=0; 00177 return new CORBA::Any(); 00178 } 00179 } 00180 00181 // 00182 00183 ProxyPullSupplier_i::ProxyPullSupplier_i( 00184 PortableServer::POA_ptr poa, 00185 EventQueue& q 00186 ) 00187 : Proxy(poa), 00188 EventQueue::Reader(q), 00189 _target(CosEventComm::PullConsumer::_nil()), 00190 _connected(false), 00191 _timestamp(0) 00192 { 00193 touch(); 00194 } 00195 00196 ProxyPullSupplier_i::~ProxyPullSupplier_i() 00197 { 00198 DB(20,"~ProxyPullSupplier_i()") 00199 } 00200 00201 void ProxyPullSupplier_i::reincarnate( 00202 const string& oid, 00203 const PersistNode& node 00204 ) 00205 { 00206 CosEventComm::PullConsumer_var pullConsumer = 00207 string_to_<CosEventComm::PullConsumer>(node.attrString("IOR").c_str()); 00208 // Do not activate until we know that we have read a valid target. 00209 activateObjectWithId(oid.c_str()); 00210 connect_pull_consumer(pullConsumer.in()); 00211 } 00212 00213 void ProxyPullSupplier_i::output(ostream& os) 00214 { 00215 basicOutput(os,"ConsumerAdmin/ProxyPullSupplier",_target.in()); 00216 } 00217 00218 inline void ProxyPullSupplier_i::touch() 00219 { 00220 unsigned long nsec; // dummy 00221 omni_thread::get_time(&_timestamp,&nsec); 00222 } 00223 00224 }; // end namespace OmniEvents