| /* |
| * Copyright 2008, 2011 Free Software Foundation, Inc. |
| * |
| * This software is distributed under the terms of the GNU Affero Public License. |
| * See the COPYING file in the main directory for details. |
| * |
| * This use of this software may be subject to additional restrictions. |
| * See the LEGAL file in the main directory for details. |
| |
| This program is free software: you can redistribute it and/or modify |
| it under the terms of the GNU Affero General Public License as published by |
| the Free Software Foundation, either version 3 of the License, or |
| (at your option) any later version. |
| |
| This program is distributed in the hope that it will be useful, |
| but WITHOUT ANY WARRANTY; without even the implied warranty of |
| MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| GNU Affero General Public License for more details. |
| |
| You should have received a copy of the GNU Affero General Public License |
| along with this program. If not, see <http://www.gnu.org/licenses/>. |
| |
| */ |
| |
| |
| #ifndef INTERTHREAD_H |
| #define INTERTHREAD_H |
| |
| #include "Timeval.h" |
| #include "Threads.h" |
| #include "LinkedLists.h" |
| #include <map> |
| #include <vector> |
| #include <queue> |
| |
| |
| |
| |
| |
| /**@defgroup Templates for interthread mechanisms. */ |
| //@{ |
| |
| |
| /** Pointer FIFO for interthread operations. */ |
| // (pat) The elements in the queue are type T*, and |
| // the Fifo class implements the underlying queue. |
| // The default is class PointerFIFO, which does not place any restrictions on the type of T, |
| // and is implemented by allocating auxilliary structures for the queue, |
| // or SingleLinkedList, which implements the queue using an internal pointer in type T, |
| // which must implement the functional interface of class SingleLinkListNode, |
| // namely: functions T*next() and void setNext(T*). |
| template <class T, class Fifo=PointerFIFO> class InterthreadQueue { |
| |
| protected: |
| |
| Fifo mQ; |
| mutable Mutex mLock; |
| mutable Signal mWriteSignal; |
| |
| public: |
| |
| /** Delete contents. */ |
| void clear() |
| { |
| ScopedLock lock(mLock); |
| while (mQ.size()>0) delete (T*)mQ.get(); |
| } |
| |
| /** Empty the queue, but don't delete. */ |
| void flushNoDelete() |
| { |
| ScopedLock lock(mLock); |
| while (mQ.size()>0) mQ.get(); |
| } |
| |
| |
| ~InterthreadQueue() |
| { clear(); } |
| |
| |
| size_t size() const |
| { |
| ScopedLock lock(mLock); |
| return mQ.size(); |
| } |
| |
| size_t totalSize() const // pat added |
| { |
| ScopedLock lock(mLock); |
| return mQ.totalSize(); |
| } |
| |
| /** |
| Blocking read. |
| @return Pointer to object (will not be NULL). |
| */ |
| T* read() |
| { |
| ScopedLock lock(mLock); |
| T* retVal = (T*)mQ.get(); |
| while (retVal==NULL) { |
| mWriteSignal.wait(mLock); |
| retVal = (T*)mQ.get(); |
| } |
| return retVal; |
| } |
| |
| /** Non-blocking peek at the first element; returns NULL if empty. */ |
| T* front() |
| { |
| ScopedLock lock(mLock); |
| return (T*) mQ.front(); |
| } |
| |
| /** |
| Blocking read with a timeout. |
| @param timeout The read timeout in ms. |
| @return Pointer to object or NULL on timeout. |
| */ |
| T* read(unsigned timeout) |
| { |
| if (timeout==0) return readNoBlock(); |
| Timeval waitTime(timeout); |
| ScopedLock lock(mLock); |
| while ((mQ.size()==0) && (!waitTime.passed())) |
| mWriteSignal.wait(mLock,waitTime.remaining()); |
| T* retVal = (T*)mQ.get(); |
| return retVal; |
| } |
| |
| /** |
| Non-blocking read. |
| @return Pointer to object or NULL if FIFO is empty. |
| */ |
| T* readNoBlock() |
| { |
| ScopedLock lock(mLock); |
| return (T*)mQ.get(); |
| } |
| |
| /** Non-blocking write. */ |
| void write(T* val) |
| { |
| ScopedLock lock(mLock); |
| mQ.put(val); |
| mWriteSignal.signal(); |
| } |
| |
| /** Non-block write to the front of the queue. */ |
| void write_front(T* val) // pat added |
| { |
| ScopedLock lock(mLock); |
| mQ.push_front(val); |
| mWriteSignal.signal(); |
| } |
| }; |
| |
| // (pat) Identical to above but with the threading problem fixed. |
| template <class T, class Fifo=PointerFIFO> class InterthreadQueue2 { |
| |
| protected: |
| |
| Fifo mQ; |
| mutable Mutex mLock; |
| mutable Signal mWriteSignal; |
| |
| public: |
| |
| /** Delete contents. */ |
| void clear() |
| { |
| ScopedLock lock(mLock); |
| while (mQ.size()>0) delete (T*)mQ.get(); |
| } |
| |
| /** Empty the queue, but don't delete. */ |
| void flushNoDelete() |
| { |
| ScopedLock lock(mLock); |
| while (mQ.size()>0) mQ.get(); |
| } |
| |
| |
| ~InterthreadQueue2() |
| { clear(); } |
| |
| |
| size_t size() const |
| { |
| ScopedLock lock(mLock); |
| return mQ.size(); |
| } |
| |
| size_t totalSize() const // pat added |
| { |
| ScopedLock lock(mLock); |
| return mQ.totalSize(); |
| } |
| |
| /** |
| Blocking read. |
| @return Pointer to object (will not be NULL). |
| */ |
| T* read() |
| { |
| ScopedLock lock(mLock); |
| T* retVal = (T*)mQ.get(); |
| while (retVal==NULL) { |
| mWriteSignal.wait(mLock); |
| retVal = (T*)mQ.get(); |
| } |
| return retVal; |
| } |
| |
| /** Non-blocking peek at the first element; returns NULL if empty. */ |
| T* front() |
| { |
| ScopedLock lock(mLock); |
| return (T*) mQ.front(); |
| } |
| |
| /** |
| Blocking read with a timeout. |
| @param timeout The read timeout in ms. |
| @return Pointer to object or NULL on timeout. |
| */ |
| T* read(unsigned timeout) |
| { |
| if (timeout==0) return readNoBlock(); |
| Timeval waitTime(timeout); |
| ScopedLock lock(mLock); |
| while ((mQ.size()==0) && (!waitTime.passed())) |
| mWriteSignal.wait(mLock,waitTime.remaining()); |
| T* retVal = (T*)mQ.get(); |
| return retVal; |
| } |
| |
| /** |
| Non-blocking read. |
| @return Pointer to object or NULL if FIFO is empty. |
| */ |
| T* readNoBlock() |
| { |
| ScopedLock lock(mLock); |
| return (T*)mQ.get(); |
| } |
| |
| /** Non-blocking write. */ |
| void write(T* val) |
| { |
| // (pat) The Mutex mLock must be released before signaling the mWriteSignal condition. |
| // This is an implicit requirement of pthread_cond_wait() called from signal(). |
| // If you do not do that, the InterthreadQueue read() function cannot start |
| // because the mutex is still locked by the thread calling the write(), |
| // so the read() thread yields its immediate execution opportunity. |
| // This recurs (and the InterthreadQueue fills up with data) |
| // until the read thread's accumulated temporary priority causes it to |
| // get a second pre-emptive activation over the writing thread, |
| // resulting in bursts of activity by the read thread. |
| { ScopedLock lock(mLock); |
| mQ.put(val); |
| } |
| mWriteSignal.signal(); |
| } |
| |
| /** Non-block write to the front of the queue. */ |
| void write_front(T* val) // pat added |
| { |
| // (pat) See comments above. |
| { ScopedLock lock(mLock); |
| mQ.push_front(val); |
| } |
| mWriteSignal.signal(); |
| } |
| }; |
| |
| |
| |
| /** Pointer FIFO for interthread operations. */ |
| template <class T> class InterthreadQueueWithWait { |
| |
| protected: |
| |
| PointerFIFO mQ; |
| mutable Mutex mLock; |
| mutable Signal mWriteSignal; |
| mutable Signal mReadSignal; |
| |
| virtual void freeElement(T* element) const { delete element; }; |
| |
| public: |
| |
| /** Delete contents. */ |
| void clear() |
| { |
| ScopedLock lock(mLock); |
| while (mQ.size()>0) freeElement((T*)mQ.get()); |
| mReadSignal.signal(); |
| } |
| |
| |
| |
| virtual ~InterthreadQueueWithWait() |
| { clear(); } |
| |
| |
| size_t size() const |
| { |
| ScopedLock lock(mLock); |
| return mQ.size(); |
| } |
| |
| /** |
| Blocking read. |
| @return Pointer to object (will not be NULL). |
| */ |
| T* read() |
| { |
| ScopedLock lock(mLock); |
| T* retVal = (T*)mQ.get(); |
| while (retVal==NULL) { |
| mWriteSignal.wait(mLock); |
| retVal = (T*)mQ.get(); |
| } |
| mReadSignal.signal(); |
| return retVal; |
| } |
| |
| /** |
| Blocking read with a timeout. |
| @param timeout The read timeout in ms. |
| @return Pointer to object or NULL on timeout. |
| */ |
| T* read(unsigned timeout) |
| { |
| if (timeout==0) return readNoBlock(); |
| Timeval waitTime(timeout); |
| ScopedLock lock(mLock); |
| while ((mQ.size()==0) && (!waitTime.passed())) |
| mWriteSignal.wait(mLock,waitTime.remaining()); |
| T* retVal = (T*)mQ.get(); |
| if (retVal!=NULL) mReadSignal.signal(); |
| return retVal; |
| } |
| |
| /** |
| Non-blocking read. |
| @return Pointer to object or NULL if FIFO is empty. |
| */ |
| T* readNoBlock() |
| { |
| ScopedLock lock(mLock); |
| T* retVal = (T*)mQ.get(); |
| if (retVal!=NULL) mReadSignal.signal(); |
| return retVal; |
| } |
| |
| /** Non-blocking write. */ |
| void write(T* val) |
| { |
| // (pat) 8-14: Taking out the threading problem fix temporarily for David to use in the field. |
| ScopedLock lock(mLock); |
| mQ.put(val); |
| mWriteSignal.signal(); |
| } |
| |
| /** Wait until the queue falls below a low water mark. */ |
| // (pat) This function suffers from the same problem as documented |
| // at InterthreadQueue.write(), but I am not fixing it because I cannot test it. |
| // The caller of this function will eventually get to run, just not immediately |
| // after the mReadSignal condition is fulfilled. |
| void wait(size_t sz=0) |
| { |
| ScopedLock lock(mLock); |
| while (mQ.size()>sz) mReadSignal.wait(mLock); |
| } |
| |
| }; |
| |
| |
| |
| |
| |
| /** Thread-safe map of pointers to class D, keyed by class K. */ |
| template <class K, class D > class InterthreadMap { |
| |
| protected: |
| |
| typedef std::map<K,D*> Map; |
| Map mMap; |
| mutable Mutex mLock; |
| Signal mWriteSignal; |
| |
| public: |
| |
| void clear() |
| { |
| // Delete everything in the map. |
| ScopedLock lock(mLock); |
| typename Map::iterator iter = mMap.begin(); |
| while (iter != mMap.end()) { |
| delete iter->second; |
| ++iter; |
| } |
| mMap.clear(); |
| } |
| |
| ~InterthreadMap() { clear(); } |
| |
| /** |
| Non-blocking write. |
| @param key The index to write to. |
| @param wData Pointer to data, not to be deleted until removed from the map. |
| */ |
| void write(const K &key, D * wData) |
| { |
| ScopedLock lock(mLock); |
| typename Map::iterator iter = mMap.find(key); |
| if (iter!=mMap.end()) { |
| delete iter->second; |
| iter->second = wData; |
| } else { |
| mMap[key] = wData; |
| } |
| mWriteSignal.broadcast(); |
| } |
| |
| /** |
| Non-blocking read with element removal. |
| @param key Key to read from. |
| @return Pointer at key or NULL if key not found, to be deleted by caller. |
| */ |
| D* getNoBlock(const K& key) |
| { |
| ScopedLock lock(mLock); |
| typename Map::iterator iter = mMap.find(key); |
| if (iter==mMap.end()) return NULL; |
| D* retVal = iter->second; |
| mMap.erase(iter); |
| return retVal; |
| } |
| |
| /** |
| Blocking read with a timeout and element removal. |
| @param key The key to read from. |
| @param timeout The blocking timeout in ms. |
| @return Pointer at key or NULL on timeout, to be deleted by caller. |
| */ |
| D* get(const K &key, unsigned timeout) |
| { |
| if (timeout==0) return getNoBlock(key); |
| Timeval waitTime(timeout); |
| ScopedLock lock(mLock); |
| typename Map::iterator iter = mMap.find(key); |
| while ((iter==mMap.end()) && (!waitTime.passed())) { |
| mWriteSignal.wait(mLock,waitTime.remaining()); |
| iter = mMap.find(key); |
| } |
| if (iter==mMap.end()) return NULL; |
| D* retVal = iter->second; |
| mMap.erase(iter); |
| return retVal; |
| } |
| |
| /** |
| Blocking read with and element removal. |
| @param key The key to read from. |
| @return Pointer at key, to be deleted by caller. |
| */ |
| D* get(const K &key) |
| { |
| ScopedLock lock(mLock); |
| typename Map::iterator iter = mMap.find(key); |
| while (iter==mMap.end()) { |
| mWriteSignal.wait(mLock); |
| iter = mMap.find(key); |
| } |
| D* retVal = iter->second; |
| mMap.erase(iter); |
| return retVal; |
| } |
| |
| |
| /** |
| Remove an entry and delete it. |
| @param key The key of the entry to delete. |
| @return True if it was actually found and deleted. |
| */ |
| bool remove(const K &key ) |
| { |
| D* val = getNoBlock(key); |
| if (!val) return false; |
| delete val; |
| return true; |
| } |
| |
| |
| /** |
| Non-blocking read. |
| @param key Key to read from. |
| @return Pointer at key or NULL if key not found. |
| */ |
| D* readNoBlock(const K& key) const |
| { |
| D* retVal=NULL; |
| ScopedLock lock(mLock); |
| typename Map::const_iterator iter = mMap.find(key); |
| if (iter!=mMap.end()) retVal = iter->second; |
| return retVal; |
| } |
| |
| /** |
| Blocking read with a timeout. |
| @param key The key to read from. |
| @param timeout The blocking timeout in ms. |
| @return Pointer at key or NULL on timeout. |
| */ |
| D* read(const K &key, unsigned timeout) const |
| { |
| if (timeout==0) return readNoBlock(key); |
| ScopedLock lock(mLock); |
| Timeval waitTime(timeout); |
| typename Map::const_iterator iter = mMap.find(key); |
| while ((iter==mMap.end()) && (!waitTime.passed())) { |
| mWriteSignal.wait(mLock,waitTime.remaining()); |
| iter = mMap.find(key); |
| } |
| if (iter==mMap.end()) return NULL; |
| D* retVal = iter->second; |
| return retVal; |
| } |
| |
| /** |
| Blocking read. |
| @param key The key to read from. |
| @return Pointer at key. |
| */ |
| D* read(const K &key) const |
| { |
| ScopedLock lock(mLock); |
| typename Map::const_iterator iter = mMap.find(key); |
| while (iter==mMap.end()) { |
| mWriteSignal.wait(mLock); |
| iter = mMap.find(key); |
| } |
| D* retVal = iter->second; |
| return retVal; |
| } |
| |
| }; |
| |
| |
| |
| |
| |
| |
| |
| /** This class is used to provide pointer-based comparison in priority_queues. */ |
| template <class T> class PointerCompare { |
| |
| public: |
| |
| /** Compare the objects pointed to, not the pointers themselves. */ |
| bool operator()(const T *v1, const T *v2) |
| { return (*v1)>(*v2); } |
| |
| }; |
| |
| |
| |
| /** |
| Priority queue for interthread operations. |
| Passes pointers to objects. |
| */ |
| template <class T, class C = std::vector<T*>, class Cmp = PointerCompare<T> > class InterthreadPriorityQueue { |
| |
| protected: |
| |
| std::priority_queue<T*,C,Cmp> mQ; |
| mutable Mutex mLock; |
| mutable Signal mWriteSignal; |
| |
| public: |
| |
| |
| /** Clear the FIFO. */ |
| void clear() |
| { |
| ScopedLock lock(mLock); |
| while (mQ.size()>0) { |
| T* ptr = mQ.top(); |
| mQ.pop(); |
| delete ptr; |
| } |
| } |
| |
| |
| ~InterthreadPriorityQueue() |
| { |
| clear(); |
| } |
| |
| size_t size() const |
| { |
| ScopedLock lock(mLock); |
| return mQ.size(); |
| } |
| |
| |
| /** Non-blocking read. */ |
| T* readNoBlock() |
| { |
| ScopedLock lock(mLock); |
| T* retVal = NULL; |
| if (mQ.size()!=0) { |
| retVal = mQ.top(); |
| mQ.pop(); |
| } |
| return retVal; |
| } |
| |
| /** Blocking read. */ |
| T* read() |
| { |
| ScopedLock lock(mLock); |
| T* retVal; |
| while (mQ.size()==0) mWriteSignal.wait(mLock); |
| retVal = mQ.top(); |
| mQ.pop(); |
| return retVal; |
| } |
| |
| /** Non-blocking write. */ |
| void write(T* val) |
| { |
| // (pat) 8-14: Taking out the threading problem fix temporarily for David to use in the field. |
| ScopedLock lock(mLock); |
| mQ.push(val); |
| mWriteSignal.signal(); |
| } |
| |
| }; |
| |
| |
| |
| |
| |
| class Semaphore { |
| |
| private: |
| |
| bool mFlag; |
| Signal mSignal; |
| mutable Mutex mLock; |
| |
| public: |
| |
| Semaphore() |
| :mFlag(false) |
| { } |
| |
| void post() |
| { |
| ScopedLock lock(mLock); |
| mFlag=true; |
| mSignal.signal(); |
| } |
| |
| void get() |
| { |
| ScopedLock lock(mLock); |
| while (!mFlag) mSignal.wait(mLock); |
| mFlag=false; |
| } |
| |
| bool semtry() |
| { |
| ScopedLock lock(mLock); |
| bool retVal = mFlag; |
| mFlag = false; |
| return retVal; |
| } |
| |
| }; |
| |
| |
| |
| |
| |
| //@} |
| |
| |
| |
| |
| #endif |
| // vim: ts=4 sw=4 |