blob: f94bb7d54e447f1cd2507c3989b3cc8f7bf9d2d8 [file] [log] [blame]
/*
* Copyright 2008 Free Software Foundation, Inc.
* Copyright 2013 Alexander Chemeris <Alexander.Chemeris@fairwaves.ru>
*
*
* This software is distributed under the terms of the GNU Affero Public License.
* See the COPYING file in the main directory for details.
*
* This use of this software may be subject to additional restrictions.
* See the LEGAL file in the main directory for details.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "Threads.h"
#include "Timeval.h"
#include "Logger.h"
#include <pthread.h>
#include <sys/types.h>
#include <errno.h> // for ETIMEDOUT
#include <sys/syscall.h> // for SYS_gettid
#include <sys/prctl.h> // Linux specific, for prctl(PR_SET_NAME)
// Make sure we get MCL_CURRENT and MCL_FUTURE (for mlockall) on OS X 10.3
#define _P1003_1B_VISIBLE
#include <sys/mman.h>
#undef _P1003_1B_VISIBLE
using namespace std;
#define POSIX_OK 0
#define POSIX_NO_WAIT 0
#define POSIX_WAIT_FOREVER (-1)
static inline int gettid() {return syscall(SYS_gettid);}
Mutex gStreamLock; ///< Global lock to control access to cout and cerr.
void lockCout()
{
gStreamLock.lock();
Timeval entryTime;
cout << entryTime << " " << pthread_self() << ": ";
}
void unlockCout()
{
cout << dec << endl << flush;
gStreamLock.unlock();
}
void lockCerr()
{
gStreamLock.lock();
Timeval entryTime;
cerr << entryTime << " " << pthread_self() << ": ";
}
void unlockCerr()
{
cerr << dec << endl << flush;
gStreamLock.unlock();
}
Mutex::Mutex()
{
bool res;
res = pthread_mutexattr_init(&mAttribs);
assert(!res);
res = pthread_mutexattr_settype(&mAttribs,PTHREAD_MUTEX_RECURSIVE);
assert(!res);
res = pthread_mutex_init(&mMutex,&mAttribs);
assert(!res);
}
Mutex::~Mutex()
{
pthread_mutex_destroy(&mMutex);
bool res = pthread_mutexattr_destroy(&mAttribs);
assert(!res);
}
/** Block for the signal up to the cancellation timeout. */
int Signal::wait(Mutex& wMutex, unsigned timeout) const
{
Timeval then(timeout);
struct timespec waitTime = then.timespec();
return pthread_cond_timedwait(&mSignal,&wMutex.mMutex,&waitTime);
}
Thread::Thread(const string &name, size_t stackSize)
: mThreadId((pthread_t)0)
, mThreadName(name)
, mStackSize(stackSize)
, mThreadState(THREAD_STATE_IDLE)
, mThreadData(NULL)
{
}
Thread::~Thread()
{
stopThread();
}
void *Thread::threadAdaptor(void *data)
{
Thread *pThread = (Thread*)data;
// If we ever receive a thread cancel request, it means that the Thread
// object is in the process of being destroyed. To avoid the situation
// where a thread attempts to run after its containing Thread object has
// been freed, we set the thread up so that the cancel takes effect
// immediately (as opposed to waiting until the next thread cancellation
// point).
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
// =====================================================================
// Synchronize with the start() in the parent thread.
{
// 1. Lock synchronization mutex.
ScopedLock lock(pThread->mThreadStartupMutex);
// 2. Btw, set the thread name, while we're inside the mutex.
// FIXME: This works on Linux with glibc >= 2.12. Under *BSD and MacOS X
// this function has different arguments.
// pthread_setname_np(pThread->mThreadId, pThread->mThreadName.c_str());
// FIXME: For some reason the previous call doesn't work on my Ubuntu 12.04,
// so we use this one which works.
prctl(PR_SET_NAME, pThread->mThreadName.c_str());
// 3. Signal that we've started.
pThread->mThreadStartStopEvent.signal();
// 4. Wait until start() finishes its initialization.
//
// The actual thread is created and started with pthread_create(), then
// start() does its housekeeping and sets mThreadState=THREAD_STATE_RUNNING.
// If we allow Thread::run() to start before this initialization completes,
// callers might think (among other things) that the thread is not started
// while it's actually started.
pThread->mThreadInitializedEvent.wait(pThread->mThreadStartupMutex);
}
// Synchronization with the parent thread is finished.
// =====================================================================
// Log Thread ID for debugging purposes
LOG(INFO) << "Thread started: " << pThread->mThreadName
<< " with lwp=" << gettid() << ", pid=" << getpid();
// Keep all memory locked into physical mem, to guarantee realtime-behaviour
int res = mlockall(MCL_CURRENT|MCL_FUTURE);
if (res != POSIX_OK) {
LOG(WARNING) << "Failed to lock memory for thread: " << pThread->mThreadName;
}
// Run the actual code
pThread->runThread();
// Huh, we're done. Signal to a (potentially) waiting stop()'s.
{
ScopedLock lock(pThread->mThreadStateMutex);
pThread->mThreadState = THREAD_STATE_IDLE;
pThread->mThreadStartStopEvent.broadcast();
}
return NULL;
}
Thread::ReturnStatus Thread::startThread(void *data)
{
pthread_attr_t attrib;
// timeval threadStartTime;
// timespec threadStartTimeout;
bool res;
// Lock startup synchronization mutex. It will be used in conjunction with
// mThreadInitializedEvent and mThreadStartStopEvent conditional variables.
ScopedLock lock(mThreadStartupMutex);
{
ScopedLock lock(mThreadStateMutex);
if (mThreadState != THREAD_STATE_IDLE)
return ALREADY_STARTED;
mThreadState = THREAD_STATE_STARTING;
}
// Save thread data pointer
mThreadData = data;
LOG(DEBUG) << "Starting thread " << mThreadName << " (" << this << ")";
// construct thread attribute
res = pthread_attr_init(&attrib);
if (res != POSIX_OK) {
LOG(ALERT) << "pthread_attr_init failed, returned " << res
<< " in " << mThreadName << " (" << this << ")";
}
// Set the thread stack size
res = pthread_attr_setstacksize(&attrib, mStackSize);
if (res != POSIX_OK)
{
LOG(ALERT) << "pthread_attr_setstacksize failed, returned " << res
<< " in " << mThreadName << " (" << this << ")";
}
// Create the thread detached
res = pthread_attr_setdetachstate(&attrib, PTHREAD_CREATE_DETACHED);
if (res != POSIX_OK)
{
LOG(ALERT) << "pthread_attr_setdetachstate failed, returned " << res
<< " in " << mThreadName << " (" << this << ")";
}
// =====================================================================
// Start the thread and synchronize with it
// Start the thread!
res = pthread_create(&mThreadId, &attrib, threadAdaptor, (void *)this);
// Attributes are no longer needed.
pthread_attr_destroy(&attrib);
if (res != POSIX_OK)
{
LOG(ALERT) << "pthread_create failed, returned " << res
<< " in " << mThreadName << " (" << this << ")";
return PTHREAD_ERROR;
}
// Wait for the thread to startup.
res = mThreadStartStopEvent.wait(mThreadStartupMutex, THREAD_STARTUP_TIMEOUT*1000);
// If the thread does not start in THREAD_STARTUP_TIMEOUT seconds,
// then something is terribly wrong here.
if (res == ETIMEDOUT)
{
LOG(ALERT) << "thread " << mThreadName << " (" << this << ") hasn't started up in "
<< THREAD_STARTUP_TIMEOUT << " seconds. Bailing out.";
return RETURN_TIMEOUT;
}
// We're done with the initialization.
ackThreadStart();
// ToDo: Add other initialization here, e.g. adding this thread to a list of all threads.
// Startup initialization finished. Signal this to started thread, so
// it could go on.
mThreadInitializedEvent.signal();
return RETURN_OK;
}
Thread::ReturnStatus Thread::stopThread()
{
int res;
LOG(DEBUG) << "Stopping thread " << mThreadName << " (" << this << ")";
while (1) {
ScopedLock lock(mThreadStateMutex);
switch (mThreadState) {
case THREAD_STATE_IDLE:
// Nothing to do.
return RETURN_OK;
case THREAD_STATE_STARTING:
// Something is wrong in thi world.
assert(mThreadState != THREAD_STATE_STARTING);
LOG(ALERT) << "Trying to stop thread " << mThreadName
<< " (" << this << ") while it's trying to start.";
return WRONG_STATE;
case THREAD_STATE_RUNNING:
// Request shudown
mThreadState = THREAD_STATE_STOPPING;
// no "break" here to fall through to the next case
case THREAD_STATE_STOPPING:
// Wait for the thread to stop.
LOG(DEBUG) << "Waiting for thread " << mThreadName << " (" << this << ") to stop.";
res = mThreadStartStopEvent.wait(mThreadStateMutex, THREAD_STOP_TIMEOUT*1000);
LOG(DEBUG) << "Thread " << mThreadName << " (" << this << ") signalled stop "
<< "with res=" << res << " and mThreadState=" << mThreadState;
// If the thread does not stop in THREAD_STOP_TIMEOUT seconds,
// return error. It may be waiting for something.
if (res == ETIMEDOUT)
{
LOG(ALERT) << "thread " << mThreadName << " (" << this << ") hasn't stopped in "
<< THREAD_STARTUP_TIMEOUT << " seconds. Bailing out.";
return RETURN_TIMEOUT;
}
// Conditional variable could return in case of a signal, so we should
// double check that the thread has indeed stopped.
if (mThreadState == THREAD_STATE_IDLE)
return RETURN_OK;
else
// Try again...
break;
}
}
// We should never reach this line
assert(false);
return RETURN_OK;
}
// vim: ts=4 sw=4