|
solarpowerlog trunk
|
00001 /* 00002 * CSharedConnectionMaster.cpp 00003 * 00004 * Created on: Sep 13, 2010 00005 * Author: tobi 00006 */ 00007 00008 #ifdef HAVE_CONFIG_H 00009 #include "config.h" 00010 #include "porting.h" 00011 #endif 00012 00013 #ifdef HAVE_COMMS_SHAREDCONNECTION 00014 00015 #include "configuration/Registry.h" 00016 #include "CSharedConnectionMaster.h" 00017 #include "Connections/factories/IConnectFactory.h" 00018 #include "configuration/Registry.h" 00019 00020 #include <boost/date_time.hpp> 00021 #include "configuration/CConfigHelper.h" 00022 00023 enum 00024 { 00025 CMD_READ, CMD_CHECKTIMEOUTS, 00026 }; 00027 00028 // THOUGHTS: 00029 /* 00030 * Problems to solve: 00031 * TIMEOUTs 00032 * * slaves has to keep track of its timeouts, as the dispatched call to 00033 * the master can timeout before/afterwards 00034 * 00035 * - master keeps track of all timeouts, 00036 * - the slaves add the information required 00037 * for the timeouts to the command, if they are not present. 00038 * - master tracks the timeout horizon for all icommands, and if timeout 00039 * is met before a receive is completed, the receive fails. 00040 * - if the "real connection object" receive times out, it is checked 00041 * if others are not yet timed out, therefore requesting another receive 00042 * with the remaining timeout time. 00043 * 00044 * PROBLEM: Intercommand races. As FIFO for comm is used, it could be that 00045 * a receive command and its resume could be not adjacent in the queue, 00046 * but interrupted with a "send", for example. 00047 * 00048 * SOLUTION: a) Needing a command queue in the master which will timestamp 00049 * the commands received and makes sure that the next commmand is only 00050 * execeuted when all "receives" are indeed completed 00051 * b) as a, but only for receive handling completion... If receiving, all other 00052 * commands are delayed until timeouts occures. 00053 * b) ignore for now... 00054 * 00055 * 00056 * 00057 * CONNECT/DISCONNECT 00058 * * Masters/Slaves might want to disconnect/connect according to their logic, 00059 * but constant disconnection/connection might jeopadize everything. 00060 * 00061 * 00062 * 00063 * 00064 * old: 00065 * - the master talks to the real connection class 00066 * - (the first version) will just do a FIFO in comms, so getting the 00067 * requests from the inverter, adding them to an internal list and 00068 * then propagate them to the comms object, acting as a negotiater. 00069 * - it cannot just add them to the targets comms queue, as receive events 00070 * might impose race conditions so that a answer would be routed to a wrong 00071 * destiation or a message would be concentreated so that a inverter gets its 00072 * own and a foreign one (or in the other sequence) 00073 * Another issue is that there might be inverters that do not need an question 00074 * to answer, but keep talking without request. The Connection class could and 00075 * should not have logic to sort them apart. 00076 * This is solved that all inverters currently waiting for receiption will 00077 * get the answer and as they have the knowledge about the telegramms, they 00078 * easily can sort out the other telegrams. 00079 * 00080 * */ 00081 00082 CSharedConnectionMaster::CSharedConnectionMaster( 00083 const string & configurationname) : 00084 IConnect(configurationname) 00085 { 00086 connection = NULL; 00087 } 00088 00089 CSharedConnectionMaster::~CSharedConnectionMaster() 00090 { 00091 if (connection) 00092 delete connection; 00093 } 00094 00095 void CSharedConnectionMaster::ExecuteCommand(const ICommand *Command) 00096 { 00097 00098 // Command->DumpData(logger); 00099 00100 00101 switch (Command->getCmd()) 00102 { 00103 00104 // A receive command has been issued with a shorter timeout than 00105 // the current read command. 00106 // So it could be that we need to time-out something.... 00107 case CMD_CHECKTIMEOUTS: 00108 { 00109 boost::posix_time::ptime now( 00110 boost::posix_time::microsec_clock::local_time()); 00111 00112 boost::posix_time::ptime timeout; 00113 00114 // check all pending reads for timeout. 00115 list<ICommand*>::iterator it = readcommands.begin(); 00116 while (it != readcommands.end()) { 00117 ICommand *trgt = *it; 00118 it++; // increment to be sure that we can remove the item after evaluation. 00119 00120 try { 00121 timeout = boost::any_cast<boost::posix_time::ptime>( 00122 trgt->findData(ICONNECT_TOKEN_TIMEOUTTIMESTAMP)); 00123 } catch (std::invalid_argument &e) { 00124 LOGDEBUG(logger,"BUG: Required timeout-key not found. " << 00125 __PRETTY_FUNCTION__ << " at " << __LINE__ << " what:" << e.what()); 00126 timeout = now; 00127 } catch (boost::bad_any_cast &e) { 00128 LOGDEBUG(logger,"BUG: Bad boost::any-cast at " << 00129 __PRETTY_FUNCTION__ << ", " << __LINE__ << " what:" 00130 << e.what()); 00131 timeout = now; 00132 } 00133 00134 if (timeout <= now) { 00135 trgt->addData(ICMD_ERRNO, -ETIMEDOUT); 00136 readcommands.remove(trgt); 00137 Registry::GetMainScheduler()->ScheduleWork(trgt); 00138 } 00139 } 00140 00141 // Saftey: Check is list is empty... (Should not happen here anyway) 00142 if (readcommands.empty()) 00143 readtimeout = boost::posix_time::not_a_date_time; 00144 00145 break; 00146 } 00147 00148 // A read command issued to the real connection object has returned. 00149 // Evaluate its status and if successful return the retrieved object 00150 // to all listeners. 00151 // On timeouts check if others are waiting for a longer time and re-issue 00152 // the receive in this case. 00153 // On other errors, inform the callers for error handling. 00154 case CMD_READ: 00155 { 00156 long errorcode; 00157 00158 try { 00159 errorcode = boost::any_cast<int>(Command->findData(ICMD_ERRNO)); 00160 } catch (std::invalid_argument &e) { 00161 LOGDEBUG(logger,"BUG: Required parameter not found." << 00162 __PRETTY_FUNCTION__ << " at " << __LINE__ << " what:" << e.what()); 00163 errorcode = -EIO; 00164 00165 } catch (boost::bad_any_cast &e) { 00166 LOGDEBUG(logger,"BUG: Required parameter cast failed " << 00167 __PRETTY_FUNCTION__ << " at " << __LINE__ << " what:" << e.what()); 00168 errorcode = -EIO; 00169 } 00170 00171 if (errorcode != ETIMEDOUT) { 00172 // Successful read. Distribute received data to all listeners. 00173 list<ICommand*>::iterator it; 00174 for (it = readcommands.begin(); it != readcommands.end(); it++) { 00175 // LOGDEBUG(logger, "Premerge:"); 00176 // (*it)->DumpData(logger); 00177 // LOGDEBUG(logger, "Merge with:"); 00178 // Command->DumpData(logger); 00179 (*it)->mergeData(*Command); 00180 // LOGDEBUG(logger, "Merged"); 00181 // (*it)->DumpData(logger); 00182 Registry::GetMainScheduler()->ScheduleWork(*it); 00183 } 00184 readcommands.clear(); 00185 readtimeout = boost::posix_time::not_a_date_time; 00186 break; 00187 00188 } else /* if (errorcode == ETIMEDOUT) */ { 00189 // timeout. 00190 // notifiy and calculate the maximum waiting time for the next 00191 // receive command. 00192 00193 boost::posix_time::ptime now( 00194 boost::posix_time::microsec_clock::local_time()); 00195 00196 boost::posix_time::ptime timeout; 00197 00198 boost::posix_time::ptime largest_timeout = now; 00199 00200 list<ICommand*>::iterator it = readcommands.begin(); 00201 while (it != readcommands.end()) { 00202 ICommand *trgt = *it; 00203 it++; // increment to be sure that we can remove the item after evaluation. 00204 00205 try { 00206 timeout = boost::any_cast<boost::posix_time::ptime>( 00207 trgt->findData(ICONNECT_TOKEN_TIMEOUTTIMESTAMP)); 00208 } catch (std::invalid_argument &e) { 00209 LOGDEBUG(logger,"BUG: Required timeout-key not found. " << 00210 __PRETTY_FUNCTION__ << " at " << __LINE__ << " what:" << e.what()); 00211 timeout = now; 00212 } catch (boost::bad_any_cast &e) { 00213 LOGDEBUG(logger,"BUG: Bad boost::any-cast at " << 00214 __PRETTY_FUNCTION__ << ", " << __LINE__ << " what:" 00215 << e.what()); 00216 timeout = now; 00217 } 00218 00219 if (largest_timeout < timeout) { 00220 largest_timeout = timeout; 00221 } 00222 00223 if (timeout <= now) { 00224 trgt->addData(ICMD_ERRNO, -ETIMEDOUT); 00225 readcommands.remove(trgt); 00226 Registry::GetMainScheduler()->ScheduleWork(trgt); 00227 } 00228 } 00229 00230 if (readcommands.empty()) { 00231 // If all reads are gone, reset readtimeout. 00232 readtimeout = boost::posix_time::not_a_date_time; 00233 } else { 00234 // Make a copy of the current command to reuse for the next 00235 // receive (the lifetime of the current one ends after this 00236 // function returns.) 00237 // Schedule the largest known timeout, the shorter ones are 00238 // handled in the CHECKTIMEOUTs state. 00239 ICommand *cmd = new ICommand(*Command); 00240 boost::posix_time::time_duration dur = largest_timeout - now; 00241 long remainingtimeout = dur.total_milliseconds(); 00242 cmd->addData(ICONN_TOKEN_TIMEOUT, remainingtimeout); 00243 connection->Receive(cmd); 00244 readtimeout = largest_timeout; 00245 } 00246 00247 } 00248 00249 break; 00250 } 00251 } 00252 00253 } 00254 00255 bool CSharedConnectionMaster::Connect(ICommand *callback) 00256 { 00257 assert(connection); 00258 return connection->Connect(callback); 00259 00260 } 00261 00262 bool CSharedConnectionMaster::Disconnect(ICommand *callback) 00263 { 00264 assert(connection); 00265 return connection->Disconnect(callback); 00266 00267 } 00268 00269 void CSharedConnectionMaster::SetupLogger(const string& parentlogger, 00270 const string &) 00271 { 00272 IConnect::SetupLogger(parentlogger, ""); 00273 00274 if (connection) 00275 connection->SetupLogger(parentlogger, ""); 00276 } 00277 00278 bool CSharedConnectionMaster::Send(const char *tosend, unsigned int len, 00279 ICommand *callback) 00280 { 00281 assert (connection); 00282 return connection->Send(tosend, len, callback); 00283 } 00284 00285 bool CSharedConnectionMaster::Send(const string& tosend, ICommand *callback) 00286 { 00287 assert (connection); 00288 return connection->Send(tosend, callback); 00289 00290 } 00291 00292 bool CSharedConnectionMaster::Receive(ICommand *callback) 00293 { 00294 // Save command for later interpretation. 00295 readcommands.push_back(callback); 00296 00297 boost::posix_time::ptime timestamp( 00298 boost::posix_time::microsec_clock::local_time()); 00299 00300 // Add a timestamp for the timeout handling, if it is not already there 00301 try { 00302 // just check if it is there. 00303 timestamp = boost::any_cast<boost::posix_time::ptime>( 00304 callback->findData(ICONNECT_TOKEN_TIMEOUTTIMESTAMP)); 00305 } catch (...) { 00306 // Apparently not, or the caller did not add an ptime.... 00307 // get timeout from config and calculate the timestamp. 00308 // for now, we can only use our config, as this is the best knowledge 00309 // we have, (otherwise we have a default configuration....) 00310 00311 unsigned long timeout; 00312 00313 try { 00314 timeout = boost::any_cast<long>(callback->findData( 00315 ICONN_TOKEN_TIMEOUT)); 00316 } catch (...) { 00317 CConfigHelper cfg(ConfigurationPath); 00318 cfg.GetConfig("timeout", timeout, SHARED_CONN_MASTER_DEFAULTTIMEOUT); 00319 } 00320 00321 timestamp += boost::posix_time::millisec(timeout); 00322 callback->addData(ICONNECT_TOKEN_TIMEOUTTIMESTAMP, timestamp); 00323 } 00324 00325 // check if the new command is supposed to timeout earlier than 00326 // the current read request. In this case, add an additional worker 00327 // to get notified in time. 00328 if (readtimeout == boost::posix_time::not_a_date_time) { 00329 // not a pending read. 00330 // clone the command, divert its response to this class and the read 00331 // to the real connection class. 00332 // If no read is pending, directly issue a read commmand to the connection, 00333 // but divert result to our class for later distribution. 00334 // for this, we copy-construct a ICOmmand and modify it afterwards. 00335 00336 this->readtimeout = timestamp; 00337 ICommand *cmd = new ICommand(*callback); 00338 callback->addData(ICONNECT_TOKEN_PRV_ORIGINALCOMMAND, callback); 00339 cmd->setTrgt(this); 00340 cmd->setCmd(CMD_READ); 00341 return connection->Receive(cmd); 00342 } 00343 00344 // a receive already pending. Schedule work to monitor its timeout. 00345 ICommand *cmd = new ICommand(CMD_CHECKTIMEOUTS, this); 00346 boost::posix_time::time_duration duration; 00347 duration = timestamp - boost::posix_time::microsec_clock::local_time(); 00348 struct timespec ts; 00349 ts.tv_sec = duration.total_seconds(); 00350 ts.tv_nsec = duration.total_nanoseconds() % 1000000000; 00351 Registry::GetMainScheduler()->ScheduleWork(cmd, ts); 00352 00353 return true; 00354 } 00355 00356 bool CSharedConnectionMaster::CheckConfig(void) 00357 { 00358 00359 // Get real configuration path to extract target comms config. 00360 string commsconfig = this->ConfigurationPath + ".realcomms"; 00361 string s; 00362 CConfigHelper h(commsconfig); 00363 00364 if (! h.GetConfig("comms",s)) { 00365 LOGERROR(logger,"realcomms section: comms missing"); 00366 return false; 00367 } 00368 00369 connection = IConnectFactory::Factory(commsconfig); 00370 00371 if (connection) { 00372 connection->SetupLogger(ConfigurationPath,"realcomms"); 00373 return connection->CheckConfig(); 00374 } else { 00375 LOGERROR(logger,"No connection object for shared master comms."); 00376 return false; 00377 } 00378 } 00379 00380 bool CSharedConnectionMaster::IsConnected(void) 00381 { 00382 assert(connection); 00383 return connection->IsConnected(); 00384 } 00385 00386 #endif