VinaLC: Parallel Molecular Docking Program

Biochemical and Biophysical Systems Group
VinaLC version: 1.1.2

parallel.h
Go to the documentation of this file.
1 /*
2 
3  Copyright (c) 2006-2010, The Scripps Research Institute
4 
5  Licensed under the Apache License, Version 2.0 (the "License");
6  you may not use this file except in compliance with the License.
7  You may obtain a copy of the License at
8 
9  http://www.apache.org/licenses/LICENSE-2.0
10 
11  Unless required by applicable law or agreed to in writing, software
12  distributed under the License is distributed on an "AS IS" BASIS,
13  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  See the License for the specific language governing permissions and
15  limitations under the License.
16 
17  Author: Dr. Oleg Trott <ot14@columbia.edu>,
18  The Olson Lab,
19  The Scripps Research Institute
20 
21 */
22 
23 #ifndef VINA_PARALLEL_H
24 #define VINA_PARALLEL_H
25 
26 #include <vector>
27 
28 #include "common.h"
29 
30 #include <boost/optional.hpp>
31 #include <boost/thread/thread.hpp>
32 #include <boost/thread/mutex.hpp>
33 #include <boost/thread/condition.hpp>
34 
35 
36 template<typename F, bool Sync = false>
37 struct parallel_for : private boost::thread_group {
38  parallel_for(const F* f, sz num_threads) : m_f(f), destructing(false), size(0), thread_finished(num_threads, true), count_finished(0), num_threads(num_threads) {
39  VINA_FOR(i, num_threads)
40  create_thread(aux(i, this));
41  }
42  void run(sz size_) {
43  boost::mutex::scoped_lock self_lk(self);
44  size = size_;
45  count_finished = 0;
47  thread_finished[i] = false;
48  cond.notify_all(); // many things modified
49  while(count_finished < num_threads) // wait until processing of all elements is thread_finished
50  busy.wait(self_lk);
51  }
52  virtual ~parallel_for() {
53  {
54  boost::mutex::scoped_lock self_lk(self);
55  destructing = true;
56  cond.notify_all(); // destructing modified
57  }
58  join_all();
59  }
60 private:
61  void loop(sz offset) {
62  while(boost::optional<sz> sz_option = get_size(offset)) {
63  sz s = sz_option.get();
64  for(sz i = offset; i < s; i += num_threads)
65  (*m_f)(i);
66  {
67  boost::mutex::scoped_lock self_lk(self);
68  thread_finished[offset] = true;
70  busy.notify_one();
71  }
72  }
73  }
74  struct aux {
77  aux(sz offset, parallel_for* par) : offset(offset), par(par) {}
78  void operator()() const { par->loop(offset); }
79  };
80  const F* m_f; // does not keep a local copy!
81  boost::condition cond;
82  boost::condition busy;
83  bool destructing; // dtor called
84  sz size; // size of the vector given to run()
85  std::vector<bool> thread_finished;
88  boost::mutex self; // any modification or reading of mutables should lock this first
89  boost::optional<sz> get_size(sz offset) {
90  boost::mutex::scoped_lock self_lk(self);
91  while(!destructing && thread_finished[offset])
92  cond.wait(self_lk);
93  if(destructing) return boost::optional<sz>(); // wrap it up!
94  return size;
95  }
96 };
97 
98 template<typename F>
99 struct parallel_for<F, true> : private boost::thread_group {
100  parallel_for(const F* f, sz num_threads) : m_f(f), destructing(false), size(0), started(0), finished(0) {
101  a.par = this; // VC8 warning workaround
102  VINA_FOR(i, num_threads)
103  create_thread(boost::ref(a));
104  }
105  void run(sz size_) {
106  boost::mutex::scoped_lock self_lk(self);
107  size = size_;
108  finished = 0;
109  started = 0;
110  cond.notify_all(); // many things modified
111  while(finished < size) // wait until processing of all elements is finished
112  busy.wait(self_lk);
113  }
114  virtual ~parallel_for() {
115  {
116  boost::mutex::scoped_lock self_lk(self);
117  destructing = true;
118  cond.notify_all(); // destructing modified
119  }
120  join_all();
121  }
122 private:
123  void loop() {
124  while(boost::optional<sz> i = get_next()) {
125  (*m_f)(i.get());
126  {
127  boost::mutex::scoped_lock self_lk(self);
128  ++finished;
129  busy.notify_one();
130  }
131  }
132  }
133  struct aux {
135  aux() : par(NULL) {}
136  void operator()() const { par->loop(); }
137  };
138  aux a;
139  const F* m_f; // does not keep a local copy!
140  boost::condition cond;
141  boost::condition busy;
142  bool destructing; // dtor called
143  sz size; // size of the vector given to run() // FIXME?
144  sz started; // the number of jobs given to run() the work started on
145  sz finished; // the number of jobs given to run() the work finished on
146  boost::mutex self; // any modification or reading of mutables should lock this first
147  boost::optional<sz> get_next() {
148  boost::mutex::scoped_lock self_lk(self);
149  while(!destructing && started >= size)
150  cond.wait(self_lk);
151  if(destructing) return boost::optional<sz>(); // NOTHING
152  sz tmp = started;
153  ++started;
154  return tmp;
155  }
156 };
157 
158 
159 template<typename F, typename Container, typename Input, bool Sync = false>
160 struct parallel_iter {
161  parallel_iter(const F* f, sz num_threads) : a(f), pf(&a, num_threads) {}
162  void run(Container& v) {
163  a.v = &v;
164  pf.run(v.size());
165  }
166 private:
167  struct aux {
168  const F* f;
169  Container* v;
170  aux(const F* f) : f(f), v(NULL) {}
171  void operator()(sz i) const {
172  VINA_CHECK(v);
173  (*f)((*v)[i]);
174  }
175  };
178 };
179 
180 #endif