OmniEvents
|
00001 // Package : omniEvents 00002 // SupplierAdmin.h Created : 2003/12/04 00003 // Author : Alex Tingle 00004 // 00005 // Copyright (C) 2003-2005 Alex Tingle. 00006 // 00007 // This file is part of the omniEvents application. 00008 // 00009 // omniEvents is free software; you can redistribute it and/or 00010 // modify it under the terms of the GNU Lesser General Public 00011 // License as published by the Free Software Foundation; either 00012 // version 2.1 of the License, or (at your option) any later version. 00013 // 00014 // omniEvents is distributed in the hope that it will be useful, 00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00017 // Lesser General Public License for more details. 00018 // 00019 // You should have received a copy of the GNU Lesser General Public 00020 // License along with this library; if not, write to the Free Software 00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00022 // 00023 00024 #include "SupplierAdmin.h" 00025 00026 #include "EventChannel.h" 00027 #include "ProxyPushConsumer.h" 00028 #include "ProxyPullConsumer.h" 00029 #include "Orb.h" 00030 #include "PersistNode.h" 00031 00032 #define MILLION 1000000 00033 #define BILLION 1000000000 00034 00035 namespace OmniEvents { 00036 00037 CosEventChannelAdmin::ProxyPushConsumer_ptr 00038 SupplierAdmin_i::obtain_push_consumer() 00039 { 00040 return _pushConsumer->createObject(); 00041 } 00042 00043 00044 CosEventChannelAdmin::ProxyPullConsumer_ptr 00045 SupplierAdmin_i::obtain_pull_consumer() 00046 { 00047 if(!_pullConsumer) 00048 _pullConsumer=new ProxyPullConsumerManager(_poa,_queue); 00049 return _pullConsumer->createObject(); 00050 } 00051 00052 00053 SupplierAdmin_i::SupplierAdmin_i( 00054 const EventChannel_i& channel, 00055 PortableServer::POA_ptr poa 00056 ) 00057 : Servant(poa), 00058 _channel(channel), 00059 _pushConsumer(NULL), 00060 _pullConsumer(NULL), 00061 _queue(), 00062 _nextPull(0,0) 00063 { 00064 // Initialise _nextPull. Only set it if the cycle period is LESS than the 00065 // pull retry period - otherwise just pull every cycle. 00066 if(_channel.pullRetryPeriod_ms() > (_channel.cyclePeriod_ns()/MILLION)) 00067 { 00068 omni_thread::get_time(&(_nextPull.first),&(_nextPull.second)); 00069 } 00070 00071 // Always create the ProxyPushConsumer_i default servant. This allows 00072 // lazy clients to connect suppliers without having to go through the 00073 // proper procedure - they can make up an appropriate ObjectId, call push() 00074 // and it will just work (TM). 00075 // Note: A SupplierAdmin_i is always created by the EventChannel to allow this 00076 // behaviour. 00077 _pushConsumer=new ProxyPushConsumer_i(_poa,_queue,_channel.consumerAdmin()); 00078 00079 activateObjectWithId("SupplierAdmin"); 00080 } 00081 00082 00083 SupplierAdmin_i::~SupplierAdmin_i() 00084 { 00085 DB(20,"~SupplierAdmin_i()") 00086 if(_pullConsumer) 00087 { 00088 _pullConsumer->_remove_ref(); 00089 _pullConsumer=NULL; 00090 } 00091 if(_pushConsumer) 00092 { 00093 delete _pushConsumer; 00094 _pushConsumer=NULL; 00095 } 00096 for(list<CORBA::Any*>::iterator i=_queue.begin(); i!=_queue.end(); ++i) 00097 delete *i; 00098 } 00099 00100 00101 OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(SupplierAdmin_i) 00102 00103 00104 void SupplierAdmin_i::collect(list<CORBA::Any*>& events) 00105 { 00106 if(_pullConsumer) 00107 { 00108 _pullConsumer->collect(); 00109 if(0==_nextPull.first) 00110 { // No delay between pulls. 00111 _pullConsumer->triggerRequest(); 00112 } 00113 else 00114 { // Only trigger new pull() calls if `pullRetry' ms have passed. 00115 pair<unsigned long,unsigned long> now; 00116 omni_thread::get_time(&(now.first),&(now.second)); 00117 if(now>=_nextPull) 00118 { 00119 _pullConsumer->triggerRequest(); 00120 00121 CORBA::ULong p =_channel.pullRetryPeriod_ms(); 00122 do{ 00123 _nextPull.second += (p%1000)*MILLION; // nsec 00124 _nextPull.first += p/1000 + _nextPull.second/BILLION; // sec 00125 _nextPull.second %= BILLION; // nsec 00126 } while(now>=_nextPull); 00127 } 00128 } 00129 } 00130 _pushConsumer->trigger(); 00131 // Pick up events from both pull & push consumers. 00132 events=_queue; 00133 _queue.clear(); 00134 } 00135 00136 00137 void SupplierAdmin_i::disconnect() 00138 { 00139 if(_pushConsumer) 00140 _pushConsumer->disconnect(); 00141 if(_pullConsumer) 00142 _pullConsumer->disconnect(); 00143 } 00144 00145 00146 void SupplierAdmin_i::reincarnate(const PersistNode& node) 00147 { 00148 // Build Push Consumer proxies 00149 PersistNode* pushcNode =node.child("ProxyPushConsumer"); 00150 if(pushcNode && !pushcNode->_child.empty()) 00151 { 00152 assert(_pushConsumer!=NULL); 00153 _pushConsumer->reincarnate(*pushcNode); 00154 } 00155 00156 // Build Pull Consumer proxies 00157 PersistNode* pullcNode =node.child("ProxyPullConsumer"); 00158 if(pullcNode && !pullcNode->_child.empty()) 00159 { 00160 if(!_pullConsumer) 00161 _pullConsumer=new ProxyPullConsumerManager(_poa,_queue); 00162 _pullConsumer->reincarnate(*pullcNode); 00163 } 00164 } 00165 00166 00167 void SupplierAdmin_i::output(ostream& os) 00168 { 00169 if(_pushConsumer) 00170 _pushConsumer->output(os); 00171 if(_pullConsumer) 00172 _pullConsumer->output(os); 00173 } 00174 00175 00176 }; // end namespace OmniEvents