OmniEvents
|
00001 // Package : omniEvents 00002 // ProxyPullConsumer.cc Created : 2003/12/04 00003 // Author : Alex Tingle 00004 // 00005 // Copyright (C) 2003-2005 Alex Tingle. 00006 // 00007 // This file is part of the omniEvents application. 00008 // 00009 // omniEvents is free software; you can redistribute it and/or 00010 // modify it under the terms of the GNU Lesser General Public 00011 // License as published by the Free Software Foundation; either 00012 // version 2.1 of the License, or (at your option) any later version. 00013 // 00014 // omniEvents is distributed in the hope that it will be useful, 00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00017 // Lesser General Public License for more details. 00018 // 00019 // You should have received a copy of the GNU Lesser General Public 00020 // License along with this library; if not, write to the Free Software 00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00022 // 00023 00024 #include "ProxyPullConsumer.h" 00025 #include "Orb.h" 00026 #include "omniEventsLog.h" 00027 #include "PersistNode.h" 00028 #include <assert.h> 00029 00030 namespace OmniEvents { 00031 00032 // 00033 // ProxyPullConsumerManager 00034 // 00035 00036 PortableServer::Servant 00037 ProxyPullConsumerManager::incarnate( 00038 const PortableServer::ObjectId& oid, 00039 PortableServer::POA_ptr poa 00040 ) 00041 { 00042 DB(20,"ProxyPullConsumerManager::incarnate()") 00043 ProxyPullConsumer_i* result =new ProxyPullConsumer_i(_managedPoa,_queue); 00044 _servants.insert(result); 00045 return result; 00046 } 00047 00048 ProxyPullConsumerManager::ProxyPullConsumerManager( 00049 PortableServer::POA_ptr parentPoa, 00050 list<CORBA::Any*>& q 00051 ) 00052 : ProxyManager(parentPoa), 00053 _queue(q) 00054 { 00055 ProxyManager::activate("ProxyPullConsumer"); 00056 } 00057 00058 ProxyPullConsumerManager::~ProxyPullConsumerManager() 00059 { 00060 DB(20,"~ProxyPullConsumerManager()") 00061 } 00062 00063 OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(ProxyPullConsumerManager) 00064 00065 CosEventChannelAdmin::ProxyPullConsumer_ptr 00066 ProxyPullConsumerManager::createObject() 00067 { 00068 return createNarrowedReference<CosEventChannelAdmin::ProxyPullConsumer>( 00069 _managedPoa.in(), 00070 CosEventChannelAdmin::_tc_ProxyPullConsumer->id() 00071 ); 00072 } 00073 00074 void ProxyPullConsumerManager::collect() 00075 { 00076 // Collect events from each servant in turn. 00077 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i) 00078 { 00079 ProxyPullConsumer_i* proxy=dynamic_cast<ProxyPullConsumer_i*>(*i); 00080 proxy->collect(); 00081 } 00082 } 00083 00084 void ProxyPullConsumerManager::triggerRequest() 00085 { 00086 // Trigger each servant in turn. 00087 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i) 00088 { 00089 ProxyPullConsumer_i* proxy=dynamic_cast<ProxyPullConsumer_i*>(*i); 00090 proxy->triggerRequest(); 00091 } 00092 } 00093 00094 void ProxyPullConsumerManager::disconnect() 00095 { 00096 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i) 00097 { 00098 Proxy* p =*i; // Sun's CC requires this temporary. 00099 ProxyPullConsumer_i* ppc =static_cast<ProxyPullConsumer_i*>(p); 00100 // We are in the EventChannel's thread. 00101 // Make sure all calls go though the ProxyPullConsumer POA. 00102 CosEventChannelAdmin::ProxyPullConsumer_var ppcv =ppc->_this(); 00103 ppcv->disconnect_pull_consumer(); 00104 } 00105 } 00106 00107 00108 // 00109 // ProxyPullConsumer_i 00110 // 00111 00112 // CORBA interface methods 00113 00114 void ProxyPullConsumer_i::connect_pull_supplier( 00115 CosEventComm::PullSupplier_ptr pullSupplier 00116 ) 00117 { 00118 if(CORBA::is_nil(pullSupplier)) 00119 throw CORBA::BAD_PARAM(); 00120 if(!CORBA::is_nil(_target) || !CORBA::is_nil(_req)) 00121 throw CosEventChannelAdmin::AlreadyConnected(); 00122 _target=CosEventComm::PullSupplier::_duplicate(pullSupplier); 00123 00124 if(omniEventsLog::exists()) 00125 { 00126 WriteLock log; 00127 output(log.os); 00128 } 00129 } 00130 00131 void ProxyPullConsumer_i::disconnect_pull_consumer() 00132 { 00133 DB(5,"ProxyPullConsumer_i::disconnect_pull_consumer()"); 00134 eraseKey("SupplierAdmin/ProxyPullConsumer"); 00135 deactivateObject(); 00136 if(CORBA::is_nil(_target)) 00137 { 00138 throw CORBA::OBJECT_NOT_EXIST( 00139 IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0), 00140 CORBA::COMPLETED_NO 00141 ); 00142 } 00143 else 00144 { 00145 CORBA::Request_var req=_target->_request("disconnect_pull_supplier"); 00146 _target=CosEventComm::PullSupplier::_nil(); 00147 req->send_deferred(); 00148 Orb::inst().deferredRequest(req._retn()); 00149 } 00150 } 00151 00152 // 00153 00154 ProxyPullConsumer_i::ProxyPullConsumer_i( 00155 PortableServer::POA_ptr poa, 00156 list<CORBA::Any*>& q 00157 ) 00158 : Proxy(poa), 00159 _target(CosEventComm::PullSupplier::_nil()), 00160 _queue(q), 00161 _mode(Pull), // Prefer 'pull' method calls. 00162 _exceptionCount(0) 00163 {} 00164 00165 ProxyPullConsumer_i::~ProxyPullConsumer_i() 00166 { 00167 DB(20,"~ProxyPullConsumer_i()") 00168 } 00169 00170 void ProxyPullConsumer_i::collect() 00171 { 00172 if(!CORBA::is_nil(_req) && _req->poll_response()) 00173 { 00174 const char* opname =_req->operation(); 00175 assert(opname); 00176 CORBA::Environment_ptr env =_req->env(); // No need to release environment. 00177 00178 if(!CORBA::is_nil(env) && env->exception()) 00179 { 00180 CORBA::Exception* ex =env->exception(); // No need to free exception. 00181 DB(10,"ProxyPullConsumer got exception" 00182 IF_OMNIORB4(<<": "<<ex->_name())<<", op:"<<opname); 00183 if(0==strcmp("pull",opname) || 0==strcmp("try_pull",opname)) 00184 { 00185 ++_exceptionCount; 00186 _mode=( _mode==Pull? TryPull: Pull ); // Try something else next time. 00187 } 00188 else 00189 DB(2,"Ignoring unrecognised response. operation:"<<opname); 00190 if(_exceptionCount>=4) 00191 { 00192 Orb::inst().reportObjectFailure(HERE,_target.in(),ex); 00193 00194 // Try to notify the Supplier that the connection is closing. 00195 CORBA::Request_var req=_target->_request("disconnect_pull_supplier"); 00196 req->send_deferred(); 00197 Orb::inst().deferredRequest(req._retn()); 00198 00199 _target=CosEventComm::PullSupplier::_nil(); // disconnected 00200 eraseKey("SupplierAdmin/ProxyPullConsumer"); 00201 deactivateObject(); 00202 } 00203 } 00204 else 00205 { 00206 // Do we have an event? 00207 bool hasEvent=false; 00208 if(0==strcmp("pull",opname)) 00209 { 00210 hasEvent=true; 00211 } 00212 else if(0==strcmp("try_pull",opname)) 00213 { 00214 CORBA::NVList_ptr args=_req->arguments(); // No need to release args. 00215 if(args->count()==1) 00216 { 00217 CORBA::NamedValue_var hasEventArg=args->item(0); 00218 if(0==strcmp(hasEventArg->name(),"has_event")) 00219 { 00220 CORBA::Any* a =hasEventArg->value(); 00221 CORBA::Boolean b; 00222 CORBA::Any::to_boolean tb(b); //MS VC++6 is on drugs! 00223 hasEvent=(((*a)>>=tb) && b); 00224 } 00225 } 00226 } 00227 // Pick up an event, if we have one. 00228 if(hasEvent) 00229 { 00230 CORBA::Any* event =new CORBA::Any(); 00231 _req->return_value() >>= (*event); 00232 _queue.push_back(event); 00233 } 00234 // Reset the exception count. 00235 _exceptionCount=0; 00236 } 00237 _req=CORBA::Request::_nil(); 00238 } 00239 } // ProxyPullConsumer_i::end collect() 00240 00241 void ProxyPullConsumer_i::triggerRequest() 00242 { 00243 if(CORBA::is_nil(_req) && !CORBA::is_nil(_target)) 00244 { 00245 switch(_mode) 00246 { 00247 case Pull: 00248 _req=_target->_request("pull"); 00249 break; 00250 case TryPull: 00251 _req=_target->_request("try_pull"); 00252 _req->add_out_arg("has_event")<<=CORBA::Any::from_boolean(1); 00253 break; 00254 default: 00255 assert(0); 00256 } 00257 _req->set_return_type(CORBA::_tc_any); 00258 _req->send_deferred(); 00259 } 00260 } 00261 00262 void ProxyPullConsumer_i::reincarnate( 00263 const string& oid, 00264 const PersistNode& node 00265 ) 00266 { 00267 CosEventComm::PullSupplier_var pullSupplier = 00268 string_to_<CosEventComm::PullSupplier>(node.attrString("IOR").c_str()); 00269 // Do not activate until we know that we have read a valid target. 00270 activateObjectWithId(oid.c_str()); 00271 connect_pull_supplier(pullSupplier.in()); 00272 } 00273 00274 void ProxyPullConsumer_i::output(ostream& os) 00275 { 00276 basicOutput(os,"SupplierAdmin/ProxyPullConsumer",_target.in()); 00277 } 00278 00279 }; // end namespace OmniEvents