solarpowerlog trunk
/home/tobi/workspace/solarpowerlog/src/DataFilters/CCSVOutputFilter.cpp
Go to the documentation of this file.
00001 /* ----------------------------------------------------------------------------
00002  solarpowerlog
00003  Copyright (C) 2009  Tobias Frost
00004 
00005  This file is part of solarpowerlog.
00006 
00007  Solarpowerlog is free software; However, it is dual-licenced
00008  as described in the file "COPYING".
00009 
00010  For this file (CCSVOutputFilter.cpp), the license terms are:
00011 
00012  You can redistribute it and/or modify it under the terms of the GNU
00013  General Public License as published by the Free Software Foundation; either
00014  version 3 of the License, or (at your option) any later version.
00015 
00016  This program is distributed in the hope that it will be useful, but
00017  WITHOUT ANY WARRANTY; without even the implied warranty of
00018  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00019  Lesser General Public License for more details.
00020 
00021  You should have received a copy of the GNU Library General Public
00022  License along with this proramm; if not, see
00023  <http://www.gnu.org/licenses/>.
00024  ----------------------------------------------------------------------------
00025  */
00026 
00034 #ifdef HAVE_CONFIG_H
00035 #include "config.h"
00036 #endif
00037 
00038 #ifdef  HAVE_FILTER_CSVDUMP
00039 
00040 #include <assert.h>
00041 #include <fstream>
00042 #include <iostream>
00043 #include <iomanip>
00044 #include <cstdio>
00045 #include <memory>
00046 
00047 #include <boost/date_time/gregorian/gregorian.hpp>
00048 #include <boost/date_time/local_time/local_time.hpp>
00049 
00050 #include "configuration/Registry.h"
00051 #include "configuration/CConfigHelper.h"
00052 #include "interfaces/CWorkScheduler.h"
00053 
00054 #include "Inverters/Capabilites.h"
00055 #include "patterns/CValue.h"
00056 
00057 #include "CCSVOutputFilter.h"
00058 
00059 #include "Inverters/interfaces/ICapaIterator.h"
00060 
00061 using namespace std;
00062 using namespace libconfig;
00063 using namespace boost::gregorian;
00064 
00065 CCSVOutputFilter::CCSVOutputFilter( const string & name,
00066      const string & configurationpath ) :
00067      IDataFilter(name, configurationpath)
00068 {
00069      headerwritten = false;
00070 
00071      // Schedule the initialization and subscriptions later...
00072      ICommand *cmd = new ICommand(CMD_INIT, this);
00073      Registry::GetMainScheduler()->ScheduleWork(cmd);
00074 
00075      // We do not anything on these capabilities, so we remove our list.
00076      // any cascaded filter will automatically use the parents one...
00077      CCapability *c = IInverterBase::GetConcreteCapability(
00078           CAPA_INVERTER_DATASTATE);
00079      CapabilityMap.erase(CAPA_INVERTER_DATASTATE);
00080      delete c;
00081 
00082      // Also we wont fiddle with the caps requiring our listeners to unsubscribe.
00083      // They also should get that info from our base.
00084      c = IInverterBase::GetConcreteCapability(CAPA_CAPAS_REMOVEALL);
00085      CapabilityMap.erase(CAPA_CAPAS_REMOVEALL);
00086      delete c;
00087 
00088      // However, to help following plugins, we will publish some data here:
00089      // (this enables other plugins to use our files as data source)
00090      c = new CCapability(CAPA_CSVDUMPER_FILENAME, CAPA_CSVDUMPER_FILENAME_TYPE, this);
00091      AddCapability(CAPA_CSVDUMPER_FILENAME,c);
00092 
00093      // A comma-seperated list of parameters which are currently logged.
00094      // note: This list might grow over time, so when parsing the CSV File,
00095      // be prepared that there might be not all given from the beginning of the
00096      // file
00097      c = new CCapability(CAPA_CSVDUMPER_LOGGEDCAPABILITES, CAPA_CSVDUMPER_LOGGEDCAPABILITES_TYPE, this);
00098      AddCapability(CAPA_CSVDUMPER_LOGGEDCAPABILITES,c);
00099 }
00100 
00101 CCSVOutputFilter::~CCSVOutputFilter()
00102 {
00103      if (file.is_open())
00104           file.close();
00105      // TODO Auto-generated destructor stub
00106 }
00107 
00108 bool CCSVOutputFilter::CheckConfig()
00109 {
00110      string setting;
00111      string str;
00112 
00113      bool fail = false;
00114 
00115      CConfigHelper hlp(configurationpath);
00116      fail |= !hlp.CheckConfig("logfile", Setting::TypeString);
00117 
00118      fail |= !hlp.CheckConfig("compact_csv", Setting::TypeBoolean, true);
00119      fail |= !hlp.CheckConfig("flush_file_buffer_immediatly", Setting::TypeBoolean, true);
00120      fail |= !hlp.CheckConfig("format_timestamp", Setting::TypeString, true);
00121      fail |= !hlp.CheckConfig("rotate", Setting::TypeBoolean, true);
00122 
00123      if (hlp.CheckConfig("data2log", Setting::TypeString, false, false)) {
00124           hlp.GetConfig("data2log", setting);
00125           if (setting != "all") {
00126                LOGERROR(logger, "Configuration Error: data2log must be \"all\" or of the type \"Array\".");
00127                fail = true;
00128           }
00129      } else if (!hlp.CheckConfig("data2log", Setting::TypeArray)) {
00130           fail = true;
00131      }
00132 
00133      hlp.GetConfig("datasource", str);
00134      IInverterBase *i = Registry::Instance().GetInverter(str);
00135      if (!i) {
00136           LOGERROR(logger,
00137                "Setting " << setting << " in " << configurationpath
00138                << "." << name
00139                << ": Cannot find instance of Inverter with the name "
00140                << str);
00141           fail = true;
00142      }
00143      return !fail;
00144 }
00145 
00146 void CCSVOutputFilter::Update( const IObserverSubject *subject )
00147 {
00148      assert (subject);
00149      CCapability *c, *cap = (CCapability *) subject;
00150 
00151      // Datastate changed.
00152      if (cap->getDescription() == CAPA_INVERTER_DATASTATE) {
00153           this->datavalid = ((CValue<bool> *) cap->getValue())->Get();
00154           return;
00155      }
00156 
00157      // Unsubscribe plea -- we do not offer this Capa, our customers will
00158      // ask our base directly.
00159      if (cap->getDescription() == CAPA_CAPAS_REMOVEALL) {
00160           auto_ptr<ICapaIterator> it(base->GetCapaNewIterator());
00161           while (it->HasNext()) {
00162                pair<string, CCapability*> cappair = it->GetNext();
00163                cap = (cappair).second;
00164                cap->UnSubscribe(this);
00165           }
00166           return;
00167      }
00168 
00169      // propagate "caps updated"
00170      if (cap->getDescription() == CAPA_CAPAS_UPDATED) {
00171           c = IInverterBase::GetConcreteCapability(CAPA_CAPAS_UPDATED);
00172           *(CValue<bool> *) c->getValue()
00173                = *(CValue<bool> *) cap->getValue();
00174           c->Notify();
00175           capsupdated = true;
00176           return;
00177      }
00178 
00179 }
00180 
00181 void CCSVOutputFilter::ExecuteCommand( const ICommand *cmd )
00182 {
00183      switch (cmd->getCmd()) {
00184 
00185      case CMD_INIT:
00186      {
00187           DoINITCmd(cmd);
00188 
00189           ICommand *ncmd = new ICommand(CMD_CYCLIC, this);
00190           struct timespec ts;
00191           // Set cyclic timer to the query interval.
00192           ts.tv_sec = 5;
00193           ts.tv_nsec = 0;
00194 
00195           CCapability *c = GetConcreteCapability(
00196                CAPA_INVERTER_QUERYINTERVAL);
00197           if (c && c->getValue()->GetType() == IValue::float_type) {
00198                CValue<float> *v = (CValue<float> *) c->getValue();
00199                ts.tv_sec = v->Get();
00200                ts.tv_nsec = ((v->Get() - ts.tv_sec) * 1e9);
00201           } else {
00202                LOGINFO(logger,
00203                     "INFO: The associated inverter does not specify the "
00204                     "queryinterval. Defaulting to 5 seconds");
00205           }
00206 
00207           Registry::GetMainScheduler()->ScheduleWork(ncmd, ts);
00208 
00209      }
00210           break;
00211 
00212      case CMD_CYCLIC:
00213      {
00214           DoCYCLICmd(cmd);
00215 
00216           // Set cyclic timer to the query interval.
00217           ICommand *ncmd = new ICommand(CMD_CYCLIC, this);
00218           struct timespec ts;
00219           ts.tv_sec = 5;
00220           ts.tv_nsec = 0;
00221 
00222           CCapability *c = GetConcreteCapability(
00223                CAPA_INVERTER_QUERYINTERVAL);
00224           if (c && c->getValue()->GetType() == IValue::float_type) {
00225                CValue<float> *v = (CValue<float> *) c->getValue();
00226                ts.tv_sec = v->Get();
00227                ts.tv_nsec = ((v->Get() - ts.tv_sec) * 1e9);
00228           }
00229 
00230           Registry::GetMainScheduler()->ScheduleWork(ncmd, ts);
00231 
00232      }
00233           break;
00234 
00235      case CMD_ROTATE:
00236           DoINITCmd(cmd);
00237           break;
00238 
00239      }
00240 }
00241 
00242 void CCSVOutputFilter::DoINITCmd( const ICommand * )
00243 {
00244      // Do init
00245      string tmp;
00246      CConfigHelper cfghlp(configurationpath);
00247      // Config is already checked (exists, type ok)
00248      cfghlp.GetConfig("datasource", tmp);
00249 
00250      base = Registry::Instance().GetInverter(tmp);
00251      if (base) {
00252           CCapability *cap = base->GetConcreteCapability(
00253                CAPA_CAPAS_UPDATED);
00254           assert(cap); // this is required to have....
00255           if (!cap->CheckSubscription(this))
00256                cap->Subscribe(this);
00257 
00258           cap = base->GetConcreteCapability(CAPA_CAPAS_REMOVEALL);
00259           assert(cap);
00260           if (!cap->CheckSubscription(this))
00261                cap->Subscribe(this);
00262 
00263           cap = base->GetConcreteCapability(CAPA_INVERTER_DATASTATE);
00264           assert(cap);
00265           if (!cap->CheckSubscription(this))
00266                cap->Subscribe(this);
00267      } else {
00268           LOGERROR(logger, "Could not find data source to connect. Filter: "
00269                << configurationpath << "." << name );
00270           abort();
00271      }
00272 
00273      // Try to open the file
00274      if (file.is_open()) {
00275           file.close();
00276      }
00277 
00278      cfghlp.GetConfig("logfile", tmp);
00279      bool rotate;
00280      cfghlp.GetConfig("rotate", rotate, false);
00281 
00282      if (rotate) {
00283           date today(day_clock::local_day());
00284           //note: the %s will be removed, so +10 is enough.
00285           char buf[tmp.size() + 10];
00286           int year = today.year();
00287           int month = today.month();
00288           int day = today.day();
00289 
00290           snprintf(buf, sizeof(buf) - 1, "%s%04d-%02d-%02d%s",
00291                tmp.substr(0, tmp.find("%s")).c_str(), year, month,
00292                day,
00293                tmp.substr(tmp.find("%s") + 2, string::npos).c_str());
00294 
00295           tmp = buf;
00296      }
00297 
00298      // Open the file. We use binary mode, as we want end the line ourself (LF+CR)
00299      // leaned on RFC4180
00300      file.clear(); // clear errorstates of fstream.
00301      file.open(tmp.c_str(), fstream::out | fstream::in | fstream::app
00302           | fstream::binary);
00303 
00304 #ifdef HAVE_WIN32_API
00305      if (file.fail()) {
00306           file.clear();
00307           file.open(tmp.c_str(), fstream::out | fstream::app | fstream::binary);
00308      }
00309 #endif
00310      if (file.fail()) {
00311           LOGWARN(logger,"Failed to open file " << tmp <<". Logger " << name
00312                << " will not work. " );
00313           file.close();
00314           tmp = "";
00315      }
00316 
00317      // Update the filename. If empty, the subsequent plugin knows that there
00318      // was a problem.
00319      CCapability *cap = this->GetConcreteCapability(
00320                     CAPA_CSVDUMPER_FILENAME);
00321      ((CValue<std::string> *) cap->getValue())->Set(tmp);
00322      cap->Notify();
00323 
00324      // a new file needs a new header
00325      headerwritten = false;
00326      // Technically seen, the file is now empty and the we-are-logging-this capa
00327      // CAPA_CSVDUMPER_LOGGEDCAPABILITES is wrong. But in some seconds, we probably
00328      // write the same as the last day, so we set the changes later.
00329      // (In other words: I told you, that in the file needs not to be all the
00330      // datas we claim to be there here...
00331 
00332      // Set a timer to some seconds after midnight, to enforce rotating with correct date
00333      boost::posix_time::ptime n =
00334           boost::posix_time::second_clock::local_time();
00335 
00336      date d = n.date() + days(1);
00337      boost::posix_time::ptime tomorrow(d);
00338      boost::posix_time::time_duration remaining = tomorrow - n;
00339 
00340      struct timespec ts;
00341      ts.tv_sec = remaining.hours() * 3600UL + remaining.minutes() * 60
00342           + remaining.seconds() + 10;
00343      ts.tv_nsec = 0;
00344      ICommand *ncmd = new ICommand(CMD_ROTATE, this);
00345      Registry::GetMainScheduler()->ScheduleWork(ncmd, ts);
00346 
00347 }
00348 
00349 void CCSVOutputFilter::DoCYCLICmd( const ICommand * )
00350 {
00351      bool compact_file, flush_after_write;
00352      std::string format;
00353 
00354      CConfigHelper cfg(configurationpath);
00355 #warning document me: Config option
00356      cfg.GetConfig("Format_Timestamp", format, std::string("%Y-%m-%d %T"));
00357      #warning document me: config Uption // FIXME
00358      cfg.GetConfig("Compact_CSV", compact_file, false);
00359 #warning document me: config Uption // FIXME
00360 cfg.GetConfig("flush_file_buffer_immediatly", flush_after_write, true);
00361 
00362      std::stringstream ss;
00363 
00364      /* Check for data validty. */
00365      if (!datavalid) {
00366           return;
00367      }
00368 
00369      /* check if CSV-Header needs to be re-emitted.*/
00370      if (capsupdated || !headerwritten) {
00371           capsupdated = false;
00372           if (CMDCyclic_CheckCapas()) {
00373                headerwritten = false;
00374           }
00375      }
00376 
00377      /* check if file is ready */
00378      if (!file.is_open()) {
00379           return;
00380      }
00381 
00382      /* output CSV Header*/
00383      if (!headerwritten) {
00384           last_line.clear();
00385           bool first = true;
00386           list<string>::const_iterator it;
00387           for (it = CSVCapas.begin(); it != CSVCapas.end(); it++) {
00388                if (!first) {
00389                     ss << ",";
00390                } else {
00391                     ss << "Timestamp,";
00392                }
00393                first = false;
00394                ss << *(it);
00395           }
00396           // CSV after RFC 4180 requires CR LF
00397           file << ss.str() << (char) 0x0d << (char) 0x0a;
00398           CCapability *cap = GetConcreteCapability(CAPA_CSVDUMPER_LOGGEDCAPABILITES);
00399           assert(cap);
00400           ((CValue<std::string> *)cap->getValue())->Set(ss.str());
00401           cap->Notify();
00402           ss.str("");
00403           headerwritten = true;
00404      }
00405 
00406      /* finally, output data. */
00407 
00408      // make timestamp
00409      boost::posix_time::ptime n =
00410           boost::posix_time::second_clock::local_time();
00411 
00412      // assign facet only to a temporay stringstream.
00413      // this avoids having a persistent object.
00415      boost::posix_time::time_facet *facet = new boost::posix_time::time_facet(format.c_str());
00416      ss.imbue(std::locale(ss.getloc(), facet));
00417      ss << n;
00418      file << ss.str();
00419      ss.str("");
00420 
00421      // note: do not delete the facet. This is done by the locale.
00422      // See: http://rhubbarb.wordpress.com/2009/10/17/boost-datetime-locales-and-facets/
00423      // (the locale will delete the object, so there is no leak. If we would
00424      // delete, this crashes.)
00425 
00426      list<string>::const_iterator it;
00427      CCapability *c;
00428      IValue *v;
00429      for (it = CSVCapas.begin(); it != CSVCapas.end(); it++) {
00430           ss << ",";
00431           c = base->GetConcreteCapability(*it);
00432           if (c) {
00433                v = c->getValue();
00434                string tmp = (string) *v;
00435 
00436                if (string::npos != tmp.find('"')) {
00437                     string t2 = tmp;
00438                     size_t t;
00439                     while (string::npos != (t = t2.find('"'))) {
00440                          tmp = t2.substr(0, t);
00441                          tmp += '"';
00442                          t2 = t2.substr(t, string::npos);
00443                     }
00444                     tmp += t2;
00445                }
00446 
00447                if (string::npos != tmp.find(',') || string::npos
00448                     != tmp.find("\x0d\x0a")) {
00449                     ss << '"' << tmp << '"';
00450                } else {
00451                     ss << tmp;
00452                }
00453 
00454           } else {
00455                // file << ' ';
00456           }
00457      }
00458 
00459      if ( !compact_file ||  ss.str() != last_line) {
00460           file << ss.str() << (char) 0x0d << (char) 0x0a;
00461           last_line = ss.str();
00462           if (flush_after_write)
00463                file << flush;
00464      }
00465 
00466 }
00467 
00468 bool CCSVOutputFilter::CMDCyclic_CheckCapas( void )
00469 {
00470      // get the array
00471      CConfigHelper cfghlp(configurationpath);
00472      bool store_all = false;
00473      bool ret = false;
00474      string tmp;
00475 
00476      if (cfghlp.GetConfig("data2log", tmp) && tmp == "all") {
00477           store_all = true;
00478      }
00479 
00480      if (!store_all) {
00481           int i = 0;
00482           while (cfghlp.GetConfigArray("data2log", i++, tmp)) {
00483                if (search_list(tmp)) {
00484                     continue;
00485                }
00486                CSVCapas.push_back(tmp);
00487                ret = true;
00488           }
00489 
00490           return ret;
00491      }
00492 
00495      auto_ptr<ICapaIterator> it(base->GetCapaNewIterator());
00496      pair<string, CCapability*> pair;
00497      while (it->HasNext()) {
00498           pair = it->GetNext();
00499           if (search_list(pair.first)) {
00500                continue;
00501           }
00502           CSVCapas.push_back(pair.first);
00503           ret = true;
00504      }
00505      return ret;
00506 
00507 }
00508 
00509 bool CCSVOutputFilter::search_list( const string id ) const
00510 {
00511      list<string>::const_iterator it;
00512      for (it = CSVCapas.begin(); it != CSVCapas.end(); it++) {
00513           if (*it == id)
00514                return true;
00515      }
00516      return false;
00517 }
00518 
00519 #endif