OmniEvents
|
00001 // -*- Mode: C++; -*- 00002 // Package : omniEvents 00003 // events.cc Created : 2004/05/02 00004 // Author : Alex Tingle 00005 // 00006 // Copyright (C) 2004 Alex Tingle 00007 // 00008 // This file is part of the omniEvents application. 00009 // 00010 // omniEvents is free software; you can redistribute it and/or 00011 // modify it under the terms of the GNU Lesser General Public 00012 // License as published by the Free Software Foundation; either 00013 // version 2.1 of the License, or (at your option) any later version. 00014 // 00015 // omniEvents is distributed in the hope that it will be useful, 00016 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00017 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00018 // Lesser General Public License for more details. 00019 // 00020 // You should have received a copy of the GNU Lesser General Public 00021 // License along with this library; if not, write to the Free Software 00022 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00023 // 00024 // Description: 00025 // Push Model streamer. 00026 // 00027 00028 #ifdef HAVE_CONFIG_H 00029 # include "config.h" 00030 #endif 00031 00032 #ifdef HAVE_GETOPT 00033 # include <unistd.h> 00034 extern char* optarg; 00035 extern int optind; 00036 #else 00037 # include "getopt.h" 00038 #endif 00039 00040 #ifdef HAVE_IOSTREAM 00041 # include <iostream> 00042 #else 00043 # include <iostream.h> 00044 #endif 00045 00046 #ifdef HAVE_STD_IOSTREAM 00047 using namespace std; 00048 #endif 00049 00050 #ifdef HAVE_STDLIB_H 00051 # include <stdlib.h> 00052 #endif 00053 00054 #include <stdio.h> 00055 00056 #if defined HAVE_UNISTD_H 00057 # include <unistd.h> // read(), write() 00058 #elif defined __WIN32__ 00059 # include <io.h> 00060 # define write(fd,buf,count) _write(fd,buf,count) 00061 # define read(fd,buf,count) _read(fd,buf,count) 00062 # define ssize_t int 00063 #endif 00064 00065 #ifdef HAVE_SIGNAL_H 00066 # include <signal.h> 00067 #endif 00068 00069 #include "CosEventComm.hh" 00070 #include "CosEventChannelAdmin.hh" 00071 #include "naming.h" 00072 00073 #ifndef STDIN_FILENO 00074 # define STDIN_FILENO 0 00075 # define STDOUT_FILENO 1 00076 #endif 00077 00078 CORBA::ORB_ptr orb; 00079 00080 static void usage(int argc, char **argv); 00081 00082 // 00083 // Time 00084 // 00085 00086 #define BILLION 1000000000 00087 00088 class Time; 00089 class Time 00090 { 00091 private: 00092 CORBA::ULong _sec; 00093 CORBA::ULong _nano; 00094 public: 00095 static Time current() 00096 { 00097 Time result; 00098 unsigned long sec,nano; 00099 omni_thread::get_time(&sec,&nano); 00100 result._sec=sec; 00101 result._nano=nano; 00102 return result; 00103 } 00104 static void sleepUntil(const Time& futureTime) 00105 { 00106 Time now =current(); 00107 if(now<futureTime) 00108 { 00109 Time offset=futureTime-now; 00110 omni_thread::sleep(offset._sec,offset._nano); 00111 } 00112 } 00113 // 00114 Time():_sec(0),_nano(0){} 00115 Time(CORBA::ULong sec,CORBA::ULong nano):_sec(sec),_nano(nano){} 00116 Time(const Time& right):_sec(right._sec),_nano(right._nano){} 00117 Time& operator=(const Time& right) 00118 { 00119 if(this!=&right) 00120 { 00121 _sec =right._sec; 00122 _nano=right._nano; 00123 } 00124 return *this; 00125 } 00126 bool operator<(const Time& right) const 00127 { 00128 if(_sec==right._sec) 00129 return _nano<right._nano; 00130 else 00131 return _sec<right._sec; 00132 } 00133 Time& operator+=(const Time& right) 00134 { 00135 _sec +=right._sec; 00136 _nano+=right._nano; 00137 if(_nano>BILLION) 00138 { 00139 _nano=_nano%BILLION; 00140 ++_sec; 00141 } 00142 return *this; 00143 } 00144 Time operator+(const Time& right) const 00145 { 00146 Time result(*this); 00147 result+=right; 00148 return result; 00149 } 00150 Time& operator-=(const Time& right) 00151 { 00152 if(operator<(right)) 00153 { 00154 cerr<<"Negative time!"<<endl; 00155 throw CORBA::BAD_PARAM(); 00156 } 00157 _sec-=right._sec; 00158 if(_nano<right._nano) 00159 { 00160 _nano+=BILLION; 00161 --_sec; 00162 } 00163 _nano-=right._nano; 00164 return *this; 00165 } 00166 Time operator-(const Time& right) const 00167 { 00168 Time result(*this); 00169 result-=right; 00170 return result; 00171 } 00172 void operator>>=(cdrMemoryStream& s) const 00173 { 00174 _sec>>=s; 00175 _nano>>=s; 00176 } 00177 void operator<<=(cdrMemoryStream& s) 00178 { 00179 _sec<<=s; 00180 _nano<<=s; 00181 } 00182 bool is_nil() const { return(_sec==0 && _nano==0); } 00183 }; // end class Time 00184 00185 00186 // 00187 // Consumer_i 00188 // 00189 00190 class Consumer_i : virtual public POA_CosEventComm::PushConsumer 00191 { 00192 public: 00193 Consumer_i(long disconnect=0): _memstream() {} 00194 void push(const CORBA::Any& data) 00195 { 00196 // Record the event timestamp. 00197 Time now=Time::current(); 00198 now>>=_memstream; 00199 // stream event data. 00200 data>>=_memstream; 00201 // Write to file. 00202 write(STDOUT_FILENO,_memstream.bufPtr(),_memstream.bufSize()); 00203 // Reset. 00204 _memstream.rewindPtrs(); 00205 } 00206 void disconnect_push_consumer() 00207 { 00208 cout<<"disconnected"<<endl; 00209 orb->shutdown(0); 00210 } 00211 void consume( 00212 CosEventChannelAdmin::EventChannel_ptr channel, 00213 const char*& action) 00214 { 00215 action="get ConsumerAdmin"; 00216 CosEventChannelAdmin::ConsumerAdmin_var consumer_admin = 00217 channel->for_consumers(); 00218 00219 action="get ProxyPushSupplier"; 00220 CosEventChannelAdmin::ProxyPushSupplier_var proxy_supplier = 00221 consumer_admin->obtain_push_supplier(); 00222 00223 action="connect to ProxyPushSupplier"; 00224 proxy_supplier->connect_push_consumer(_this()); 00225 } 00226 private: 00227 cdrMemoryStream _memstream; 00228 }; 00229 00230 00231 // 00232 // Supplier_i 00233 // 00234 00235 class Supplier_i : virtual public POA_CosEventComm::PushSupplier 00236 { 00237 public: 00238 Supplier_i(): _connected(true) {} 00239 void disconnect_push_supplier() 00240 { 00241 cout<<"disconnected"<<endl; 00242 _connected=false; 00243 } 00244 void supply( 00245 CosEventChannelAdmin::EventChannel_ptr channel, 00246 const char*& action) 00247 { 00248 action="get SupplierAdmin"; 00249 CosEventChannelAdmin::SupplierAdmin_var supplier_admin = 00250 channel->for_suppliers(); 00251 00252 action="get ProxyPushConsumer"; 00253 CosEventChannelAdmin::ProxyPushConsumer_var proxy_consumer = 00254 supplier_admin->obtain_push_consumer(); 00255 00256 action="connect to ProxyPushConsumer"; 00257 proxy_consumer->connect_push_supplier(_this()); 00258 00259 char buf[1024]; 00260 ssize_t len; 00261 action="read standard input"; 00262 // Stream start time (seconds,nanoseconds) 00263 Time offsetTime; 00264 while(_connected && (len=read(STDIN_FILENO,buf,1024))) 00265 { 00266 CORBA::Any any; 00267 cdrMemoryStream memstr; 00268 action="put_octet_array"; 00269 memstr.put_octet_array( (_CORBA_Octet*)buf, (int)len ); 00270 while(_connected && memstr.currentInputPtr()<memstr.bufSize()) 00271 { 00272 action="unmarshal"; 00273 Time eventTime; 00274 eventTime<<=memstr; 00275 any<<=memstr; 00276 00277 if(offsetTime.is_nil()) // first time special. 00278 offsetTime=Time::current()-eventTime; 00279 Time::sleepUntil(eventTime+offsetTime); 00280 00281 action="push"; 00282 proxy_consumer->push(any); 00283 } 00284 } 00285 } 00286 private: 00287 bool _connected; 00288 }; 00289 00290 00291 // 00292 // main() 00293 // 00294 00295 int main(int argc, char **argv) 00296 { 00297 // 00298 // Start orb. 00299 #if defined(HAVE_OMNIORB4) 00300 orb=CORBA::ORB_init(argc,argv,"omniORB4"); 00301 #else 00302 orb=CORBA::ORB_init(argc,argv,"omniORB3"); 00303 #endif 00304 00305 // Process Options 00306 bool supplierMode =false; 00307 const char* channelName ="EventChannel"; 00308 00309 int c; 00310 while ((c = getopt(argc,argv,"shn:")) != EOF) 00311 { 00312 switch (c) 00313 { 00314 case 's': supplierMode=true; 00315 break; 00316 00317 case 'n': channelName = optarg; 00318 break; 00319 00320 case 'h': usage(argc,argv); 00321 exit(0); 00322 default : usage(argc,argv); 00323 exit(-1); 00324 } 00325 } 00326 00327 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE) 00328 // Ignore broken pipes 00329 signal(SIGPIPE, SIG_IGN); 00330 #endif 00331 00332 const char* action=""; // Use this variable to help report errors. 00333 try { 00334 CORBA::Object_var obj; 00335 00336 action="resolve initial reference 'RootPOA'"; 00337 obj=orb->resolve_initial_references("RootPOA"); 00338 PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj); 00339 if(CORBA::is_nil(rootPoa)) 00340 throw CORBA::OBJECT_NOT_EXIST(); 00341 00342 action="activate the RootPOA's POAManager"; 00343 PortableServer::POAManager_var pman =rootPoa->the_POAManager(); 00344 pman->activate(); 00345 00346 // 00347 // Obtain object reference to EventChannel 00348 // (from command-line argument or from the Naming Service). 00349 if(optind<argc) 00350 { 00351 action="convert URI from command line into object reference"; 00352 obj=orb->string_to_object(argv[optind]); 00353 } 00354 else 00355 { 00356 action="resolve initial reference 'NameService'"; 00357 obj=orb->resolve_initial_references("NameService"); 00358 CosNaming::NamingContext_var rootContext= 00359 CosNaming::NamingContext::_narrow(obj); 00360 if(CORBA::is_nil(rootContext)) 00361 throw CORBA::OBJECT_NOT_EXIST(); 00362 00363 action="find EventChannel in NameService"; 00364 cout << action << endl; 00365 obj=rootContext->resolve(str2name(channelName)); 00366 } 00367 00368 action="narrow object reference to event channel"; 00369 CosEventChannelAdmin::EventChannel_var channel = 00370 CosEventChannelAdmin::EventChannel::_narrow(obj); 00371 if(CORBA::is_nil(channel)) 00372 { 00373 cerr << "Failed to narrow Event Channel reference." << endl; 00374 exit(1); 00375 } 00376 00377 if(supplierMode) 00378 { 00379 action="construct PushSupplier"; 00380 Supplier_i* supplier =new Supplier_i(); 00381 supplier->supply(channel,action); 00382 } 00383 else 00384 { 00385 action="construct PushConsumer"; 00386 Consumer_i* consumer =new Consumer_i(); 00387 consumer->consume(channel,action); 00388 00389 action="run ORB"; 00390 orb->run(); 00391 } 00392 00393 return 0; 00394 00395 } 00396 catch(CORBA::ORB::InvalidName& ex) { // resolve_initial_references 00397 cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl; 00398 } 00399 catch(CosNaming::NamingContext::InvalidName& ex) { // resolve 00400 cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl; 00401 } 00402 catch(CosNaming::NamingContext::NotFound& ex) { // resolve 00403 cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl; 00404 } 00405 catch(CosNaming::NamingContext::CannotProceed& ex) { // resolve 00406 cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl; 00407 } 00408 catch(CORBA::TRANSIENT& ex) { // _narrow() 00409 cerr<<"Failed to "<<action<<". TRANSIENT"<<endl; 00410 } 00411 catch(CORBA::OBJECT_NOT_EXIST& ex) { // _narrow() 00412 cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl; 00413 } 00414 catch(CORBA::SystemException& ex) { 00415 cerr<<"Failed to "<<action<<"." 00416 #if defined(HAVE_OMNIORB4) 00417 " "<<ex._name()<<" ("<<ex.NP_minorString()<<")" 00418 #endif 00419 <<endl; 00420 } 00421 catch(CORBA::Exception& ex) { 00422 cerr<<"Failed to "<<action<<"." 00423 #if defined(HAVE_OMNIORB4) 00424 " "<<ex._name() 00425 #endif 00426 <<endl; 00427 } 00428 00429 return 1; 00430 } 00431 00432 static void usage(int argc, char **argv) 00433 { 00434 cerr<< 00435 "\nStream events from a channel to stdout, or (-s) from stdin to a channel.\n" 00436 "syntax: "<<(argc?argv[0]:"events")<<" OPTIONS [CHANNEL_URI]\n" 00437 "\n" 00438 "CHANNEL_URI: The event channel may be specified as a URI.\n" 00439 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n" 00440 "\n" 00441 "OPTIONS: DEFAULT:\n" 00442 " -s supply mode. Read events from stdin.\n" 00443 " -n NAME channel name (if URI is not specified) [\"EventChannel\"]\n" 00444 " -h display this help text\n" << endl; 00445 }