solarpowerlog trunk
/home/tobi/workspace/solarpowerlog/src/Connections/CSharedConnectionMaster.cpp
Go to the documentation of this file.
00001 /*
00002  * CSharedConnectionMaster.cpp
00003  *
00004  *  Created on: Sep 13, 2010
00005  *      Author: tobi
00006  */
00007 
00008 #ifdef HAVE_CONFIG_H
00009 #include "config.h"
00010 #include "porting.h"
00011 #endif
00012 
00013 #ifdef HAVE_COMMS_SHAREDCONNECTION
00014 
00015 #include "configuration/Registry.h"
00016 #include "CSharedConnectionMaster.h"
00017 #include "Connections/factories/IConnectFactory.h"
00018 #include "configuration/Registry.h"
00019 
00020 #include <boost/date_time.hpp>
00021 #include "configuration/CConfigHelper.h"
00022 
00023 enum
00024 {
00025      CMD_READ, CMD_CHECKTIMEOUTS,
00026 };
00027 
00028 // THOUGHTS:
00029 /*
00030  * Problems to solve:
00031  * TIMEOUTs
00032  *    * slaves has to keep track of its timeouts, as the dispatched call to
00033  *      the master can timeout before/afterwards
00034  *
00035  *          - master keeps track of all timeouts,
00036  *          - the slaves add the information required
00037  *            for the timeouts to the command, if they are not present.
00038  *          - master tracks the timeout horizon for all icommands, and if timeout
00039  *            is met before a receive is completed, the receive fails.
00040  *          - if the "real connection object" receive times out, it is checked
00041  *            if others are not yet timed out, therefore requesting another receive
00042  *            with the remaining timeout time.
00043  *
00044  *            PROBLEM: Intercommand races. As FIFO for comm is used, it could be that
00045  *            a receive command and its resume could be not adjacent in the queue,
00046  *            but interrupted with a "send", for example.
00047  *
00048  *            SOLUTION: a) Needing a command queue in the master which will timestamp
00049  *            the commands received and makes sure that the next commmand is only
00050  *            execeuted when all "receives" are indeed completed
00051  *            b) as a, but only for receive handling completion... If receiving, all other
00052  *            commands are delayed until timeouts occures.
00053  *            b) ignore for now...
00054  *
00055  *
00056  *
00057  * CONNECT/DISCONNECT
00058  *    * Masters/Slaves might want to disconnect/connect according to their logic,
00059  *      but constant disconnection/connection might jeopadize everything.
00060  *
00061  *
00062  *
00063  *
00064  * old:
00065  * - the master talks to the real connection class
00066  * - (the first version) will just do a FIFO in comms, so getting the
00067  *   requests from the inverter, adding them to an internal list and
00068  *   then propagate them to the comms object, acting as a negotiater.
00069  * - it cannot just add them to the targets comms queue, as receive events
00070  *   might impose race conditions so that a answer would be routed to a wrong
00071  *   destiation or a message would be concentreated so that a inverter gets its
00072  *   own and a foreign one (or in the other sequence)
00073  *   Another issue is that there might be inverters that do not need an question
00074  *   to answer, but keep talking without request. The Connection class could and
00075  *   should not have logic to sort them apart.
00076  *   This is solved that all inverters currently waiting for receiption will
00077  *   get the answer and as they have the knowledge about the telegramms, they
00078  *   easily can sort out the other telegrams.
00079  *
00080  * */
00081 
00082 CSharedConnectionMaster::CSharedConnectionMaster(
00083           const string & configurationname) :
00084      IConnect(configurationname)
00085 {
00086      connection = NULL;
00087 }
00088 
00089 CSharedConnectionMaster::~CSharedConnectionMaster()
00090 {
00091      if (connection)
00092           delete connection;
00093 }
00094 
00095 void CSharedConnectionMaster::ExecuteCommand(const ICommand *Command)
00096 {
00097 
00098 //   Command->DumpData(logger);
00099 
00100 
00101      switch (Command->getCmd())
00102      {
00103 
00104      // A receive command has been issued with a shorter timeout than
00105      // the current read command.
00106      // So it could be that we need to time-out something....
00107      case CMD_CHECKTIMEOUTS:
00108      {
00109           boost::posix_time::ptime now(
00110                     boost::posix_time::microsec_clock::local_time());
00111 
00112           boost::posix_time::ptime timeout;
00113 
00114           // check all pending reads for timeout.
00115           list<ICommand*>::iterator it = readcommands.begin();
00116           while (it != readcommands.end()) {
00117                ICommand *trgt = *it;
00118                it++; // increment to be sure that we can remove the item after evaluation.
00119 
00120                try {
00121                     timeout = boost::any_cast<boost::posix_time::ptime>(
00122                               trgt->findData(ICONNECT_TOKEN_TIMEOUTTIMESTAMP));
00123                } catch (std::invalid_argument &e) {
00124                     LOGDEBUG(logger,"BUG: Required timeout-key not found. " <<
00125                               __PRETTY_FUNCTION__ << " at " << __LINE__ << " what:" << e.what());
00126                     timeout = now;
00127                } catch (boost::bad_any_cast &e) {
00128                     LOGDEBUG(logger,"BUG: Bad boost::any-cast at " <<
00129                               __PRETTY_FUNCTION__ << ", " << __LINE__ << " what:"
00130                               << e.what());
00131                     timeout = now;
00132                }
00133 
00134                if (timeout <= now) {
00135                     trgt->addData(ICMD_ERRNO, -ETIMEDOUT);
00136                     readcommands.remove(trgt);
00137                     Registry::GetMainScheduler()->ScheduleWork(trgt);
00138                }
00139           }
00140 
00141           // Saftey: Check is list is empty... (Should not happen here anyway)
00142           if (readcommands.empty())
00143                readtimeout = boost::posix_time::not_a_date_time;
00144 
00145           break;
00146      }
00147 
00148           // A read command issued to the real connection object has returned.
00149           // Evaluate its status and if successful return the retrieved object
00150           // to all listeners.
00151           // On timeouts check if others are waiting for a longer time and re-issue
00152           // the receive in this case.
00153           // On other errors, inform the callers for error handling.
00154      case CMD_READ:
00155      {
00156           long errorcode;
00157 
00158           try {
00159                errorcode = boost::any_cast<int>(Command->findData(ICMD_ERRNO));
00160           } catch (std::invalid_argument &e) {
00161                LOGDEBUG(logger,"BUG: Required parameter not found." <<
00162                          __PRETTY_FUNCTION__ << " at " << __LINE__ << " what:" << e.what());
00163                errorcode = -EIO;
00164 
00165           } catch (boost::bad_any_cast &e) {
00166                LOGDEBUG(logger,"BUG: Required parameter cast failed " <<
00167                          __PRETTY_FUNCTION__ << " at " << __LINE__ << " what:" << e.what());
00168                errorcode = -EIO;
00169           }
00170 
00171           if (errorcode != ETIMEDOUT) {
00172                // Successful read. Distribute received data to all listeners.
00173                list<ICommand*>::iterator it;
00174                for (it = readcommands.begin(); it != readcommands.end(); it++) {
00175 //                  LOGDEBUG(logger, "Premerge:");
00176 //                  (*it)->DumpData(logger);
00177 //                  LOGDEBUG(logger, "Merge with:");
00178 //                  Command->DumpData(logger);
00179                     (*it)->mergeData(*Command);
00180 //                  LOGDEBUG(logger, "Merged");
00181 //                  (*it)->DumpData(logger);
00182                     Registry::GetMainScheduler()->ScheduleWork(*it);
00183                }
00184                readcommands.clear();
00185                readtimeout = boost::posix_time::not_a_date_time;
00186                break;
00187 
00188           } else /* if (errorcode == ETIMEDOUT) */ {
00189                // timeout.
00190                // notifiy and calculate the maximum waiting time for the next
00191                // receive command.
00192 
00193                boost::posix_time::ptime now(
00194                          boost::posix_time::microsec_clock::local_time());
00195 
00196                boost::posix_time::ptime timeout;
00197 
00198                boost::posix_time::ptime largest_timeout = now;
00199 
00200                list<ICommand*>::iterator it = readcommands.begin();
00201                while (it != readcommands.end()) {
00202                     ICommand *trgt = *it;
00203                     it++; // increment to be sure that we can remove the item after evaluation.
00204 
00205                     try {
00206                          timeout = boost::any_cast<boost::posix_time::ptime>(
00207                                    trgt->findData(ICONNECT_TOKEN_TIMEOUTTIMESTAMP));
00208                     } catch (std::invalid_argument &e) {
00209                          LOGDEBUG(logger,"BUG: Required timeout-key not found. " <<
00210                                    __PRETTY_FUNCTION__ << " at " << __LINE__ << " what:" << e.what());
00211                          timeout = now;
00212                     } catch (boost::bad_any_cast &e) {
00213                          LOGDEBUG(logger,"BUG: Bad boost::any-cast at " <<
00214                                    __PRETTY_FUNCTION__ << ", " << __LINE__ << " what:"
00215                                    << e.what());
00216                          timeout = now;
00217                     }
00218 
00219                     if (largest_timeout < timeout) {
00220                          largest_timeout = timeout;
00221                     }
00222 
00223                     if (timeout <= now) {
00224                          trgt->addData(ICMD_ERRNO, -ETIMEDOUT);
00225                          readcommands.remove(trgt);
00226                          Registry::GetMainScheduler()->ScheduleWork(trgt);
00227                     }
00228                }
00229 
00230                if (readcommands.empty()) {
00231                     // If all reads are gone, reset readtimeout.
00232                     readtimeout = boost::posix_time::not_a_date_time;
00233                } else {
00234                     // Make a copy of the current command to reuse for the next
00235                     // receive (the lifetime of the current one ends after this
00236                     // function returns.)
00237                     // Schedule the largest known timeout, the shorter ones are
00238                     // handled in the CHECKTIMEOUTs state.
00239                     ICommand *cmd = new ICommand(*Command);
00240                     boost::posix_time::time_duration dur = largest_timeout - now;
00241                     long remainingtimeout = dur.total_milliseconds();
00242                     cmd->addData(ICONN_TOKEN_TIMEOUT, remainingtimeout);
00243                     connection->Receive(cmd);
00244                     readtimeout = largest_timeout;
00245                }
00246 
00247           }
00248 
00249           break;
00250      }
00251      }
00252 
00253 }
00254 
00255 bool CSharedConnectionMaster::Connect(ICommand *callback)
00256 {
00257      assert(connection);
00258      return connection->Connect(callback);
00259 
00260 }
00261 
00262 bool CSharedConnectionMaster::Disconnect(ICommand *callback)
00263 {
00264      assert(connection);
00265      return connection->Disconnect(callback);
00266 
00267 }
00268 
00269 void CSharedConnectionMaster::SetupLogger(const string& parentlogger,
00270           const string &)
00271 {
00272      IConnect::SetupLogger(parentlogger, "");
00273 
00274      if (connection)
00275           connection->SetupLogger(parentlogger, "");
00276 }
00277 
00278 bool CSharedConnectionMaster::Send(const char *tosend, unsigned int len,
00279           ICommand *callback)
00280 {
00281      assert (connection);
00282      return connection->Send(tosend, len, callback);
00283 }
00284 
00285 bool CSharedConnectionMaster::Send(const string& tosend, ICommand *callback)
00286 {
00287      assert (connection);
00288      return connection->Send(tosend, callback);
00289 
00290 }
00291 
00292 bool CSharedConnectionMaster::Receive(ICommand *callback)
00293 {
00294      // Save command for later interpretation.
00295      readcommands.push_back(callback);
00296 
00297      boost::posix_time::ptime timestamp(
00298                boost::posix_time::microsec_clock::local_time());
00299 
00300      // Add a timestamp for the timeout handling, if it is not already there
00301      try {
00302           // just check if it is there.
00303           timestamp = boost::any_cast<boost::posix_time::ptime>(
00304                     callback->findData(ICONNECT_TOKEN_TIMEOUTTIMESTAMP));
00305      } catch (...) {
00306           // Apparently not, or the caller did not add an ptime....
00307           // get timeout from config and calculate the timestamp.
00308           // for now, we can only use our config, as this is the best knowledge
00309           // we have, (otherwise we have a default configuration....)
00310 
00311           unsigned long timeout;
00312 
00313           try {
00314                timeout = boost::any_cast<long>(callback->findData(
00315                          ICONN_TOKEN_TIMEOUT));
00316           } catch (...) {
00317                CConfigHelper cfg(ConfigurationPath);
00318                cfg.GetConfig("timeout", timeout, SHARED_CONN_MASTER_DEFAULTTIMEOUT);
00319           }
00320 
00321           timestamp += boost::posix_time::millisec(timeout);
00322           callback->addData(ICONNECT_TOKEN_TIMEOUTTIMESTAMP, timestamp);
00323      }
00324 
00325      // check if the new command is supposed to timeout earlier than
00326      // the current read request. In this case, add an additional worker
00327      // to get notified in time.
00328      if (readtimeout == boost::posix_time::not_a_date_time) {
00329           // not a pending read.
00330           // clone the command, divert its response to this class and the read
00331           // to the real connection class.
00332           // If no read is pending, directly issue a read commmand to the connection,
00333           // but divert result to our class for later distribution.
00334           // for this, we copy-construct a ICOmmand and modify it afterwards.
00335 
00336           this->readtimeout = timestamp;
00337           ICommand *cmd = new ICommand(*callback);
00338           callback->addData(ICONNECT_TOKEN_PRV_ORIGINALCOMMAND, callback);
00339           cmd->setTrgt(this);
00340           cmd->setCmd(CMD_READ);
00341           return connection->Receive(cmd);
00342      }
00343 
00344      // a receive already pending. Schedule work to monitor its timeout.
00345      ICommand *cmd = new ICommand(CMD_CHECKTIMEOUTS, this);
00346      boost::posix_time::time_duration duration;
00347      duration = timestamp - boost::posix_time::microsec_clock::local_time();
00348      struct timespec ts;
00349      ts.tv_sec = duration.total_seconds();
00350      ts.tv_nsec = duration.total_nanoseconds() % 1000000000;
00351      Registry::GetMainScheduler()->ScheduleWork(cmd, ts);
00352 
00353      return true;
00354 }
00355 
00356 bool CSharedConnectionMaster::CheckConfig(void)
00357 {
00358 
00359      // Get real configuration path to extract target comms config.
00360      string commsconfig = this->ConfigurationPath + ".realcomms";
00361      string s;
00362      CConfigHelper h(commsconfig);
00363 
00364      if (! h.GetConfig("comms",s)) {
00365           LOGERROR(logger,"realcomms section: comms missing");
00366           return false;
00367      }
00368 
00369      connection = IConnectFactory::Factory(commsconfig);
00370 
00371      if (connection) {
00372           connection->SetupLogger(ConfigurationPath,"realcomms");
00373           return connection->CheckConfig();
00374      } else {
00375           LOGERROR(logger,"No connection object for shared master comms.");
00376           return false;
00377      }
00378 }
00379 
00380 bool CSharedConnectionMaster::IsConnected(void)
00381 {
00382      assert(connection);
00383      return connection->IsConnected();
00384 }
00385 
00386 #endif