OmniEvents
pullcons.cc
Go to the documentation of this file.
00001 // -*- Mode: C++; -*-
00002 //                            Package   : omniEvents
00003 // pullcons.cc                Created on: 1/4/98
00004 //                            Author    : Paul Nader (pwn)
00005 //
00006 //    Copyright (C) 1998 Paul Nader, 2003-2004 Alex Tingle
00007 //
00008 //    This file is part of the omniEvents application.
00009 //
00010 //    omniEvents is free software; you can redistribute it and/or
00011 //    modify it under the terms of the GNU Lesser General Public
00012 //    License as published by the Free Software Foundation; either
00013 //    version 2.1 of the License, or (at your option) any later version.
00014 //
00015 //    omniEvents is distributed in the hope that it will be useful,
00016 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00017 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018 //    Lesser General Public License for more details.
00019 //
00020 //    You should have received a copy of the GNU Lesser General Public
00021 //    License along with this library; if not, write to the Free Software
00022 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00023 //
00024 // Description:
00025 //    Pull Model consumer implementation.
00026 //      
00027 
00028 /*
00029   $Log: pullcons.cc,v $
00030   Revision 1.12  2004/10/08 09:06:08  alextingle
00031   More robust exception minor code handling.
00032 
00033   Revision 1.11  2004/08/18 17:49:45  alextingle
00034   Added check for SIGPIPE before trying to use it.
00035 
00036   Revision 1.10  2004/08/06 16:19:23  alextingle
00037   -k & -K options removed.
00038   Naming service names may now be as complex as you like.
00039 
00040   Revision 1.9  2004/04/20 16:52:02  alextingle
00041   All examples updated for latest version on omniEvents. Server may now be
00042   specified as a 'corbaloc' string or IOR, instead of as naming service id/kind.
00043 
00044   Revision 1.8  2004/03/08 18:00:13  alextingle
00045   One too many newlines.
00046 
00047   Revision 1.7  2004/03/08 17:48:25  alextingle
00048   Added newlines to cirrectly format output when exceptions occur.
00049 
00050   Revision 1.6  2004/02/21 19:07:45  alextingle
00051   Corrected servants to use POA instead of BOA.
00052 
00053   Revision 1.5  2004/02/04 22:29:55  alextingle
00054   Reworked all C++ examples.
00055   Removed catch(...) as it tends to make it harder to see what's going on.
00056   Now uses POA instead of BOA.
00057   Uses omniORB4's Exception name probing.
00058   No longer uses 'naming.h/cc' utility code.
00059 
00060   Revision 1.4  2004/01/11 16:59:28  alextingle
00061   Added more helpful exception handling error messages to pullcons.cc.
00062 
00063   Revision 1.3  2003/11/03 22:20:54  alextingle
00064   Removed all platform specific switches. Now uses autoconf, config.h.
00065   Removed stub header in order to allow makefile dependency checking to work
00066   correctly.
00067   Changed int to bool where appropriate.
00068 
00069   Revision 1.1.1.1.2.1  2002/09/28 22:20:51  shamus13
00070   Added ifdefs to enable omniEvents to compile
00071   with both omniORB3 and omniORB4. If __OMNIORB4__
00072   is defined during compilation, omniORB4 headers
00073   and command line option syntax is used, otherwise
00074   fall back to omniORB3 style.
00075 
00076   Revision 1.1.1.1  2002/09/25 19:00:25  shamus13
00077   Import of OmniEvents source tree from release 2.1.1
00078 
00079   Revision 0.11  2000/10/11 01:16:21  naderp
00080   *** empty log message ***
00081 
00082   Revision 0.10  2000/08/30 04:39:48  naderp
00083   Port to omniORB 3.0.1.
00084 
00085   Revision 0.9  2000/03/16 05:37:27  naderp
00086   Added stdlib.h for getopt.
00087 
00088   Revision 0.8  2000/03/06 13:25:44  naderp
00089   Using util getRootNamingContext function.
00090   Using stub headers.
00091   Fixed error messages.
00092 
00093   Revision 0.7  2000/03/02 02:11:27  naderp
00094   Added -r option to connect using nil reference.
00095   Added retry resiliency for handling COMM_FAUILURE exceptions.
00096 
00097   Revision 0.6  1999/11/02 13:38:57  naderp
00098   Added <signal.h>
00099 
00100   Revision 0.5  1999/11/01 15:55:11  naderp
00101   omniEvents 2.0 Release.
00102   Ignoring SIGPIPE for UNIX platforms.
00103 
00104 Revision 0.4  99/04/23  16:05:38  16:05:38  naderp (Paul Nader)
00105 gcc port.
00106 
00107 Revision 0.3  99/04/23  09:33:40  09:33:40  naderp (Paul Nader)
00108 Windows Port.
00109 
00110 Revision 0.2  99/04/21  18:06:25  18:06:25  naderp (Paul Nader)
00111 *** empty log message ***
00112 
00113 Revision 0.1.1.1  98/11/27  16:59:07  16:59:07  naderp (Paul Nader)
00114 Added -s option to sleep after disconnecting.
00115 
00116 Revision 0.1  98/11/25  14:08:04  14:08:04  naderp (Paul Nader)
00117 Initial Revision
00118 
00119 */
00120 
00121 #ifdef HAVE_CONFIG_H
00122 #  include "config.h"
00123 #endif
00124 
00125 #ifdef HAVE_GETOPT
00126 #  include <unistd.h>
00127 extern char* optarg;
00128 extern int optind;
00129 #else
00130 #  include "getopt.h"
00131 #endif
00132 
00133 #ifdef HAVE_IOSTREAM
00134 #  include <iostream>
00135 #else
00136 #  include <iostream.h>
00137 #endif
00138 
00139 #ifdef HAVE_STD_IOSTREAM
00140 using namespace std;
00141 #endif
00142 
00143 #ifdef HAVE_STDLIB_H
00144 #  include <stdlib.h>
00145 #endif
00146 
00147 #ifdef HAVE_SIGNAL_H
00148 #  include <signal.h>
00149 #endif
00150 
00151 #include <cstdio>
00152 
00153 #include "CosEventComm.hh"
00154 #include "CosEventChannelAdmin.hh"
00155 #include "naming.h"
00156 
00157 static void usage(int argc, char **argv);
00158 
00159 class Consumer_i : virtual public POA_CosEventComm::PullConsumer {
00160 public:
00161   Consumer_i () {};
00162   void disconnect_pull_consumer ();
00163 };
00164 
00165 void Consumer_i::disconnect_pull_consumer () {
00166   cout << "Pull Consumer: disconnected." << endl;
00167 }
00168 
00169 int
00170 main(int argc, char **argv)
00171 {
00172   //
00173   // Start orb.
00174   CORBA::ORB_ptr orb = CORBA::ORB_init(argc,argv);
00175 
00176   // Process Options
00177   bool        trymode       =false;
00178   int         discnum       =0;
00179   bool        refnil        =false;
00180   int         sleepInterval =0;
00181   const char* channelName   ="EventChannel";
00182 
00183   int c;
00184   while ((c = getopt(argc,argv,"td:rs:n:h")) != EOF)
00185   {
00186      switch (c)
00187      {
00188         case 't': trymode = true;
00189                   break;
00190 
00191         case 'd': discnum = atoi(optarg);
00192                   break;
00193 
00194         case 'r': refnil = true;
00195                   break;
00196 
00197         case 's': sleepInterval = atoi(optarg);
00198                   break;
00199 
00200         case 'n': channelName = optarg;
00201                   break;
00202 
00203         case 'h':
00204         default : usage(argc,argv);
00205                   exit(-1);
00206                   break;
00207      }
00208   }
00209 
00210 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
00211   // Ignore broken pipes
00212   signal(SIGPIPE, SIG_IGN);
00213 #endif
00214 
00215   Consumer_i* consumer =NULL;
00216   CosEventChannelAdmin::EventChannel_var channel;
00217 
00218   const char* action=""; // Use this variable to help report errors.
00219   try {
00220     CORBA::Object_var obj;
00221 
00222     // A Pull Consumer can be implemented as a pure client or as a mixed
00223     // client-server process, depending on whether it requires and is
00224     // prepared to service disconnect requests from the channel.
00225     // If it is, then create the servant object and activate the POA.
00226     if (! refnil)
00227     {
00228        consumer=new Consumer_i();
00229 
00230        action="resolve initial reference 'RootPOA'";
00231        obj=orb->resolve_initial_references("RootPOA");
00232        PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
00233        if(CORBA::is_nil(rootPoa))
00234            throw CORBA::OBJECT_NOT_EXIST();
00235 
00236        action="activate the RootPOA's POAManager";
00237        PortableServer::POAManager_var pman =rootPoa->the_POAManager();
00238        pman->activate();
00239     }
00240    
00241     //
00242     // Obtain object reference to EventChannel
00243     // (from command-line argument or from the Naming Service).
00244     if(optind<argc)
00245     {
00246       action="convert URI from command line into object reference";
00247       obj=orb->string_to_object(argv[optind]);
00248     }
00249     else
00250     {
00251       action="resolve initial reference 'NameService'";
00252       obj=orb->resolve_initial_references("NameService");
00253       CosNaming::NamingContext_var rootContext=
00254         CosNaming::NamingContext::_narrow(obj);
00255       if(CORBA::is_nil(rootContext))
00256           throw CORBA::OBJECT_NOT_EXIST();
00257 
00258       action="find EventChannel in NameService";
00259       cout << action << endl;
00260       obj=rootContext->resolve(str2name(channelName));
00261     }
00262 
00263     action="narrow object reference to event channel";
00264     channel=CosEventChannelAdmin::EventChannel::_narrow(obj);
00265     if(CORBA::is_nil(channel))
00266     {
00267        cerr << "Failed to narrow Event Channel reference." << endl;
00268        exit(1);
00269     }
00270 
00271   }
00272   catch(CORBA::ORB::InvalidName& ex) { // resolve_initial_references
00273      cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
00274      exit(1);
00275   }
00276   catch(CosNaming::NamingContext::InvalidName& ex) { // resolve
00277      cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
00278      exit(1);
00279   }
00280   catch(CosNaming::NamingContext::NotFound& ex) { // resolve
00281      cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
00282      exit(1);
00283   }
00284   catch(CosNaming::NamingContext::CannotProceed& ex) { // resolve
00285      cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
00286      exit(1);
00287   }
00288   catch(CORBA::TRANSIENT& ex) { // _narrow()
00289      cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
00290      exit(1);
00291   }
00292   catch(CORBA::OBJECT_NOT_EXIST& ex) { // _narrow()
00293      cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
00294      exit(1);
00295   }
00296   catch(CORBA::SystemException& ex) {
00297      cerr<<"Failed to "<<action<<".";
00298 #if defined(HAVE_OMNIORB4)
00299      cerr<<" "<<ex._name();
00300      if(ex.NP_minorString())
00301          cerr<<" ("<<ex.NP_minorString()<<")";
00302 #endif
00303      cerr<<endl;
00304      exit(1);
00305   }
00306   catch(CORBA::Exception& ex) {
00307      cerr<<"Failed to "<<action<<"."
00308 #if defined(HAVE_OMNIORB4)
00309        " "<<ex._name()
00310 #endif
00311        <<endl;
00312      exit(1);
00313   }
00314 
00315   //
00316   // Get Consumer admin interface - retrying on Comms Failure.
00317   CosEventChannelAdmin::ConsumerAdmin_var consumer_admin;
00318   while (1)
00319   {
00320      try {
00321         consumer_admin = channel->for_consumers ();
00322         if (CORBA::is_nil (consumer_admin))
00323         {
00324            cerr << "Event Channel returned nil Consumer Admin!" << endl;
00325            exit (1);
00326         }
00327         break;
00328      }
00329      catch (CORBA::COMM_FAILURE& ex) {
00330         cerr << "Caught COMM_FAILURE exception "
00331              << "obtaining Consumer Admin! Retrying..."
00332              << endl;
00333         continue;
00334      }
00335   }
00336   cout << "Obtained Consumer Admin." << endl;
00337 
00338   while (1)
00339   {
00340      //
00341      // Get proxy supplier - retrying on Comms Failure.
00342      CosEventChannelAdmin::ProxyPullSupplier_var proxy_supplier;
00343      while (1)
00344      {
00345         try {
00346            proxy_supplier = consumer_admin->obtain_pull_supplier ();
00347            if (CORBA::is_nil (proxy_supplier))
00348            {
00349               cerr << "Consumer Admin returned nil Proxy Supplier!" << endl;
00350               exit (1);
00351            }
00352            break;
00353         }
00354         catch (CORBA::COMM_FAILURE& ex) {
00355            cerr << "Caught COMM_FAILURE Exception "
00356                 << "obtaining Pull Supplier! Retrying..."
00357                 << endl;
00358            continue;
00359         }
00360      }
00361      cout << "Obtained ProxyPullSupplier." << endl;
00362    
00363      //
00364      // Connect Pull Consumer - retrying on Comms Failure.
00365      CosEventComm::PullConsumer_ptr cptr =CosEventComm::PullConsumer::_nil();
00366      if (! refnil) {
00367         cptr=consumer->_this();
00368      }
00369 
00370      while (1)
00371      {
00372         try {
00373            proxy_supplier->connect_pull_consumer(cptr);
00374            break;
00375         }
00376         catch (CORBA::BAD_PARAM& ex) {
00377            cerr << "Caught BAD_PARAM exception connecting Pull Consumer!"<<endl;
00378            exit (1);
00379         }
00380         catch (CosEventChannelAdmin::AlreadyConnected& ex) {
00381            cerr << "Proxy Pull Supplier already connected!" 
00382                 << endl;
00383            break;
00384         }
00385         catch (CORBA::COMM_FAILURE& ex) {
00386            cerr << "Caught COMM_FAILURE Exception "
00387                 << "connecting Pull Consumer! Retrying..." 
00388                 << endl;
00389            continue;
00390         }
00391      }
00392      cout << "Connected Pull Consumer." << endl;
00393 
00394      // Pull data.
00395      CORBA::Any *data;
00396      CORBA::ULong l = 0;
00397      for (int i=0; (discnum == 0) || (i < discnum); i++)
00398      {
00399         if(trymode)
00400         {
00401            try {
00402                CORBA::Boolean has_event;
00403                data = proxy_supplier->try_pull(has_event);
00404                cout << "Consumer: try_pull() called. Data : " << flush;
00405                if (has_event)
00406                {
00407                   l = 0;
00408                   *data >>= l;
00409                   cout << l << endl;
00410                   delete data;
00411                }
00412                else
00413                {
00414                   cout << "None" << endl;
00415                }
00416            }
00417            catch (CosEventComm::Disconnected& ex) {
00418               cout << endl;
00419               cerr << "Failed. Caught Disconnected Exception !" << endl;
00420            }
00421            catch (CORBA::COMM_FAILURE& ex) {
00422               cout << endl;
00423               cerr << "Failed. Caught COMM_FAILURE Exception !" << endl;
00424            }
00425            catch (CORBA::Exception& ex) {
00426               cout << endl;
00427               cerr<<"CORBA exception, unable to try_pull()"
00428 #ifdef HAVE_OMNIORB4
00429                   <<": "<<ex._name()
00430 #endif
00431                   << endl;
00432            }
00433         }
00434         else
00435         {
00436            try {
00437                cout << "Pull Consumer: pull() called. ";
00438                cout.flush();
00439                data = proxy_supplier->pull();
00440                l = 0;
00441                *data >>= l;
00442                cout << "Data : " << l << endl;
00443                delete data;
00444            }
00445            catch(CORBA::TRANSIENT&) {
00446               cout << "caught TRANSIENT." << endl;
00447               omni_thread::sleep(1);
00448            }
00449            catch (CosEventComm::Disconnected& ex) {
00450               cout << endl;
00451               cerr << "Failed. Caught Disconnected exception!" << endl;
00452               exit(1);
00453            }
00454            catch (CORBA::COMM_FAILURE& ex) {
00455               cout << endl;
00456               cerr << "Failed. Caught COMM_FAILURE exception!" << endl;
00457               exit(1);
00458            }
00459            catch (CORBA::SystemException& ex) {
00460               cout << endl;
00461               cerr<<"System exception, unable to pull()";
00462 #ifdef HAVE_OMNIORB4
00463               cerr<<": "<<ex._name();
00464               if(ex.NP_minorString())
00465                   cerr<<" ("<<ex.NP_minorString()<<")";
00466 #endif
00467               cerr<< endl;
00468               exit(1);
00469            }
00470            catch (CORBA::Exception& ex) {
00471               cout << endl;
00472               cerr<<"CORBA exception, unable to pull()"
00473 #ifdef HAVE_OMNIORB4
00474                   <<": "<<ex._name()
00475 #endif
00476                   << endl;
00477               exit(1);
00478            }
00479         }
00480      }
00481 
00482      // Disconnect - retrying on Comms Failure.
00483      while (1)
00484      {
00485         try {
00486            proxy_supplier->disconnect_pull_supplier();
00487            break;
00488         }
00489         catch (CORBA::COMM_FAILURE& ex) {
00490            cerr << "Caught COMM_FAILURE exception "
00491                 << "disconnecting Pull Consumer! Retrying..." 
00492                 << endl;
00493            continue;
00494         }
00495      }
00496      cout << "Disconnected Pull Consumer." << endl;
00497 
00498      // Yawn
00499      cout << "Sleeping " << sleepInterval << " seconds." << endl;
00500      omni_thread::sleep(sleepInterval);
00501   }
00502 
00503   // Not Reached
00504   return 0;
00505 }
00506 
00507 static void
00508 usage(int argc, char **argv)
00509 {
00510   cerr<<
00511 "\nCreate a PullConsumer to receive events from a channel.\n"
00512 "syntax: "<<(argc?argv[0]:"pullcons")<<" OPTIONS [CHANNEL_URI]\n"
00513 "\n"
00514 "CHANNEL_URI: The event channel may be specified as a URI.\n"
00515 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
00516 "\n"
00517 "OPTIONS:                                         DEFAULT:\n"
00518 " -t       enable try_pull mode\n"
00519 " -r       connect using a nil reference\n"
00520 " -d NUM   disconnect after receiving NUM events   [0 - never disconnect]\n"
00521 " -s SECS  sleep SECS seconds after disconnecting  [0]\n"
00522 " -n NAME  channel name (if URI is not specified)  [\"EventChannel\"]\n"
00523 " -h       display this help text\n" << endl;
00524 }