OmniEvents
pullsupp.cc
Go to the documentation of this file.
00001 // -*- Mode: C++; -*-
00002 //                            Package   : omniEvents
00003 //   pullsupp.cc              Created   : 1/4/98
00004 //                            Author    : Paul Nader (pwn)
00005 //
00006 //    Copyright (C) 1998 Paul Nader, 2003-2004 Alex Tingle
00007 //
00008 //    This file is part of the omniEvents application.
00009 //
00010 //    omniEvents is free software; you can redistribute it and/or
00011 //    modify it under the terms of the GNU Lesser General Public
00012 //    License as published by the Free Software Foundation; either
00013 //    version 2.1 of the License, or (at your option) any later version.
00014 //
00015 //    omniEvents is distributed in the hope that it will be useful,
00016 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00017 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018 //    Lesser General Public License for more details.
00019 //
00020 //    You should have received a copy of the GNU Lesser General Public
00021 //    License along with this library; if not, write to the Free Software
00022 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00023 //
00024 // Description:
00025 //    Pull Model supplier implementation.
00026 //      
00027 
00028 /*
00029   $Log: pullsupp.cc,v $
00030   Revision 1.9  2004/10/08 09:06:08  alextingle
00031   More robust exception minor code handling.
00032 
00033   Revision 1.8  2004/08/18 17:49:45  alextingle
00034   Added check for SIGPIPE before trying to use it.
00035 
00036   Revision 1.7  2004/08/06 16:19:23  alextingle
00037   -k & -K options removed.
00038   Naming service names may now be as complex as you like.
00039 
00040   Revision 1.6  2004/04/20 16:52:05  alextingle
00041   All examples updated for latest version on omniEvents. Server may now be
00042   specified as a 'corbaloc' string or IOR, instead of as naming service id/kind.
00043 
00044   Revision 1.5  2004/02/04 22:29:55  alextingle
00045   Reworked all C++ examples.
00046   Removed catch(...) as it tends to make it harder to see what's going on.
00047   Now uses POA instead of BOA.
00048   Uses omniORB4's Exception name probing.
00049   No longer uses 'naming.h/cc' utility code.
00050 
00051   Revision 1.4  2003/11/17 08:47:09  alextingle
00052   Corrected typo.
00053 
00054   Revision 1.2  2003/11/16 23:24:22  alex
00055   Corrected typo.
00056 
00057   Revision 1.1.1.1  2003/11/11 22:28:56  alex
00058   Import of testing 2.3.0
00059 
00060   Revision 1.3  2003/11/03 22:20:26  alextingle
00061   Removed all platform specific switches. Now uses autoconf, config.h.
00062   Removed stub header in order to allow makefile dependency checking to work
00063   correctly.
00064 
00065   Revision 1.1.1.1.2.1  2002/09/28 22:20:51  shamus13
00066   Added ifdefs to enable omniEvents to compile
00067   with both omniORB3 and omniORB4. If __OMNIORB4__
00068   is defined during compilation, omniORB4 headers
00069   and command line option syntax is used, otherwise
00070   fall back to omniORB3 style.
00071 
00072   Revision 1.1.1.1  2002/09/25 19:00:26  shamus13
00073   Import of OmniEvents source tree from release 2.1.1
00074 
00075   Revision 0.11  2000/08/30 04:39:48  naderp
00076   Port to omniORB 3.0.1.
00077 
00078   Revision 0.10  2000/03/16 05:37:27  naderp
00079   Added stdlib.h for getopt.
00080 
00081   Revision 0.9  2000/03/06 13:26:32  naderp
00082   Using util getRootNamingContext function.
00083   Using stub headers.
00084   Fixed error messages.
00085 
00086   Revision 0.8  2000/03/02 03:16:26  naderp
00087   Added retry resiliency for handling COMM_FAUILURE exceptions.
00088   Replaced condition variable by counting semaphore.
00089 
00090   Revision 0.7  1999/11/02 13:39:13  naderp
00091   Added <signal.h>
00092 
00093   Revision 0.6  1999/11/02 07:57:22  naderp
00094   Updated usage.
00095 
00096 Revision 0.5  99/11/01  16:10:12  16:10:12  naderp (Paul Nader)
00097 omniEvents 2.0 Release.
00098 Ignoring SIGPIPE for UNIX platforms.
00099 
00100 Revision 0.4  99/04/23  16:05:44  16:05:44  naderp (Paul Nader)
00101 gcc port.
00102 
00103 Revision 0.3  99/04/23  09:34:01  09:34:01  naderp (Paul Nader)
00104 Windows Port.
00105 
00106 Revision 0.2  99/04/21  18:06:25  18:06:25  naderp (Paul Nader)
00107 *** empty log message ***
00108 
00109 Revision 0.1.1.1  98/11/27  16:59:33  16:59:33  naderp (Paul Nader)
00110 Added -s option to sleep after disconnecting.
00111 
00112 Revision 0.1  98/11/25  14:08:07  14:08:07  naderp (Paul Nader)
00113 Initial Revision
00114 
00115 */
00116 
00117 #ifdef HAVE_CONFIG_H
00118 #  include "config.h"
00119 #endif
00120 
00121 #ifdef HAVE_GETOPT
00122 #  include <unistd.h>
00123 extern char* optarg;
00124 extern int optind;
00125 #else
00126 #  include "getopt.h"
00127 #endif
00128 
00129 #ifdef HAVE_IOSTREAM
00130 #  include <iostream>
00131 #else
00132 #  include <iostream.h>
00133 #endif
00134 
00135 #ifdef HAVE_STD_IOSTREAM
00136 using namespace std;
00137 #endif
00138 
00139 #ifdef HAVE_STDLIB_H
00140 #  include <stdlib.h>
00141 #endif
00142 
00143 #ifdef HAVE_SIGNAL_H
00144 #  include <signal.h>
00145 #endif
00146 
00147 #include <cstdio>
00148 
00149 #include "CosEventComm.hh"
00150 #include "CosEventChannelAdmin.hh"
00151 #include "naming.h"
00152 
00153 static omni_semaphore connect_cond(0);
00154 static void usage(int argc, char **argv);
00155 
00156 class Supplier_i : virtual public POA_CosEventComm::PullSupplier {
00157 public:
00158   Supplier_i (long disconnect = 0) : i(0), _disconnect(disconnect), l(0) {};
00159   CORBA::Any *pull();
00160   CORBA::Any *try_pull(CORBA::Boolean &has_event);
00161   void disconnect_pull_supplier ();
00162 
00163 private:
00164   long i;
00165   long _disconnect;
00166   CORBA::ULong l;
00167 };
00168 
00169 void
00170 Supplier_i::disconnect_pull_supplier () {
00171   cout << "Pull Supplier: disconnected by channel." << endl;
00172 }
00173 
00174 CORBA::Any *
00175 Supplier_i::pull() {
00176    cout << "Pull Supplier: pull() called. Data : ";
00177    CORBA::Any *any = new CORBA::Any();
00178    *any <<= l++;
00179    cout << l-1 << endl;
00180 
00181    // Exercise Disconnect
00182    if ((_disconnect > 0) && (i == _disconnect)) {
00183       i = 0;
00184       // Signal main thread to disconnect and re-connect.
00185       connect_cond.post();
00186    }
00187    i++;
00188    return (any);
00189 }
00190 
00191 CORBA::Any *
00192 Supplier_i::try_pull(CORBA::Boolean &has_event)
00193 {
00194    cout << "Pull Supplier: try_pull() called. Data : ";
00195    CORBA::Any *any = new CORBA::Any();
00196    *any <<= l++;
00197    cout << l-1 << endl;
00198    has_event = 1;
00199 
00200    // Exercise Disconnect
00201    if ((_disconnect > 0) && (i == _disconnect)) {
00202       i = 0;
00203       // Signal main thread to disconnect and re-connect.
00204       connect_cond.post();
00205    }
00206    i++;
00207    return (any);
00208 }
00209 
00210 
00211 int
00212 main (int argc, char** argv)
00213 {
00214 #if defined(HAVE_OMNIORB4)
00215   CORBA::ORB_var orb =CORBA::ORB_init(argc,argv,"omniORB4");
00216 #else
00217   CORBA::ORB_var orb =CORBA::ORB_init(argc,argv,"omniORB3");
00218 #endif
00219 
00220   // Process Options
00221   int         discnum       =0;
00222   int         sleepInterval =0;
00223   const char* channelName   ="EventChannel";
00224 
00225   int c;
00226   while ((c = getopt(argc,argv,"d:s:n:h")) != EOF)
00227   {
00228      switch (c)
00229      {
00230         case 'd': discnum = atoi(optarg);
00231                   break;
00232 
00233         case 's': sleepInterval = atoi(optarg);
00234                   break;
00235 
00236         case 'n': channelName = optarg;
00237                   break;
00238 
00239         case 'h':
00240         default : usage(argc,argv);
00241                   exit(-1);
00242                   break;
00243      }
00244   }
00245 
00246 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
00247   // Ignore broken pipes
00248   signal(SIGPIPE, SIG_IGN);
00249 #endif
00250 
00251   Supplier_i* supplier = new Supplier_i (discnum);
00252   CosEventChannelAdmin::EventChannel_var channel;
00253 
00254   const char* action=""; // Use this variable to help report errors.
00255   try {
00256     CORBA::Object_var obj;
00257 
00258     action="resolve initial reference 'RootPOA'";
00259     obj=orb->resolve_initial_references("RootPOA");
00260     PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
00261     if(CORBA::is_nil(rootPoa))
00262         throw CORBA::OBJECT_NOT_EXIST();
00263 
00264     action="activate the RootPOA's POAManager";
00265     PortableServer::POAManager_var pman =rootPoa->the_POAManager();
00266     pman->activate();
00267 
00268     //
00269     // Obtain object reference to EventChannel
00270     // (from command-line argument or from the Naming Service).
00271     if(optind<argc)
00272     {
00273       action="convert URI from command line into object reference";
00274       obj=orb->string_to_object(argv[optind]);
00275     }
00276     else
00277     {
00278       action="resolve initial reference 'NameService'";
00279       obj=orb->resolve_initial_references("NameService");
00280       CosNaming::NamingContext_var rootContext=
00281         CosNaming::NamingContext::_narrow(obj);
00282       if(CORBA::is_nil(rootContext))
00283           throw CORBA::OBJECT_NOT_EXIST();
00284 
00285       action="find EventChannel in NameService";
00286       cout << action << endl;
00287       obj=rootContext->resolve(str2name(channelName));
00288     }
00289 
00290     action="narrow object reference to event channel";
00291     channel=CosEventChannelAdmin::EventChannel::_narrow(obj);
00292     if(CORBA::is_nil(channel))
00293     {
00294        cerr << "Failed to narrow Event Channel reference." << endl;
00295        exit(1);
00296     }
00297 
00298   }
00299   catch(CORBA::ORB::InvalidName& ex) { // resolve_initial_references
00300      cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
00301      exit(1);
00302   }
00303   catch(CosNaming::NamingContext::InvalidName& ex) { // resolve
00304      cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
00305      exit(1);
00306   }
00307   catch(CosNaming::NamingContext::NotFound& ex) { // resolve
00308      cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
00309      exit(1);
00310   }
00311   catch(CosNaming::NamingContext::CannotProceed& ex) { // resolve
00312      cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
00313      exit(1);
00314   }
00315   catch(CORBA::TRANSIENT& ex) { // _narrow()
00316      cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
00317      exit(1);
00318   }
00319   catch(CORBA::OBJECT_NOT_EXIST& ex) { // _narrow()
00320      cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
00321      exit(1);
00322   }
00323   catch(CORBA::SystemException& ex) {
00324      cerr<<"Failed to "<<action<<".";
00325 #if defined(HAVE_OMNIORB4)
00326      cerr<<" "<<ex._name();
00327      if(ex.NP_minorString())
00328          cerr<<" ("<<ex.NP_minorString()<<")";
00329 #endif
00330      cerr<<endl;
00331      exit(1);
00332   }
00333   catch(CORBA::Exception& ex) {
00334      cerr<<"Failed to "<<action<<"."
00335 #if defined(HAVE_OMNIORB4)
00336        " "<<ex._name()
00337 #endif
00338        <<endl;
00339      exit(1);
00340   }
00341 
00342   //
00343   // Get Supplier Admin interface - retrying on Comms Failure.
00344   CosEventChannelAdmin::SupplierAdmin_var supplier_admin;
00345   while (1)
00346   {
00347      try {
00348         supplier_admin = channel->for_suppliers ();
00349         if (CORBA::is_nil(supplier_admin))
00350         {
00351            cerr << "Event Channel returned nil Supplier Admin!"
00352                 << endl;
00353            exit(1);
00354         }
00355         break;
00356      }
00357      catch (CORBA::COMM_FAILURE& ex) {
00358         cerr << "Caught COMM_FAILURE exception "
00359              << "obtaining Supplier Admin! Retrying..."
00360              << endl;
00361         continue;
00362      }
00363   }
00364   cout << "Obtained SupplierAdmin." << endl;
00365 
00366   while (1)
00367   {
00368      //
00369      // Get proxy consumer - retrying on Comms Failure.
00370      CosEventChannelAdmin::ProxyPullConsumer_var proxy_consumer;
00371      while (1)
00372      {
00373         try {
00374            proxy_consumer = supplier_admin->obtain_pull_consumer ();
00375            if (CORBA::is_nil(proxy_consumer))
00376            {
00377               cerr << "Supplier Admin returned nil proxy_consumer!"
00378                    << endl;
00379               exit(1);
00380            }
00381            break;
00382         }
00383         catch (CORBA::COMM_FAILURE& ex) {
00384            cerr << "Caught COMM_FAILURE exception "
00385                 << "obtaining Proxy Pull Consumer! Retrying..."
00386                 << endl;
00387            continue;
00388         }
00389      }
00390      cout << "Obtained ProxyPullConsumer." << endl;
00391    
00392      // Connect Pull Supplier - retrying on Comms Failure.
00393      CosEventComm::PullSupplier_var supplierRef =supplier->_this();
00394      while (1)
00395      {
00396         try {
00397            proxy_consumer->connect_pull_supplier(supplierRef.in());
00398            break;
00399         }
00400         catch (CORBA::BAD_PARAM& ex) {
00401            cerr<<"Caught BAD_PARAM Exception connecting Pull Supplier!"<<endl;
00402            exit(1);
00403         }
00404         catch (CosEventChannelAdmin::AlreadyConnected& ex) {
00405            cerr << "Pull Supplier already connected!"
00406                 << endl;
00407            break;
00408         }
00409         catch (CORBA::COMM_FAILURE& ex) {
00410            cerr << "Caught COMM_FAILURE exception "
00411                 << "connecting Pull Supplier! Retrying..."
00412                 << endl;
00413            continue;
00414         }
00415      }
00416      cout << "Connected Pull Supplier." << endl;
00417 
00418      // Wait for indication to disconnect before re-connecting.
00419      connect_cond.wait();
00420 
00421      // Disconnect - retrying on Comms Failure.
00422      while (1)
00423      {
00424         try {
00425            proxy_consumer->disconnect_pull_consumer();
00426            break;
00427         }
00428         catch (CORBA::COMM_FAILURE& ex) {
00429            cerr << "Caught COMM_FAILURE exception "
00430                 << "disconnecting Pull Supplier! Retrying..."
00431                 << endl;
00432            continue;
00433         }
00434      }
00435      cout << "Disconnected Pull Supplier." << endl;
00436 
00437      // Yawn.
00438      cout << "Sleeping " << sleepInterval << " seconds." << endl;
00439      omni_thread::sleep(sleepInterval);
00440   }
00441 
00442   // Not Reached
00443   return 0;
00444 }
00445 
00446 static void
00447 usage(int argc, char **argv)
00448 {
00449   cerr<<
00450 "\nCreate a PullSupplier to send events to a channel.\n"
00451 "syntax: "<<(argc?argv[0]:"pullsupp")<<" OPTIONS [CHANNEL_URI]\n"
00452 "\n"
00453 "CHANNEL_URI: The event channel may be specified as a URI.\n"
00454 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
00455 "\n"
00456 "OPTIONS:                                         DEFAULT:\n"
00457 " -d NUM   disconnect after sending NUM events     [0 - never disconnect]\n"
00458 " -s SECS  sleep SECS seconds after disconnecting  [0]\n"
00459 " -n NAME  channel name (if URI is not specified)  [\"EventChannel\"]\n"
00460 " -h       display this help text\n" << endl;
00461 }