OmniEvents
|
00001 // -*- Mode: C++; -*- 00002 // Package : omniEvents 00003 // pullsupp.cc Created : 1/4/98 00004 // Author : Paul Nader (pwn) 00005 // 00006 // Copyright (C) 1998 Paul Nader, 2003-2004 Alex Tingle 00007 // 00008 // This file is part of the omniEvents application. 00009 // 00010 // omniEvents is free software; you can redistribute it and/or 00011 // modify it under the terms of the GNU Lesser General Public 00012 // License as published by the Free Software Foundation; either 00013 // version 2.1 of the License, or (at your option) any later version. 00014 // 00015 // omniEvents is distributed in the hope that it will be useful, 00016 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00017 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00018 // Lesser General Public License for more details. 00019 // 00020 // You should have received a copy of the GNU Lesser General Public 00021 // License along with this library; if not, write to the Free Software 00022 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00023 // 00024 // Description: 00025 // Pull Model supplier implementation. 00026 // 00027 00028 /* 00029 $Log: pullsupp.cc,v $ 00030 Revision 1.9 2004/10/08 09:06:08 alextingle 00031 More robust exception minor code handling. 00032 00033 Revision 1.8 2004/08/18 17:49:45 alextingle 00034 Added check for SIGPIPE before trying to use it. 00035 00036 Revision 1.7 2004/08/06 16:19:23 alextingle 00037 -k & -K options removed. 00038 Naming service names may now be as complex as you like. 00039 00040 Revision 1.6 2004/04/20 16:52:05 alextingle 00041 All examples updated for latest version on omniEvents. Server may now be 00042 specified as a 'corbaloc' string or IOR, instead of as naming service id/kind. 00043 00044 Revision 1.5 2004/02/04 22:29:55 alextingle 00045 Reworked all C++ examples. 00046 Removed catch(...) as it tends to make it harder to see what's going on. 00047 Now uses POA instead of BOA. 00048 Uses omniORB4's Exception name probing. 00049 No longer uses 'naming.h/cc' utility code. 00050 00051 Revision 1.4 2003/11/17 08:47:09 alextingle 00052 Corrected typo. 00053 00054 Revision 1.2 2003/11/16 23:24:22 alex 00055 Corrected typo. 00056 00057 Revision 1.1.1.1 2003/11/11 22:28:56 alex 00058 Import of testing 2.3.0 00059 00060 Revision 1.3 2003/11/03 22:20:26 alextingle 00061 Removed all platform specific switches. Now uses autoconf, config.h. 00062 Removed stub header in order to allow makefile dependency checking to work 00063 correctly. 00064 00065 Revision 1.1.1.1.2.1 2002/09/28 22:20:51 shamus13 00066 Added ifdefs to enable omniEvents to compile 00067 with both omniORB3 and omniORB4. If __OMNIORB4__ 00068 is defined during compilation, omniORB4 headers 00069 and command line option syntax is used, otherwise 00070 fall back to omniORB3 style. 00071 00072 Revision 1.1.1.1 2002/09/25 19:00:26 shamus13 00073 Import of OmniEvents source tree from release 2.1.1 00074 00075 Revision 0.11 2000/08/30 04:39:48 naderp 00076 Port to omniORB 3.0.1. 00077 00078 Revision 0.10 2000/03/16 05:37:27 naderp 00079 Added stdlib.h for getopt. 00080 00081 Revision 0.9 2000/03/06 13:26:32 naderp 00082 Using util getRootNamingContext function. 00083 Using stub headers. 00084 Fixed error messages. 00085 00086 Revision 0.8 2000/03/02 03:16:26 naderp 00087 Added retry resiliency for handling COMM_FAUILURE exceptions. 00088 Replaced condition variable by counting semaphore. 00089 00090 Revision 0.7 1999/11/02 13:39:13 naderp 00091 Added <signal.h> 00092 00093 Revision 0.6 1999/11/02 07:57:22 naderp 00094 Updated usage. 00095 00096 Revision 0.5 99/11/01 16:10:12 16:10:12 naderp (Paul Nader) 00097 omniEvents 2.0 Release. 00098 Ignoring SIGPIPE for UNIX platforms. 00099 00100 Revision 0.4 99/04/23 16:05:44 16:05:44 naderp (Paul Nader) 00101 gcc port. 00102 00103 Revision 0.3 99/04/23 09:34:01 09:34:01 naderp (Paul Nader) 00104 Windows Port. 00105 00106 Revision 0.2 99/04/21 18:06:25 18:06:25 naderp (Paul Nader) 00107 *** empty log message *** 00108 00109 Revision 0.1.1.1 98/11/27 16:59:33 16:59:33 naderp (Paul Nader) 00110 Added -s option to sleep after disconnecting. 00111 00112 Revision 0.1 98/11/25 14:08:07 14:08:07 naderp (Paul Nader) 00113 Initial Revision 00114 00115 */ 00116 00117 #ifdef HAVE_CONFIG_H 00118 # include "config.h" 00119 #endif 00120 00121 #ifdef HAVE_GETOPT 00122 # include <unistd.h> 00123 extern char* optarg; 00124 extern int optind; 00125 #else 00126 # include "getopt.h" 00127 #endif 00128 00129 #ifdef HAVE_IOSTREAM 00130 # include <iostream> 00131 #else 00132 # include <iostream.h> 00133 #endif 00134 00135 #ifdef HAVE_STD_IOSTREAM 00136 using namespace std; 00137 #endif 00138 00139 #ifdef HAVE_STDLIB_H 00140 # include <stdlib.h> 00141 #endif 00142 00143 #ifdef HAVE_SIGNAL_H 00144 # include <signal.h> 00145 #endif 00146 00147 #include <cstdio> 00148 00149 #include "CosEventComm.hh" 00150 #include "CosEventChannelAdmin.hh" 00151 #include "naming.h" 00152 00153 static omni_semaphore connect_cond(0); 00154 static void usage(int argc, char **argv); 00155 00156 class Supplier_i : virtual public POA_CosEventComm::PullSupplier { 00157 public: 00158 Supplier_i (long disconnect = 0) : i(0), _disconnect(disconnect), l(0) {}; 00159 CORBA::Any *pull(); 00160 CORBA::Any *try_pull(CORBA::Boolean &has_event); 00161 void disconnect_pull_supplier (); 00162 00163 private: 00164 long i; 00165 long _disconnect; 00166 CORBA::ULong l; 00167 }; 00168 00169 void 00170 Supplier_i::disconnect_pull_supplier () { 00171 cout << "Pull Supplier: disconnected by channel." << endl; 00172 } 00173 00174 CORBA::Any * 00175 Supplier_i::pull() { 00176 cout << "Pull Supplier: pull() called. Data : "; 00177 CORBA::Any *any = new CORBA::Any(); 00178 *any <<= l++; 00179 cout << l-1 << endl; 00180 00181 // Exercise Disconnect 00182 if ((_disconnect > 0) && (i == _disconnect)) { 00183 i = 0; 00184 // Signal main thread to disconnect and re-connect. 00185 connect_cond.post(); 00186 } 00187 i++; 00188 return (any); 00189 } 00190 00191 CORBA::Any * 00192 Supplier_i::try_pull(CORBA::Boolean &has_event) 00193 { 00194 cout << "Pull Supplier: try_pull() called. Data : "; 00195 CORBA::Any *any = new CORBA::Any(); 00196 *any <<= l++; 00197 cout << l-1 << endl; 00198 has_event = 1; 00199 00200 // Exercise Disconnect 00201 if ((_disconnect > 0) && (i == _disconnect)) { 00202 i = 0; 00203 // Signal main thread to disconnect and re-connect. 00204 connect_cond.post(); 00205 } 00206 i++; 00207 return (any); 00208 } 00209 00210 00211 int 00212 main (int argc, char** argv) 00213 { 00214 #if defined(HAVE_OMNIORB4) 00215 CORBA::ORB_var orb =CORBA::ORB_init(argc,argv,"omniORB4"); 00216 #else 00217 CORBA::ORB_var orb =CORBA::ORB_init(argc,argv,"omniORB3"); 00218 #endif 00219 00220 // Process Options 00221 int discnum =0; 00222 int sleepInterval =0; 00223 const char* channelName ="EventChannel"; 00224 00225 int c; 00226 while ((c = getopt(argc,argv,"d:s:n:h")) != EOF) 00227 { 00228 switch (c) 00229 { 00230 case 'd': discnum = atoi(optarg); 00231 break; 00232 00233 case 's': sleepInterval = atoi(optarg); 00234 break; 00235 00236 case 'n': channelName = optarg; 00237 break; 00238 00239 case 'h': 00240 default : usage(argc,argv); 00241 exit(-1); 00242 break; 00243 } 00244 } 00245 00246 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE) 00247 // Ignore broken pipes 00248 signal(SIGPIPE, SIG_IGN); 00249 #endif 00250 00251 Supplier_i* supplier = new Supplier_i (discnum); 00252 CosEventChannelAdmin::EventChannel_var channel; 00253 00254 const char* action=""; // Use this variable to help report errors. 00255 try { 00256 CORBA::Object_var obj; 00257 00258 action="resolve initial reference 'RootPOA'"; 00259 obj=orb->resolve_initial_references("RootPOA"); 00260 PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj); 00261 if(CORBA::is_nil(rootPoa)) 00262 throw CORBA::OBJECT_NOT_EXIST(); 00263 00264 action="activate the RootPOA's POAManager"; 00265 PortableServer::POAManager_var pman =rootPoa->the_POAManager(); 00266 pman->activate(); 00267 00268 // 00269 // Obtain object reference to EventChannel 00270 // (from command-line argument or from the Naming Service). 00271 if(optind<argc) 00272 { 00273 action="convert URI from command line into object reference"; 00274 obj=orb->string_to_object(argv[optind]); 00275 } 00276 else 00277 { 00278 action="resolve initial reference 'NameService'"; 00279 obj=orb->resolve_initial_references("NameService"); 00280 CosNaming::NamingContext_var rootContext= 00281 CosNaming::NamingContext::_narrow(obj); 00282 if(CORBA::is_nil(rootContext)) 00283 throw CORBA::OBJECT_NOT_EXIST(); 00284 00285 action="find EventChannel in NameService"; 00286 cout << action << endl; 00287 obj=rootContext->resolve(str2name(channelName)); 00288 } 00289 00290 action="narrow object reference to event channel"; 00291 channel=CosEventChannelAdmin::EventChannel::_narrow(obj); 00292 if(CORBA::is_nil(channel)) 00293 { 00294 cerr << "Failed to narrow Event Channel reference." << endl; 00295 exit(1); 00296 } 00297 00298 } 00299 catch(CORBA::ORB::InvalidName& ex) { // resolve_initial_references 00300 cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl; 00301 exit(1); 00302 } 00303 catch(CosNaming::NamingContext::InvalidName& ex) { // resolve 00304 cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl; 00305 exit(1); 00306 } 00307 catch(CosNaming::NamingContext::NotFound& ex) { // resolve 00308 cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl; 00309 exit(1); 00310 } 00311 catch(CosNaming::NamingContext::CannotProceed& ex) { // resolve 00312 cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl; 00313 exit(1); 00314 } 00315 catch(CORBA::TRANSIENT& ex) { // _narrow() 00316 cerr<<"Failed to "<<action<<". TRANSIENT"<<endl; 00317 exit(1); 00318 } 00319 catch(CORBA::OBJECT_NOT_EXIST& ex) { // _narrow() 00320 cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl; 00321 exit(1); 00322 } 00323 catch(CORBA::SystemException& ex) { 00324 cerr<<"Failed to "<<action<<"."; 00325 #if defined(HAVE_OMNIORB4) 00326 cerr<<" "<<ex._name(); 00327 if(ex.NP_minorString()) 00328 cerr<<" ("<<ex.NP_minorString()<<")"; 00329 #endif 00330 cerr<<endl; 00331 exit(1); 00332 } 00333 catch(CORBA::Exception& ex) { 00334 cerr<<"Failed to "<<action<<"." 00335 #if defined(HAVE_OMNIORB4) 00336 " "<<ex._name() 00337 #endif 00338 <<endl; 00339 exit(1); 00340 } 00341 00342 // 00343 // Get Supplier Admin interface - retrying on Comms Failure. 00344 CosEventChannelAdmin::SupplierAdmin_var supplier_admin; 00345 while (1) 00346 { 00347 try { 00348 supplier_admin = channel->for_suppliers (); 00349 if (CORBA::is_nil(supplier_admin)) 00350 { 00351 cerr << "Event Channel returned nil Supplier Admin!" 00352 << endl; 00353 exit(1); 00354 } 00355 break; 00356 } 00357 catch (CORBA::COMM_FAILURE& ex) { 00358 cerr << "Caught COMM_FAILURE exception " 00359 << "obtaining Supplier Admin! Retrying..." 00360 << endl; 00361 continue; 00362 } 00363 } 00364 cout << "Obtained SupplierAdmin." << endl; 00365 00366 while (1) 00367 { 00368 // 00369 // Get proxy consumer - retrying on Comms Failure. 00370 CosEventChannelAdmin::ProxyPullConsumer_var proxy_consumer; 00371 while (1) 00372 { 00373 try { 00374 proxy_consumer = supplier_admin->obtain_pull_consumer (); 00375 if (CORBA::is_nil(proxy_consumer)) 00376 { 00377 cerr << "Supplier Admin returned nil proxy_consumer!" 00378 << endl; 00379 exit(1); 00380 } 00381 break; 00382 } 00383 catch (CORBA::COMM_FAILURE& ex) { 00384 cerr << "Caught COMM_FAILURE exception " 00385 << "obtaining Proxy Pull Consumer! Retrying..." 00386 << endl; 00387 continue; 00388 } 00389 } 00390 cout << "Obtained ProxyPullConsumer." << endl; 00391 00392 // Connect Pull Supplier - retrying on Comms Failure. 00393 CosEventComm::PullSupplier_var supplierRef =supplier->_this(); 00394 while (1) 00395 { 00396 try { 00397 proxy_consumer->connect_pull_supplier(supplierRef.in()); 00398 break; 00399 } 00400 catch (CORBA::BAD_PARAM& ex) { 00401 cerr<<"Caught BAD_PARAM Exception connecting Pull Supplier!"<<endl; 00402 exit(1); 00403 } 00404 catch (CosEventChannelAdmin::AlreadyConnected& ex) { 00405 cerr << "Pull Supplier already connected!" 00406 << endl; 00407 break; 00408 } 00409 catch (CORBA::COMM_FAILURE& ex) { 00410 cerr << "Caught COMM_FAILURE exception " 00411 << "connecting Pull Supplier! Retrying..." 00412 << endl; 00413 continue; 00414 } 00415 } 00416 cout << "Connected Pull Supplier." << endl; 00417 00418 // Wait for indication to disconnect before re-connecting. 00419 connect_cond.wait(); 00420 00421 // Disconnect - retrying on Comms Failure. 00422 while (1) 00423 { 00424 try { 00425 proxy_consumer->disconnect_pull_consumer(); 00426 break; 00427 } 00428 catch (CORBA::COMM_FAILURE& ex) { 00429 cerr << "Caught COMM_FAILURE exception " 00430 << "disconnecting Pull Supplier! Retrying..." 00431 << endl; 00432 continue; 00433 } 00434 } 00435 cout << "Disconnected Pull Supplier." << endl; 00436 00437 // Yawn. 00438 cout << "Sleeping " << sleepInterval << " seconds." << endl; 00439 omni_thread::sleep(sleepInterval); 00440 } 00441 00442 // Not Reached 00443 return 0; 00444 } 00445 00446 static void 00447 usage(int argc, char **argv) 00448 { 00449 cerr<< 00450 "\nCreate a PullSupplier to send events to a channel.\n" 00451 "syntax: "<<(argc?argv[0]:"pullsupp")<<" OPTIONS [CHANNEL_URI]\n" 00452 "\n" 00453 "CHANNEL_URI: The event channel may be specified as a URI.\n" 00454 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n" 00455 "\n" 00456 "OPTIONS: DEFAULT:\n" 00457 " -d NUM disconnect after sending NUM events [0 - never disconnect]\n" 00458 " -s SECS sleep SECS seconds after disconnecting [0]\n" 00459 " -n NAME channel name (if URI is not specified) [\"EventChannel\"]\n" 00460 " -h display this help text\n" << endl; 00461 }