27#include <boost/algorithm/string/replace.hpp>
28#include <boost/date_time/posix_time/posix_time.hpp>
29#include <boost/filesystem.hpp>
30#include <boost/format.hpp>
43template <
typename JobContainer>
48 Job *jobToProc =
nullptr;
51 <<
"Requesting next job" << std::flush;
54 if (nextjit_ == jobsToProc_.end() && moreJobsAvailable_) {
55 SyncWithProgFile(thread);
56 nextjit_ = jobsToProc_.begin();
57 if (nextjit_ == jobsToProc_.end()) {
58 moreJobsAvailable_ =
false;
60 <<
"Sync did not yield any new jobs." << std::flush;
65 if (nextjit_ == jobsToProc_.end()) {
66 if (maxJobs_ == startJobsCount_) {
68 <<
"Next job: ID = - (reached maximum for this process)"
72 <<
"Next job: ID = - (none available)" << std::flush;
77 jobToProc = *nextjit_;
80 <<
"Next job: ID = " << jobToProc->getId() << std::flush;
83 if (!thread.
isMaverick() && jobToProc !=
nullptr) {
84 Index idx = jobToProc->getId();
85 Index frac = (jobs_.size() >= 10) ? 10 : jobs_.size();
86 Index rounded =
Index(
double(jobs_.size()) /
double(frac)) * frac;
87 Index tenth = rounded / frac;
88 if (idx % tenth == 0) {
89 double percent = double(idx) / double(rounded) * 100 + 0.5;
90 std::cout << (format(
"=> [%1$2.0f%%] ") % percent).str() << std::flush;
98template <
typename JobContainer>
103 <<
"Reporting job results" << std::flush;
105 job.UpdateFromResult(res);
106 job.setTime(GenerateTime());
107 job.setHost(GenerateHost());
110 if (!thread.isMaverick()) {
111 std::cout << std::endl << thread.getLogger() << std::flush;
113 lockThread_.Unlock();
117template <
typename JobContainer>
120 (void)gethostname(host,
sizeof host);
121 pid_t pid = getpid();
122 return (format(
"%1$s:%2$d") % host % pid).str();
125template <
typename JobContainer>
127 boost::posix_time::ptime now = boost::posix_time::second_clock::local_time();
128 return (format(
"%1$s") % now.time_of_day()).str();
131template <
typename JobContainer>
135 this->LockProgFile(thread);
137 std::string progFile = progFile_;
138 std::string progBackFile = progFile_ +
"~";
142 <<
"Update internal structures from job file" << std::flush;
143 JobContainer jobs_ext =
LOAD_JOBS(progFile);
148 <<
"Create job-file back-up" << std::flush;
153 <<
"Assign jobs from stack" << std::flush;
156 Index cacheSize = cacheSize_;
157 while (
int(jobsToProc_.size()) < cacheSize) {
158 if (metajit_ == jobs_.end() || startJobsCount_ == maxJobs_) {
162 bool startJob =
false;
165 if ((metajit_->isAvailable()) ||
166 (restartMode_ && restart_stats_.count(metajit_->getStatusStr())) ||
167 (restartMode_ && restart_hosts_.count(metajit_->getHost()))) {
173 metajit_->setStatus(
"ASSIGNED");
174 metajit_->setHost(GenerateHost());
175 metajit_->setTime(GenerateTime());
176 jobsToProc_.push_back(&*metajit_);
177 startJobsCount_ += 1;
187 this->ReleaseProgFile(thread);
191template <
typename JobContainer>
193 flock_ = std::unique_ptr<boost::interprocess::file_lock>(
194 new boost::interprocess::file_lock(lockFile_.c_str()));
195 flock_->lock_sharable();
197 <<
"Imposed lock on " << lockFile_ << std::flush;
199 <<
"Sleep ... " << lockFile_ << std::flush;
201 <<
"Wake up ... " << lockFile_ << std::flush;
204template <
typename JobContainer>
206 flock_->unlock_sharable();
208 <<
"Releasing " << lockFile_ <<
". " << std::flush;
211template <
typename JobContainer>
213 const boost::program_options::variables_map &optsMap) {
215 lockFile_ = optsMap[
"file"].as<std::string>();
216 cacheSize_ = optsMap[
"cache"].as<
Index>();
217 maxJobs_ = optsMap[
"maxjobs"].as<
Index>();
218 std::string restartPattern = optsMap[
"restart"].as<std::string>();
221 boost::algorithm::replace_all(restartPattern,
" ",
"");
222 if (restartPattern ==
"") {
223 restartMode_ =
false;
228 std::vector<std::string> patterns =
231 std::string category =
"";
232 for (
const std::string &pattern : patterns) {
234 if (pattern ==
"host" || pattern ==
"stat") {
237 }
else if (category ==
"host") {
238 restart_hosts_[pattern] =
true;
239 }
else if (category ==
"stat") {
240 if (pattern ==
"ASSIGNED" || pattern ==
"COMPLETE") {
241 std::cout <<
"Restart if status == " << pattern
242 <<
"? Not necessarily a good idea." << std::endl;
244 restart_stats_[pattern] =
true;
248 throw std::runtime_error(
249 "Restart pattern ill-defined, format is"
250 "[host([HOSTNAME:PID])] [stat([STATUS])]");
256template <
typename JobContainer>
260 progFile_ = progFile;
264 <<
"Job file = '" << progFile_ <<
"', ";
266 <<
"lock file = '" << lockFile_ <<
"', ";
268 <<
"cache size = " << cacheSize_ << std::flush;
271 <<
"Initialize jobs from " << progFile << std::flush;
275 this->LockProgFile(thread);
282 metajit_ = jobs_.begin();
285 <<
"Registered " << jobs_.size() <<
" jobs." << std::flush;
286 if (jobs_.size() > 0) {
287 moreJobsAvailable_ =
true;
289 moreJobsAvailable_ =
false;
293 if (restartMode_ && restart_hosts_.size()) {
294 std::string infostr =
"Restart if host == ";
295 for (
const std::pair<const std::string, bool> &host : restart_hosts_) {
296 infostr += host.first +
" ";
300 if (restartMode_ && restart_stats_.size()) {
301 std::string infostr =
"Restart if stat == ";
302 for (
const std::pair<const std::string, bool> &host : restart_hosts_) {
303 infostr += host.first +
" ";
309 this->ReleaseProgFile(thread);
void LockProgFile(QMThread &thread)
typename JobContainer::value_type Job
void InitFromProgFile(std::string progFile, QMThread &thread)
void ReportJobDone(Job &job, Result &res, QMThread &thread)
ProgObserver::Job * RequestNextJob(QMThread &thread)
std::string GenerateTime()
typename Job::JobResult Result
std::string GenerateHost()
void ReleaseProgFile(QMThread &thread)
void SyncWithProgFile(QMThread &thread)
void InitCmdLineOpts(const boost::program_options::variables_map &optsMap)
#define XTP_LOG(level, log)
void WRITE_JOBS(const std::vector< Job > &jobs, const std::string &job_file)
void UPDATE_JOBS(const std::vector< Job > &from, std::vector< Job > &to, const std::string &thisHost)
std::vector< Job > LOAD_JOBS(const std::string &xml_file)
base class for all analysis tools