blob: 42e6f7fb1c83a7cd81edd6838bd8d00f447d9781 [file] [log] [blame]
dburgess82c46ff2011-10-07 02:40:51 +00001/*
2* Copyright 2008, 2011 Free Software Foundation, Inc.
3*
4* This software is distributed under the terms of the GNU Affero Public License.
5* See the COPYING file in the main directory for details.
6*
7* This use of this software may be subject to additional restrictions.
8* See the LEGAL file in the main directory for details.
9
10 This program is free software: you can redistribute it and/or modify
11 it under the terms of the GNU Affero General Public License as published by
12 the Free Software Foundation, either version 3 of the License, or
13 (at your option) any later version.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU Affero General Public License for more details.
19
20 You should have received a copy of the GNU Affero General Public License
21 along with this program. If not, see <http://www.gnu.org/licenses/>.
22
23*/
24
25
26#ifndef INTERTHREAD_H
27#define INTERTHREAD_H
28
29#include "Timeval.h"
30#include "Threads.h"
31#include "LinkedLists.h"
32#include <map>
33#include <vector>
34#include <queue>
35
36
37
38
39
40/**@defgroup Templates for interthread mechanisms. */
41//@{
42
43
44/** Pointer FIFO for interthread operations. */
kurtis.heimerl5a872472013-05-31 21:47:25 +000045// (pat) The elements in the queue are type T*, and
46// the Fifo class implements the underlying queue.
47// The default is class PointerFIFO, which does not place any restrictions on the type of T,
48// and is implemented by allocating auxilliary structures for the queue,
49// or SingleLinkedList, which implements the queue using an internal pointer in type T,
50// which must implement the functional interface of class SingleLinkListNode,
51// namely: functions T*next() and void setNext(T*).
52template <class T, class Fifo=PointerFIFO> class InterthreadQueue {
dburgess82c46ff2011-10-07 02:40:51 +000053
54 protected:
55
kurtis.heimerl5a872472013-05-31 21:47:25 +000056 Fifo mQ;
dburgess82c46ff2011-10-07 02:40:51 +000057 mutable Mutex mLock;
58 mutable Signal mWriteSignal;
59
dburgess82c46ff2011-10-07 02:40:51 +000060 public:
61
62 /** Delete contents. */
63 void clear()
64 {
65 ScopedLock lock(mLock);
66 while (mQ.size()>0) delete (T*)mQ.get();
67 }
68
69 /** Empty the queue, but don't delete. */
70 void flushNoDelete()
71 {
72 ScopedLock lock(mLock);
73 while (mQ.size()>0) mQ.get();
74 }
75
76
77 ~InterthreadQueue()
78 { clear(); }
79
80
81 size_t size() const
82 {
83 ScopedLock lock(mLock);
84 return mQ.size();
85 }
86
kurtis.heimerl5a872472013-05-31 21:47:25 +000087 size_t totalSize() const // pat added
88 {
89 ScopedLock lock(mLock);
90 return mQ.totalSize();
91 }
92
dburgess82c46ff2011-10-07 02:40:51 +000093 /**
94 Blocking read.
95 @return Pointer to object (will not be NULL).
96 */
97 T* read()
98 {
99 ScopedLock lock(mLock);
100 T* retVal = (T*)mQ.get();
101 while (retVal==NULL) {
102 mWriteSignal.wait(mLock);
103 retVal = (T*)mQ.get();
104 }
105 return retVal;
106 }
107
kurtis.heimerl5a872472013-05-31 21:47:25 +0000108 /** Non-blocking peek at the first element; returns NULL if empty. */
109 T* front()
110 {
111 ScopedLock lock(mLock);
112 return (T*) mQ.front();
113 }
114
dburgess82c46ff2011-10-07 02:40:51 +0000115 /**
116 Blocking read with a timeout.
dburgess82c46ff2011-10-07 02:40:51 +0000117 @param timeout The read timeout in ms.
118 @return Pointer to object or NULL on timeout.
119 */
120 T* read(unsigned timeout)
121 {
122 if (timeout==0) return readNoBlock();
123 Timeval waitTime(timeout);
124 ScopedLock lock(mLock);
125 while ((mQ.size()==0) && (!waitTime.passed()))
126 mWriteSignal.wait(mLock,waitTime.remaining());
127 T* retVal = (T*)mQ.get();
128 return retVal;
129 }
130
131 /**
132 Non-blocking read.
133 @return Pointer to object or NULL if FIFO is empty.
134 */
135 T* readNoBlock()
136 {
137 ScopedLock lock(mLock);
138 return (T*)mQ.get();
139 }
140
141 /** Non-blocking write. */
142 void write(T* val)
143 {
144 ScopedLock lock(mLock);
145 mQ.put(val);
146 mWriteSignal.signal();
147 }
148
kurtis.heimerl5a872472013-05-31 21:47:25 +0000149 /** Non-block write to the front of the queue. */
150 void write_front(T* val) // pat added
151 {
152 ScopedLock lock(mLock);
153 mQ.push_front(val);
154 mWriteSignal.signal();
155 }
156};
dburgess82c46ff2011-10-07 02:40:51 +0000157
kurtis.heimerl5a872472013-05-31 21:47:25 +0000158// (pat) Identical to above but with the threading problem fixed.
159template <class T, class Fifo=PointerFIFO> class InterthreadQueue2 {
160
161 protected:
162
163 Fifo mQ;
164 mutable Mutex mLock;
165 mutable Signal mWriteSignal;
166
167 public:
168
169 /** Delete contents. */
170 void clear()
171 {
172 ScopedLock lock(mLock);
173 while (mQ.size()>0) delete (T*)mQ.get();
174 }
175
176 /** Empty the queue, but don't delete. */
177 void flushNoDelete()
178 {
179 ScopedLock lock(mLock);
180 while (mQ.size()>0) mQ.get();
181 }
182
183
184 ~InterthreadQueue2()
185 { clear(); }
186
187
188 size_t size() const
189 {
190 ScopedLock lock(mLock);
191 return mQ.size();
192 }
193
194 size_t totalSize() const // pat added
195 {
196 ScopedLock lock(mLock);
197 return mQ.totalSize();
198 }
199
200 /**
201 Blocking read.
202 @return Pointer to object (will not be NULL).
203 */
204 T* read()
205 {
206 ScopedLock lock(mLock);
207 T* retVal = (T*)mQ.get();
208 while (retVal==NULL) {
209 mWriteSignal.wait(mLock);
210 retVal = (T*)mQ.get();
211 }
212 return retVal;
213 }
214
215 /** Non-blocking peek at the first element; returns NULL if empty. */
216 T* front()
217 {
218 ScopedLock lock(mLock);
219 return (T*) mQ.front();
220 }
221
222 /**
223 Blocking read with a timeout.
224 @param timeout The read timeout in ms.
225 @return Pointer to object or NULL on timeout.
226 */
227 T* read(unsigned timeout)
228 {
229 if (timeout==0) return readNoBlock();
230 Timeval waitTime(timeout);
231 ScopedLock lock(mLock);
232 while ((mQ.size()==0) && (!waitTime.passed()))
233 mWriteSignal.wait(mLock,waitTime.remaining());
234 T* retVal = (T*)mQ.get();
235 return retVal;
236 }
237
238 /**
239 Non-blocking read.
240 @return Pointer to object or NULL if FIFO is empty.
241 */
242 T* readNoBlock()
243 {
244 ScopedLock lock(mLock);
245 return (T*)mQ.get();
246 }
247
248 /** Non-blocking write. */
249 void write(T* val)
250 {
251 // (pat) The Mutex mLock must be released before signaling the mWriteSignal condition.
252 // This is an implicit requirement of pthread_cond_wait() called from signal().
253 // If you do not do that, the InterthreadQueue read() function cannot start
254 // because the mutex is still locked by the thread calling the write(),
255 // so the read() thread yields its immediate execution opportunity.
256 // This recurs (and the InterthreadQueue fills up with data)
257 // until the read thread's accumulated temporary priority causes it to
258 // get a second pre-emptive activation over the writing thread,
259 // resulting in bursts of activity by the read thread.
260 { ScopedLock lock(mLock);
261 mQ.put(val);
262 }
263 mWriteSignal.signal();
264 }
265
266 /** Non-block write to the front of the queue. */
267 void write_front(T* val) // pat added
268 {
269 // (pat) See comments above.
270 { ScopedLock lock(mLock);
271 mQ.push_front(val);
272 }
273 mWriteSignal.signal();
274 }
dburgess82c46ff2011-10-07 02:40:51 +0000275};
276
277
278
279/** Pointer FIFO for interthread operations. */
280template <class T> class InterthreadQueueWithWait {
281
282 protected:
283
284 PointerFIFO mQ;
285 mutable Mutex mLock;
286 mutable Signal mWriteSignal;
287 mutable Signal mReadSignal;
288
289 virtual void freeElement(T* element) const { delete element; };
290
291 public:
292
293 /** Delete contents. */
294 void clear()
295 {
296 ScopedLock lock(mLock);
297 while (mQ.size()>0) freeElement((T*)mQ.get());
298 mReadSignal.signal();
299 }
300
301
302
303 virtual ~InterthreadQueueWithWait()
304 { clear(); }
305
306
307 size_t size() const
308 {
309 ScopedLock lock(mLock);
310 return mQ.size();
311 }
312
313 /**
314 Blocking read.
315 @return Pointer to object (will not be NULL).
316 */
317 T* read()
318 {
319 ScopedLock lock(mLock);
320 T* retVal = (T*)mQ.get();
321 while (retVal==NULL) {
322 mWriteSignal.wait(mLock);
323 retVal = (T*)mQ.get();
324 }
325 mReadSignal.signal();
326 return retVal;
327 }
328
329 /**
330 Blocking read with a timeout.
331 @param timeout The read timeout in ms.
332 @return Pointer to object or NULL on timeout.
333 */
334 T* read(unsigned timeout)
335 {
336 if (timeout==0) return readNoBlock();
337 Timeval waitTime(timeout);
338 ScopedLock lock(mLock);
339 while ((mQ.size()==0) && (!waitTime.passed()))
340 mWriteSignal.wait(mLock,waitTime.remaining());
341 T* retVal = (T*)mQ.get();
342 if (retVal!=NULL) mReadSignal.signal();
343 return retVal;
344 }
345
346 /**
347 Non-blocking read.
348 @return Pointer to object or NULL if FIFO is empty.
349 */
350 T* readNoBlock()
351 {
352 ScopedLock lock(mLock);
353 T* retVal = (T*)mQ.get();
354 if (retVal!=NULL) mReadSignal.signal();
355 return retVal;
356 }
357
358 /** Non-blocking write. */
359 void write(T* val)
360 {
kurtis.heimerl5a872472013-05-31 21:47:25 +0000361 // (pat) 8-14: Taking out the threading problem fix temporarily for David to use in the field.
dburgess82c46ff2011-10-07 02:40:51 +0000362 ScopedLock lock(mLock);
363 mQ.put(val);
364 mWriteSignal.signal();
365 }
366
367 /** Wait until the queue falls below a low water mark. */
kurtis.heimerl5a872472013-05-31 21:47:25 +0000368 // (pat) This function suffers from the same problem as documented
369 // at InterthreadQueue.write(), but I am not fixing it because I cannot test it.
370 // The caller of this function will eventually get to run, just not immediately
371 // after the mReadSignal condition is fulfilled.
dburgess82c46ff2011-10-07 02:40:51 +0000372 void wait(size_t sz=0)
373 {
374 ScopedLock lock(mLock);
375 while (mQ.size()>sz) mReadSignal.wait(mLock);
376 }
377
378};
379
380
381
382
383
384/** Thread-safe map of pointers to class D, keyed by class K. */
385template <class K, class D > class InterthreadMap {
386
387protected:
388
389 typedef std::map<K,D*> Map;
390 Map mMap;
391 mutable Mutex mLock;
392 Signal mWriteSignal;
393
394public:
395
396 void clear()
397 {
398 // Delete everything in the map.
399 ScopedLock lock(mLock);
400 typename Map::iterator iter = mMap.begin();
401 while (iter != mMap.end()) {
402 delete iter->second;
403 ++iter;
404 }
405 mMap.clear();
406 }
407
408 ~InterthreadMap() { clear(); }
409
410 /**
411 Non-blocking write.
412 @param key The index to write to.
413 @param wData Pointer to data, not to be deleted until removed from the map.
414 */
415 void write(const K &key, D * wData)
416 {
417 ScopedLock lock(mLock);
418 typename Map::iterator iter = mMap.find(key);
419 if (iter!=mMap.end()) {
420 delete iter->second;
421 iter->second = wData;
422 } else {
423 mMap[key] = wData;
424 }
425 mWriteSignal.broadcast();
426 }
427
428 /**
429 Non-blocking read with element removal.
430 @param key Key to read from.
431 @return Pointer at key or NULL if key not found, to be deleted by caller.
432 */
433 D* getNoBlock(const K& key)
434 {
435 ScopedLock lock(mLock);
436 typename Map::iterator iter = mMap.find(key);
437 if (iter==mMap.end()) return NULL;
438 D* retVal = iter->second;
439 mMap.erase(iter);
440 return retVal;
441 }
442
443 /**
444 Blocking read with a timeout and element removal.
445 @param key The key to read from.
446 @param timeout The blocking timeout in ms.
447 @return Pointer at key or NULL on timeout, to be deleted by caller.
448 */
449 D* get(const K &key, unsigned timeout)
450 {
451 if (timeout==0) return getNoBlock(key);
452 Timeval waitTime(timeout);
453 ScopedLock lock(mLock);
454 typename Map::iterator iter = mMap.find(key);
455 while ((iter==mMap.end()) && (!waitTime.passed())) {
456 mWriteSignal.wait(mLock,waitTime.remaining());
457 iter = mMap.find(key);
458 }
459 if (iter==mMap.end()) return NULL;
460 D* retVal = iter->second;
461 mMap.erase(iter);
462 return retVal;
463 }
464
465 /**
466 Blocking read with and element removal.
467 @param key The key to read from.
468 @return Pointer at key, to be deleted by caller.
469 */
470 D* get(const K &key)
471 {
472 ScopedLock lock(mLock);
473 typename Map::iterator iter = mMap.find(key);
474 while (iter==mMap.end()) {
475 mWriteSignal.wait(mLock);
476 iter = mMap.find(key);
477 }
478 D* retVal = iter->second;
479 mMap.erase(iter);
480 return retVal;
481 }
482
483
484 /**
485 Remove an entry and delete it.
486 @param key The key of the entry to delete.
487 @return True if it was actually found and deleted.
488 */
489 bool remove(const K &key )
490 {
491 D* val = getNoBlock(key);
492 if (!val) return false;
493 delete val;
494 return true;
495 }
496
497
498 /**
499 Non-blocking read.
500 @param key Key to read from.
501 @return Pointer at key or NULL if key not found.
502 */
503 D* readNoBlock(const K& key) const
504 {
505 D* retVal=NULL;
506 ScopedLock lock(mLock);
507 typename Map::const_iterator iter = mMap.find(key);
508 if (iter!=mMap.end()) retVal = iter->second;
509 return retVal;
510 }
511
512 /**
513 Blocking read with a timeout.
514 @param key The key to read from.
515 @param timeout The blocking timeout in ms.
516 @return Pointer at key or NULL on timeout.
517 */
518 D* read(const K &key, unsigned timeout) const
519 {
520 if (timeout==0) return readNoBlock(key);
521 ScopedLock lock(mLock);
522 Timeval waitTime(timeout);
523 typename Map::const_iterator iter = mMap.find(key);
524 while ((iter==mMap.end()) && (!waitTime.passed())) {
525 mWriteSignal.wait(mLock,waitTime.remaining());
526 iter = mMap.find(key);
527 }
528 if (iter==mMap.end()) return NULL;
529 D* retVal = iter->second;
530 return retVal;
531 }
532
533 /**
534 Blocking read.
535 @param key The key to read from.
536 @return Pointer at key.
537 */
538 D* read(const K &key) const
539 {
540 ScopedLock lock(mLock);
541 typename Map::const_iterator iter = mMap.find(key);
542 while (iter==mMap.end()) {
543 mWriteSignal.wait(mLock);
544 iter = mMap.find(key);
545 }
546 D* retVal = iter->second;
547 return retVal;
548 }
549
550};
551
552
553
554
555
556
557
558/** This class is used to provide pointer-based comparison in priority_queues. */
559template <class T> class PointerCompare {
560
561 public:
562
563 /** Compare the objects pointed to, not the pointers themselves. */
564 bool operator()(const T *v1, const T *v2)
565 { return (*v1)>(*v2); }
566
567};
568
569
570
571/**
572 Priority queue for interthread operations.
573 Passes pointers to objects.
574*/
575template <class T, class C = std::vector<T*>, class Cmp = PointerCompare<T> > class InterthreadPriorityQueue {
576
577 protected:
578
579 std::priority_queue<T*,C,Cmp> mQ;
580 mutable Mutex mLock;
581 mutable Signal mWriteSignal;
582
583 public:
584
585
586 /** Clear the FIFO. */
587 void clear()
588 {
589 ScopedLock lock(mLock);
590 while (mQ.size()>0) {
591 T* ptr = mQ.top();
592 mQ.pop();
593 delete ptr;
594 }
595 }
596
597
598 ~InterthreadPriorityQueue()
599 {
600 clear();
601 }
602
603 size_t size() const
604 {
605 ScopedLock lock(mLock);
606 return mQ.size();
607 }
608
609
610 /** Non-blocking read. */
611 T* readNoBlock()
612 {
613 ScopedLock lock(mLock);
614 T* retVal = NULL;
615 if (mQ.size()!=0) {
616 retVal = mQ.top();
617 mQ.pop();
618 }
619 return retVal;
620 }
621
622 /** Blocking read. */
623 T* read()
624 {
625 ScopedLock lock(mLock);
626 T* retVal;
627 while (mQ.size()==0) mWriteSignal.wait(mLock);
628 retVal = mQ.top();
629 mQ.pop();
630 return retVal;
631 }
632
633 /** Non-blocking write. */
634 void write(T* val)
635 {
kurtis.heimerl5a872472013-05-31 21:47:25 +0000636 // (pat) 8-14: Taking out the threading problem fix temporarily for David to use in the field.
dburgess82c46ff2011-10-07 02:40:51 +0000637 ScopedLock lock(mLock);
638 mQ.push(val);
639 mWriteSignal.signal();
640 }
641
642};
643
644
645
646
647
648class Semaphore {
649
650 private:
651
652 bool mFlag;
653 Signal mSignal;
654 mutable Mutex mLock;
655
656 public:
657
658 Semaphore()
659 :mFlag(false)
660 { }
661
662 void post()
663 {
664 ScopedLock lock(mLock);
665 mFlag=true;
666 mSignal.signal();
667 }
668
669 void get()
670 {
671 ScopedLock lock(mLock);
672 while (!mFlag) mSignal.wait(mLock);
673 mFlag=false;
674 }
675
676 bool semtry()
677 {
678 ScopedLock lock(mLock);
679 bool retVal = mFlag;
680 mFlag = false;
681 return retVal;
682 }
683
684};
685
686
687
688
689
690//@}
691
692
693
694
695#endif
696// vim: ts=4 sw=4