votca 2024.1-dev
Loading...
Searching...
No Matches
progressobserver.cc
Go to the documentation of this file.
1/*
2 * Copyright 2009-2020 The VOTCA Development Team
3 * (http://www.votca.org)
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License")
6 *
7 * You may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 */
21
22// Standard includes
23#include <fstream>
24#include <unistd.h>
25
26// Third party includes
27#include <boost/algorithm/string/replace.hpp>
28#include <boost/date_time/posix_time/posix_time.hpp>
29#include <boost/filesystem.hpp>
30#include <boost/format.hpp>
31#include <sys/types.h>
32
33// Local VOTCA includes
34#include "votca/xtp/job.h"
36#include "votca/xtp/qmthread.h"
37
38using boost::format;
39
40namespace votca {
41namespace xtp {
42
43template <typename JobContainer>
46
47 lockThread_.Lock();
48 Job *jobToProc = nullptr;
50 XTP_LOG(Log::error, thread.getLogger())
51 << "Requesting next job" << std::flush;
53 // NEED NEW CHUNK?
54 if (nextjit_ == jobsToProc_.end() && moreJobsAvailable_) {
55 SyncWithProgFile(thread);
56 nextjit_ = jobsToProc_.begin();
57 if (nextjit_ == jobsToProc_.end()) {
58 moreJobsAvailable_ = false;
59 XTP_LOG(Log::error, thread.getLogger())
60 << "Sync did not yield any new jobs." << std::flush;
61 }
62 }
63
64 // JOBS EATEN ALL UP?
65 if (nextjit_ == jobsToProc_.end()) {
66 if (maxJobs_ == startJobsCount_) {
68 << "Next job: ID = - (reached maximum for this process)"
69 << std::flush;
70 } else {
72 << "Next job: ID = - (none available)" << std::flush;
73 }
74 }
75 // TAKE A BITE
76 else {
77 jobToProc = *nextjit_;
78 ++nextjit_;
80 << "Next job: ID = " << jobToProc->getId() << std::flush;
81 }
82
83 if (!thread.isMaverick() && jobToProc != nullptr) {
84 Index idx = jobToProc->getId();
85 Index frac = (jobs_.size() >= 10) ? 10 : jobs_.size();
86 Index rounded = Index(double(jobs_.size()) / double(frac)) * frac;
87 Index tenth = rounded / frac;
88 if (idx % tenth == 0) {
89 double percent = double(idx) / double(rounded) * 100 + 0.5;
90 std::cout << (format("=> [%1$2.0f%%] ") % percent).str() << std::flush;
91 }
92 }
93
94 lockThread_.Unlock();
95 return jobToProc;
96}
97
98template <typename JobContainer>
100 QMThread &thread) {
101 lockThread_.Lock();
102 XTP_LOG(Log::error, thread.getLogger())
103 << "Reporting job results" << std::flush;
104 // RESULTS, TIME, HOST
105 job.UpdateFromResult(res);
106 job.setTime(GenerateTime());
107 job.setHost(GenerateHost());
108 // PRINT PROGRESS BAR
109 jobsReported_ += 1;
110 if (!thread.isMaverick()) {
111 std::cout << std::endl << thread.getLogger() << std::flush;
112 }
113 lockThread_.Unlock();
114 return;
115}
116
117template <typename JobContainer>
119 char host[128];
120 (void)gethostname(host, sizeof host);
121 pid_t pid = getpid();
122 return (format("%1$s:%2$d") % host % pid).str();
123}
124
125template <typename JobContainer>
127 boost::posix_time::ptime now = boost::posix_time::second_clock::local_time();
128 return (format("%1$s") % now.time_of_day()).str();
129}
130
131template <typename JobContainer>
133
134 // INTERPROCESS FILE LOCKING (THREAD LOCK IN ::RequestNextJob)
135 this->LockProgFile(thread);
136
137 std::string progFile = progFile_;
138 std::string progBackFile = progFile_ + "~";
139
140 // LOAD EXTERNAL JOBS FROM SHARED XML & UPDATE INTERNAL JOBS
141 XTP_LOG(Log::info, thread.getLogger())
142 << "Update internal structures from job file" << std::flush;
143 JobContainer jobs_ext = LOAD_JOBS(progFile);
144 UPDATE_JOBS(jobs_ext, jobs_, GenerateHost());
145
146 // GENERATE BACK-UP FOR SHARED XML
147 XTP_LOG(Log::info, thread.getLogger())
148 << "Create job-file back-up" << std::flush;
149 WRITE_JOBS(jobs_, progBackFile);
150
151 // ASSIGN NEW JOBS IF AVAILABLE
152 XTP_LOG(Log::error, thread.getLogger())
153 << "Assign jobs from stack" << std::flush;
154 jobsToProc_.clear();
155
156 Index cacheSize = cacheSize_;
157 while (int(jobsToProc_.size()) < cacheSize) {
158 if (metajit_ == jobs_.end() || startJobsCount_ == maxJobs_) {
159 break;
160 }
161
162 bool startJob = false;
163
164 // Start if job available or restart patterns matched
165 if ((metajit_->isAvailable()) ||
166 (restartMode_ && restart_stats_.count(metajit_->getStatusStr())) ||
167 (restartMode_ && restart_hosts_.count(metajit_->getHost()))) {
168 startJob = true;
169 }
170
171 if (startJob) {
172 metajit_->Reset();
173 metajit_->setStatus("ASSIGNED");
174 metajit_->setHost(GenerateHost());
175 metajit_->setTime(GenerateTime());
176 jobsToProc_.push_back(&*metajit_);
177 startJobsCount_ += 1;
178 }
179
180 ++metajit_;
181 }
182
183 // UPDATE PROGRESS STATUS FILE
184 WRITE_JOBS(jobs_, progFile);
185
186 // RELEASE PROGRESS STATUS FILE
187 this->ReleaseProgFile(thread);
188 return;
189}
190
191template <typename JobContainer>
193 flock_ = std::unique_ptr<boost::interprocess::file_lock>(
194 new boost::interprocess::file_lock(lockFile_.c_str()));
195 flock_->lock_sharable();
196 XTP_LOG(Log::warning, thread.getLogger())
197 << "Imposed lock on " << lockFile_ << std::flush;
198 XTP_LOG(Log::warning, thread.getLogger())
199 << "Sleep ... " << lockFile_ << std::flush;
200 XTP_LOG(Log::warning, thread.getLogger())
201 << "Wake up ... " << lockFile_ << std::flush;
202}
203
204template <typename JobContainer>
206 flock_->unlock_sharable();
207 XTP_LOG(Log::warning, thread.getLogger())
208 << "Releasing " << lockFile_ << ". " << std::flush;
209}
210
211template <typename JobContainer>
213 const boost::program_options::variables_map &optsMap) {
214
215 lockFile_ = optsMap["file"].as<std::string>();
216 cacheSize_ = optsMap["cache"].as<Index>();
217 maxJobs_ = optsMap["maxjobs"].as<Index>();
218 std::string restartPattern = optsMap["restart"].as<std::string>();
219
220 // restartPattern = e.g. host(pckr124:1234) stat(FAILED)
221 boost::algorithm::replace_all(restartPattern, " ", "");
222 if (restartPattern == "") {
223 restartMode_ = false;
224 } else {
225 restartMode_ = true;
226 }
227
228 std::vector<std::string> patterns =
229 tools::Tokenizer(restartPattern, "(,)").ToVector();
230
231 std::string category = "";
232 for (const std::string &pattern : patterns) {
233
234 if (pattern == "host" || pattern == "stat") {
235 category = pattern;
236
237 } else if (category == "host") {
238 restart_hosts_[pattern] = true;
239 } else if (category == "stat") {
240 if (pattern == "ASSIGNED" || pattern == "COMPLETE") {
241 std::cout << "Restart if status == " << pattern
242 << "? Not necessarily a good idea." << std::endl;
243 }
244 restart_stats_[pattern] = true;
245 }
246
247 else {
248 throw std::runtime_error(
249 "Restart pattern ill-defined, format is"
250 "[host([HOSTNAME:PID])] [stat([STATUS])]");
251 }
252 }
253 return;
254}
255
256template <typename JobContainer>
258 QMThread &thread) {
259
260 progFile_ = progFile;
261 jobsReported_ = 0;
262
263 XTP_LOG(Log::error, thread.getLogger())
264 << "Job file = '" << progFile_ << "', ";
265 XTP_LOG(Log::info, thread.getLogger())
266 << "lock file = '" << lockFile_ << "', ";
267 XTP_LOG(Log::error, thread.getLogger())
268 << "cache size = " << cacheSize_ << std::flush;
269
270 XTP_LOG(Log::error, thread.getLogger())
271 << "Initialize jobs from " << progFile << std::flush;
272 XTP_LOG(Log::info, thread.getLogger()) << "Lock & load " << std::flush;
273
274 // LOCK, READ INTO XML
275 this->LockProgFile(thread);
276
277 // ... Clear container
278 jobs_.clear();
279
280 // ... Load new, set availability bool
281 jobs_ = LOAD_JOBS(progFile);
282 metajit_ = jobs_.begin();
283 WRITE_JOBS(jobs_, progFile + "~");
284 XTP_LOG(Log::error, thread.getLogger())
285 << "Registered " << jobs_.size() << " jobs." << std::flush;
286 if (jobs_.size() > 0) {
287 moreJobsAvailable_ = true;
288 } else {
289 moreJobsAvailable_ = false;
290 }
291
292 // SUMMARIZE OBSERVER VARIABLES: RESTART PATTERN, CACHE, LOCK FILE
293 if (restartMode_ && restart_hosts_.size()) {
294 std::string infostr = "Restart if host == ";
295 for (const std::pair<const std::string, bool> &host : restart_hosts_) {
296 infostr += host.first + " ";
297 }
298 XTP_LOG(Log::error, thread.getLogger()) << infostr << std::flush;
299 }
300 if (restartMode_ && restart_stats_.size()) {
301 std::string infostr = "Restart if stat == ";
302 for (const std::pair<const std::string, bool> &host : restart_hosts_) {
303 infostr += host.first + " ";
304 }
305 XTP_LOG(Log::error, thread.getLogger()) << infostr << std::flush;
306 }
307
308 // RELEASE PROGRESS FILE
309 this->ReleaseProgFile(thread);
310 return;
311}
312
313// REGISTER
314template class ProgObserver<std::vector<Job> >;
315
316} // namespace xtp
317} // namespace votca
break string into words
Definition tokenizer.h:72
std::vector< T > ToVector()
store all words in a vector of type T, does type conversion.
Definition tokenizer.h:109
void LockProgFile(QMThread &thread)
typename JobContainer::value_type Job
void InitFromProgFile(std::string progFile, QMThread &thread)
void ReportJobDone(Job &job, Result &res, QMThread &thread)
ProgObserver::Job * RequestNextJob(QMThread &thread)
typename Job::JobResult Result
void ReleaseProgFile(QMThread &thread)
void SyncWithProgFile(QMThread &thread)
void InitCmdLineOpts(const boost::program_options::variables_map &optsMap)
bool isMaverick() const
Definition qmthread.h:53
Logger & getLogger()
Definition qmthread.h:55
#define XTP_LOG(level, log)
Definition logger.h:40
void WRITE_JOBS(const std::vector< Job > &jobs, const std::string &job_file)
Definition job.cc:199
void UPDATE_JOBS(const std::vector< Job > &from, std::vector< Job > &to, const std::string &thisHost)
Definition job.cc:215
std::vector< Job > LOAD_JOBS(const std::string &xml_file)
Definition job.cc:184
base class for all analysis tools
Definition basebead.h:33
Eigen::Index Index
Definition types.h:26