27#include <boost/algorithm/string/replace.hpp>
28#include <boost/date_time/posix_time/posix_time.hpp>
29#include <boost/filesystem.hpp>
30#include <boost/format.hpp>
43template <
typename JobContainer>
48 Job *jobToProc =
nullptr;
51 <<
"Requesting next job" << std::flush;
60 <<
"Sync did not yield any new jobs." << std::flush;
65 if (nextjit_ == jobsToProc_.end()) {
66 if (maxJobs_ == startJobsCount_) {
68 <<
"Next job: ID = - (reached maximum for this process)"
72 <<
"Next job: ID = - (none available)" << std::flush;
77 jobToProc = *nextjit_;
80 <<
"Next job: ID = " << jobToProc->getId() << std::flush;
83 if (!thread.isMaverick() && jobToProc !=
nullptr) {
84 Index idx = jobToProc->getId();
85 Index frac = (jobs_.size() >= 10) ? 10 : jobs_.size();
86 Index rounded =
Index(
double(jobs_.size()) /
double(frac)) * frac;
87 Index tenth = rounded / frac;
88 if (idx % tenth == 0) {
89 double percent = double(idx) / double(rounded) * 100 + 0.5;
90 std::cout << (format(
"=> [%1$2.0f%%] ") % percent).str() << std::flush;
98template <
typename JobContainer>
103 <<
"Reporting job results" << std::flush;
105 job.UpdateFromResult(res);
110 if (!thread.isMaverick()) {
111 std::cout << std::endl << thread.getLogger() << std::flush;
117template <
typename JobContainer>
120 (void)gethostname(host,
sizeof host);
121 pid_t pid = getpid();
122 return (format(
"%1$s:%2$d") % host % pid).str();
125template <
typename JobContainer>
127 boost::posix_time::ptime now = boost::posix_time::second_clock::local_time();
128 return (format(
"%1$s") % now.time_of_day()).str();
131template <
typename JobContainer>
138 std::string progBackFile =
progFile_ +
"~";
142 <<
"Update internal structures from job file" << std::flush;
143 JobContainer jobs_ext =
LOAD_JOBS(progFile);
148 <<
"Create job-file back-up" << std::flush;
153 <<
"Assign jobs from stack" << std::flush;
162 bool startJob =
false;
191template <
typename JobContainer>
193 flock_ = std::unique_ptr<boost::interprocess::file_lock>(
194 new boost::interprocess::file_lock(
lockFile_.c_str()));
197 <<
"Imposed lock on " <<
lockFile_ << std::flush;
199 <<
"Sleep ... " <<
lockFile_ << std::flush;
201 <<
"Wake up ... " <<
lockFile_ << std::flush;
204template <
typename JobContainer>
206 flock_->unlock_sharable();
208 <<
"Releasing " <<
lockFile_ <<
". " << std::flush;
211template <
typename JobContainer>
213 const boost::program_options::variables_map &optsMap) {
215 lockFile_ = optsMap[
"file"].as<std::string>();
218 std::string restartPattern = optsMap[
"restart"].as<std::string>();
221 boost::algorithm::replace_all(restartPattern,
" ",
"");
222 if (restartPattern ==
"") {
228 std::vector<std::string> patterns =
231 std::string category =
"";
232 for (
const std::string &pattern : patterns) {
234 if (pattern ==
"host" || pattern ==
"stat") {
237 }
else if (category ==
"host") {
239 }
else if (category ==
"stat") {
240 if (pattern ==
"ASSIGNED" || pattern ==
"COMPLETE") {
241 std::cout <<
"Restart if status == " << pattern
242 <<
"? Not necessarily a good idea." << std::endl;
248 throw std::runtime_error(
249 "Restart pattern ill-defined, format is"
250 "[host([HOSTNAME:PID])] [stat([STATUS])]");
256template <
typename JobContainer>
266 <<
"lock file = '" <<
lockFile_ <<
"', ";
268 <<
"cache size = " <<
cacheSize_ << std::flush;
271 <<
"Initialize jobs from " << progFile << std::flush;
285 <<
"Registered " <<
jobs_.size() <<
" jobs." << std::flush;
286 if (
jobs_.size() > 0) {
294 std::string infostr =
"Restart if host == ";
295 for (
const std::pair<const std::string, bool> &host :
restart_hosts_) {
296 infostr += host.first +
" ";
301 std::string infostr =
"Restart if stat == ";
302 for (
const std::pair<const std::string, bool> &host :
restart_hosts_) {
303 infostr += host.first +
" ";
void LockProgFile(QMThread &thread)
typename JobContainer::value_type Job
std::vector< Job * > jobsToProc_
void InitFromProgFile(std::string progFile, QMThread &thread)
void ReportJobDone(Job &job, Result &res, QMThread &thread)
ProgObserver::Job * RequestNextJob(QMThread &thread)
std::string GenerateTime()
typename Job::JobResult Result
std::map< std::string, bool > restart_stats_
std::map< std::string, bool > restart_hosts_
std::string GenerateHost()
void ReleaseProgFile(QMThread &thread)
void SyncWithProgFile(QMThread &thread)
void InitCmdLineOpts(const boost::program_options::variables_map &optsMap)
std::unique_ptr< boost::interprocess::file_lock > flock_
#define XTP_LOG(level, log)
Charge transport classes.
void WRITE_JOBS(const std::vector< Job > &jobs, const std::string &job_file)
void UPDATE_JOBS(const std::vector< Job > &from, std::vector< Job > &to, const std::string &thisHost)
std::vector< Job > LOAD_JOBS(const std::string &xml_file)
base class for all analysis tools