blob: 023ac148b1b3f137967d9bd4854b6216594442d1 [file] [log] [blame]
dburgess82c46ff2011-10-07 02:40:51 +00001/*
2* Copyright 2008, 2011 Free Software Foundation, Inc.
3*
4* This software is distributed under the terms of the GNU Affero Public License.
5* See the COPYING file in the main directory for details.
6*
7* This use of this software may be subject to additional restrictions.
8* See the LEGAL file in the main directory for details.
9
10 This program is free software: you can redistribute it and/or modify
11 it under the terms of the GNU Affero General Public License as published by
12 the Free Software Foundation, either version 3 of the License, or
13 (at your option) any later version.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU Affero General Public License for more details.
19
20 You should have received a copy of the GNU Affero General Public License
21 along with this program. If not, see <http://www.gnu.org/licenses/>.
22
23*/
24
25
26#ifndef INTERTHREAD_H
27#define INTERTHREAD_H
28
29#include "Timeval.h"
30#include "Threads.h"
31#include "LinkedLists.h"
32#include <map>
33#include <vector>
34#include <queue>
35
36
37
38
39
40/**@defgroup Templates for interthread mechanisms. */
41//@{
42
43
44/** Pointer FIFO for interthread operations. */
45template <class T> class InterthreadQueue {
46
47 protected:
48
49 PointerFIFO mQ;
50 mutable Mutex mLock;
51 mutable Signal mWriteSignal;
52
53
54 public:
55
56 /** Delete contents. */
57 void clear()
58 {
59 ScopedLock lock(mLock);
60 while (mQ.size()>0) delete (T*)mQ.get();
61 }
62
63 /** Empty the queue, but don't delete. */
64 void flushNoDelete()
65 {
66 ScopedLock lock(mLock);
67 while (mQ.size()>0) mQ.get();
68 }
69
70
71 ~InterthreadQueue()
72 { clear(); }
73
74
75 size_t size() const
76 {
77 ScopedLock lock(mLock);
78 return mQ.size();
79 }
80
81 /**
82 Blocking read.
83 @return Pointer to object (will not be NULL).
84 */
85 T* read()
86 {
87 ScopedLock lock(mLock);
88 T* retVal = (T*)mQ.get();
89 while (retVal==NULL) {
90 mWriteSignal.wait(mLock);
91 retVal = (T*)mQ.get();
92 }
93 return retVal;
94 }
95
96 /**
97 Blocking read with a timeout.
98 @param timeout The read timeout in ms.
99 @return Pointer to object or NULL on timeout.
100 */
101 T* read(unsigned timeout)
102 {
103 if (timeout==0) return readNoBlock();
104 Timeval waitTime(timeout);
105 ScopedLock lock(mLock);
106 while ((mQ.size()==0) && (!waitTime.passed()))
107 mWriteSignal.wait(mLock,waitTime.remaining());
108 T* retVal = (T*)mQ.get();
109 return retVal;
110 }
111
112 /**
113 Non-blocking read.
114 @return Pointer to object or NULL if FIFO is empty.
115 */
116 T* readNoBlock()
117 {
118 ScopedLock lock(mLock);
119 return (T*)mQ.get();
120 }
121
122 /** Non-blocking write. */
123 void write(T* val)
124 {
125 ScopedLock lock(mLock);
126 mQ.put(val);
127 mWriteSignal.signal();
128 }
129
130
131};
132
133
134
135/** Pointer FIFO for interthread operations. */
136template <class T> class InterthreadQueueWithWait {
137
138 protected:
139
140 PointerFIFO mQ;
141 mutable Mutex mLock;
142 mutable Signal mWriteSignal;
143 mutable Signal mReadSignal;
144
145 virtual void freeElement(T* element) const { delete element; };
146
147 public:
148
149 /** Delete contents. */
150 void clear()
151 {
152 ScopedLock lock(mLock);
153 while (mQ.size()>0) freeElement((T*)mQ.get());
154 mReadSignal.signal();
155 }
156
157
158
159 virtual ~InterthreadQueueWithWait()
160 { clear(); }
161
162
163 size_t size() const
164 {
165 ScopedLock lock(mLock);
166 return mQ.size();
167 }
168
169 /**
170 Blocking read.
171 @return Pointer to object (will not be NULL).
172 */
173 T* read()
174 {
175 ScopedLock lock(mLock);
176 T* retVal = (T*)mQ.get();
177 while (retVal==NULL) {
178 mWriteSignal.wait(mLock);
179 retVal = (T*)mQ.get();
180 }
181 mReadSignal.signal();
182 return retVal;
183 }
184
185 /**
186 Blocking read with a timeout.
187 @param timeout The read timeout in ms.
188 @return Pointer to object or NULL on timeout.
189 */
190 T* read(unsigned timeout)
191 {
192 if (timeout==0) return readNoBlock();
193 Timeval waitTime(timeout);
194 ScopedLock lock(mLock);
195 while ((mQ.size()==0) && (!waitTime.passed()))
196 mWriteSignal.wait(mLock,waitTime.remaining());
197 T* retVal = (T*)mQ.get();
198 if (retVal!=NULL) mReadSignal.signal();
199 return retVal;
200 }
201
202 /**
203 Non-blocking read.
204 @return Pointer to object or NULL if FIFO is empty.
205 */
206 T* readNoBlock()
207 {
208 ScopedLock lock(mLock);
209 T* retVal = (T*)mQ.get();
210 if (retVal!=NULL) mReadSignal.signal();
211 return retVal;
212 }
213
214 /** Non-blocking write. */
215 void write(T* val)
216 {
217 ScopedLock lock(mLock);
218 mQ.put(val);
219 mWriteSignal.signal();
220 }
221
222 /** Wait until the queue falls below a low water mark. */
223 void wait(size_t sz=0)
224 {
225 ScopedLock lock(mLock);
226 while (mQ.size()>sz) mReadSignal.wait(mLock);
227 }
228
229};
230
231
232
233
234
235/** Thread-safe map of pointers to class D, keyed by class K. */
236template <class K, class D > class InterthreadMap {
237
238protected:
239
240 typedef std::map<K,D*> Map;
241 Map mMap;
242 mutable Mutex mLock;
243 Signal mWriteSignal;
244
245public:
246
247 void clear()
248 {
249 // Delete everything in the map.
250 ScopedLock lock(mLock);
251 typename Map::iterator iter = mMap.begin();
252 while (iter != mMap.end()) {
253 delete iter->second;
254 ++iter;
255 }
256 mMap.clear();
257 }
258
259 ~InterthreadMap() { clear(); }
260
261 /**
262 Non-blocking write.
263 @param key The index to write to.
264 @param wData Pointer to data, not to be deleted until removed from the map.
265 */
266 void write(const K &key, D * wData)
267 {
268 ScopedLock lock(mLock);
269 typename Map::iterator iter = mMap.find(key);
270 if (iter!=mMap.end()) {
271 delete iter->second;
272 iter->second = wData;
273 } else {
274 mMap[key] = wData;
275 }
276 mWriteSignal.broadcast();
277 }
278
279 /**
280 Non-blocking read with element removal.
281 @param key Key to read from.
282 @return Pointer at key or NULL if key not found, to be deleted by caller.
283 */
284 D* getNoBlock(const K& key)
285 {
286 ScopedLock lock(mLock);
287 typename Map::iterator iter = mMap.find(key);
288 if (iter==mMap.end()) return NULL;
289 D* retVal = iter->second;
290 mMap.erase(iter);
291 return retVal;
292 }
293
294 /**
295 Blocking read with a timeout and element removal.
296 @param key The key to read from.
297 @param timeout The blocking timeout in ms.
298 @return Pointer at key or NULL on timeout, to be deleted by caller.
299 */
300 D* get(const K &key, unsigned timeout)
301 {
302 if (timeout==0) return getNoBlock(key);
303 Timeval waitTime(timeout);
304 ScopedLock lock(mLock);
305 typename Map::iterator iter = mMap.find(key);
306 while ((iter==mMap.end()) && (!waitTime.passed())) {
307 mWriteSignal.wait(mLock,waitTime.remaining());
308 iter = mMap.find(key);
309 }
310 if (iter==mMap.end()) return NULL;
311 D* retVal = iter->second;
312 mMap.erase(iter);
313 return retVal;
314 }
315
316 /**
317 Blocking read with and element removal.
318 @param key The key to read from.
319 @return Pointer at key, to be deleted by caller.
320 */
321 D* get(const K &key)
322 {
323 ScopedLock lock(mLock);
324 typename Map::iterator iter = mMap.find(key);
325 while (iter==mMap.end()) {
326 mWriteSignal.wait(mLock);
327 iter = mMap.find(key);
328 }
329 D* retVal = iter->second;
330 mMap.erase(iter);
331 return retVal;
332 }
333
334
335 /**
336 Remove an entry and delete it.
337 @param key The key of the entry to delete.
338 @return True if it was actually found and deleted.
339 */
340 bool remove(const K &key )
341 {
342 D* val = getNoBlock(key);
343 if (!val) return false;
344 delete val;
345 return true;
346 }
347
348
349 /**
350 Non-blocking read.
351 @param key Key to read from.
352 @return Pointer at key or NULL if key not found.
353 */
354 D* readNoBlock(const K& key) const
355 {
356 D* retVal=NULL;
357 ScopedLock lock(mLock);
358 typename Map::const_iterator iter = mMap.find(key);
359 if (iter!=mMap.end()) retVal = iter->second;
360 return retVal;
361 }
362
363 /**
364 Blocking read with a timeout.
365 @param key The key to read from.
366 @param timeout The blocking timeout in ms.
367 @return Pointer at key or NULL on timeout.
368 */
369 D* read(const K &key, unsigned timeout) const
370 {
371 if (timeout==0) return readNoBlock(key);
372 ScopedLock lock(mLock);
373 Timeval waitTime(timeout);
374 typename Map::const_iterator iter = mMap.find(key);
375 while ((iter==mMap.end()) && (!waitTime.passed())) {
376 mWriteSignal.wait(mLock,waitTime.remaining());
377 iter = mMap.find(key);
378 }
379 if (iter==mMap.end()) return NULL;
380 D* retVal = iter->second;
381 return retVal;
382 }
383
384 /**
385 Blocking read.
386 @param key The key to read from.
387 @return Pointer at key.
388 */
389 D* read(const K &key) const
390 {
391 ScopedLock lock(mLock);
392 typename Map::const_iterator iter = mMap.find(key);
393 while (iter==mMap.end()) {
394 mWriteSignal.wait(mLock);
395 iter = mMap.find(key);
396 }
397 D* retVal = iter->second;
398 return retVal;
399 }
400
401};
402
403
404
405
406
407
408
409/** This class is used to provide pointer-based comparison in priority_queues. */
410template <class T> class PointerCompare {
411
412 public:
413
414 /** Compare the objects pointed to, not the pointers themselves. */
415 bool operator()(const T *v1, const T *v2)
416 { return (*v1)>(*v2); }
417
418};
419
420
421
422/**
423 Priority queue for interthread operations.
424 Passes pointers to objects.
425*/
426template <class T, class C = std::vector<T*>, class Cmp = PointerCompare<T> > class InterthreadPriorityQueue {
427
428 protected:
429
430 std::priority_queue<T*,C,Cmp> mQ;
431 mutable Mutex mLock;
432 mutable Signal mWriteSignal;
433
434 public:
435
436
437 /** Clear the FIFO. */
438 void clear()
439 {
440 ScopedLock lock(mLock);
441 while (mQ.size()>0) {
442 T* ptr = mQ.top();
443 mQ.pop();
444 delete ptr;
445 }
446 }
447
448
449 ~InterthreadPriorityQueue()
450 {
451 clear();
452 }
453
454 size_t size() const
455 {
456 ScopedLock lock(mLock);
457 return mQ.size();
458 }
459
460
461 /** Non-blocking read. */
462 T* readNoBlock()
463 {
464 ScopedLock lock(mLock);
465 T* retVal = NULL;
466 if (mQ.size()!=0) {
467 retVal = mQ.top();
468 mQ.pop();
469 }
470 return retVal;
471 }
472
473 /** Blocking read. */
474 T* read()
475 {
476 ScopedLock lock(mLock);
477 T* retVal;
478 while (mQ.size()==0) mWriteSignal.wait(mLock);
479 retVal = mQ.top();
480 mQ.pop();
481 return retVal;
482 }
483
484 /** Non-blocking write. */
485 void write(T* val)
486 {
487 ScopedLock lock(mLock);
488 mQ.push(val);
489 mWriteSignal.signal();
490 }
491
492};
493
494
495
496
497
498class Semaphore {
499
500 private:
501
502 bool mFlag;
503 Signal mSignal;
504 mutable Mutex mLock;
505
506 public:
507
508 Semaphore()
509 :mFlag(false)
510 { }
511
512 void post()
513 {
514 ScopedLock lock(mLock);
515 mFlag=true;
516 mSignal.signal();
517 }
518
519 void get()
520 {
521 ScopedLock lock(mLock);
522 while (!mFlag) mSignal.wait(mLock);
523 mFlag=false;
524 }
525
526 bool semtry()
527 {
528 ScopedLock lock(mLock);
529 bool retVal = mFlag;
530 mFlag = false;
531 return retVal;
532 }
533
534};
535
536
537
538
539
540//@}
541
542
543
544
545#endif
546// vim: ts=4 sw=4