blob: 207ada2144ea8d70b4bdd61551d141394226c4c2 [file] [log] [blame]
dburgess82c46ff2011-10-07 02:40:51 +00001/*
2* Copyright 2008, 2011 Free Software Foundation, Inc.
3*
Pau Espin Pedrol21d03d32019-07-22 12:05:52 +02004* SPDX-License-Identifier: AGPL-3.0+
5*
dburgess82c46ff2011-10-07 02:40:51 +00006* This software is distributed under the terms of the GNU Affero Public License.
7* See the COPYING file in the main directory for details.
8*
9* This use of this software may be subject to additional restrictions.
10* See the LEGAL file in the main directory for details.
11
12 This program is free software: you can redistribute it and/or modify
13 it under the terms of the GNU Affero General Public License as published by
14 the Free Software Foundation, either version 3 of the License, or
15 (at your option) any later version.
16
17 This program is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 GNU Affero General Public License for more details.
21
22 You should have received a copy of the GNU Affero General Public License
23 along with this program. If not, see <http://www.gnu.org/licenses/>.
24
25*/
26
27
28#ifndef INTERTHREAD_H
29#define INTERTHREAD_H
30
31#include "Timeval.h"
32#include "Threads.h"
33#include "LinkedLists.h"
34#include <map>
35#include <vector>
36#include <queue>
37
38
39
40
41
42/**@defgroup Templates for interthread mechanisms. */
43//@{
44
45
46/** Pointer FIFO for interthread operations. */
kurtis.heimerl5a872472013-05-31 21:47:25 +000047// (pat) The elements in the queue are type T*, and
48// the Fifo class implements the underlying queue.
49// The default is class PointerFIFO, which does not place any restrictions on the type of T,
Martin Hauke066fd042019-10-13 19:08:00 +020050// and is implemented by allocating auxiliary structures for the queue,
kurtis.heimerl5a872472013-05-31 21:47:25 +000051// or SingleLinkedList, which implements the queue using an internal pointer in type T,
52// which must implement the functional interface of class SingleLinkListNode,
53// namely: functions T*next() and void setNext(T*).
54template <class T, class Fifo=PointerFIFO> class InterthreadQueue {
dburgess82c46ff2011-10-07 02:40:51 +000055
56 protected:
57
Pau Espin Pedrolbdb970e2019-07-22 12:03:39 +020058 Fifo mQ;
dburgess82c46ff2011-10-07 02:40:51 +000059 mutable Mutex mLock;
60 mutable Signal mWriteSignal;
61
dburgess82c46ff2011-10-07 02:40:51 +000062 public:
63
64 /** Delete contents. */
65 void clear()
66 {
67 ScopedLock lock(mLock);
68 while (mQ.size()>0) delete (T*)mQ.get();
69 }
70
71 /** Empty the queue, but don't delete. */
72 void flushNoDelete()
73 {
74 ScopedLock lock(mLock);
75 while (mQ.size()>0) mQ.get();
76 }
77
78
79 ~InterthreadQueue()
80 { clear(); }
81
82
83 size_t size() const
84 {
85 ScopedLock lock(mLock);
86 return mQ.size();
87 }
88
kurtis.heimerl5a872472013-05-31 21:47:25 +000089 size_t totalSize() const // pat added
90 {
91 ScopedLock lock(mLock);
92 return mQ.totalSize();
93 }
94
dburgess82c46ff2011-10-07 02:40:51 +000095 /**
96 Blocking read.
97 @return Pointer to object (will not be NULL).
98 */
99 T* read()
100 {
101 ScopedLock lock(mLock);
102 T* retVal = (T*)mQ.get();
103 while (retVal==NULL) {
104 mWriteSignal.wait(mLock);
105 retVal = (T*)mQ.get();
106 }
107 return retVal;
108 }
109
kurtis.heimerl5a872472013-05-31 21:47:25 +0000110 /** Non-blocking peek at the first element; returns NULL if empty. */
111 T* front()
112 {
113 ScopedLock lock(mLock);
114 return (T*) mQ.front();
115 }
116
dburgess82c46ff2011-10-07 02:40:51 +0000117 /**
118 Blocking read with a timeout.
dburgess82c46ff2011-10-07 02:40:51 +0000119 @param timeout The read timeout in ms.
120 @return Pointer to object or NULL on timeout.
121 */
122 T* read(unsigned timeout)
123 {
124 if (timeout==0) return readNoBlock();
125 Timeval waitTime(timeout);
126 ScopedLock lock(mLock);
127 while ((mQ.size()==0) && (!waitTime.passed()))
128 mWriteSignal.wait(mLock,waitTime.remaining());
129 T* retVal = (T*)mQ.get();
130 return retVal;
131 }
132
133 /**
134 Non-blocking read.
135 @return Pointer to object or NULL if FIFO is empty.
136 */
137 T* readNoBlock()
138 {
139 ScopedLock lock(mLock);
140 return (T*)mQ.get();
141 }
142
143 /** Non-blocking write. */
144 void write(T* val)
145 {
146 ScopedLock lock(mLock);
147 mQ.put(val);
148 mWriteSignal.signal();
149 }
150
kurtis.heimerl5a872472013-05-31 21:47:25 +0000151 /** Non-block write to the front of the queue. */
152 void write_front(T* val) // pat added
153 {
154 ScopedLock lock(mLock);
155 mQ.push_front(val);
156 mWriteSignal.signal();
157 }
158};
dburgess82c46ff2011-10-07 02:40:51 +0000159
kurtis.heimerl5a872472013-05-31 21:47:25 +0000160// (pat) Identical to above but with the threading problem fixed.
161template <class T, class Fifo=PointerFIFO> class InterthreadQueue2 {
162
163 protected:
164
Pau Espin Pedrolbdb970e2019-07-22 12:03:39 +0200165 Fifo mQ;
kurtis.heimerl5a872472013-05-31 21:47:25 +0000166 mutable Mutex mLock;
167 mutable Signal mWriteSignal;
168
169 public:
170
171 /** Delete contents. */
172 void clear()
173 {
174 ScopedLock lock(mLock);
175 while (mQ.size()>0) delete (T*)mQ.get();
176 }
177
178 /** Empty the queue, but don't delete. */
179 void flushNoDelete()
180 {
181 ScopedLock lock(mLock);
182 while (mQ.size()>0) mQ.get();
183 }
184
185
186 ~InterthreadQueue2()
187 { clear(); }
188
189
190 size_t size() const
191 {
192 ScopedLock lock(mLock);
193 return mQ.size();
194 }
195
196 size_t totalSize() const // pat added
197 {
198 ScopedLock lock(mLock);
199 return mQ.totalSize();
200 }
201
202 /**
203 Blocking read.
204 @return Pointer to object (will not be NULL).
205 */
206 T* read()
207 {
208 ScopedLock lock(mLock);
209 T* retVal = (T*)mQ.get();
210 while (retVal==NULL) {
211 mWriteSignal.wait(mLock);
212 retVal = (T*)mQ.get();
213 }
214 return retVal;
215 }
216
217 /** Non-blocking peek at the first element; returns NULL if empty. */
218 T* front()
219 {
220 ScopedLock lock(mLock);
221 return (T*) mQ.front();
222 }
223
224 /**
225 Blocking read with a timeout.
226 @param timeout The read timeout in ms.
227 @return Pointer to object or NULL on timeout.
228 */
229 T* read(unsigned timeout)
230 {
231 if (timeout==0) return readNoBlock();
232 Timeval waitTime(timeout);
233 ScopedLock lock(mLock);
234 while ((mQ.size()==0) && (!waitTime.passed()))
235 mWriteSignal.wait(mLock,waitTime.remaining());
236 T* retVal = (T*)mQ.get();
237 return retVal;
238 }
239
240 /**
241 Non-blocking read.
242 @return Pointer to object or NULL if FIFO is empty.
243 */
244 T* readNoBlock()
245 {
246 ScopedLock lock(mLock);
247 return (T*)mQ.get();
248 }
249
250 /** Non-blocking write. */
251 void write(T* val)
252 {
253 // (pat) The Mutex mLock must be released before signaling the mWriteSignal condition.
254 // This is an implicit requirement of pthread_cond_wait() called from signal().
255 // If you do not do that, the InterthreadQueue read() function cannot start
256 // because the mutex is still locked by the thread calling the write(),
257 // so the read() thread yields its immediate execution opportunity.
258 // This recurs (and the InterthreadQueue fills up with data)
259 // until the read thread's accumulated temporary priority causes it to
260 // get a second pre-emptive activation over the writing thread,
Pau Espin Pedrolbdb970e2019-07-22 12:03:39 +0200261 // resulting in bursts of activity by the read thread.
kurtis.heimerl5a872472013-05-31 21:47:25 +0000262 { ScopedLock lock(mLock);
263 mQ.put(val);
264 }
265 mWriteSignal.signal();
266 }
267
268 /** Non-block write to the front of the queue. */
269 void write_front(T* val) // pat added
270 {
271 // (pat) See comments above.
272 { ScopedLock lock(mLock);
273 mQ.push_front(val);
274 }
275 mWriteSignal.signal();
276 }
dburgess82c46ff2011-10-07 02:40:51 +0000277};
278
279
280
281/** Pointer FIFO for interthread operations. */
282template <class T> class InterthreadQueueWithWait {
283
284 protected:
285
Pau Espin Pedrolbdb970e2019-07-22 12:03:39 +0200286 PointerFIFO mQ;
dburgess82c46ff2011-10-07 02:40:51 +0000287 mutable Mutex mLock;
288 mutable Signal mWriteSignal;
289 mutable Signal mReadSignal;
290
291 virtual void freeElement(T* element) const { delete element; };
292
293 public:
294
295 /** Delete contents. */
296 void clear()
297 {
298 ScopedLock lock(mLock);
299 while (mQ.size()>0) freeElement((T*)mQ.get());
300 mReadSignal.signal();
301 }
302
303
304
305 virtual ~InterthreadQueueWithWait()
306 { clear(); }
307
308
309 size_t size() const
310 {
311 ScopedLock lock(mLock);
312 return mQ.size();
313 }
314
315 /**
316 Blocking read.
317 @return Pointer to object (will not be NULL).
318 */
319 T* read()
320 {
321 ScopedLock lock(mLock);
322 T* retVal = (T*)mQ.get();
323 while (retVal==NULL) {
324 mWriteSignal.wait(mLock);
325 retVal = (T*)mQ.get();
326 }
327 mReadSignal.signal();
328 return retVal;
329 }
330
331 /**
332 Blocking read with a timeout.
333 @param timeout The read timeout in ms.
334 @return Pointer to object or NULL on timeout.
335 */
336 T* read(unsigned timeout)
337 {
338 if (timeout==0) return readNoBlock();
339 Timeval waitTime(timeout);
340 ScopedLock lock(mLock);
341 while ((mQ.size()==0) && (!waitTime.passed()))
342 mWriteSignal.wait(mLock,waitTime.remaining());
343 T* retVal = (T*)mQ.get();
344 if (retVal!=NULL) mReadSignal.signal();
345 return retVal;
346 }
347
348 /**
349 Non-blocking read.
350 @return Pointer to object or NULL if FIFO is empty.
351 */
352 T* readNoBlock()
353 {
354 ScopedLock lock(mLock);
355 T* retVal = (T*)mQ.get();
356 if (retVal!=NULL) mReadSignal.signal();
357 return retVal;
358 }
359
360 /** Non-blocking write. */
361 void write(T* val)
362 {
kurtis.heimerl5a872472013-05-31 21:47:25 +0000363 // (pat) 8-14: Taking out the threading problem fix temporarily for David to use in the field.
dburgess82c46ff2011-10-07 02:40:51 +0000364 ScopedLock lock(mLock);
365 mQ.put(val);
366 mWriteSignal.signal();
367 }
368
369 /** Wait until the queue falls below a low water mark. */
kurtis.heimerl5a872472013-05-31 21:47:25 +0000370 // (pat) This function suffers from the same problem as documented
371 // at InterthreadQueue.write(), but I am not fixing it because I cannot test it.
372 // The caller of this function will eventually get to run, just not immediately
373 // after the mReadSignal condition is fulfilled.
dburgess82c46ff2011-10-07 02:40:51 +0000374 void wait(size_t sz=0)
375 {
376 ScopedLock lock(mLock);
377 while (mQ.size()>sz) mReadSignal.wait(mLock);
378 }
379
380};
381
382
383
384
385
386/** Thread-safe map of pointers to class D, keyed by class K. */
387template <class K, class D > class InterthreadMap {
388
389protected:
390
391 typedef std::map<K,D*> Map;
392 Map mMap;
393 mutable Mutex mLock;
394 Signal mWriteSignal;
395
396public:
397
398 void clear()
399 {
400 // Delete everything in the map.
401 ScopedLock lock(mLock);
402 typename Map::iterator iter = mMap.begin();
403 while (iter != mMap.end()) {
404 delete iter->second;
405 ++iter;
406 }
407 mMap.clear();
408 }
409
410 ~InterthreadMap() { clear(); }
411
412 /**
413 Non-blocking write.
414 @param key The index to write to.
415 @param wData Pointer to data, not to be deleted until removed from the map.
416 */
417 void write(const K &key, D * wData)
418 {
419 ScopedLock lock(mLock);
420 typename Map::iterator iter = mMap.find(key);
421 if (iter!=mMap.end()) {
422 delete iter->second;
423 iter->second = wData;
424 } else {
425 mMap[key] = wData;
426 }
427 mWriteSignal.broadcast();
428 }
429
430 /**
431 Non-blocking read with element removal.
432 @param key Key to read from.
433 @return Pointer at key or NULL if key not found, to be deleted by caller.
434 */
435 D* getNoBlock(const K& key)
436 {
437 ScopedLock lock(mLock);
438 typename Map::iterator iter = mMap.find(key);
439 if (iter==mMap.end()) return NULL;
440 D* retVal = iter->second;
441 mMap.erase(iter);
442 return retVal;
443 }
444
445 /**
446 Blocking read with a timeout and element removal.
447 @param key The key to read from.
448 @param timeout The blocking timeout in ms.
449 @return Pointer at key or NULL on timeout, to be deleted by caller.
450 */
451 D* get(const K &key, unsigned timeout)
452 {
453 if (timeout==0) return getNoBlock(key);
454 Timeval waitTime(timeout);
455 ScopedLock lock(mLock);
456 typename Map::iterator iter = mMap.find(key);
457 while ((iter==mMap.end()) && (!waitTime.passed())) {
458 mWriteSignal.wait(mLock,waitTime.remaining());
459 iter = mMap.find(key);
460 }
461 if (iter==mMap.end()) return NULL;
462 D* retVal = iter->second;
463 mMap.erase(iter);
464 return retVal;
465 }
466
467 /**
468 Blocking read with and element removal.
469 @param key The key to read from.
470 @return Pointer at key, to be deleted by caller.
471 */
472 D* get(const K &key)
473 {
474 ScopedLock lock(mLock);
475 typename Map::iterator iter = mMap.find(key);
476 while (iter==mMap.end()) {
477 mWriteSignal.wait(mLock);
478 iter = mMap.find(key);
479 }
480 D* retVal = iter->second;
481 mMap.erase(iter);
482 return retVal;
483 }
484
485
486 /**
487 Remove an entry and delete it.
488 @param key The key of the entry to delete.
489 @return True if it was actually found and deleted.
490 */
491 bool remove(const K &key )
492 {
493 D* val = getNoBlock(key);
494 if (!val) return false;
495 delete val;
496 return true;
497 }
498
499
500 /**
501 Non-blocking read.
502 @param key Key to read from.
503 @return Pointer at key or NULL if key not found.
504 */
505 D* readNoBlock(const K& key) const
506 {
507 D* retVal=NULL;
508 ScopedLock lock(mLock);
509 typename Map::const_iterator iter = mMap.find(key);
510 if (iter!=mMap.end()) retVal = iter->second;
511 return retVal;
512 }
513
514 /**
515 Blocking read with a timeout.
516 @param key The key to read from.
517 @param timeout The blocking timeout in ms.
518 @return Pointer at key or NULL on timeout.
519 */
Eric5561f112022-07-19 21:18:21 +0200520 D* read(const K &key, unsigned timeout)
dburgess82c46ff2011-10-07 02:40:51 +0000521 {
522 if (timeout==0) return readNoBlock(key);
523 ScopedLock lock(mLock);
524 Timeval waitTime(timeout);
525 typename Map::const_iterator iter = mMap.find(key);
526 while ((iter==mMap.end()) && (!waitTime.passed())) {
527 mWriteSignal.wait(mLock,waitTime.remaining());
528 iter = mMap.find(key);
529 }
530 if (iter==mMap.end()) return NULL;
531 D* retVal = iter->second;
532 return retVal;
533 }
534
535 /**
536 Blocking read.
537 @param key The key to read from.
538 @return Pointer at key.
539 */
Eric5561f112022-07-19 21:18:21 +0200540 D* read(const K &key)
dburgess82c46ff2011-10-07 02:40:51 +0000541 {
542 ScopedLock lock(mLock);
543 typename Map::const_iterator iter = mMap.find(key);
544 while (iter==mMap.end()) {
545 mWriteSignal.wait(mLock);
546 iter = mMap.find(key);
547 }
548 D* retVal = iter->second;
549 return retVal;
550 }
551
552};
553
554
555
556
557
558
559
560/** This class is used to provide pointer-based comparison in priority_queues. */
561template <class T> class PointerCompare {
562
563 public:
564
565 /** Compare the objects pointed to, not the pointers themselves. */
566 bool operator()(const T *v1, const T *v2)
567 { return (*v1)>(*v2); }
568
569};
570
571
572
573/**
574 Priority queue for interthread operations.
575 Passes pointers to objects.
576*/
577template <class T, class C = std::vector<T*>, class Cmp = PointerCompare<T> > class InterthreadPriorityQueue {
578
579 protected:
580
581 std::priority_queue<T*,C,Cmp> mQ;
582 mutable Mutex mLock;
583 mutable Signal mWriteSignal;
584
585 public:
586
587
588 /** Clear the FIFO. */
589 void clear()
590 {
591 ScopedLock lock(mLock);
592 while (mQ.size()>0) {
593 T* ptr = mQ.top();
594 mQ.pop();
595 delete ptr;
596 }
597 }
598
599
600 ~InterthreadPriorityQueue()
601 {
602 clear();
603 }
604
605 size_t size() const
606 {
607 ScopedLock lock(mLock);
608 return mQ.size();
609 }
610
611
612 /** Non-blocking read. */
613 T* readNoBlock()
614 {
615 ScopedLock lock(mLock);
616 T* retVal = NULL;
617 if (mQ.size()!=0) {
618 retVal = mQ.top();
619 mQ.pop();
620 }
621 return retVal;
622 }
623
624 /** Blocking read. */
625 T* read()
626 {
627 ScopedLock lock(mLock);
628 T* retVal;
629 while (mQ.size()==0) mWriteSignal.wait(mLock);
630 retVal = mQ.top();
631 mQ.pop();
632 return retVal;
633 }
634
635 /** Non-blocking write. */
636 void write(T* val)
637 {
kurtis.heimerl5a872472013-05-31 21:47:25 +0000638 // (pat) 8-14: Taking out the threading problem fix temporarily for David to use in the field.
dburgess82c46ff2011-10-07 02:40:51 +0000639 ScopedLock lock(mLock);
640 mQ.push(val);
641 mWriteSignal.signal();
642 }
643
644};
645
646
647
648
649
650class Semaphore {
651
652 private:
653
654 bool mFlag;
655 Signal mSignal;
656 mutable Mutex mLock;
657
658 public:
659
660 Semaphore()
661 :mFlag(false)
662 { }
663
664 void post()
665 {
666 ScopedLock lock(mLock);
667 mFlag=true;
668 mSignal.signal();
669 }
670
671 void get()
672 {
673 ScopedLock lock(mLock);
674 while (!mFlag) mSignal.wait(mLock);
675 mFlag=false;
676 }
677
678 bool semtry()
679 {
680 ScopedLock lock(mLock);
681 bool retVal = mFlag;
682 mFlag = false;
683 return retVal;
684 }
685
686};
687
688
689
690
691
692//@}
693
694
695
696
697#endif
698// vim: ts=4 sw=4