]> ruin.nu Git - germs.git/blob - src/threadgenesorter.cpp
Added progress information for the search
[germs.git] / src / threadgenesorter.cpp
1 /***************************************************************************
2  *   Copyright (C) 2008 by Michael Andreen                                 *
3  *   andreen@student.chalmers.se                                           *
4  *                                                                         *
5  *   This program is free software; you can redistribute it and/or modify  *
6  *   it under the terms of the GNU General Public License as published by  *
7  *   the Free Software Foundation; either version 2 of the License, or     *
8  *   (at your option) any later version.                                   *
9  *                                                                         *
10  *   This program is distributed in the hope that it will be useful,       *
11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
13  *   GNU General Public License for more details.                          *
14  *                                                                         *
15  *   You should have received a copy of the GNU General Public License     *
16  *   along with this program; if not, write to the                         *
17  *   Free Software Foundation, Inc.,                                       *
18  *   51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA          *
19  ***************************************************************************/
20
21 #include "threadgenesorter.h"
22 #include "sortaction.h"
23
24 #include "genealgorithms.h"
25
26 #include <sys/time.h>
27 #include <errno.h>
28
29 #include <set>
30 using namespace std;
31
32 /**
33  * Handles locking and unlocking of mutexes.
34  *
35  * RAII style class for mutex acquisition.
36  */
37 class Mutex {
38         pthread_mutex_t* _m;
39         bool _locked;
40         public:
41                 Mutex(pthread_mutex_t* m){
42                         _m = m;
43                         pthread_mutex_lock(_m);
44                         _locked = true;
45                 }
46                 void unlock(){
47                         pthread_mutex_unlock(_m);
48                         _locked = false;
49                 }
50                 ~Mutex(){
51                         if (_locked)
52                                 unlock();
53                 }
54 };
55
56 struct ExecutionStuff {
57         ExecutionStuff(ThreadGeneSorter* so, const GeneOrder& go): _sorter(so)
58                         ,_go(go){};
59         ThreadGeneSorter* _sorter;
60         GeneOrder _go;
61 };
62
63
64 ThreadGeneSorter::ThreadGeneSorter(const Model& m, int threads) : _model(m){
65         _workers = threads;
66         pthread_mutex_init(&_queuelock, NULL);
67         pthread_mutex_init(&_solutionslock, NULL);
68         pthread_cond_init(&_addedSolution,NULL);
69         pthread_cond_init(&_addedTask,NULL);
70         pthread_cond_init(&_waiting,NULL);
71 }
72
73 void ThreadGeneSorter::join(){
74         pthread_join(_tid,NULL);
75 }
76
77 void ThreadGeneSorter::stop(){
78         Mutex m(&_queuelock);
79         _done = true;
80         pthread_cond_signal(&_waiting);
81 }
82
83 size_t ThreadGeneSorter::size(){
84         Mutex m(&_solutionslock);
85         return _solutions.size();
86 }
87
88 bool ThreadGeneSorter::running(){
89         return !_done;
90 }
91
92 double ThreadGeneSorter::wait(time_t time, size_t solutions){
93
94         timeval now;
95         gettimeofday(&now, NULL);
96         timespec timeout;
97         timeout.tv_sec = now.tv_sec + time;
98         timeout.tv_nsec = 0;
99
100         int err = 0;
101
102         Mutex m(&_solutionslock);
103         size_t n = _solutions.size();
104
105         while (!_done && err != ETIMEDOUT
106                         && _solutions.size() - n < solutions){
107                 err = pthread_cond_timedwait(&_addedSolution,&_solutionslock
108                         , &timeout);
109         }
110         if (_solutions.size() == 0)
111                 return 0.0;
112         return _solutions.top().first;
113 }
114
115 ThreadGeneSorter::SolutionsQueue ThreadGeneSorter::solutions(){
116         //TODO: thread safety..
117         return _solutions;
118 }
119
120 void ThreadGeneSorter::start(const GeneOrder& go){
121         pthread_attr_t tattr;
122         pthread_attr_init(&tattr);
123         pthread_attr_setscope(&tattr, PTHREAD_SCOPE_SYSTEM);
124         ExecutionStuff* es = new ExecutionStuff(this,go);
125         pthread_create(&_tid, &tattr, t_sorter, es);
126 }
127
128 void ThreadGeneSorter::sorter(const GeneOrder& go){
129         _queue.push(SortUnit(0.0,go,ActionList()));
130         pthread_attr_t tattr;
131         pthread_attr_init(&tattr);
132         pthread_attr_setscope(&tattr, PTHREAD_SCOPE_SYSTEM);
133         _done = false;
134         _queue = SortQueue();
135         _solutions = SolutionsQueue();
136
137         queue<pthread_t> threads;
138
139         _queue.push(SortUnit(0.0,go,ActionList()));
140
141         int workers = _workers;
142         while (!_done){
143                 if (workers){
144                         --workers;
145                         pthread_t tid;
146                         pthread_create(&tid, &tattr, t_worker, this);
147                         threads.push(tid);
148                 }else{
149                         Mutex m(&_queuelock);
150                         while(_workers != 0 && !_done){
151                                 pthread_cond_wait(&_waiting,&_queuelock);
152                         }
153                         if (_workers == 0)
154                                 _done = true;
155                 }
156         }
157         pthread_cond_broadcast(&_addedTask);
158         while(threads.size() > 0){
159                 pthread_join(threads.front(),NULL);
160                 threads.pop();
161         }
162         pthread_exit((void*)0);
163 }
164
165 void ThreadGeneSorter::worker(){
166         try{
167         while(!_done){
168                 Mutex m(&_queuelock);
169
170                 while (_queue.size() == 0 && !_done){
171                         --_workers;
172                         pthread_cond_signal(&_waiting);
173                         pthread_cond_wait(&_addedTask,&_queuelock);
174                         ++_workers;
175                 }
176                 if (_done)
177                         break;
178
179                 SortUnit su = _queue.top();
180                 _queue.pop();
181                 m.unlock();
182
183                 size_t dist = inversionDistance(su._go);
184                 if (dist == 0){
185                         Mutex m(&_solutionslock);
186                         _solutions.push(pair<double,ActionList>(su._score,su._al));
187                         pthread_cond_broadcast(&_addedSolution);
188                         continue;;
189                 }
190
191                 ActionList act = safeActions(su._go);
192                 if (act.size() > 0){
193                         set<SortAction> safe(act.begin(), act.end());
194                         for (set<SortAction>::iterator sa = safe.begin(); sa != safe.end(); ++sa){
195                                 GeneOrder go(su._go);
196                                 ActionList al(su._al);
197                                 al.push_back(*sa);
198                                 (*sa)(go);
199                                 double score = su._score + _model.score(*sa,su._go);
200                                 Mutex m(&_queuelock);
201                                 _queue.push(SortUnit(score,go,al));
202                         }
203                         pthread_cond_broadcast(&_addedTask);
204                 } //TODO: Hurdles..
205
206         }
207         }catch(const bad_alloc& e){
208                 _done = true;
209         }
210 }
211
212 void* t_sorter(void* arg){
213         ExecutionStuff* es = static_cast<ExecutionStuff*>(arg);
214         es->_sorter->sorter(es->_go);
215         delete es;
216         pthread_exit((void*)0);
217 }
218
219 void* t_worker(void* arg){
220         ThreadGeneSorter* sorter = static_cast<ThreadGeneSorter*>(arg);
221         sorter->worker();
222         pthread_exit((void*)0);
223 }