|
solarpowerlog trunk
|
00001 /* ---------------------------------------------------------------------------- 00002 solarpowerlog 00003 Copyright (C) 2009 Tobias Frost 00004 00005 This file is part of solarpowerlog. 00006 00007 Solarpowerlog is free software; However, it is dual-licenced 00008 as described in the file "COPYING". 00009 00010 For this file (CCSVOutputFilter.cpp), the license terms are: 00011 00012 You can redistribute it and/or modify it under the terms of the GNU 00013 General Public License as published by the Free Software Foundation; either 00014 version 3 of the License, or (at your option) any later version. 00015 00016 This program is distributed in the hope that it will be useful, but 00017 WITHOUT ANY WARRANTY; without even the implied warranty of 00018 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00019 Lesser General Public License for more details. 00020 00021 You should have received a copy of the GNU Library General Public 00022 License along with this proramm; if not, see 00023 <http://www.gnu.org/licenses/>. 00024 ---------------------------------------------------------------------------- 00025 */ 00026 00034 #ifdef HAVE_CONFIG_H 00035 #include "config.h" 00036 #endif 00037 00038 #ifdef HAVE_FILTER_CSVDUMP 00039 00040 #include <assert.h> 00041 #include <fstream> 00042 #include <iostream> 00043 #include <iomanip> 00044 #include <cstdio> 00045 #include <memory> 00046 00047 #include <boost/date_time/gregorian/gregorian.hpp> 00048 #include <boost/date_time/local_time/local_time.hpp> 00049 00050 #include "configuration/Registry.h" 00051 #include "configuration/CConfigHelper.h" 00052 #include "interfaces/CWorkScheduler.h" 00053 00054 #include "Inverters/Capabilites.h" 00055 #include "patterns/CValue.h" 00056 00057 #include "CCSVOutputFilter.h" 00058 00059 #include "Inverters/interfaces/ICapaIterator.h" 00060 00061 using namespace std; 00062 using namespace libconfig; 00063 using namespace boost::gregorian; 00064 00065 CCSVOutputFilter::CCSVOutputFilter( const string & name, 00066 const string & configurationpath ) : 00067 IDataFilter(name, configurationpath) 00068 { 00069 headerwritten = false; 00070 00071 // Schedule the initialization and subscriptions later... 00072 ICommand *cmd = new ICommand(CMD_INIT, this); 00073 Registry::GetMainScheduler()->ScheduleWork(cmd); 00074 00075 // We do not anything on these capabilities, so we remove our list. 00076 // any cascaded filter will automatically use the parents one... 00077 CCapability *c = IInverterBase::GetConcreteCapability( 00078 CAPA_INVERTER_DATASTATE); 00079 CapabilityMap.erase(CAPA_INVERTER_DATASTATE); 00080 delete c; 00081 00082 // Also we wont fiddle with the caps requiring our listeners to unsubscribe. 00083 // They also should get that info from our base. 00084 c = IInverterBase::GetConcreteCapability(CAPA_CAPAS_REMOVEALL); 00085 CapabilityMap.erase(CAPA_CAPAS_REMOVEALL); 00086 delete c; 00087 00088 // However, to help following plugins, we will publish some data here: 00089 // (this enables other plugins to use our files as data source) 00090 c = new CCapability(CAPA_CSVDUMPER_FILENAME, CAPA_CSVDUMPER_FILENAME_TYPE, this); 00091 AddCapability(CAPA_CSVDUMPER_FILENAME,c); 00092 00093 // A comma-seperated list of parameters which are currently logged. 00094 // note: This list might grow over time, so when parsing the CSV File, 00095 // be prepared that there might be not all given from the beginning of the 00096 // file 00097 c = new CCapability(CAPA_CSVDUMPER_LOGGEDCAPABILITES, CAPA_CSVDUMPER_LOGGEDCAPABILITES_TYPE, this); 00098 AddCapability(CAPA_CSVDUMPER_LOGGEDCAPABILITES,c); 00099 } 00100 00101 CCSVOutputFilter::~CCSVOutputFilter() 00102 { 00103 if (file.is_open()) 00104 file.close(); 00105 // TODO Auto-generated destructor stub 00106 } 00107 00108 bool CCSVOutputFilter::CheckConfig() 00109 { 00110 string setting; 00111 string str; 00112 00113 bool fail = false; 00114 00115 CConfigHelper hlp(configurationpath); 00116 fail |= !hlp.CheckConfig("logfile", Setting::TypeString); 00117 00118 fail |= !hlp.CheckConfig("compact_csv", Setting::TypeBoolean, true); 00119 fail |= !hlp.CheckConfig("flush_file_buffer_immediatly", Setting::TypeBoolean, true); 00120 fail |= !hlp.CheckConfig("format_timestamp", Setting::TypeString, true); 00121 fail |= !hlp.CheckConfig("rotate", Setting::TypeBoolean, true); 00122 00123 if (hlp.CheckConfig("data2log", Setting::TypeString, false, false)) { 00124 hlp.GetConfig("data2log", setting); 00125 if (setting != "all") { 00126 LOGERROR(logger, "Configuration Error: data2log must be \"all\" or of the type \"Array\"."); 00127 fail = true; 00128 } 00129 } else if (!hlp.CheckConfig("data2log", Setting::TypeArray)) { 00130 fail = true; 00131 } 00132 00133 hlp.GetConfig("datasource", str); 00134 IInverterBase *i = Registry::Instance().GetInverter(str); 00135 if (!i) { 00136 LOGERROR(logger, 00137 "Setting " << setting << " in " << configurationpath 00138 << "." << name 00139 << ": Cannot find instance of Inverter with the name " 00140 << str); 00141 fail = true; 00142 } 00143 return !fail; 00144 } 00145 00146 void CCSVOutputFilter::Update( const IObserverSubject *subject ) 00147 { 00148 assert (subject); 00149 CCapability *c, *cap = (CCapability *) subject; 00150 00151 // Datastate changed. 00152 if (cap->getDescription() == CAPA_INVERTER_DATASTATE) { 00153 this->datavalid = ((CValue<bool> *) cap->getValue())->Get(); 00154 return; 00155 } 00156 00157 // Unsubscribe plea -- we do not offer this Capa, our customers will 00158 // ask our base directly. 00159 if (cap->getDescription() == CAPA_CAPAS_REMOVEALL) { 00160 auto_ptr<ICapaIterator> it(base->GetCapaNewIterator()); 00161 while (it->HasNext()) { 00162 pair<string, CCapability*> cappair = it->GetNext(); 00163 cap = (cappair).second; 00164 cap->UnSubscribe(this); 00165 } 00166 return; 00167 } 00168 00169 // propagate "caps updated" 00170 if (cap->getDescription() == CAPA_CAPAS_UPDATED) { 00171 c = IInverterBase::GetConcreteCapability(CAPA_CAPAS_UPDATED); 00172 *(CValue<bool> *) c->getValue() 00173 = *(CValue<bool> *) cap->getValue(); 00174 c->Notify(); 00175 capsupdated = true; 00176 return; 00177 } 00178 00179 } 00180 00181 void CCSVOutputFilter::ExecuteCommand( const ICommand *cmd ) 00182 { 00183 switch (cmd->getCmd()) { 00184 00185 case CMD_INIT: 00186 { 00187 DoINITCmd(cmd); 00188 00189 ICommand *ncmd = new ICommand(CMD_CYCLIC, this); 00190 struct timespec ts; 00191 // Set cyclic timer to the query interval. 00192 ts.tv_sec = 5; 00193 ts.tv_nsec = 0; 00194 00195 CCapability *c = GetConcreteCapability( 00196 CAPA_INVERTER_QUERYINTERVAL); 00197 if (c && c->getValue()->GetType() == IValue::float_type) { 00198 CValue<float> *v = (CValue<float> *) c->getValue(); 00199 ts.tv_sec = v->Get(); 00200 ts.tv_nsec = ((v->Get() - ts.tv_sec) * 1e9); 00201 } else { 00202 LOGINFO(logger, 00203 "INFO: The associated inverter does not specify the " 00204 "queryinterval. Defaulting to 5 seconds"); 00205 } 00206 00207 Registry::GetMainScheduler()->ScheduleWork(ncmd, ts); 00208 00209 } 00210 break; 00211 00212 case CMD_CYCLIC: 00213 { 00214 DoCYCLICmd(cmd); 00215 00216 // Set cyclic timer to the query interval. 00217 ICommand *ncmd = new ICommand(CMD_CYCLIC, this); 00218 struct timespec ts; 00219 ts.tv_sec = 5; 00220 ts.tv_nsec = 0; 00221 00222 CCapability *c = GetConcreteCapability( 00223 CAPA_INVERTER_QUERYINTERVAL); 00224 if (c && c->getValue()->GetType() == IValue::float_type) { 00225 CValue<float> *v = (CValue<float> *) c->getValue(); 00226 ts.tv_sec = v->Get(); 00227 ts.tv_nsec = ((v->Get() - ts.tv_sec) * 1e9); 00228 } 00229 00230 Registry::GetMainScheduler()->ScheduleWork(ncmd, ts); 00231 00232 } 00233 break; 00234 00235 case CMD_ROTATE: 00236 DoINITCmd(cmd); 00237 break; 00238 00239 } 00240 } 00241 00242 void CCSVOutputFilter::DoINITCmd( const ICommand * ) 00243 { 00244 // Do init 00245 string tmp; 00246 CConfigHelper cfghlp(configurationpath); 00247 // Config is already checked (exists, type ok) 00248 cfghlp.GetConfig("datasource", tmp); 00249 00250 base = Registry::Instance().GetInverter(tmp); 00251 if (base) { 00252 CCapability *cap = base->GetConcreteCapability( 00253 CAPA_CAPAS_UPDATED); 00254 assert(cap); // this is required to have.... 00255 if (!cap->CheckSubscription(this)) 00256 cap->Subscribe(this); 00257 00258 cap = base->GetConcreteCapability(CAPA_CAPAS_REMOVEALL); 00259 assert(cap); 00260 if (!cap->CheckSubscription(this)) 00261 cap->Subscribe(this); 00262 00263 cap = base->GetConcreteCapability(CAPA_INVERTER_DATASTATE); 00264 assert(cap); 00265 if (!cap->CheckSubscription(this)) 00266 cap->Subscribe(this); 00267 } else { 00268 LOGERROR(logger, "Could not find data source to connect. Filter: " 00269 << configurationpath << "." << name ); 00270 abort(); 00271 } 00272 00273 // Try to open the file 00274 if (file.is_open()) { 00275 file.close(); 00276 } 00277 00278 cfghlp.GetConfig("logfile", tmp); 00279 bool rotate; 00280 cfghlp.GetConfig("rotate", rotate, false); 00281 00282 if (rotate) { 00283 date today(day_clock::local_day()); 00284 //note: the %s will be removed, so +10 is enough. 00285 char buf[tmp.size() + 10]; 00286 int year = today.year(); 00287 int month = today.month(); 00288 int day = today.day(); 00289 00290 snprintf(buf, sizeof(buf) - 1, "%s%04d-%02d-%02d%s", 00291 tmp.substr(0, tmp.find("%s")).c_str(), year, month, 00292 day, 00293 tmp.substr(tmp.find("%s") + 2, string::npos).c_str()); 00294 00295 tmp = buf; 00296 } 00297 00298 // Open the file. We use binary mode, as we want end the line ourself (LF+CR) 00299 // leaned on RFC4180 00300 file.clear(); // clear errorstates of fstream. 00301 file.open(tmp.c_str(), fstream::out | fstream::in | fstream::app 00302 | fstream::binary); 00303 00304 #ifdef HAVE_WIN32_API 00305 if (file.fail()) { 00306 file.clear(); 00307 file.open(tmp.c_str(), fstream::out | fstream::app | fstream::binary); 00308 } 00309 #endif 00310 if (file.fail()) { 00311 LOGWARN(logger,"Failed to open file " << tmp <<". Logger " << name 00312 << " will not work. " ); 00313 file.close(); 00314 tmp = ""; 00315 } 00316 00317 // Update the filename. If empty, the subsequent plugin knows that there 00318 // was a problem. 00319 CCapability *cap = this->GetConcreteCapability( 00320 CAPA_CSVDUMPER_FILENAME); 00321 ((CValue<std::string> *) cap->getValue())->Set(tmp); 00322 cap->Notify(); 00323 00324 // a new file needs a new header 00325 headerwritten = false; 00326 // Technically seen, the file is now empty and the we-are-logging-this capa 00327 // CAPA_CSVDUMPER_LOGGEDCAPABILITES is wrong. But in some seconds, we probably 00328 // write the same as the last day, so we set the changes later. 00329 // (In other words: I told you, that in the file needs not to be all the 00330 // datas we claim to be there here... 00331 00332 // Set a timer to some seconds after midnight, to enforce rotating with correct date 00333 boost::posix_time::ptime n = 00334 boost::posix_time::second_clock::local_time(); 00335 00336 date d = n.date() + days(1); 00337 boost::posix_time::ptime tomorrow(d); 00338 boost::posix_time::time_duration remaining = tomorrow - n; 00339 00340 struct timespec ts; 00341 ts.tv_sec = remaining.hours() * 3600UL + remaining.minutes() * 60 00342 + remaining.seconds() + 10; 00343 ts.tv_nsec = 0; 00344 ICommand *ncmd = new ICommand(CMD_ROTATE, this); 00345 Registry::GetMainScheduler()->ScheduleWork(ncmd, ts); 00346 00347 } 00348 00349 void CCSVOutputFilter::DoCYCLICmd( const ICommand * ) 00350 { 00351 bool compact_file, flush_after_write; 00352 std::string format; 00353 00354 CConfigHelper cfg(configurationpath); 00355 #warning document me: Config option 00356 cfg.GetConfig("Format_Timestamp", format, std::string("%Y-%m-%d %T")); 00357 #warning document me: config Uption // FIXME 00358 cfg.GetConfig("Compact_CSV", compact_file, false); 00359 #warning document me: config Uption // FIXME 00360 cfg.GetConfig("flush_file_buffer_immediatly", flush_after_write, true); 00361 00362 std::stringstream ss; 00363 00364 /* Check for data validty. */ 00365 if (!datavalid) { 00366 return; 00367 } 00368 00369 /* check if CSV-Header needs to be re-emitted.*/ 00370 if (capsupdated || !headerwritten) { 00371 capsupdated = false; 00372 if (CMDCyclic_CheckCapas()) { 00373 headerwritten = false; 00374 } 00375 } 00376 00377 /* check if file is ready */ 00378 if (!file.is_open()) { 00379 return; 00380 } 00381 00382 /* output CSV Header*/ 00383 if (!headerwritten) { 00384 last_line.clear(); 00385 bool first = true; 00386 list<string>::const_iterator it; 00387 for (it = CSVCapas.begin(); it != CSVCapas.end(); it++) { 00388 if (!first) { 00389 ss << ","; 00390 } else { 00391 ss << "Timestamp,"; 00392 } 00393 first = false; 00394 ss << *(it); 00395 } 00396 // CSV after RFC 4180 requires CR LF 00397 file << ss.str() << (char) 0x0d << (char) 0x0a; 00398 CCapability *cap = GetConcreteCapability(CAPA_CSVDUMPER_LOGGEDCAPABILITES); 00399 assert(cap); 00400 ((CValue<std::string> *)cap->getValue())->Set(ss.str()); 00401 cap->Notify(); 00402 ss.str(""); 00403 headerwritten = true; 00404 } 00405 00406 /* finally, output data. */ 00407 00408 // make timestamp 00409 boost::posix_time::ptime n = 00410 boost::posix_time::second_clock::local_time(); 00411 00412 // assign facet only to a temporay stringstream. 00413 // this avoids having a persistent object. 00415 boost::posix_time::time_facet *facet = new boost::posix_time::time_facet(format.c_str()); 00416 ss.imbue(std::locale(ss.getloc(), facet)); 00417 ss << n; 00418 file << ss.str(); 00419 ss.str(""); 00420 00421 // note: do not delete the facet. This is done by the locale. 00422 // See: http://rhubbarb.wordpress.com/2009/10/17/boost-datetime-locales-and-facets/ 00423 // (the locale will delete the object, so there is no leak. If we would 00424 // delete, this crashes.) 00425 00426 list<string>::const_iterator it; 00427 CCapability *c; 00428 IValue *v; 00429 for (it = CSVCapas.begin(); it != CSVCapas.end(); it++) { 00430 ss << ","; 00431 c = base->GetConcreteCapability(*it); 00432 if (c) { 00433 v = c->getValue(); 00434 string tmp = (string) *v; 00435 00436 if (string::npos != tmp.find('"')) { 00437 string t2 = tmp; 00438 size_t t; 00439 while (string::npos != (t = t2.find('"'))) { 00440 tmp = t2.substr(0, t); 00441 tmp += '"'; 00442 t2 = t2.substr(t, string::npos); 00443 } 00444 tmp += t2; 00445 } 00446 00447 if (string::npos != tmp.find(',') || string::npos 00448 != tmp.find("\x0d\x0a")) { 00449 ss << '"' << tmp << '"'; 00450 } else { 00451 ss << tmp; 00452 } 00453 00454 } else { 00455 // file << ' '; 00456 } 00457 } 00458 00459 if ( !compact_file || ss.str() != last_line) { 00460 file << ss.str() << (char) 0x0d << (char) 0x0a; 00461 last_line = ss.str(); 00462 if (flush_after_write) 00463 file << flush; 00464 } 00465 00466 } 00467 00468 bool CCSVOutputFilter::CMDCyclic_CheckCapas( void ) 00469 { 00470 // get the array 00471 CConfigHelper cfghlp(configurationpath); 00472 bool store_all = false; 00473 bool ret = false; 00474 string tmp; 00475 00476 if (cfghlp.GetConfig("data2log", tmp) && tmp == "all") { 00477 store_all = true; 00478 } 00479 00480 if (!store_all) { 00481 int i = 0; 00482 while (cfghlp.GetConfigArray("data2log", i++, tmp)) { 00483 if (search_list(tmp)) { 00484 continue; 00485 } 00486 CSVCapas.push_back(tmp); 00487 ret = true; 00488 } 00489 00490 return ret; 00491 } 00492 00495 auto_ptr<ICapaIterator> it(base->GetCapaNewIterator()); 00496 pair<string, CCapability*> pair; 00497 while (it->HasNext()) { 00498 pair = it->GetNext(); 00499 if (search_list(pair.first)) { 00500 continue; 00501 } 00502 CSVCapas.push_back(pair.first); 00503 ret = true; 00504 } 00505 return ret; 00506 00507 } 00508 00509 bool CCSVOutputFilter::search_list( const string id ) const 00510 { 00511 list<string>::const_iterator it; 00512 for (it = CSVCapas.begin(); it != CSVCapas.end(); it++) { 00513 if (*it == id) 00514 return true; 00515 } 00516 return false; 00517 } 00518 00519 #endif