OmniEvents
|
00001 // -*- Mode: C++; -*- 00002 // Package : omniEvents 00003 // EventChannelFactory_i.cc Created : 1/4/98 00004 // Author : Paul Nader (pwn) 00005 // 00006 // Copyright (C) 1998 Paul Nader, 2003-2004 Alex Tingle. 00007 // 00008 // This file is part of the omniEvents application. 00009 // 00010 // omniEvents is free software; you can redistribute it and/or 00011 // modify it under the terms of the GNU Lesser General Public 00012 // License as published by the Free Software Foundation; either 00013 // version 2.1 of the License, or (at your option) any later version. 00014 // 00015 // omniEvents is distributed in the hope that it will be useful, 00016 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00017 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00018 // Lesser General Public License for more details. 00019 // 00020 // You should have received a copy of the GNU Lesser General Public 00021 // License along with this library; if not, write to the Free Software 00022 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00023 // 00024 // Description: 00025 // Implementation of the COSS Event Services Event Channel Factory 00026 // 00027 00028 #include "EventChannelFactory.h" 00029 00030 #include "Orb.h" 00031 #include "EventChannel.h" 00032 #include "PersistNode.h" 00033 00034 #include <memory> 00035 00036 #ifdef HAVE_OMNIORB4 00037 # define STR_MATCH(s1,s2) omni::strMatch((s1),(s2)) 00038 #else 00039 # define STR_MATCH(s1,s2) (0==::strcmp((s1),(s2))) 00040 #endif 00041 00042 namespace OmniEvents { 00043 00044 //------------------------------------------------------------------------ 00045 // Event Channel Factory Interface Implementation 00046 //------------------------------------------------------------------------ 00047 EventChannelFactory_i::EventChannelFactory_i(const PersistNode& node) 00048 : Servant(Orb::inst()._omniINSPOA.in()), 00049 _port(node.attrLong("port",11169)), 00050 _endPointNoListen(node.attrString("endPointNoListen")), 00051 _channels() 00052 { 00053 // Create event channels 00054 for(map<string,PersistNode*>::const_iterator i=node._child.begin(); 00055 i!=node._child.end(); 00056 ++i) 00057 { 00058 EventChannel_i* channel =new EventChannel_i(&_channels); 00059 channel->activate( 00060 i->first.c_str(), // channelName 00061 i->second // node 00062 ); 00063 } 00064 activateObjectWithId("omniEvents"); 00065 } 00066 00067 00068 EventChannelFactory_i::~EventChannelFactory_i() 00069 { 00070 DB(20, "EventChannelFactory_i::~EventChannelFactory_i()"); 00071 } 00072 00073 00074 CORBA::Boolean 00075 EventChannelFactory_i::supports(const CosLifeCycle::Key &k) 00076 { 00077 if((k.length() == 1) && 00078 (strcmp(k[0].id, "EventChannel") == 0) && 00079 (strcmp(k[0].kind, "object interface") == 0)) 00080 return 1; 00081 else 00082 return 0; 00083 } 00084 00085 00086 CORBA::Object_ptr 00087 EventChannelFactory_i::create_object( 00088 const CosLifeCycle::Key& k, 00089 const CosLifeCycle::Criteria& criteria 00090 ) 00091 { 00092 // Check the key 00093 if(!this->supports(k)) 00094 throw CosLifeCycle::NoFactory(k); 00095 00096 // Process criteria !! MAY THROW !! 00097 auto_ptr<PersistNode> criteriaNode( parseCriteria(criteria) ); 00098 00099 CORBA::String_var channelId; 00100 if(criteriaNode->hasAttr("InsName")) 00101 channelId=criteriaNode->attrString("InsName").c_str(); 00102 else 00103 channelId=newUniqueId(); 00104 00105 // Create the channel. 00106 // We place it into an auto_ptr - this will automatically clean up if anything 00107 // goes wrong. 00108 auto_ptr<EventChannel_i> channel( new EventChannel_i(&_channels) ); 00109 try 00110 { 00111 channel->activate(channelId.in(),criteriaNode.get()); // !! MAY THROW !! 00112 } 00113 catch(PortableServer::POA::ObjectAlreadyActive& ex) 00114 { 00115 throw CosLifeCycle::InvalidCriteria(criteria); //?? 00116 } 00117 catch(PortableServer::POA::AdapterAlreadyExists& ex) // create_POA 00118 { 00119 throw CosLifeCycle::InvalidCriteria(criteria); //?? 00120 } 00121 00122 // We release() the pointer, as the thread will delete it when it stops. 00123 return channel.release()->_this(); 00124 } 00125 00126 00127 CosEventChannelAdmin::EventChannel_ptr 00128 EventChannelFactory_i::create_channel(const char* channel_name) 00129 { 00130 CosEventChannelAdmin::EventChannel_var result; 00131 00132 CosLifeCycle::Key key; 00133 key.length(1); 00134 key[0].id ="EventChannel"; 00135 key[0].kind="object interface"; 00136 00137 CosLifeCycle::Criteria criteria; 00138 criteria.length(1); 00139 criteria[0].name = "InsName"; 00140 criteria[0].value <<= channel_name; 00141 00142 try 00143 { 00144 CORBA::Object_var obj=create_object(key,criteria); 00145 result=CosEventChannelAdmin::EventChannel::_narrow(obj.in()); 00146 } 00147 catch(CosLifeCycle::InvalidCriteria& ex) 00148 { 00149 if(ex.invalid_criteria.length()>0 && 00150 STR_MATCH(ex.invalid_criteria[0].name,"InsName")) 00151 { 00152 throw event::NameAlreadyUsed(); 00153 } 00154 else 00155 { 00156 DB(10,"Failed to create_channel." 00157 " Converting InvalidCriteria exception into UNKNOWN.") 00158 throw CORBA::UNKNOWN(); 00159 } 00160 } 00161 catch(CORBA::UserException& ex) 00162 { 00163 DB(2,"Failed to create_channel. Converting UserException" 00164 IFELSE_OMNIORB4(" '"<<ex._name()<<"'",<<) " into UNKNOWN.") 00165 throw CORBA::UNKNOWN(); 00166 } 00167 return result._retn(); 00168 } 00169 00170 00171 CosEventChannelAdmin::EventChannel_ptr 00172 EventChannelFactory_i::join_channel(const char* channel_name) 00173 { 00174 using namespace PortableServer; 00175 CosEventChannelAdmin::EventChannel_var result; 00176 try 00177 { 00178 ObjectId_var oid =PortableServer::string_to_ObjectId(channel_name); 00179 CORBA::Object_var obj =Orb::inst()._omniINSPOA->id_to_reference(oid.in()); 00180 result=CosEventChannelAdmin::EventChannel::_narrow(obj.in()); 00181 } 00182 catch(POA::ObjectNotActive&) 00183 { 00184 DB(10,"Failed to join_channel. Object not active.") 00185 throw event::EventChannelNotFound(); 00186 } 00187 catch(CORBA::UserException& ex) 00188 { 00189 DB(2,"Failed to join_channel. Converting UserException" 00190 IFELSE_OMNIORB4(" '"<<ex._name()<<"'",<<) " into UNKNOWN.") 00191 throw CORBA::UNKNOWN(); 00192 } 00193 return result._retn(); 00194 } 00195 00196 00197 PersistNode* EventChannelFactory_i::parseCriteria( 00198 const CosLifeCycle::Criteria &criteria 00199 ) const 00200 { 00201 using namespace CosLifeCycle; 00202 auto_ptr<PersistNode> result( new PersistNode() ); 00203 00204 for(CORBA::ULong i=0; i<criteria.length(); i++) 00205 { 00206 if(strcmp(criteria[i].name, "PullRetryPeriod_ms") == 0) 00207 { 00208 CORBA::ULong pullRetryPeriod_ms; 00209 if(! (criteria[i].value >>= pullRetryPeriod_ms)) 00210 throw InvalidCriteria(extract("PullRetryPeriod_ms",criteria)); 00211 if(pullRetryPeriod_ms <= 0) 00212 throw CannotMeetCriteria(extract("PullRetryPeriod_ms",criteria)); 00213 result->addattr("PullRetryPeriod_ms",pullRetryPeriod_ms); 00214 } 00215 else if(strcmp(criteria[i].name, "PullRetryPeriod") == 0) 00216 { 00217 // This criterion has been deprecated in favour of PullRetryPeriod_ms. 00218 // Don't overwrite any value provided by the latter. 00219 if(!result->hasAttr("PullRetryPeriod_ms")) 00220 { 00221 CORBA::ULong pullRetryPeriod; 00222 if(! (criteria[i].value >>= pullRetryPeriod)) 00223 throw InvalidCriteria(extract("PullRetryPeriod",criteria)); 00224 if(pullRetryPeriod <= 0) 00225 throw CannotMeetCriteria(extract("PullRetryPeriod",criteria)); 00226 result->addattr("PullRetryPeriod_ms",pullRetryPeriod*1000); 00227 } 00228 } 00229 else if(strcmp(criteria[i].name, "MaxQueueLength") == 0) 00230 { 00231 CORBA::ULong maxQueueLength; 00232 if(! (criteria[i].value >>= maxQueueLength)) 00233 throw InvalidCriteria(extract("MaxQueueLength",criteria)); 00234 if(maxQueueLength > 0) 00235 result->addattr("MaxQueueLength",maxQueueLength); 00236 else 00237 DB(10,"Ignoring CosLifeCycle criterion: MaxQueueLength=0"); 00238 } 00239 else if(strcmp(criteria[i].name, "MaxNumProxies") == 0) 00240 { 00241 CORBA::ULong maxNumProxies; 00242 if(! (criteria[i].value >>= maxNumProxies)) 00243 throw InvalidCriteria(extract("MaxNumProxies",criteria)); 00244 if(maxNumProxies > 0) 00245 result->addattr("MaxNumProxies",maxNumProxies); 00246 else 00247 DB(10,"Ignoring CosLifeCycle criterion: MaxNumProxies=0"); 00248 } 00249 else if(strcmp(criteria[i].name, "CyclePeriod_ns") == 0) 00250 { 00251 CORBA::ULong cyclePeriod_ns; 00252 if(! (criteria[i].value >>= cyclePeriod_ns)) 00253 throw InvalidCriteria(extract("CyclePeriod_ns",criteria)); 00254 if(cyclePeriod_ns > 0) 00255 result->addattr("CyclePeriod_ns",cyclePeriod_ns); 00256 else 00257 DB(10,"Ignoring CosLifeCycle criterion: CyclePeriod_ns=0"); 00258 } 00259 else if(strcmp(criteria[i].name, "InsName") == 0) 00260 { 00261 const char* insName; 00262 if(! (criteria[i].value >>= insName)) 00263 throw InvalidCriteria(extract("InsName",criteria)); 00264 if(insName && insName[0]) 00265 result->addattr(string("InsName=")+insName); 00266 else 00267 DB(10,"Ignoring empty CosLifeCycle criterion: InsName"); 00268 } 00269 else if(strcmp(criteria[i].name, "FilterId") == 0) 00270 { 00271 const char* repositoryId; 00272 if(! (criteria[i].value >>= repositoryId)) 00273 throw InvalidCriteria(extract("FilterId",criteria)); 00274 if(repositoryId && repositoryId[0]) 00275 result->addattr(string("FilterId=")+repositoryId); 00276 else 00277 DB(10,"Ignoring empty CosLifeCycle criterion: FilterId"); 00278 } 00279 else if(strcmp(criteria[i].name, "MaxEventsPerConsumer") == 0) 00280 { 00281 DB(10,"Ignoring obsolete CosLifeCycle criterion: MaxEventsPerConsumer"); 00282 } 00283 else 00284 { 00285 DB(10,"Ignoring unknown CosLifeCycle criterion: "<<criteria[i].name); 00286 } 00287 } // end loop for(i) 00288 00289 return result.release(); 00290 } 00291 00292 00293 CosLifeCycle::Criteria EventChannelFactory_i::extract( 00294 const char* name, 00295 const CosLifeCycle::Criteria& from 00296 ) const 00297 { 00298 CosLifeCycle::Criteria result; 00299 result.length(0); 00300 for(CORBA::ULong i=0; i<from.length(); i++) 00301 { 00302 if(strcmp(from[i].name,name) == 0) 00303 { 00304 result.length(1); 00305 result[0]=from[i]; 00306 break; 00307 } 00308 } 00309 return result; 00310 } 00311 00312 00313 void 00314 EventChannelFactory_i::output(ostream &os) 00315 { 00316 os<<"ecf port="<<_port; 00317 if(!_endPointNoListen.empty()) 00318 os<<" endPointNoListen="<<_endPointNoListen; 00319 os<<" ;;\n"; 00320 _channels.output(os); 00321 } 00322 00323 00324 }; // end namespace OmniEvents