|
solarpowerlog trunk
|
00001 /* ---------------------------------------------------------------------------- 00002 solarpowerlog 00003 Copyright (C) 2009 Tobias Frost 00004 00005 This file is part of solarpowerlog. 00006 00007 Solarpowerlog is free software; However, it is dual-licenced 00008 as described in the file "COPYING". 00009 00010 For this file (ConnectionTCP.cpp), the license terms are: 00011 00012 You can redistribute it and/or modify it under the terms of the GNU 00013 General Public License as published by the Free Software Foundation; either 00014 version 3 of the License, or (at your option) any later version. 00015 00016 This program is distributed in the hope that it will be useful, but 00017 WITHOUT ANY WARRANTY; without even the implied warranty of 00018 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00019 Lesser General Public License for more details. 00020 00021 You should have received a copy of the GNU Library General Public 00022 License along with this proramm; if not, see 00023 <http://www.gnu.org/licenses/>. 00024 ---------------------------------------------------------------------------- 00025 */ 00026 00033 #ifdef HAVE_CONFIG_H 00034 #include "config.h" 00035 #include "porting.h" 00036 #endif 00037 00038 #ifdef HAVE_COMMS_ASIOTCPIO 00039 00040 #define DEBUG_TCPASIO 00041 00042 #include "interfaces/IConnect.h" 00043 #include "Connections/CConnectTCPAsio.h" 00044 00045 #include <iostream> 00046 #include <string> 00047 00048 #include "configuration/CConfigHelper.h" 00049 00050 #include <boost/asio/write.hpp> 00051 #include <boost/asio/io_service.hpp> 00052 #include <boost/asio/ip/tcp.hpp> 00053 #include <boost/asio/streambuf.hpp> 00054 00055 #include "interfaces/CMutexHelper.h" 00056 00057 #include <errno.h> 00058 #include <boost/asio/deadline_timer.hpp> 00059 #include <boost/asio/placeholders.hpp> 00060 #include <boost/date_time/posix_time/posix_time_config.hpp> 00061 #include <boost/bind.hpp> 00062 00063 using namespace boost::posix_time; 00064 00065 using namespace std; 00066 using namespace boost::asio; 00067 using namespace boost; 00068 using namespace libconfig; 00069 00070 struct asyncASIOCompletionHandler 00071 { 00072 asyncASIOCompletionHandler( size_t *b, boost::system::error_code *ec ) 00073 { 00074 bytes = b; 00075 this->ec = ec; 00076 } 00077 00078 void operator()( const boost::system::error_code& e, 00079 std::size_t bytes_transferred ) 00080 { 00081 *bytes = bytes_transferred; 00082 *ec = e; 00083 } 00084 00085 // note, we need pointer as boost seems to make a copy of our handler... 00086 size_t *bytes; 00087 boost::system::error_code *ec; 00088 }; 00089 00090 CConnectTCPAsio::CConnectTCPAsio( const string &configurationname ) : 00091 IConnect(configurationname) 00092 { 00093 // Generate our own asio ioservice 00094 // TODO check if one central would do that too... 00095 ioservice = new io_service; 00096 sockt = new ip::tcp::socket(*ioservice); 00097 sem_init(&cmdsemaphore, 0, 0); 00098 } 00099 00100 CConnectTCPAsio::~CConnectTCPAsio() 00101 { 00102 if (IsConnected()) { 00103 Disconnect(NULL); 00104 } 00105 00106 if (sockt) 00107 delete sockt; 00108 if (ioservice) 00109 delete ioservice; 00110 00111 } 00112 00113 // ASYNCED!! 00114 bool CConnectTCPAsio::Connect( ICommand *callback ) 00115 { 00116 sem_t semaphore; 00117 00118 CAsyncCommand *commando = new CAsyncCommand(CAsyncCommand::CONNECT, callback); 00119 00120 // if callback is NUll, fallback to synchronous operation. 00121 if (!callback) { 00122 sem_init(&semaphore, 0, 0); 00123 commando->SetSemaphore(&semaphore); 00124 } 00125 00126 PushWork(commando); 00127 00128 if (!callback) { 00129 sem_wait(&semaphore); 00130 LOGTRACE(logger, "destroying CAsyncCommando " << commando ); 00131 delete commando; 00132 return IsConnected(); 00133 } 00134 00135 return true; 00136 } 00137 00138 /* Disconnect 00139 * 00140 * The disconnection is done by the async task. 00141 * 00142 * (If to be done synchronous, it is also dispatched to the worker thread and 00143 * directly waited for completion.) 00144 * */ 00145 bool CConnectTCPAsio::Disconnect( ICommand *callback ) 00146 { 00147 sem_t semaphore; 00148 00149 CAsyncCommand *commando = new CAsyncCommand(CAsyncCommand::DISCONNECT, 00150 callback); 00151 // if callback is NULL, fallback to synchronous operation. 00152 // (we will do the job asynchronous, but wait for completion here) 00153 if (!callback) { 00154 sem_init(&semaphore, 0, 0); 00155 commando->SetSemaphore(&semaphore); 00156 } 00157 00158 PushWork(commando); 00159 00160 if (!callback) { 00161 // wait for async job completion 00162 sem_wait(&semaphore); 00163 LOGTRACE(logger, "destroying CAsyncCommando " << commando ); 00164 delete commando; 00165 } 00166 return true; 00167 } 00168 00169 // Send kept SYNC for the moment 00170 bool CConnectTCPAsio::Send( const char *tosend, unsigned int len, 00171 ICommand *callback ) 00172 { 00173 sem_t semaphore; 00174 bool ret; 00175 std::string s(tosend, len); 00176 s.assign(tosend, len); 00177 00178 CAsyncCommand *commando = new CAsyncCommand(CAsyncCommand::SEND, callback); 00179 callback->addData(ICONN_TOKEN_SEND_STRING, s); 00180 00181 if (callback) { 00182 sem_init(&semaphore, 0, 0); 00183 commando->SetSemaphore(&semaphore); 00184 } 00185 00186 PushWork(commando); 00187 00188 if (!callback) { 00189 int err; 00190 // sync: wait for async job completion and check result. 00191 sem_wait(&semaphore); 00192 try { 00193 err 00194 = boost::any_cast<int>(commando->callback->findData( 00195 ICMD_ERRNO)); 00196 if (err < 0) { 00197 ret = false; 00198 } else { 00199 ret = true; 00200 } 00201 } catch (std::invalid_argument e) { 00202 LOGDEBUG(logger,"ERR: unexpected exception while " 00203 "receive-handling: Invalid argument " << e.what()); 00204 ret = false; 00205 } catch (boost::bad_any_cast e) { 00206 LOGDEBUG(logger,"ERR: unexpected exception while " 00207 "receive-handling: Bad cast " << e.what()); 00208 ret = false; 00209 } 00210 00211 LOGTRACE(logger, "destroying CAsyncCommando " << commando ); 00212 delete commando; 00213 return ret; 00214 } 00215 return true; 00216 } 00217 00218 // Send kept SYNC for the moment 00219 bool CConnectTCPAsio::Send( const string & tosend, ICommand *callback ) 00220 { 00221 return Send(tosend.c_str(), tosend.length(), callback); 00222 } 00223 00224 /* Receive bytes from the stream -- asynced. 00225 * 00226 * As with all other methods, will be done by the worker thread, even if 00227 * synchronous operation is requested. In this case, we'll just wait for 00228 * completion.*/ 00229 bool CConnectTCPAsio::Receive( ICommand *callback ) 00230 { 00231 // RECEIVE async Command: 00232 // auxdata: pointer to std::string, where to place received data 00233 00234 assert(callback); 00235 00236 sem_t semaphore; 00237 bool ret; 00238 00239 CAsyncCommand *commando = new CAsyncCommand(CAsyncCommand::RECEIVE, callback); 00240 00241 #if 0 00242 if (!callback) { 00243 sem_init(&semaphore, 0, 0); 00244 commando->SetSemaphore(&semaphore); 00245 } 00246 #endif 00247 00248 PushWork(commando); 00249 00250 #if 0 00251 if (!callback) { 00252 int err; 00253 // sync: wait for async job completion and check result. 00254 sem_wait(&semaphore); 00255 try { 00256 err 00257 = boost::any_cast<int>(commando->callback->findData( 00258 ICMD_ERRNO)); 00259 if (err < 0) { 00260 ret = false; 00261 goto out; 00262 } else { 00263 ret = true; 00264 } 00265 00266 wheretoplace = boost::any_cast<string>( 00267 commando->callback->findData(ICONN_TOKEN_RECEIVE_STRING)); 00268 00269 } catch (std::invalid_argument e) { 00270 LOGDEBUG(logger,"ERR: unexpected exception while " 00271 "receive-handling: Invalid argument " << e.what()); 00272 ret = false; 00273 goto out; 00274 } catch (boost::bad_any_cast e) { 00275 LOGDEBUG(logger,"ERR: unexpected exception while " 00276 "receive-handling: Bad cast " << e.what()); 00277 ret = false; 00278 goto out; 00279 } 00280 00281 out: 00282 LOGTRACE(logger, "destroying CAsyncCommando " << commando ); 00283 delete commando; 00284 return ret; 00285 } 00286 #endif 00287 return true; 00288 } 00289 00290 bool CConnectTCPAsio::IsConnected( void ) 00291 { 00292 mutex.lock(); 00293 bool ret = sockt->is_open(); 00294 mutex.unlock(); 00295 return ret; 00296 } 00297 00298 bool CConnectTCPAsio::CheckConfig( void ) 00299 { 00300 string setting; 00301 bool fail = false; 00302 00303 CConfigHelper cfghelper(ConfigurationPath); 00304 00305 fail |= !cfghelper.CheckConfig("tcpadr", libconfig::Setting::TypeString); 00306 fail |= !cfghelper.CheckConfig("tcpport", libconfig::Setting::TypeString); 00307 fail |= !cfghelper.CheckConfig("tcptimeout", libconfig::Setting::TypeInt, 00308 false); 00309 00310 if (!fail) { 00311 StartWorkerThread(); 00312 return true; 00313 } 00314 00315 return false; 00316 } 00317 00318 void CConnectTCPAsio::_main( void ) 00319 { 00320 LOGTRACE(logger, "Starting helper thread"); 00321 00322 while (!IsTermRequested()) { 00323 int syscallret; 00324 00325 // wait for work or signals. 00326 // FIXME Test this if this works ;-) 00327 LOGTRACE(logger, "Waiting for work"); 00328 syscallret = sem_wait(&cmdsemaphore); 00329 if (syscallret == 0) { 00330 // semaphore had work for us. process it. 00331 // safety check: really some work? 00332 mutex.lock(); 00333 if (!cmds.empty()) { 00334 bool delete_cmd; 00335 CAsyncCommand *donow = cmds.front(); 00336 // cache info if to delete the objet later, 00337 // as later it might be already gone. 00338 delete_cmd = donow->IsAsynchronous(); 00339 00340 mutex.unlock(); 00341 00342 LOGTRACE(logger, "Received command " << donow << " with callback " << donow->callback ); 00343 00344 switch (donow->c) { 00345 case CAsyncCommand::CONNECT: 00346 if (HandleConnect(donow)) { 00347 LOGTRACE(logger, "Check command " << donow << " with callback " << donow->callback ); 00348 mutex.lock(); 00349 LOGTRACE(logger, "Front is " <<cmds.front() ); 00350 cmds.pop_front(); 00351 // check if we have to delete the object 00352 // or -- in case of sync operation -- 00353 // the caller will do that for us. 00354 // the "sign" is, that donow->callback is non NULL 00355 // (as the object can be already gone, if 00356 // the sync command had already deleted it) 00357 if (delete_cmd) { 00358 LOGTRACE(logger, "Deleting " << donow); 00359 delete donow; 00360 } 00361 mutex.unlock(); 00362 } 00363 break; 00364 00365 case CAsyncCommand::DISCONNECT: 00366 if (HandleDisConnect(donow)) { 00367 mutex.lock(); 00368 cmds.pop_front(); 00369 if (delete_cmd) { 00370 LOGTRACE(logger, "Deleting " << donow); 00371 delete donow; 00372 } 00373 mutex.unlock(); 00374 } 00375 break; 00376 00377 case CAsyncCommand::RECEIVE: 00378 if (HandleReceive(donow)) { 00379 mutex.lock(); 00380 cmds.pop_front(); 00381 if (delete_cmd) { 00382 LOGTRACE(logger, "Deleting " << donow); 00383 delete donow; 00384 } 00385 mutex.unlock(); 00386 } 00387 break; 00388 00389 case CAsyncCommand::SEND: 00390 { 00391 if(HandleSend(donow)) { 00392 mutex.lock(); 00393 cmds.pop_front(); 00394 if (delete_cmd) { 00395 LOGTRACE(logger, "Deleting " << donow); 00396 delete donow; 00397 } 00398 mutex.unlock(); 00399 } 00400 } 00401 break; 00402 00403 default: 00404 { 00405 LOGFATAL(logger, "Unknown command received."); 00406 abort(); 00407 break; 00408 } 00409 } 00410 00411 } else { 00412 mutex.unlock(); 00413 } 00414 00415 } 00416 } 00417 00418 IConnect::_main(); 00419 } 00420 00421 bool CConnectTCPAsio::PushWork( CAsyncCommand *cmd ) 00422 { 00423 LOGTRACE(logger, "Pushing command " << cmd << " with callback " << cmd->callback ); 00424 mutex.lock(); 00425 cmds.push_back(cmd); 00426 mutex.unlock(); 00427 sem_post(&cmdsemaphore); 00428 workerthread.interrupt(); 00429 return true; 00430 } 00431 00432 bool CConnectTCPAsio::HandleConnect( CAsyncCommand *cmd ) 00433 { 00434 string strhost, port; 00435 unsigned long timeout = -1; 00436 00437 // if connected, ignore the commmand, pretend success. 00438 if (IsConnected()) { 00439 cmd->callback->addData(ICMD_ERRNO, 0); 00440 cmd->HandleCompletion(); 00441 return true; 00442 } 00443 00444 CConfigHelper cfghelper(ConfigurationPath); 00445 00446 #warning timeouts should be only configured in the calling objects! \ 00447 Otherwise it is hard to differenicate between commands! \ 00448 So depreciate tcptimeout and configure this in the inverter class! 00449 00450 #warning rework: Should be only needed from the configuration, as the \ 00451 calling object needs not be aware of these issues (should be transparent) 00452 try { 00453 timeout = boost::any_cast<long>(cmd->callback->findData( 00454 ICONN_TOKEN_TIMEOUT)); 00455 } catch (std::invalid_argument e) { 00456 cfghelper.GetConfig("tcptimeout", timeout, TCP_ASIO_DEFAULT_TIMEOUT); 00457 } catch (boost::bad_any_cast e) { 00458 LOGDEBUG(logger, 00459 "BUG: Handling Connect: Bad cast for " << ICONN_TOKEN_TIMEOUT); 00460 timeout = TCP_ASIO_DEFAULT_TIMEOUT; 00461 } 00462 00463 cfghelper.GetConfig("tcpadr", strhost); 00464 cfghelper.GetConfig("tcpport", port); 00465 00466 boost::system::error_code ec; 00467 ip::tcp::resolver resolver(*ioservice); 00468 ip::tcp::resolver::query query(strhost.c_str(), port); 00469 00470 // returns on error a default constructed iterator ... 00471 ip::tcp::resolver::iterator iter = resolver.resolve(query, ec); 00472 ip::tcp::resolver::iterator end; // ... which is a "End marker" itself. 00473 00474 #warning TODO Change to async connect for better timeout handling. 00475 while (iter != end) { 00476 ip::tcp::endpoint endpoint = *iter++; 00477 LOGDEBUG(logger, "Connecting to " << endpoint ); 00478 sockt->connect(endpoint, ec); 00479 if (!ec) 00480 break; 00481 } 00482 00483 // preset name, but only needed if we gonna log on these levels. 00484 if (logger.IsEnabled(ILogger::LL_ERROR) || logger.IsEnabled(ILogger::LL_DEBUG)) 00485 cfghelper.GetConfig("name", strhost); 00486 00487 if (ec) { 00488 cmd->callback->addData(ICMD_ERRNO, -ECONNREFUSED); 00489 if (!ec.message().empty()) 00490 cmd->callback->addData(ICMD_ERRNO_STR, ec.message()); 00491 cmd->HandleCompletion(); 00492 return true; 00493 } 00494 00495 LOGDEBUG(logger, "Connected to " << strhost ); 00496 // Signal success. 00497 cmd->callback->addData(ICMD_ERRNO, 0); 00498 cmd->HandleCompletion(); 00499 return true; 00500 00501 } 00502 00503 bool CConnectTCPAsio::HandleDisConnect( CAsyncCommand *cmd ) 00504 { 00505 boost::system::error_code ec, ec2; 00506 std::string message; 00507 int error = 0; 00508 00509 if (!IsConnected()) { 00510 cmd->callback->addData(ICMD_ERRNO, 0); 00511 cmd->HandleCompletion(); 00512 return true; 00513 } 00514 00515 // according boost:asio documentation, one should call shutdown before 00516 // close 00517 // (http://www.boost.org/doc/libs/1_36_0/doc/html/boost_asio/reference/basic_stream_socket/close/overload2.html) 00518 sockt->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); 00519 00520 sockt->close(ec2); 00521 00522 if (ec) { 00523 error = -EIO; 00524 message = ec.message(); 00525 } 00526 00527 if (ec2) { 00528 error = -EIO; 00529 if (!message.empty()) 00530 message = message + " "; 00531 message = message + ec2.message(); 00532 } 00533 00534 cmd->callback->addData(ICMD_ERRNO, error); 00535 if (!message.empty()) { 00536 cmd->callback->addData(ICMD_ERRNO_STR, ec.message()); 00537 } 00538 cmd->HandleCompletion(); 00539 return true; 00540 } 00541 00544 static void boosthelper_set_result( int* store, int value ) 00545 { 00546 if (store) 00547 *store = value; 00548 } 00549 00563 bool CConnectTCPAsio::HandleReceive( CAsyncCommand *cmd ) 00564 { 00565 boost::system::error_code ec, handlerec; 00566 00567 volatile int result_timer = 0; 00568 size_t bytes; 00569 unsigned long timeout; 00570 char buf[2]; 00571 struct asyncASIOCompletionHandler read_handler(&bytes, &handlerec); 00572 // timeout setup 00573 ioservice->reset(); 00574 00575 try { 00576 timeout = boost::any_cast<unsigned long>(cmd->callback->findData( 00577 ICONN_TOKEN_TIMEOUT)); 00578 } catch (std::invalid_argument e) { 00579 CConfigHelper cfghelper(ConfigurationPath); 00580 cfghelper.GetConfig("tcptimeout", timeout, TCP_ASIO_DEFAULT_TIMEOUT); 00581 } catch (boost::bad_any_cast e) { 00582 LOGDEBUG(logger, "Unexpected exception in HandleReceive: Bad cast" << e.what()); 00583 timeout = TCP_ASIO_DEFAULT_TIMEOUT; 00584 } 00585 00586 deadline_timer timer(*(this->ioservice)); 00587 boost::posix_time::time_duration td = boost::posix_time::millisec(timeout); 00588 timer.expires_from_now(td); 00589 timer.async_wait(boost::bind(&boosthelper_set_result, (int*) &result_timer, 00590 1)); 00591 00592 // socket preparation 00593 // async_read. However, boost:asio seems not to allow auto-buffers, 00594 // so we will just read one byte and when this is available, we'll 00595 // check for if there are some others left 00596 sockt->async_read_some(boost::asio::buffer(&buf, 1), read_handler); 00597 00598 size_t num = ioservice->run_one(ec); 00599 00600 // ioservice error or timeout 00601 if (num == 0 || result_timer) { 00602 timer.cancel(ec); 00603 sockt->cancel(ec); 00604 LOGTRACE(logger,"Async read timeout"); 00605 cmd->callback->addData(ICMD_ERRNO_STR, std::string("Read timeout")); 00606 cmd->callback->addData(ICMD_ERRNO, -ETIMEDOUT); 00607 cmd->HandleCompletion(); 00608 ioservice->poll(); 00609 return true; 00610 } 00611 00612 timer.cancel(); 00613 ioservice->poll(ec); 00614 00615 if (*read_handler.ec) { 00616 if (*read_handler.ec != boost::asio::error::eof) { 00617 LOGDEBUG(logger,"Async read failed with ec=" << *read_handler.ec 00618 << " msg="<< read_handler.ec->message()); 00619 cmd->callback->addData(ICMD_ERRNO, -EIO); 00620 cmd->callback->addData(ICMD_ERRNO_STR, read_handler.ec->message()); 00621 00622 } else { 00623 cmd->callback->addData(ICMD_ERRNO, -ENOTCONN); 00624 LOGTRACE(logger, "Received eof on socket read"); 00625 } 00626 cmd->HandleCompletion(); 00627 return true; 00628 } 00629 00630 if (1 != *read_handler.bytes) { 00631 LOGDEBUG(logger,"Received " 00632 << *read_handler.bytes << " but expected only 1 byte"); 00633 cmd->callback->addData(ICMD_ERRNO, -EIO); 00634 cmd->HandleCompletion(); 00635 return true; 00636 } 00637 00638 size_t avail = sockt->available(); 00639 size_t tmp; 00640 size_t numrecvd = 1; 00641 std::string receivestr; 00642 00643 LOGTRACE(logger, "There are " << avail << " bytes ready to read"); 00644 char recved[avail + 2]; 00645 recved[0] = buf[0]; 00646 while (avail > 0) { 00647 tmp = sockt->read_some(asio::buffer(&recved[numrecvd], avail), ec); 00648 LOGTRACE(logger, "Read " << tmp << " of these"); 00649 avail -= tmp; 00650 numrecvd += tmp; 00651 // check if error occured. 00652 if (ec) { 00653 int error; 00654 00655 switch (ec.value()) { 00656 case boost::asio::error::eof: 00657 error = -ENOTCONN; 00658 break; 00659 default: 00660 error = -EIO; 00661 } 00662 00663 // give everything we got before the error. 00664 receivestr.assign(recved, numrecvd); 00665 cmd->callback->addData(ICONN_TOKEN_RECEIVE_STRING, receivestr); 00666 cmd->callback->addData(ICMD_ERRNO_STR, ec.message()); 00667 cmd->callback->addData(ICMD_ERRNO, error); 00668 cmd->HandleCompletion(); 00669 return true; 00670 } 00671 } 00672 00673 receivestr.assign(recved, numrecvd); 00674 cmd->callback->addData(ICONN_TOKEN_RECEIVE_STRING, receivestr); 00675 cmd->callback->addData(ICMD_ERRNO, 0); 00676 cmd->HandleCompletion(); 00677 00678 return true; 00679 } 00680 00681 00683 bool CConnectTCPAsio::HandleSend( CAsyncCommand *cmd ) { 00684 00685 boost::system::error_code ec, handlerec; 00686 volatile int result_timer = 0; 00687 size_t bytes; 00688 unsigned long timeout; 00689 struct asyncASIOCompletionHandler write_handler(&bytes, &handlerec); 00690 // timeout setup 00691 ioservice->reset(); 00692 std::string s; 00693 00694 try { 00695 s = boost::any_cast<std::string>(cmd->callback->findData(ICONN_TOKEN_SEND_STRING)); 00696 } 00697 #ifdef DEBUG_TCPASIO 00698 catch (std::invalid_argument e) { 00699 LOGDEBUG(logger, "BUG: required " << ICONN_TOKEN_SEND_STRING << " argument not set"); 00700 00701 } 00702 catch (boost::bad_any_cast e) 00703 { 00704 LOGDEBUG(logger, "Unexpected exception in HandleSend: Bad cast" << e.what()); 00705 } 00706 #else 00707 catch (...); 00708 #endif 00709 00710 try { 00711 timeout = boost::any_cast<unsigned long>(cmd->callback->findData( 00712 ICONN_TOKEN_TIMEOUT)); 00713 } 00714 #ifdef DEBUG_TCPASIO 00715 catch (std::invalid_argument e) { 00716 CConfigHelper cfghelper(ConfigurationPath); 00717 cfghelper.GetConfig("tcptimeout", timeout, 3000UL); 00718 } catch (boost::bad_any_cast e) { 00719 LOGDEBUG(logger, "Unexpected exception in HandleSend: Bad cast" << e.what()); 00720 timeout = TCP_ASIO_DEFAULT_TIMEOUT; 00721 } 00722 #else 00723 catch (...) { 00724 CConfigHelper cfghelper(ConfigurationPath); 00725 cfghelper.GetConfig("tcptimeout", timeout, 3000UL); 00726 } 00727 #endif 00728 00729 deadline_timer timer(*(this->ioservice)); 00730 boost::posix_time::time_duration td = boost::posix_time::millisec(timeout); 00731 timer.expires_from_now(td); 00732 timer.async_wait(boost::bind(&boosthelper_set_result, (int*) &result_timer, 00733 1)); 00734 00735 // socket preparation 00736 // async_write the whole std::string 00737 boost::asio::async_write(*sockt, boost::asio::buffer(s), write_handler); 00738 00739 // run one of the scheduled services 00740 size_t num = ioservice->run_one(ec); 00741 00742 // ioservice error or timeout 00743 if (num == 0 || result_timer) { 00744 timer.cancel(ec); 00745 sockt->cancel(ec); 00746 LOGTRACE(logger,"Async write timeout"); 00747 cmd->callback->addData(ICMD_ERRNO, -ETIMEDOUT); 00748 cmd->HandleCompletion(); 00749 ioservice->poll(); 00750 return true; 00751 } 00752 00753 // cancel the timer, and catch the completion handler 00754 timer.cancel(); 00755 ioservice->poll(ec); 00756 00757 if (*write_handler.ec) { 00758 if (*write_handler.ec != boost::asio::error::eof) { 00759 LOGDEBUG(logger,"Async write failed with ec=" << *write_handler.ec 00760 << " msg="<< write_handler.ec->message()); 00761 cmd->callback->addData(ICMD_ERRNO, -EIO); 00762 cmd->callback->addData(ICMD_ERRNO_STR, write_handler.ec->message()); 00763 } else { 00764 cmd->callback->addData(ICMD_ERRNO, -ENOTCONN); 00765 LOGTRACE(logger, "Received eof on socket write"); 00766 } 00767 cmd->HandleCompletion(); 00768 return true; 00769 } 00770 00771 if (s.length() != *write_handler.bytes) { 00772 LOGDEBUG(logger,"Sent " 00773 << *write_handler.bytes << " but expected "<< s.length() ); 00774 cmd->callback->addData(ICMD_ERRNO, -EIO); 00775 cmd->HandleCompletion(); 00776 return true; 00777 } 00778 00779 cmd->callback->addData(ICMD_ERRNO, 0); 00780 cmd->HandleCompletion(); 00781 return true; 00782 } 00783 00784 #endif /* HAVE_COMMS_ASIOTCPIO */