OmniEvents
pushcons.cc
Go to the documentation of this file.
00001 // -*- Mode: C++; -*-
00002 //                            Package   : omniEvents
00003 //   pushcons.cc              Created   : 1/4/98
00004 //                            Author    : Paul Nader (pwn)
00005 //
00006 //    Copyright (C) 1998 Paul Nader, 2003-2004 Alex Tingle
00007 //
00008 //    This file is part of the omniEvents application.
00009 //
00010 //    omniEvents is free software; you can redistribute it and/or
00011 //    modify it under the terms of the GNU Lesser General Public
00012 //    License as published by the Free Software Foundation; either
00013 //    version 2.1 of the License, or (at your option) any later version.
00014 //
00015 //    omniEvents is distributed in the hope that it will be useful,
00016 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00017 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018 //    Lesser General Public License for more details.
00019 //
00020 //    You should have received a copy of the GNU Lesser General Public
00021 //    License along with this library; if not, write to the Free Software
00022 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00023 //
00024 // Description:
00025 //    Push Model consumer implementation
00026 //
00027 
00028 /*
00029   $Log: pushcons.cc,v $
00030   Revision 1.12.2.1  2005/06/16 09:39:49  alextingle
00031   Fixed theoretical race caused by sloppy use of condition variable.
00032 
00033   Revision 1.12  2004/10/08 09:06:08  alextingle
00034   More robust exception minor code handling.
00035 
00036   Revision 1.11  2004/08/18 17:49:45  alextingle
00037   Added check for SIGPIPE before trying to use it.
00038 
00039   Revision 1.10  2004/08/06 16:19:23  alextingle
00040   -k & -K options removed.
00041   Naming service names may now be as complex as you like.
00042 
00043   Revision 1.9  2004/04/30 17:54:47  alextingle
00044   Corrected handling of CORBA::Any.
00045 
00046   Revision 1.8  2004/04/20 16:52:17  alextingle
00047   All examples updated for latest version on omniEvents. Server may now be
00048   specified as a 'corbaloc' string or IOR, instead of as naming service id/kind.
00049 
00050   Revision 1.7  2004/04/01 22:28:36  alextingle
00051   Corrected usage message.
00052 
00053   Revision 1.6  2004/03/23 19:09:26  alextingle
00054   Fixed typos.
00055 
00056   Revision 1.5  2004/02/21 19:07:45  alextingle
00057   Corrected servants to use POA instead of BOA.
00058 
00059   Revision 1.4  2004/02/04 22:29:55  alextingle
00060   Reworked all C++ examples.
00061   Removed catch(...) as it tends to make it harder to see what's going on.
00062   Now uses POA instead of BOA.
00063   Uses omniORB4's Exception name probing.
00064   No longer uses 'naming.h/cc' utility code.
00065 
00066   Revision 1.3  2003/11/03 22:19:56  alextingle
00067   Removed all platform specific switches. Now uses autoconf, config.h.
00068   Removed stub header in order to allow makefile dependency checking to work
00069   correctly.
00070   Corrected usage of omni_condition/omni_mutex. Mutexes are now always unlocked by
00071   the same thread that locked them.
00072 
00073   Revision 1.1.1.1.2.1  2002/09/28 22:20:51  shamus13
00074   Added ifdefs to enable omniEvents to compile
00075   with both omniORB3 and omniORB4. If __OMNIORB4__
00076   is defined during compilation, omniORB4 headers
00077   and command line option syntax is used, otherwise
00078   fall back to omniORB3 style.
00079 
00080   Revision 1.1.1.1  2002/09/25 19:00:26  shamus13
00081   Import of OmniEvents source tree from release 2.1.1
00082 
00083   Revision 0.13  2000/08/30 04:39:48  naderp
00084   Port to omniORB 3.0.1.
00085 
00086   Revision 0.12  2000/03/16 05:37:27  naderp
00087   Added stdlib.h for getopt.
00088 
00089   Revision 0.11  2000/03/06 13:27:02  naderp
00090   Using util getRootNamingContext function.
00091   Using stub headers.
00092   Fixed error messages.
00093 
00094   Revision 0.10  2000/03/02 03:20:24  naderp
00095   Added retry resiliency for handling COMM_FAUILURE exceptions.
00096 
00097   Revision 0.9  1999/11/02 13:39:15  naderp
00098   Added <signal.h>
00099 
00100   Revision 0.8  1999/11/02 07:57:04  naderp
00101   Updated usage.
00102 
00103 Revision 0.7  99/11/01  18:10:29  18:10:29  naderp (Paul Nader)
00104 Added ahndling of COMM_FAILURE exception for connect_push_consumer.
00105 
00106 Revision 0.6  99/11/01  16:11:03  16:11:03  naderp (Paul Nader)
00107 omniEvents 2.0 Release.
00108 
00109 Revision 0.5  99/10/27  19:46:01  19:46:01  naderp (Paul Nader)
00110 Ignoring Unix SIGPIPE signal.
00111 Catching COMM_FAILURE exception for obtain_push_supplier.
00112 Continuing if it fails to obtain Proxy Supplier.
00113 Try/Catch block for disconnect_push_supplier.
00114 
00115 Revision 0.4  99/04/23  16:05:46  16:05:46  naderp (Paul Nader)
00116 gcc port.
00117 
00118 Revision 0.3  99/04/23  09:34:03  09:34:03  naderp (Paul Nader)
00119 Windows Port.
00120 
00121 Revision 0.2  99/04/21  18:06:26  18:06:26  naderp (Paul Nader)
00122 *** empty log message ***
00123 
00124 Revision 0.1.1.1  98/11/27  16:59:37  16:59:37  naderp (Paul Nader)
00125 Added -s option to sleep after disconnecting.
00126 
00127 Revision 0.1  98/11/25  14:08:21  14:08:21  naderp (Paul Nader)
00128 Initial Revision
00129 
00130 */
00131 
00132 #ifdef HAVE_CONFIG_H
00133 #  include "config.h"
00134 #endif
00135 
00136 #ifdef HAVE_GETOPT
00137 #  include <unistd.h>
00138 extern char* optarg;
00139 extern int optind;
00140 #else
00141 #  include "getopt.h"
00142 #endif
00143 
00144 #ifdef HAVE_IOSTREAM
00145 #  include <iostream>
00146 #else
00147 #  include <iostream.h>
00148 #endif
00149 
00150 #ifdef HAVE_STD_IOSTREAM
00151 using namespace std;
00152 #endif
00153 
00154 #ifdef HAVE_STDLIB_H
00155 #  include <stdlib.h>
00156 #endif
00157 
00158 #ifdef HAVE_SIGNAL_H
00159 #  include <signal.h>
00160 #endif
00161 
00162 #include <cstdio>
00163 
00164 #include "CosEventComm.hh"
00165 #include "CosEventChannelAdmin.hh"
00166 #include "naming.h"
00167 
00168 static omni_mutex mutex;
00169 static omni_condition connect_cond(&mutex);
00170 static void usage(int argc, char **argv);
00171 
00172 class Consumer_i : virtual public POA_CosEventComm::PushConsumer {
00173 public:
00174   Consumer_i(long disconnect=0): _disconnect(disconnect) {}
00175 
00176   void push(const CORBA::Any& data);
00177   void disconnect_push_consumer ();
00178 
00179 private:
00180   long _disconnect;
00181 };
00182 
00183 void Consumer_i::push(const CORBA::Any& data) {
00184   CORBA::ULong l;
00185   static int i = 0;
00186 
00187   i++;
00188   if( data>>=l )
00189   {
00190     cout<<"Push Consumer: push() called. Data : "<< l <<endl;
00191 
00192     // Exercise Disconnect
00193     if (i == _disconnect)
00194     {
00195        i = 0;
00196        // NOTE : The proxy_supplier object is disposed at the server
00197        //        during the disconnect_push_supplier call. Do NOT
00198        //        use the proxy_supplier reference after disconnecting.
00199 
00200        // Signal main thread to disconnect and re-connect.
00201        omni_mutex_lock condition_lock(mutex); // ensure main thread in wait()
00202        connect_cond.signal();
00203     }
00204   }
00205   else
00206   {
00207     cerr<<"Push Consumer: push() called. UNEXPECTED TYPE"<<endl;
00208   }
00209 }
00210 
00211 void Consumer_i::disconnect_push_consumer () {
00212   cout << "Push Consumer: disconnected." << endl;
00213 }
00214 
00215 int
00216 main(int argc, char **argv)
00217 {
00218   //
00219   // Start orb.
00220   CORBA::ORB_ptr orb = CORBA::ORB_init(argc,argv);
00221 
00222   // Process Options
00223   int         discnum       =0;
00224   int         sleepInterval =0;
00225   const char* channelName   ="EventChannel";
00226 
00227   int c;
00228   while ((c = getopt(argc,argv,"hd:s:n:")) != EOF)
00229   {
00230      switch (c)
00231      {
00232         case 'd': discnum = atoi(optarg);
00233                   break;
00234 
00235         case 's': sleepInterval = atoi(optarg);
00236                   break;
00237 
00238         case 'n': channelName = optarg;
00239                   break;
00240 
00241         case 'h': usage(argc,argv);
00242                   exit(0);
00243         default : usage(argc,argv);
00244                   exit(-1);
00245      }
00246   }
00247 
00248 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
00249   // Ignore broken pipes
00250   signal(SIGPIPE, SIG_IGN);
00251 #endif
00252 
00253   Consumer_i* consumer = new Consumer_i (discnum);
00254   CosEventChannelAdmin::EventChannel_var channel;
00255 
00256   const char* action=""; // Use this variable to help report errors.
00257   try {
00258     CORBA::Object_var obj;
00259 
00260     action="resolve initial reference 'RootPOA'";
00261     obj=orb->resolve_initial_references("RootPOA");
00262     PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
00263     if(CORBA::is_nil(rootPoa))
00264         throw CORBA::OBJECT_NOT_EXIST();
00265 
00266     action="activate the RootPOA's POAManager";
00267     PortableServer::POAManager_var pman =rootPoa->the_POAManager();
00268     pman->activate();
00269 
00270     //
00271     // Obtain object reference to EventChannel
00272     // (from command-line argument or from the Naming Service).
00273     if(optind<argc)
00274     {
00275       action="convert URI from command line into object reference";
00276       obj=orb->string_to_object(argv[optind]);
00277     }
00278     else
00279     {
00280       action="resolve initial reference 'NameService'";
00281       obj=orb->resolve_initial_references("NameService");
00282       CosNaming::NamingContext_var rootContext=
00283         CosNaming::NamingContext::_narrow(obj);
00284       if(CORBA::is_nil(rootContext))
00285           throw CORBA::OBJECT_NOT_EXIST();
00286 
00287       action="find EventChannel in NameService";
00288       cout << action << endl;
00289       obj=rootContext->resolve(str2name(channelName));
00290     }
00291 
00292     action="narrow object reference to event channel";
00293     channel=CosEventChannelAdmin::EventChannel::_narrow(obj);
00294     if(CORBA::is_nil(channel))
00295     {
00296        cerr << "Failed to narrow Event Channel reference." << endl;
00297        exit(1);
00298     }
00299 
00300   }
00301   catch(CORBA::ORB::InvalidName& ex) { // resolve_initial_references
00302      cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
00303      exit(1);
00304   }
00305   catch(CosNaming::NamingContext::InvalidName& ex) { // resolve
00306      cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
00307      exit(1);
00308   }
00309   catch(CosNaming::NamingContext::NotFound& ex) { // resolve
00310      cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
00311      exit(1);
00312   }
00313   catch(CosNaming::NamingContext::CannotProceed& ex) { // resolve
00314      cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
00315      exit(1);
00316   }
00317   catch(CORBA::TRANSIENT& ex) { // _narrow()
00318      cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
00319      exit(1);
00320   }
00321   catch(CORBA::OBJECT_NOT_EXIST& ex) { // _narrow()
00322      cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
00323      exit(1);
00324   }
00325   catch(CORBA::SystemException& ex) {
00326      cerr<<"Failed to "<<action<<".";
00327 #if defined(HAVE_OMNIORB4)
00328      cerr<<" "<<ex._name();
00329      if(ex.NP_minorString())
00330          cerr<<" ("<<ex.NP_minorString()<<")";
00331 #endif
00332      cerr<<endl;
00333      exit(1);
00334   }
00335   catch(CORBA::Exception& ex) {
00336      cerr<<"Failed to "<<action<<"."
00337 #if defined(HAVE_OMNIORB4)
00338        " "<<ex._name()
00339 #endif
00340        <<endl;
00341      exit(1);
00342   }
00343 
00344   //
00345   // Get Consumer admin interface - retrying on Comms Failure.
00346   CosEventChannelAdmin::ConsumerAdmin_var consumer_admin;
00347   while (1)
00348   {
00349      try {
00350         consumer_admin = channel->for_consumers ();
00351         if (CORBA::is_nil (consumer_admin))
00352         {
00353            cerr << "Event Channel returned nil Consumer Admin!" << endl;
00354            exit(1);
00355         }
00356         break;
00357      }
00358      catch (CORBA::COMM_FAILURE& ex) {
00359         cerr << "Caught COMM_FAILURE exception "
00360              << "obtaining Consumer Admin! Retrying..."
00361              << endl;
00362         continue;
00363      }
00364   }
00365   cout << "Obtained ConsumerAdmin." << endl;
00366 
00367   omni_mutex_lock condition_lock(mutex);
00368   while (1) {
00369      //
00370      // Get proxy supplier - retrying on Comms Failure.
00371      CosEventChannelAdmin::ProxyPushSupplier_var proxy_supplier;
00372      while (1)
00373      {
00374         try {
00375            proxy_supplier = consumer_admin->obtain_push_supplier ();
00376            if (CORBA::is_nil (proxy_supplier))
00377            {
00378               cerr << "Consumer Admin returned nil proxy_supplier!"
00379                    << endl;
00380               exit (1);
00381            }
00382            break;
00383         }
00384         catch (CORBA::COMM_FAILURE& ex) {
00385            cerr << "Caught COMM_FAILURE Exception "
00386                 << "obtaining Push Supplier! Retrying..."
00387                 << endl;
00388            continue;
00389         }
00390      }
00391      cout << "Obtained ProxyPushSupplier." << endl;
00392    
00393      //
00394      // Connect Push Consumer - retrying on Comms Failure.
00395      while (1)
00396      {
00397         try {
00398            proxy_supplier->connect_push_consumer(consumer->_this());
00399            break;
00400         }
00401         catch (CORBA::BAD_PARAM& ex) {
00402            cerr << "Caught BAD_PARAM Exception connecting Push Consumer!"
00403                 << endl;
00404            exit (1);
00405         }
00406         catch (CosEventChannelAdmin::AlreadyConnected& ex) {
00407            cerr << "Proxy Push Supplier already connected!"
00408                 << endl;
00409            break;
00410         }
00411         catch (CORBA::COMM_FAILURE& ex) {
00412            cerr << "Caught COMM_FAILURE exception "
00413                 << "connecting Push Consumer! Retrying..."
00414                 << endl;
00415            continue;
00416         }
00417      }
00418      cout << "Connected Push Consumer." << endl;
00419 
00420      // Wait for indication to disconnect before re-connecting.
00421      connect_cond.wait();
00422 
00423      // Disconnect - retrying on Comms Failure.
00424      while (1)
00425      {
00426         try {
00427            proxy_supplier->disconnect_push_supplier();
00428            break;
00429         }
00430         catch (CORBA::COMM_FAILURE& ex) {
00431            cerr << "Caught COMM_FAILURE Exception "
00432                 << "disconnecting Push Consumer! Retrying..."
00433                 << endl;
00434            continue;
00435         }
00436      }
00437      cout << "Disconnected Push Consumer." << endl;
00438    
00439      // Yawn
00440      cout << "Sleeping " << sleepInterval << " seconds." << endl;
00441      omni_thread::sleep(sleepInterval);
00442   }
00443 
00444   // NEVER GET HERE
00445   return 0;
00446 }
00447 
00448 static void
00449 usage(int argc, char **argv)
00450 {
00451   cerr<<
00452 "\nCreate a PushConsumer to receive events from a channel.\n"
00453 "syntax: "<<(argc?argv[0]:"pushcons")<<" OPTIONS [CHANNEL_URI]\n"
00454 "\n"
00455 "CHANNEL_URI: The event channel may be specified as a URI.\n"
00456 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
00457 "\n"
00458 "OPTIONS:                                         DEFAULT:\n"
00459 " -d NUM   disconnect after receiving NUM events   [0 - never disconnect]\n"
00460 " -s SECS  sleep SECS seconds after disconnecting  [0]\n"
00461 " -n NAME  channel name (if URI is not specified)  [\"EventChannel\"]\n"
00462 " -h       display this help text\n" << endl;
00463 }