OmniEvents
|
00001 // Package : omniEvents 00002 // ProxyPushConsumer.cc Created : 2003/12/04 00003 // Author : Alex Tingle 00004 // 00005 // Copyright (C) 2003,2005 Alex Tingle. 00006 // 00007 // This file is part of the omniEvents application. 00008 // 00009 // omniEvents is free software; you can redistribute it and/or 00010 // modify it under the terms of the GNU Lesser General Public 00011 // License as published by the Free Software Foundation; either 00012 // version 2.1 of the License, or (at your option) any later version. 00013 // 00014 // omniEvents is distributed in the hope that it will be useful, 00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00017 // Lesser General Public License for more details. 00018 // 00019 // You should have received a copy of the GNU Lesser General Public 00020 // License along with this library; if not, write to the Free Software 00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00022 // 00023 00024 #include "ProxyPushConsumer.h" 00025 #include "ConsumerAdmin.h" 00026 #include "Orb.h" 00027 #include "omniEventsLog.h" 00028 #include "PersistNode.h" 00029 00030 #include <assert.h> 00031 00032 namespace OmniEvents { 00033 00034 void ProxyPushConsumer_i::connect_push_supplier( 00035 CosEventComm::PushSupplier_ptr pushSupplier) 00036 { 00037 // pushSupplier is permitted to be nil. 00038 if(CORBA::is_nil(pushSupplier)) 00039 return; 00040 00041 string oidstr =currentObjectId(); 00042 Connections_t::iterator pos =_connections.find(oidstr); 00043 00044 if(pos!=_connections.end()) 00045 throw CosEventChannelAdmin::AlreadyConnected(); 00046 00047 Connection* newConnection = 00048 new Connection( 00049 _channelName.in(), 00050 oidstr, 00051 CosEventComm::PushSupplier::_duplicate(pushSupplier) 00052 ); 00053 _connections.insert( Connections_t::value_type(oidstr,newConnection) ); 00054 00055 // Test to see whether pushSupplier is a ProxyPushSupplier. 00056 // If so, then we will aggressively try to reconnect, when we are reincarnated 00057 CORBA::Request_var req =pushSupplier->_request("_is_a"); 00058 req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushSupplier->id(); 00059 req->set_return_type(CORBA::_tc_boolean); 00060 req->send_deferred(); 00061 Orb::inst().deferredRequest(req._retn(),newConnection); // Register callback 00062 00063 if(omniEventsLog::exists()) 00064 { 00065 WriteLock log; 00066 newConnection->output(log.os); 00067 } 00068 } 00069 00070 00071 void ProxyPushConsumer_i::disconnect_push_consumer() 00072 { 00073 #ifdef HAVE_OMNIORB4 00074 DB(5,"ProxyPushConsumer_i::disconnect_push_consumer()") 00075 string oidstr =currentObjectId(); 00076 Connections_t::iterator pos =_connections.find(oidstr); 00077 00078 if(pos!=_connections.end()) 00079 { 00080 CORBA::Request_var req = 00081 pos->second->_target->_request("disconnect_push_supplier"); 00082 pos->second->_remove_ref(); 00083 _connections.erase(pos); 00084 // The following line could result in a reentrant callback, if this call was 00085 // not made through the POA => must erase the connection BEFORE this point. 00086 req->send_deferred(); 00087 Orb::inst().deferredRequest(req._retn()); 00088 if(omniEventsLog::exists()) 00089 { 00090 // Erase this connection from the log file. 00091 WriteLock log; 00092 log.os<<"-ecf/"<<_channelName.in(); 00093 log.os<<"/SupplierAdmin/ProxyPushConsumer/"<<oidstr<<'\n'; 00094 } 00095 } 00096 #else /* Silently ignore disconnects with omniORB3 */ 00097 DB(5,"Ignoring disconnect_push_consumer(). Upgrade to omniORB4!") 00098 #endif 00099 } 00100 00101 00102 void ProxyPushConsumer_i::push(const CORBA::Any& event) 00103 { 00104 #ifdef OMNIEVENTS_REAL_TIME_PUSH 00105 if(!_useLocalQueue) 00106 { 00107 _consumerAdmin.send(new CORBA::Any(event)); 00108 _useLocalQueue=true; 00109 } 00110 else 00111 #endif 00112 _queue.push_back(new CORBA::Any(event)); 00113 } 00114 00115 00116 ProxyPushConsumer_i::ProxyPushConsumer_i( 00117 PortableServer::POA_ptr p, 00118 list<CORBA::Any*>& q, 00119 ConsumerAdmin_i& consumerAdmin 00120 ) 00121 : Servant(PortableServer::POA::_nil()), 00122 _connections(), 00123 _channelName(p->the_name()), 00124 _consumerAdmin(consumerAdmin), 00125 _queue(q), 00126 _useLocalQueue(false) 00127 { 00128 _consumerAdmin._add_ref(); 00129 00130 using namespace PortableServer; 00131 00132 // POLICIES: 00133 // Lifespan =PERSISTENT // we can persist 00134 // Assignment =USER_ID // write our own oid 00135 // Uniqueness =MULTIPLE_ID // only one servant 00136 // ImplicitActivation=NO_IMPLICIT_ACTIVATION // disable auto activation 00137 // RequestProcessing =USE_DEFAULT_SERVANT // only one servant 00138 // ServantRetention =NON_RETAIN // stateless POA 00139 // Thread =SINGLE_THREAD_MODEL // keep it simple 00140 00141 CORBA::PolicyList policies; 00142 policies.length(7); 00143 policies[0]=p->create_lifespan_policy(PERSISTENT); 00144 policies[1]=p->create_id_assignment_policy(USER_ID); 00145 policies[2]=p->create_id_uniqueness_policy(MULTIPLE_ID); 00146 policies[3]=p->create_implicit_activation_policy(NO_IMPLICIT_ACTIVATION); 00147 policies[4]=p->create_request_processing_policy(USE_DEFAULT_SERVANT); 00148 policies[5]=p->create_servant_retention_policy(NON_RETAIN); 00149 policies[6]=p->create_thread_policy(SINGLE_THREAD_MODEL); 00150 00151 try 00152 { 00153 // Create a POA for this proxy type in this channel. 00154 string poaName =string(_channelName.in())+".ProxyPushConsumer"; 00155 POAManager_var parentManager =p->the_POAManager(); 00156 _poa=p->create_POA(poaName.c_str(),parentManager.in(),policies); 00157 } 00158 catch(POA::AdapterAlreadyExists&) // create_POA 00159 { 00160 DB(0,"ProxyPushConsumer_i::ProxyPushConsumer_i() - " 00161 "POA::AdapterAlreadyExists") 00162 } 00163 catch(POA::InvalidPolicy& ex) // create_POA 00164 { 00165 DB(0,"ProxyPushConsumer_i::ProxyPushConsumer_i() - " 00166 "POA::InvalidPolicy: "<<ex.index) 00167 } 00168 00169 // Destroy the policy objects (Not strictly necessary in omniORB) 00170 for(CORBA::ULong i=0; i<policies.length(); ++i) 00171 policies[i]->destroy(); 00172 00173 // This object is the POA's default servant. 00174 _poa->set_servant(this); 00175 } 00176 00177 00178 ProxyPushConsumer_i::~ProxyPushConsumer_i() 00179 { 00180 DB(20,"~ProxyPushConsumer_i()") 00181 for(Connections_t::iterator i =_connections.begin(); 00182 i!=_connections.end(); 00183 ++i) 00184 { 00185 i->second->_remove_ref(); 00186 } 00187 _connections.clear(); 00188 00189 _consumerAdmin._remove_ref(); 00190 } 00191 00192 00193 CosEventChannelAdmin::ProxyPushConsumer_ptr 00194 ProxyPushConsumer_i::createObject() 00195 { 00196 return createNarrowedReference<CosEventChannelAdmin::ProxyPushConsumer>( 00197 _poa.in(), 00198 CosEventChannelAdmin::_tc_ProxyPushConsumer->id() 00199 ); 00200 } 00201 00202 00203 void ProxyPushConsumer_i::disconnect() 00204 { 00205 // Note. We are (probably) in the EventChannel's thread. 00206 Connections_t::iterator curr,next=_connections.begin(); 00207 while(next!=_connections.end()) 00208 { 00209 curr=next++; 00210 CORBA::Request_var req = 00211 curr->second->_target->_request("disconnect_push_supplier"); 00212 curr->second->_remove_ref(); 00213 _connections.erase(curr); 00214 // The following line could result in a reentrant callback 00215 // => must erase the connection BEFORE this point. 00216 req->send_deferred(); 00217 Orb::inst().deferredRequest(req._retn()); 00218 } 00219 } 00220 00221 00222 void ProxyPushConsumer_i::reincarnate(const PersistNode& node) 00223 { 00224 // Reincarnate all connections from node's children. 00225 for(map<string,PersistNode*>::const_iterator i=node._child.begin(); 00226 i!=node._child.end(); 00227 ++i) 00228 { 00229 const char* oidstr =i->first.c_str(); 00230 string ior( i->second->attrString("IOR") ); 00231 bool isProxy( i->second->attrLong("proxy") ); 00232 assert(_connections.find(oidstr)==_connections.end()); 00233 try 00234 { 00235 using namespace CosEventComm; 00236 using namespace CosEventChannelAdmin; 00237 00238 PushSupplier_var supp =string_to_<PushSupplier>(ior.c_str()); 00239 _connections.insert(Connections_t::value_type( 00240 oidstr, 00241 new Connection(_channelName.in(),oidstr,supp._retn(),isProxy) 00242 )); 00243 DB(5,"Reincarnated ProxyPushConsumer: "<<oidstr) 00244 00245 // If supp is a ProxyPushSupplier, then try to reconnect. 00246 if(isProxy) 00247 { 00248 DB(15,"Attempting to reconnect ProxyPushConsumer: "<<oidstr) 00249 // This will only work if the proxy is implemented in the same way as 00250 // omniEvents, so connect_() automatically creates a proxy. 00251 ProxyPushSupplier_var proxySupp = 00252 string_to_<ProxyPushSupplier>(ior.c_str()); 00253 PortableServer::ObjectId_var objectId = 00254 PortableServer::string_to_ObjectId(oidstr); 00255 CORBA::Object_var obj = 00256 _poa->create_reference_with_id( 00257 objectId.in(), 00258 CosEventChannelAdmin::_tc_ProxyPushConsumer->id() 00259 ); 00260 PushConsumer_var thisCons =CosEventComm::PushConsumer::_narrow(obj); 00261 proxySupp->connect_push_consumer(thisCons.in()); 00262 DB(7,"Reconnected ProxyPushConsumer: "<<oidstr) 00263 } 00264 } 00265 catch(CORBA::BAD_PARAM&) { 00266 // This will happen when IOR fails to narrow. 00267 DB(5,"Failed to reincarnate ProxyPushConsumer: "<<oidstr) 00268 } 00269 catch(CosEventChannelAdmin::AlreadyConnected&){ //connect_push_consumer() 00270 // The supplier doesn't need to be reconnected. 00271 DB(7,"Remote ProxyPushSupplier already connected: "<<oidstr) 00272 } 00273 catch(CosEventChannelAdmin::TypeError&){ // connect_push_consumer() 00274 // Don't know what to make of this... 00275 DB(2,"Remote ProxyPushSupplier threw TypeError: "<<oidstr) 00276 } 00277 catch(CORBA::OBJECT_NOT_EXIST&) {} // object 'supp' not responding. 00278 catch(CORBA::TRANSIENT& ) {} // object 'supp' not responding. 00279 catch(CORBA::COMM_FAILURE& ) {} // object 'supp' not responding. 00280 } // end loop for(i) 00281 } 00282 00283 00284 void ProxyPushConsumer_i::output(ostream& os) const 00285 { 00286 for(Connections_t::const_iterator i=_connections.begin(); 00287 i!=_connections.end(); 00288 ++i) 00289 { 00290 i->second->output(os); 00291 } 00292 } 00293 00294 00295 string ProxyPushConsumer_i::currentObjectId() const 00296 { 00297 #ifdef HAVE_OMNIORB4 00298 try 00299 { 00300 using namespace PortableServer; 00301 ObjectId_var oid =Orb::inst()._POACurrent->get_object_id(); 00302 CORBA::String_var oidStr =ObjectId_to_string(oid.in()); 00303 return string(oidStr.in()); 00304 } 00305 catch(PortableServer::Current::NoContext&) // get_object_id() 00306 { 00307 DB(0,"No context!!") 00308 } 00309 catch(CORBA::BAD_PARAM&) // ObjectId_to_string() 00310 { 00311 // Should never get here in omniORB, because ObjectID is a char*. 00312 assert(0); 00313 } 00314 return "ERROR"; 00315 #else 00316 throw CORBA::NO_IMPLEMENT(); 00317 #endif 00318 } 00319 00320 00321 // 00322 // ProxyPushConsumer_i::Connection 00323 // 00324 00325 #if OMNIEVENTS__DEBUG_SERVANT 00326 int ProxyPushConsumer_i::Connection::_objectCount =0; 00327 #endif 00328 00329 ProxyPushConsumer_i::Connection::Connection( 00330 const char* channelName, 00331 const string& oidstr, 00332 CosEventComm::PushSupplier_ptr pushSupplier, 00333 bool isProxy 00334 ):Callback(), 00335 _channelName(channelName), 00336 _oidstr(oidstr), 00337 _target(pushSupplier), 00338 _targetIsProxy(isProxy) 00339 { 00340 #if OMNIEVENTS__DEBUG_SERVANT 00341 ++_objectCount; 00342 DB(21,"ProxyPushConsumer_i::Connection::Connection() count="<<_objectCount) 00343 #endif 00344 } 00345 00346 ProxyPushConsumer_i::Connection::~Connection() 00347 { 00348 #if OMNIEVENTS__DEBUG_SERVANT 00349 --_objectCount; 00350 DB(20,"ProxyPushConsumer_i::Connection::~Connection() count="<<_objectCount) 00351 #else 00352 DB(20,"ProxyPushConsumer_i::Connection::~Connection()") 00353 #endif 00354 } 00355 00356 OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(ProxyPushConsumer_i::Connection) 00357 00358 void ProxyPushConsumer_i::Connection::callback(CORBA::Request_ptr req) 00359 { 00360 bool save =_targetIsProxy; 00361 if(req->return_value()>>=CORBA::Any::to_boolean(_targetIsProxy)) 00362 { 00363 if(_targetIsProxy && omniEventsLog::exists()) 00364 { 00365 WriteLock log; 00366 output(log.os); 00367 DB(15,"ProxyPushConsumer is federated."); 00368 } 00369 } 00370 else 00371 { 00372 DB(2,"ProxyPushConsumer got unexpected callback."); 00373 _targetIsProxy=save; // Reset it just to be sure. 00374 } 00375 } 00376 00377 void ProxyPushConsumer_i::Connection::output(ostream& os) const 00378 { 00379 os<<"ecf/"<<_channelName; 00380 os<<"/SupplierAdmin/ProxyPushConsumer/"<<_oidstr; 00381 00382 if(!CORBA::is_nil(_target.in())) 00383 { 00384 CORBA::String_var iorstr; 00385 iorstr = Orb::inst()._orb->object_to_string(_target.in()); 00386 os<<" IOR="<<iorstr.in(); 00387 if(_targetIsProxy) 00388 os<<" proxy=1"; 00389 } 00390 os<<" ;;\n"; 00391 } 00392 00393 00394 }; // end namespace OmniEvents