VinaLC: Parallel Molecular Docking Program

Biochemical and Biophysical Systems Group
VinaLC version: 1.1.2

23 #ifndef VINA_PARALLEL_H
24 #define VINA_PARALLEL_H
26 #include <vector>
28 #include "common.h"
30 #include <boost/optional.hpp>
31 #include <boost/thread/thread.hpp>
32 #include <boost/thread/mutex.hpp>
33 #include <boost/thread/condition.hpp>
36 template<typename F, bool Sync = false>
37 struct parallel_for : private boost::thread_group {
38  parallel_for(const F* f, sz num_threads) : m_f(f), destructing(false), size(0), thread_finished(num_threads, true), count_finished(0), num_threads(num_threads) {
39  VINA_FOR(i, num_threads)
40  create_thread(aux(i, this));
41  }
42  void run(sz size_) {
43  boost::mutex::scoped_lock self_lk(self);
44  size = size_;
45  count_finished = 0;
47  thread_finished[i] = false;
48  cond.notify_all(); // many things modified
49  while(count_finished < num_threads) // wait until processing of all elements is thread_finished
50  busy.wait(self_lk);
51  }
52  virtual ~parallel_for() {
53  {
54  boost::mutex::scoped_lock self_lk(self);
55  destructing = true;
56  cond.notify_all(); // destructing modified
57  }
58  join_all();
59  }
60 private:
61  void loop(sz offset) {
62  while(boost::optional<sz> sz_option = get_size(offset)) {
63  sz s = sz_option.get();
64  for(sz i = offset; i < s; i += num_threads)
65  (*m_f)(i);
66  {
67  boost::mutex::scoped_lock self_lk(self);
68  thread_finished[offset] = true;
70  busy.notify_one();
71  }
72  }
73  }
74  struct aux {
77  aux(sz offset, parallel_for* par) : offset(offset), par(par) {}
78  void operator()() const { par->loop(offset); }
79  };
80  const F* m_f; // does not keep a local copy!
81  boost::condition cond;
82  boost::condition busy;
83  bool destructing; // dtor called
84  sz size; // size of the vector given to run()
85  std::vector<bool> thread_finished;
88  boost::mutex self; // any modification or reading of mutables should lock this first
89  boost::optional<sz> get_size(sz offset) {
90  boost::mutex::scoped_lock self_lk(self);
91  while(!destructing && thread_finished[offset])
92  cond.wait(self_lk);
93  if(destructing) return boost::optional<sz>(); // wrap it up!
94  return size;
95  }
96 };
98 template<typename F>
99 struct parallel_for<F, true> : private boost::thread_group {
100  parallel_for(const F* f, sz num_threads) : m_f(f), destructing(false), size(0), started(0), finished(0) {
101  a.par = this; // VC8 warning workaround
102  VINA_FOR(i, num_threads)
103  create_thread(boost::ref(a));
104  }
105  void run(sz size_) {
106  boost::mutex::scoped_lock self_lk(self);
107  size = size_;
108  finished = 0;
109  started = 0;
110  cond.notify_all(); // many things modified
111  while(finished < size) // wait until processing of all elements is finished
112  busy.wait(self_lk);
113  }
114  virtual ~parallel_for() {
115  {
116  boost::mutex::scoped_lock self_lk(self);
117  destructing = true;
118  cond.notify_all(); // destructing modified
119  }
120  join_all();
121  }
122 private:
123  void loop() {
124  while(boost::optional<sz> i = get_next()) {
125  (*m_f)(i.get());
126  {
127  boost::mutex::scoped_lock self_lk(self);
128  ++finished;
129  busy.notify_one();
130  }
131  }
132  }
133  struct aux {
135  aux() : par(NULL) {}
136  void operator()() const { par->loop(); }
137  };
138  aux a;
139  const F* m_f; // does not keep a local copy!
140  boost::condition cond;
141  boost::condition busy;
142  bool destructing; // dtor called
143  sz size; // size of the vector given to run() // FIXME?
144  sz started; // the number of jobs given to run() the work started on
145  sz finished; // the number of jobs given to run() the work finished on
146  boost::mutex self; // any modification or reading of mutables should lock this first
147  boost::optional<sz> get_next() {
148  boost::mutex::scoped_lock self_lk(self);
149  while(!destructing && started >= size)
150  cond.wait(self_lk);
151  if(destructing) return boost::optional<sz>(); // NOTHING
152  sz tmp = started;
153  ++started;
154  return tmp;
155  }
156 };
159 template<typename F, typename Container, typename Input, bool Sync = false>
160 struct parallel_iter {
161  parallel_iter(const F* f, sz num_threads) : a(f), pf(&a, num_threads) {}
162  void run(Container& v) {
163  a.v = &v;
165  }
166 private:
167  struct aux {
168  const F* f;
169  Container* v;
170  aux(const F* f) : f(f), v(NULL) {}
171  void operator()(sz i) const {
172  VINA_CHECK(v);
173  (*f)((*v)[i]);
174  }
175  };
178 };
180 #endif