- added a new thread manager to try to see if it takes less CPU cycles while the thread controller waits for slaves to do their work. To enable the experimental thread manager set: EnableNewThreadManager=true

This commit is contained in:
Mark Vejvoda 2013-01-10 21:16:28 +00:00
parent ef069abdc8
commit 5d2066fdbb
14 changed files with 666 additions and 179 deletions

View File

@ -37,6 +37,7 @@ namespace Glest{ namespace Game{
// =====================================================
AiInterfaceThread::AiInterfaceThread(AiInterface *aiIntf) : BaseThread() {
this->masterController = NULL;
this->triggerIdMutex = new Mutex();
this->aiIntf = aiIntf;
}
@ -124,6 +125,9 @@ void AiInterfaceThread::execute() {
semTaskSignalled.waitTillSignalled();
static string masterSlaveOwnerId = string(__FILE__) + string("_") + intToStr(__LINE__);
MasterSlaveThreadControllerSafeWrapper safeMasterController(masterController,20000,masterSlaveOwnerId);
if(getQuitStatus() == true) {
if(SystemFlags::getSystemSettingType(SystemFlags::debugSystem).enabled) SystemFlags::OutputDebug(SystemFlags::debugSystem,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
break;
@ -144,50 +148,6 @@ void AiInterfaceThread::execute() {
this->aiIntf->update();
/*
World *world = faction->getWorld();
//Config &config= Config::getInstance();
//bool sortedUnitsAllowed = config.getBool("AllowGroupedUnitCommands","true");
bool sortedUnitsAllowed = false;
if(sortedUnitsAllowed) {
faction->sortUnitsByCommandGroups();
}
MutexSafeWrapper safeMutex(faction->getUnitMutex(),string(__FILE__) + "_" + intToStr(__LINE__));
//if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled) chrono.start();
if(minorDebugPerformance) chrono.start();
int unitCount = faction->getUnitCount();
for(int j = 0; j < unitCount; ++j) {
Unit *unit = faction->getUnit(j);
if(unit == NULL) {
throw megaglest_runtime_error("unit == NULL");
}
int64 elapsed1 = 0;
if(minorDebugPerformance) elapsed1 = chrono.getMillis();
bool update = unit->needToUpdate();
if(minorDebugPerformance && (chrono.getMillis() - elapsed1) >= 1) printf("Faction [%d - %s] #1-unit threaded updates on frame: %d for [%d] unit # %d, unitCount = %d, took [%lld] msecs\n",faction->getStartLocationIndex(),faction->getType()->getName().c_str(),frameIndex.first,faction->getUnitPathfindingListCount(),j,unitCount,(long long int)chrono.getMillis() - elapsed1);
//update = true;
if(update == true) {
int64 elapsed2 = 0;
if(minorDebugPerformance) elapsed2 = chrono.getMillis();
world->getUnitUpdater()->updateUnitCommand(unit,frameIndex.first);
if(minorDebugPerformance && (chrono.getMillis() - elapsed2) >= 1) printf("Faction [%d - %s] #2-unit threaded updates on frame: %d for [%d] unit # %d, unitCount = %d, took [%lld] msecs\n",faction->getStartLocationIndex(),faction->getType()->getName().c_str(),frameIndex.first,faction->getUnitPathfindingListCount(),j,unitCount,(long long int)chrono.getMillis() - elapsed2);
}
}
if(minorDebugPerformance && chrono.getMillis() >= 1) printf("Faction [%d - %s] threaded updates on frame: %d for [%d] units took [%lld] msecs\n",faction->getStartLocationIndex(),faction->getType()->getName().c_str(),frameIndex.first,faction->getUnitPathfindingListCount(),(long long int)chrono.getMillis());
*/
safeMutex.ReleaseLock();
setTaskCompleted(frameIndex.first);

View File

@ -31,13 +31,14 @@ namespace Glest{ namespace Game{
/// The AI will interact with the game through this interface
// =====================================================
class AiInterfaceThread : public BaseThread {
class AiInterfaceThread : public BaseThread, public SlaveThreadControllerInterface {
protected:
AiInterface *aiIntf;
Semaphore semTaskSignalled;
Mutex *triggerIdMutex;
std::pair<int,bool> frameIndex;
MasterSlaveThreadController *masterController;
virtual void setQuitStatus(bool value);
virtual void setTaskCompleted(int frameIndex);
@ -49,6 +50,10 @@ public:
virtual void execute();
void signal(int frameIndex);
bool isSignalCompleted(int frameIndex);
virtual void setMasterController(MasterSlaveThreadController *master) { masterController = master; }
virtual void signalSlave(void *userdata) { signal(*((int *)(userdata))); }
};
class AiInterface {
@ -87,6 +92,7 @@ public:
void signalWorkerThread(int frameIndex);
bool isWorkerThreadSignalCompleted(int frameIndex);
AiInterfaceThread *getWorkerThread() { return workerThread; }
//get
int getTimer() const {return timer;}

View File

@ -378,6 +378,7 @@ Game::~Game() {
if(SystemFlags::getSystemSettingType(SystemFlags::debugSystem).enabled) SystemFlags::OutputDebug(SystemFlags::debugSystem,"In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
masterController.clearSlaves(true);
deleteValues(aiInterfaces.begin(), aiInterfaces.end());
aiInterfaces.clear();
@ -1133,8 +1134,10 @@ void Game::init(bool initForPreviewOnly) {
bool isNetworkGame = this->gameSettings.isNetworkGame();
role = networkManager.getNetworkRole();
masterController.clearSlaves(true);
deleteValues(aiInterfaces.begin(), aiInterfaces.end());
std::vector<SlaveThreadControllerInterface *> slaveThreadList;
aiInterfaces.resize(world.getFactionCount());
for(int i=0; i < world.getFactionCount(); ++i) {
Faction *faction= world.getFaction(i);
@ -1146,15 +1149,19 @@ void Game::init(bool initForPreviewOnly) {
char szBuf[8096]="";
snprintf(szBuf,8096,Lang::getInstance().get("LogScreenGameLoadingCreatingAIFaction","",true).c_str(),i);
logger.add(szBuf, true);
slaveThreadList.push_back(aiInterfaces[i]->getWorkerThread());
}
else {
aiInterfaces[i]= NULL;
}
}
if(Config::getInstance().getBool("EnableNewThreadManager","false") == true) {
masterController.setSlaves(slaveThreadList);
}
if(SystemFlags::getSystemSettingType(SystemFlags::debugSystem).enabled) SystemFlags::OutputDebug(SystemFlags::debugSystem,"In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
// give CPU time to update other things to avoid apperance of hanging
sleep(0);
Shared::Platform::Window::handleEvent();
@ -1525,55 +1532,63 @@ void Game::update() {
}
*/
// Signal the faction threads to do any pre-processing
bool hasAIPlayer = false;
for(int j = 0; j < world.getFactionCount(); ++j) {
Faction *faction = world.getFaction(j);
const bool newThreadManager = Config::getInstance().getBool("EnableNewThreadManager","false");
if(newThreadManager == true) {
int currentFrameCount = world.getFrameCount();
masterController.signalSlaves(&currentFrameCount);
bool slavesCompleted = masterController.waitTillSlavesTrigger(20000);
}
else {
// Signal the faction threads to do any pre-processing
bool hasAIPlayer = false;
for(int j = 0; j < world.getFactionCount(); ++j) {
Faction *faction = world.getFaction(j);
//printf("Faction Index = %d enableServerControlledAI = %d, isNetworkGame = %d, role = %d isCPU player = %d scriptManager.getPlayerModifiers(j)->getAiEnabled() = %d\n",j,enableServerControlledAI,isNetworkGame,role,faction->getCpuControl(enableServerControlledAI,isNetworkGame,role),scriptManager.getPlayerModifiers(j)->getAiEnabled());
//printf("Faction Index = %d enableServerControlledAI = %d, isNetworkGame = %d, role = %d isCPU player = %d scriptManager.getPlayerModifiers(j)->getAiEnabled() = %d\n",j,enableServerControlledAI,isNetworkGame,role,faction->getCpuControl(enableServerControlledAI,isNetworkGame,role),scriptManager.getPlayerModifiers(j)->getAiEnabled());
if( faction->getCpuControl(enableServerControlledAI,isNetworkGame,role) == true &&
scriptManager.getPlayerModifiers(j)->getAiEnabled() == true) {
if( faction->getCpuControl(enableServerControlledAI,isNetworkGame,role) == true &&
scriptManager.getPlayerModifiers(j)->getAiEnabled() == true) {
if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled && chrono.getMillis() > 0) SystemFlags::OutputDebug(SystemFlags::debugPerformance,"In [%s::%s Line: %d] [i = %d] faction = %d, factionCount = %d, took msecs: %lld [before AI updates]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,i,j,world.getFactionCount(),chrono.getMillis());
aiInterfaces[j]->signalWorkerThread(world.getFrameCount());
hasAIPlayer = true;
if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled && chrono.getMillis() > 0) SystemFlags::OutputDebug(SystemFlags::debugPerformance,"In [%s::%s Line: %d] [i = %d] faction = %d, factionCount = %d, took msecs: %lld [before AI updates]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,i,j,world.getFactionCount(),chrono.getMillis());
aiInterfaces[j]->signalWorkerThread(world.getFrameCount());
hasAIPlayer = true;
}
}
}
if(showPerfStats) {
sprintf(perfBuf,"In [%s::%s] Line: %d took msecs: " MG_I64_SPECIFIER "\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,chronoPerf.getMillis());
perfList.push_back(perfBuf);
}
if(showPerfStats) {
sprintf(perfBuf,"In [%s::%s] Line: %d took msecs: " MG_I64_SPECIFIER "\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,chronoPerf.getMillis());
perfList.push_back(perfBuf);
}
if(hasAIPlayer == true) {
//sleep(0);
if(hasAIPlayer == true) {
//sleep(0);
bool workThreadsFinished = false;
Chrono chronoAI;
chronoAI.start();
bool workThreadsFinished = false;
Chrono chronoAI;
chronoAI.start();
const int MAX_FACTION_THREAD_WAIT_MILLISECONDS = 20000;
for(;chronoAI.getMillis() < MAX_FACTION_THREAD_WAIT_MILLISECONDS;) {
workThreadsFinished = true;
for(int j = 0; j < world.getFactionCount(); ++j) {
Faction *faction = world.getFaction(j);
if(faction == NULL) {
throw megaglest_runtime_error("faction == NULL");
}
if( faction->getCpuControl(enableServerControlledAI,isNetworkGame,role) == true &&
scriptManager.getPlayerModifiers(j)->getAiEnabled() == true) {
if(aiInterfaces[j]->isWorkerThreadSignalCompleted(world.getFrameCount()) == false) {
workThreadsFinished = false;
break;
const int MAX_FACTION_THREAD_WAIT_MILLISECONDS = 20000;
for(;chronoAI.getMillis() < MAX_FACTION_THREAD_WAIT_MILLISECONDS;) {
workThreadsFinished = true;
for(int j = 0; j < world.getFactionCount(); ++j) {
Faction *faction = world.getFaction(j);
if(faction == NULL) {
throw megaglest_runtime_error("faction == NULL");
}
if( faction->getCpuControl(enableServerControlledAI,isNetworkGame,role) == true &&
scriptManager.getPlayerModifiers(j)->getAiEnabled() == true) {
if(aiInterfaces[j]->isWorkerThreadSignalCompleted(world.getFrameCount()) == false) {
workThreadsFinished = false;
break;
}
}
}
}
if(workThreadsFinished == false) {
//sleep(0);
}
else {
break;
if(workThreadsFinished == false) {
//sleep(0);
}
else {
break;
}
}
}
}
@ -1824,6 +1839,8 @@ void Game::update() {
world.clearTileset();
SoundRenderer::getInstance().stopAllSounds();
masterController.clearSlaves(true);
deleteValues(aiInterfaces.begin(), aiInterfaces.end());
aiInterfaces.clear();
gui.end(); //selection must be cleared before deleting units
@ -2092,6 +2109,7 @@ void Game::ReplaceDisconnectedNetworkPlayersWithAI(bool isNetworkGame, NetworkRo
Logger &logger= Logger::getInstance();
ServerInterface *server = NetworkManager::getInstance().getServerInterface();
bool newAIPlayerCreated = false;
for(int i = 0; i < world.getFactionCount(); ++i) {
Faction *faction = world.getFaction(i);
if( faction->getFactionDisconnectHandled() == false &&
@ -2123,6 +2141,8 @@ void Game::ReplaceDisconnectedNetworkPlayersWithAI(bool isNetworkGame, NetworkRo
snprintf(szBuf,8096,msg.c_str(),i+1,this->gameSettings.getNetworkPlayerName(i).c_str());
commander.tryNetworkPlayerDisconnected(i);
newAIPlayerCreated = true;
}
else {
string msg = "Player #%d [%s] has disconnected, but player was only an observer!";
@ -2140,6 +2160,21 @@ void Game::ReplaceDisconnectedNetworkPlayersWithAI(bool isNetworkGame, NetworkRo
}
}
}
if(newAIPlayerCreated == true && Config::getInstance().getBool("EnableNewThreadManager","false") == true) {
bool enableServerControlledAI = this->gameSettings.getEnableServerControlledAI();
masterController.clearSlaves(true);
std::vector<SlaveThreadControllerInterface *> slaveThreadList;
for(int i=0; i < world.getFactionCount(); ++i) {
Faction *faction= world.getFaction(i);
if(faction->getCpuControl(enableServerControlledAI,isNetworkGame,role) == true) {
slaveThreadList.push_back(aiInterfaces[i]->getWorkerThread());
}
}
masterController.setSlaves(slaveThreadList);
}
}
}

View File

@ -196,6 +196,8 @@ private:
std::map<int,HighlightSpecialUnitInfo> unitHighlightList;
MasterSlaveThreadController masterController;
public:
Game();
Game(Program *program, const GameSettings *gameSettings, bool masterserverMode);

View File

@ -33,6 +33,7 @@ namespace Glest{ namespace Game{
// =====================================================
ConnectionSlotThread::ConnectionSlotThread(int slotIndex) : BaseThread() {
this->masterController = NULL;
this->triggerIdMutex = new Mutex();
this->slotIndex = slotIndex;
this->slotInterface = NULL;
@ -42,6 +43,7 @@ ConnectionSlotThread::ConnectionSlotThread(int slotIndex) : BaseThread() {
}
ConnectionSlotThread::ConnectionSlotThread(ConnectionSlotCallbackInterface *slotInterface,int slotIndex) : BaseThread() {
this->masterController = NULL;
this->triggerIdMutex = new Mutex();
this->slotIndex = slotIndex;
this->slotInterface = slotInterface;
@ -164,6 +166,14 @@ void ConnectionSlotThread::slotUpdateTask(ConnectionSlotEvent *event) {
}
}
void ConnectionSlotThread::signalSlave(void *userdata) {
//ConnectionSlotEvent &event = eventList[i];
std::map<int,ConnectionSlotEvent> *eventList = (std::map<int,ConnectionSlotEvent> *)userdata;
ConnectionSlotEvent &event = (*eventList)[slotIndex];
signalUpdate(&event);
}
void ConnectionSlotThread::execute() {
RunningStatusSafeWrapper runningStatus(this);
try {
@ -179,6 +189,9 @@ void ConnectionSlotThread::execute() {
semTaskSignalled.waitTillSignalled();
static string masterSlaveOwnerId = string(__FILE__) + string("_") + intToStr(__LINE__);
MasterSlaveThreadControllerSafeWrapper safeMasterController(masterController,20000,masterSlaveOwnerId);
if(getQuitStatus() == true) {
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
break;

View File

@ -70,7 +70,7 @@ public:
virtual ~ConnectionSlotCallbackInterface() {}
};
class ConnectionSlotThread : public BaseThread
class ConnectionSlotThread : public BaseThread, public SlaveThreadControllerInterface
{
protected:
@ -79,6 +79,7 @@ protected:
Mutex *triggerIdMutex;
vector<ConnectionSlotEvent> eventList;
int slotIndex;
MasterSlaveThreadController *masterController;
virtual void setQuitStatus(bool value);
virtual void setTaskCompleted(int eventId);
@ -91,6 +92,9 @@ public:
ConnectionSlotThread(ConnectionSlotCallbackInterface *slotInterface,int slotIndex);
virtual ~ConnectionSlotThread();
virtual void setMasterController(MasterSlaveThreadController *master) { masterController = master; }
virtual void signalSlave(void *userdata);
virtual void execute();
void signalUpdate(ConnectionSlotEvent *event);
bool isSignalCompleted(ConnectionSlotEvent *event);
@ -140,6 +144,8 @@ public:
ConnectionSlot(ServerInterface* serverInterface, int playerIndex);
~ConnectionSlot();
ConnectionSlotThread *getWorkerThread() { return slotThreadWorker; }
void update(bool checkForNewClients,int lockedSlotIndex);
void setPlayerIndex(int value) { playerIndex = value; }
int getPlayerIndex() const {return playerIndex;}

View File

@ -243,6 +243,8 @@ ServerInterface::~ServerInterface() {
//printf("===> Destructor for ServerInterface\n");
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
masterController.clearSlaves(true);
exitServer = true;
for(int i= 0; i < GameConstants::maxPlayers; ++i) {
if(slots[i] != NULL) {
@ -826,25 +828,6 @@ std::pair<bool,bool> ServerInterface::clientLagCheck(ConnectionSlot *connectionS
return clientLagExceededOrWarned;
}
bool ServerInterface::signalClientReceiveCommands(ConnectionSlot *connectionSlot, int slotIndex, bool socketTriggered, ConnectionSlotEvent & event) {
bool slotSignalled = false;
event.eventType = eReceiveSocketData;
event.networkMessage = NULL;
event.connectionSlot = connectionSlot;
event.socketTriggered = socketTriggered;
event.triggerId = slotIndex;
event.eventId = getNextEventId();
if(connectionSlot != NULL) {
if(socketTriggered == true || connectionSlot->isConnected() == false) {
connectionSlot->signalUpdate(&event);
slotSignalled = true;
}
}
return slotSignalled;
}
void ServerInterface::updateSocketTriggeredList(std::map<PLATFORM_SOCKET,bool> & socketTriggeredList) {
for(int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) {
MutexSafeWrapper safeMutexSlot(slotAccessorMutexes[i],CODE_AT_LINE_X(i));
@ -868,51 +851,97 @@ void ServerInterface::validateConnectedClients() {
}
}
bool ServerInterface::signalClientReceiveCommands(ConnectionSlot *connectionSlot,
int slotIndex, bool socketTriggered, ConnectionSlotEvent & event) {
bool slotSignalled = false;
event.eventType = eReceiveSocketData;
event.networkMessage = NULL;
event.connectionSlot = connectionSlot;
event.socketTriggered = socketTriggered;
event.triggerId = slotIndex;
event.eventId = getNextEventId();
if(connectionSlot != NULL) {
if(socketTriggered == true || connectionSlot->isConnected() == false) {
connectionSlot->signalUpdate(&event);
slotSignalled = true;
}
}
return slotSignalled;
}
void ServerInterface::signalClientsToRecieveData(std::map<PLATFORM_SOCKET,bool> &socketTriggeredList,
std::map<int,ConnectionSlotEvent> &eventList,
std::map<int,bool> & mapSlotSignalledList) {
//bool checkForNewClients = true;
for(int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) {
MutexSafeWrapper safeMutexSlot(slotAccessorMutexes[i],CODE_AT_LINE_X(i));
ConnectionSlot* connectionSlot = slots[i];
const bool newThreadManager = Config::getInstance().getBool("EnableNewThreadManager","false");
if(newThreadManager == true) {
masterController.clearSlaves(true);
std::vector<SlaveThreadControllerInterface *> slaveThreadList;
for(unsigned int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) {
MutexSafeWrapper safeMutexSlot(slotAccessorMutexes[i],CODE_AT_LINE_X(i));
ConnectionSlot* connectionSlot = slots[i];
bool socketTriggered = false;
bool socketTriggered = false;
if(connectionSlot != NULL) {
PLATFORM_SOCKET clientSocket = connectionSlot->getSocketId();
if(Socket::isSocketValid(&clientSocket)) {
socketTriggered = socketTriggeredList[clientSocket];
if(connectionSlot != NULL) {
PLATFORM_SOCKET clientSocket = connectionSlot->getSocketId();
if(Socket::isSocketValid(&clientSocket)) {
socketTriggered = socketTriggeredList[clientSocket];
}
}
ConnectionSlotEvent &event = eventList[i];
if(connectionSlot != NULL) {
if(socketTriggered == true || connectionSlot->isConnected() == false) {
ConnectionSlotEvent &event = eventList[i];
event.eventType = eReceiveSocketData;
event.networkMessage = NULL;
event.connectionSlot = connectionSlot;
event.socketTriggered = socketTriggered;
event.triggerId = i;
event.eventId = getNextEventId();
slaveThreadList.push_back(connectionSlot->getWorkerThread());
mapSlotSignalledList[i] = true;
}
}
}
ConnectionSlotEvent &event = eventList[i];
mapSlotSignalledList[i] = signalClientReceiveCommands(connectionSlot,i,socketTriggered,event);
masterController.setSlaves(slaveThreadList);
masterController.signalSlaves(&eventList);
}
else {
for(int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) {
MutexSafeWrapper safeMutexSlot(slotAccessorMutexes[i],CODE_AT_LINE_X(i));
ConnectionSlot* connectionSlot = slots[i];
bool socketTriggered = false;
if(connectionSlot != NULL) {
PLATFORM_SOCKET clientSocket = connectionSlot->getSocketId();
if(Socket::isSocketValid(&clientSocket)) {
socketTriggered = socketTriggeredList[clientSocket];
}
}
ConnectionSlotEvent &event = eventList[i];
mapSlotSignalledList[i] = signalClientReceiveCommands(connectionSlot,i,socketTriggered,event);
}
}
}
void ServerInterface::checkForCompletedClients(std::map<int,bool> & mapSlotSignalledList,
std::vector <string> &errorMsgList,
std::map<int,ConnectionSlotEvent> &eventList) {
time_t waitForThreadElapsed = time(NULL);
std::map<int,bool> slotsCompleted;
for(bool threadsDone = false;
exitServer == false && threadsDone == false &&
difftime((long int)time(NULL),waitForThreadElapsed) < MAX_SLOT_THREAD_WAIT_TIME;) {
threadsDone = true;
// Examine all threads for completion of delegation
for(int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) {
//printf("===> START slot %d [%p] - About to checkForCompletedClients\n",i,slots[i]);
const bool newThreadManager = Config::getInstance().getBool("EnableNewThreadManager","false");
if(newThreadManager == true) {
bool slavesCompleted = masterController.waitTillSlavesTrigger(1000 * MAX_SLOT_THREAD_WAIT_TIME);
for(unsigned int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) {
MutexSafeWrapper safeMutexSlot(slotAccessorMutexes[i],CODE_AT_LINE_X(i));
//printf("===> IN slot %d - About to checkForCompletedClients\n",i);
ConnectionSlot* connectionSlot = slots[i];
if(connectionSlot != NULL && mapSlotSignalledList[i] == true &&
slotsCompleted.find(i) == slotsCompleted.end()) {
if(connectionSlot != NULL && mapSlotSignalledList[i] == true) {
try {
std::vector<std::string> errorList = connectionSlot->getThreadErrorList();
// Collect any collected errors from threads
if(errorList.empty() == false) {
@ -924,16 +953,6 @@ void ServerInterface::checkForCompletedClients(std::map<int,bool> & mapSlotSigna
}
connectionSlot->clearThreadErrorList();
}
// Not done waiting for data yet
bool updateFinished = (connectionSlot != NULL ? connectionSlot->updateCompleted(&eventList[i]) : true);
if(updateFinished == false) {
threadsDone = false;
break;
}
else {
slotsCompleted[i] = true;
}
}
catch(const exception &ex) {
SystemFlags::OutputDebug(SystemFlags::debugError,"In [%s::%s Line: %d] Error [%s]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,ex.what());
@ -942,8 +961,63 @@ void ServerInterface::checkForCompletedClients(std::map<int,bool> & mapSlotSigna
errorMsgList.push_back(ex.what());
}
}
}
masterController.clearSlaves(true);
}
else {
time_t waitForThreadElapsed = time(NULL);
std::map<int,bool> slotsCompleted;
for(bool threadsDone = false;
exitServer == false && threadsDone == false &&
difftime((long int)time(NULL),waitForThreadElapsed) < MAX_SLOT_THREAD_WAIT_TIME;) {
threadsDone = true;
// Examine all threads for completion of delegation
for(int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) {
//printf("===> END slot %d - About to checkForCompletedClients\n",i);
//printf("===> START slot %d [%p] - About to checkForCompletedClients\n",i,slots[i]);
MutexSafeWrapper safeMutexSlot(slotAccessorMutexes[i],CODE_AT_LINE_X(i));
//printf("===> IN slot %d - About to checkForCompletedClients\n",i);
ConnectionSlot* connectionSlot = slots[i];
if(connectionSlot != NULL && mapSlotSignalledList[i] == true &&
slotsCompleted.find(i) == slotsCompleted.end()) {
try {
std::vector<std::string> errorList = connectionSlot->getThreadErrorList();
// Collect any collected errors from threads
if(errorList.empty() == false) {
for(int iErrIdx = 0; iErrIdx < errorList.size(); ++iErrIdx) {
string &sErr = errorList[iErrIdx];
if(sErr != "") {
errorMsgList.push_back(sErr);
}
}
connectionSlot->clearThreadErrorList();
}
// Not done waiting for data yet
bool updateFinished = (connectionSlot != NULL ? connectionSlot->updateCompleted(&eventList[i]) : true);
if(updateFinished == false) {
threadsDone = false;
break;
}
else {
slotsCompleted[i] = true;
}
}
catch(const exception &ex) {
SystemFlags::OutputDebug(SystemFlags::debugError,"In [%s::%s Line: %d] Error [%s]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,ex.what());
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] error detected [%s]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,ex.what());
errorMsgList.push_back(ex.what());
}
}
//printf("===> END slot %d - About to checkForCompletedClients\n",i);
}
}
}
}

View File

@ -93,6 +93,7 @@ private:
map<string,pair<uint64,time_t> > badClientConnectIPList;
ServerSocket *serverSocketAdmin;
MasterSlaveThreadController masterController;
public:
ServerInterface(bool publishEnabled);

View File

@ -215,6 +215,7 @@ void Faction::sortUnitsByCommandGroups() {
FactionThread::FactionThread(Faction *faction) : BaseThread() {
this->triggerIdMutex = new Mutex();
this->faction = faction;
this->masterController = NULL;
}
FactionThread::~FactionThread() {
@ -300,6 +301,11 @@ void FactionThread::execute() {
semTaskSignalled.waitTillSignalled();
//printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
static string masterSlaveOwnerId = string(__FILE__) + string("_") + intToStr(__LINE__);
MasterSlaveThreadControllerSafeWrapper safeMasterController(masterController,20000,masterSlaveOwnerId);
//printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
if(getQuitStatus() == true) {
if(SystemFlags::getSystemSettingType(SystemFlags::debugSystem).enabled) SystemFlags::OutputDebug(SystemFlags::debugSystem,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
break;
@ -313,6 +319,8 @@ void FactionThread::execute() {
safeMutex.ReleaseLock();
//printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
if(executeTask == true) {
ExecutingTaskSafeWrapper safeExecutingTaskMutex(this);
@ -330,6 +338,8 @@ void FactionThread::execute() {
//if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled) chrono.start();
if(minorDebugPerformance) chrono.start();
//printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
int unitCount = faction->getUnitCount();
for(int j = 0; j < unitCount; ++j) {
Unit *unit = faction->getUnit(j);
@ -358,11 +368,19 @@ void FactionThread::execute() {
if(minorDebugPerformance && chrono.getMillis() >= 1) printf("Faction [%d - %s] threaded updates on frame: %d for [%d] units took [%lld] msecs\n",faction->getStartLocationIndex(),faction->getType()->getName().c_str(),frameIndex.first,faction->getUnitPathfindingListCount(),(long long int)chrono.getMillis());
//printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
safeMutex.ReleaseLock();
//printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
setTaskCompleted(frameIndex.first);
//printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
}
//printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
if(getQuitStatus() == true) {
if(SystemFlags::getSystemSettingType(SystemFlags::debugSystem).enabled) SystemFlags::OutputDebug(SystemFlags::debugSystem,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
break;

View File

@ -76,13 +76,14 @@ struct CommandGroupUnitSorterId {
bool operator()(const int l, const int r);
};
class FactionThread : public BaseThread {
class FactionThread : public BaseThread, public SlaveThreadControllerInterface {
protected:
Faction *faction;
Semaphore semTaskSignalled;
Mutex *triggerIdMutex;
std::pair<int,bool> frameIndex;
MasterSlaveThreadController *masterController;
virtual void setQuitStatus(bool value);
virtual void setTaskCompleted(int frameIndex);
@ -92,6 +93,10 @@ public:
FactionThread(Faction *faction);
virtual ~FactionThread();
virtual void execute();
virtual void setMasterController(MasterSlaveThreadController *master) { masterController = master; }
virtual void signalSlave(void *userdata) { signalPathfinder(*((int *)(userdata))); }
void signalPathfinder(int frameIndex);
bool isSignalPathfinderCompleted(int frameIndex);
};
@ -320,8 +325,11 @@ public:
inline World * getWorld() { return world; }
int getFrameCount();
void signalWorkerThread(int frameIndex);
bool isWorkerThreadSignalCompleted(int frameIndex);
FactionThread *getWorkerThread() { return workerThread; }
void limitResourcesToStore();
void sortUnitsByCommandGroups();

View File

@ -100,6 +100,7 @@ void World::cleanup() {
factions[i]->end();
}
masterController.clearSlaves(true);
if(SystemFlags::getSystemSettingType(SystemFlags::debugSystem).enabled) SystemFlags::OutputDebug(SystemFlags::debugSystem,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
for(int i= 0; i<factions.size(); ++i){
delete factions[i];
@ -171,6 +172,8 @@ void World::end(){
for(int i= 0; i<factions.size(); ++i){
factions[i]->end();
}
masterController.clearSlaves(true);
for(int i= 0; i<factions.size(); ++i){
delete factions[i];
}
@ -422,42 +425,54 @@ void World::updateAllFactionUnits() {
faction->clearAproxCanMoveSoonCached();
}
// Signal the faction threads to do any pre-processing
for(int i = 0; i < factionCount; ++i) {
Faction *faction = getFaction(i);
//if(faction == NULL) {
// throw megaglest_runtime_error("faction == NULL");
//}
faction->signalWorkerThread(frameCount);
}
//sleep(0);
bool workThreadsFinished = false;
Chrono chrono;
chrono.start();
const int MAX_FACTION_THREAD_WAIT_MILLISECONDS = 20000;
for(;chrono.getMillis() < MAX_FACTION_THREAD_WAIT_MILLISECONDS;) {
workThreadsFinished = true;
const bool newThreadManager = Config::getInstance().getBool("EnableNewThreadManager","false");
if(newThreadManager == true) {
masterController.signalSlaves(&frameCount);
bool slavesCompleted = masterController.waitTillSlavesTrigger(20000);
if(SystemFlags::VERBOSE_MODE_ENABLED && chrono.getMillis() >= 10) printf("In [%s::%s Line: %d] *** Faction thread preprocessing took [%lld] msecs for %d factions for frameCount = %d slavesCompleted = %d.\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis(),factionCount,frameCount,slavesCompleted);
}
else {
// Signal the faction threads to do any pre-processing
for(int i = 0; i < factionCount; ++i) {
Faction *faction = getFaction(i);
//if(faction == NULL) {
// throw megaglest_runtime_error("faction == NULL");
//}
if(faction->isWorkerThreadSignalCompleted(frameCount) == false) {
workThreadsFinished = false;
faction->signalWorkerThread(frameCount);
}
//sleep(0);
bool workThreadsFinished = false;
Chrono chrono;
chrono.start();
const int MAX_FACTION_THREAD_WAIT_MILLISECONDS = 20000;
for(;chrono.getMillis() < MAX_FACTION_THREAD_WAIT_MILLISECONDS;) {
workThreadsFinished = true;
for(int i = 0; i < factionCount; ++i) {
Faction *faction = getFaction(i);
//if(faction == NULL) {
// throw megaglest_runtime_error("faction == NULL");
//}
if(faction->isWorkerThreadSignalCompleted(frameCount) == false) {
workThreadsFinished = false;
break;
}
}
if(workThreadsFinished == false) {
//sleep(0);
}
else {
break;
}
}
if(workThreadsFinished == false) {
//sleep(0);
}
else {
break;
}
}
if(SystemFlags::VERBOSE_MODE_ENABLED && chrono.getMillis() >= 10) printf("In [%s::%s Line: %d] *** Faction thread preprocessing took [%lld] msecs for %d factions for frameCount = %d.\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis(),factionCount,frameCount);
if(SystemFlags::VERBOSE_MODE_ENABLED && chrono.getMillis() >= 10) printf("In [%s::%s Line: %d] *** Faction thread preprocessing took [%lld] msecs for %d factions for frameCount = %d.\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis(),factionCount,frameCount);
}
//units
for(int i = 0; i < factionCount; ++i) {
@ -1704,6 +1719,15 @@ void World::initFactionTypes(GameSettings *gs) {
}
}
if(Config::getInstance().getBool("EnableNewThreadManager","false") == true) {
std::vector<SlaveThreadControllerInterface *> slaveThreadList;
for(unsigned int i = 0; i < factions.size(); ++i) {
Faction *faction = factions[i];
slaveThreadList.push_back(faction->getWorkerThread());
}
masterController.setSlaves(slaveThreadList);
}
if(loadWorldNode != NULL) {
stats.loadGame(loadWorldNode);
random.setLastNumber(loadWorldNode->getAttribute("random")->getIntValue());

View File

@ -143,6 +143,8 @@ private:
const XmlNode *loadWorldNode;
MasterSlaveThreadController masterController;
public:
World();
~World();

View File

@ -94,6 +94,8 @@ public:
void p();
void v();
int getRefCount() const { return refCount; }
SDL_mutex* getMutex() { return mutex; }
};
class MutexSafeWrapper {
@ -190,8 +192,10 @@ public:
~Semaphore();
void signal();
int waitTillSignalled(int waitMilliseconds=-1);
bool tryDecrement();
uint32 getSemValue();
void resetSemValue(Uint32 initialValue);
};
@ -313,6 +317,86 @@ public:
}
};
const bool debugMasterSlaveThreadController = false;
// =====================================================
// class Trigger
// =====================================================
class Trigger {
private:
SDL_cond* trigger;
Mutex *mutex;
public:
Trigger(Mutex *mutex);
~Trigger();
void signal(bool allThreads=false);
int waitTillSignalled(Mutex *mutex, int waitMilliseconds=-1);
};
class MasterSlaveThreadController;
class SlaveThreadControllerInterface {
public:
virtual void setMasterController(MasterSlaveThreadController *master) = 0;
virtual void signalSlave(void *userdata) = 0;
};
class MasterSlaveThreadController {
private:
static const int triggerBaseCount = 1;
Mutex *mutex;
Semaphore *slaveTriggerSem;
int slaveTriggerCounter;
std::vector<SlaveThreadControllerInterface *> slaveThreadList;
void init(std::vector<SlaveThreadControllerInterface *> &newSlaveThreadList);
public:
MasterSlaveThreadController();
MasterSlaveThreadController(std::vector<SlaveThreadControllerInterface *> &slaveThreadList);
~MasterSlaveThreadController();
void setSlaves(std::vector<SlaveThreadControllerInterface *> &slaveThreadList);
void clearSlaves(bool clearListOnly=false);
void signalSlaves(void *userdata);
void triggerMaster(int waitMilliseconds=-1);
bool waitTillSlavesTrigger(int waitMilliseconds=-1);
};
class MasterSlaveThreadControllerSafeWrapper {
protected:
MasterSlaveThreadController *master;
string ownerId;
int waitMilliseconds;
public:
MasterSlaveThreadControllerSafeWrapper(MasterSlaveThreadController *master, int waitMilliseconds=-1, string ownerId="") {
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
this->master = master;
this->waitMilliseconds = waitMilliseconds;
this->ownerId = ownerId;
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
}
~MasterSlaveThreadControllerSafeWrapper() {
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
if(master != NULL) {
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
master->triggerMaster(this->waitMilliseconds);
}
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
}
};
}}//end namespace

View File

@ -59,7 +59,7 @@ void Thread::start() {
assert(thread != NULL);
if(thread == NULL) {
char szBuf[1024]="";
snprintf(szBuf,1023,"In [%s::%s Line: %d] thread == NULL",__FILE__,__FUNCTION__,__LINE__);
snprintf(szBuf,1023,"In [%s::%s Line: %d] thread == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
throw megaglest_runtime_error(szBuf);
}
}
@ -73,7 +73,7 @@ int Thread::beginExecution(void* data) {
assert(thread != NULL);
if(thread == NULL) {
char szBuf[1024]="";
snprintf(szBuf,1023,"In [%s::%s Line: %d] thread == NULL",__FILE__,__FUNCTION__,__LINE__);
snprintf(szBuf,1023,"In [%s::%s Line: %d] thread == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
throw megaglest_runtime_error(szBuf);
}
thread->execute();
@ -140,7 +140,7 @@ Mutex::Mutex(string ownerId) {
assert(mutex != NULL);
if(mutex == NULL) {
char szBuf[1024]="";
snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL",__FILE__,__FUNCTION__,__LINE__);
snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
throw megaglest_runtime_error(szBuf);
}
deleteownerId = "";
@ -150,13 +150,13 @@ Mutex::~Mutex() {
SDLMutexSafeWrapper safeMutex(&mutexAccessor,true);
if(mutex == NULL) {
char szBuf[1024]="";
snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL refCount = %d owner [%s] deleteownerId [%s]",__FILE__,__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str());
snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL refCount = %d owner [%s] deleteownerId [%s]",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str());
throw megaglest_runtime_error(szBuf);
//printf("%s\n",szBuf);
}
else if(refCount >= 1) {
char szBuf[1024]="";
snprintf(szBuf,1023,"In [%s::%s Line: %d] about to destroy mutex refCount = %d owner [%s] deleteownerId [%s]",__FILE__,__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str());
snprintf(szBuf,1023,"In [%s::%s Line: %d] about to destroy mutex refCount = %d owner [%s] deleteownerId [%s]",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str());
throw megaglest_runtime_error(szBuf);
}
@ -170,7 +170,7 @@ Mutex::~Mutex() {
void Mutex::p() {
if(mutex == NULL) {
char szBuf[1024]="";
snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL refCount = %d owner [%s] deleteownerId [%s]",__FILE__,__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str());
snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL refCount = %d owner [%s] deleteownerId [%s]",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str());
throw megaglest_runtime_error(szBuf);
}
SDL_mutexP(mutex);
@ -180,7 +180,7 @@ void Mutex::p() {
void Mutex::v() {
if(mutex == NULL) {
char szBuf[1024]="";
snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL refCount = %d owner [%s] deleteownerId [%s]",__FILE__,__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str());
snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL refCount = %d owner [%s] deleteownerId [%s]",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str());
throw megaglest_runtime_error(szBuf);
}
refCount--;
@ -195,7 +195,7 @@ Semaphore::Semaphore(Uint32 initialValue) {
semaphore = SDL_CreateSemaphore(initialValue);
if(semaphore == NULL) {
char szBuf[1024]="";
snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",__FILE__,__FUNCTION__,__LINE__);
snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
throw megaglest_runtime_error(szBuf);
}
}
@ -203,7 +203,7 @@ Semaphore::Semaphore(Uint32 initialValue) {
Semaphore::~Semaphore() {
if(semaphore == NULL) {
char szBuf[1024]="";
snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",__FILE__,__FUNCTION__,__LINE__);
snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
throw megaglest_runtime_error(szBuf);
}
SDL_DestroySemaphore(semaphore);
@ -213,7 +213,7 @@ Semaphore::~Semaphore() {
void Semaphore::signal() {
if(semaphore == NULL) {
char szBuf[1024]="";
snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",__FILE__,__FUNCTION__,__LINE__);
snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
throw megaglest_runtime_error(szBuf);
}
SDL_SemPost(semaphore);
@ -222,7 +222,7 @@ void Semaphore::signal() {
int Semaphore::waitTillSignalled(int waitMilliseconds) {
if(semaphore == NULL) {
char szBuf[1024]="";
snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",__FILE__,__FUNCTION__,__LINE__);
snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
throw megaglest_runtime_error(szBuf);
}
int semValue = 0;
@ -235,16 +235,43 @@ int Semaphore::waitTillSignalled(int waitMilliseconds) {
return semValue;
}
bool Semaphore::tryDecrement() {
if(semaphore == NULL) {
char szBuf[1024]="";
snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
throw megaglest_runtime_error(szBuf);
}
int semValue = SDL_SemTryWait(semaphore);
return (semValue == 0);
}
uint32 Semaphore::getSemValue() {
if(semaphore == NULL) {
char szBuf[1024]="";
snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",__FILE__,__FUNCTION__,__LINE__);
snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
throw megaglest_runtime_error(szBuf);
}
return SDL_SemValue(semaphore);
}
void Semaphore::resetSemValue(Uint32 initialValue) {
if(semaphore == NULL) {
char szBuf[1024]="";
snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
throw megaglest_runtime_error(szBuf);
}
uint32 currentValue = SDL_SemValue(semaphore);
for(unsigned int i = currentValue; i < initialValue; ++i) {
SDL_SemPost(semaphore);
}
}
// =====================================================
// class ReadWriteMutex
// =====================================================
ReadWriteMutex::ReadWriteMutex(int maxReaders) : semaphore(maxReaders) {
this->maxReadersCount = maxReaders;
}
@ -273,5 +300,232 @@ int ReadWriteMutex::maxReaders() {
return this->maxReadersCount;
}
// =====================================================
// class Trigger
// =====================================================
Trigger::Trigger(Mutex *mutex) {
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
this->mutex = mutex;
this->trigger = SDL_CreateCond();
if(this->trigger == NULL) {
char szBuf[1024]="";
snprintf(szBuf,1023,"In [%s::%s Line: %d] trigger == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
throw megaglest_runtime_error(szBuf);
}
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
}
Trigger::~Trigger() {
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
if(trigger == NULL) {
char szBuf[1024]="";
snprintf(szBuf,1023,"In [%s::%s Line: %d] trigger == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
throw megaglest_runtime_error(szBuf);
}
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
SDL_DestroyCond(trigger);
trigger = NULL;
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
this->mutex = NULL;
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
}
void Trigger::signal(bool allThreads) {
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
if(trigger == NULL) {
char szBuf[1024]="";
snprintf(szBuf,1023,"In [%s::%s Line: %d] trigger == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
throw megaglest_runtime_error(szBuf);
}
//MutexSafeWrapper safeMutex(mutex);
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
if(allThreads == false) {
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
int result = SDL_CondSignal(trigger);
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] result = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,result);
}
else {
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
int result = SDL_CondBroadcast(trigger);
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] result = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,result);
}
//safeMutex.ReleaseLock();
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
}
int Trigger::waitTillSignalled(Mutex *mutex, int waitMilliseconds) {
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
if(trigger == NULL) {
char szBuf[1024]="";
snprintf(szBuf,1023,"In [%s::%s Line: %d] trigger == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
throw megaglest_runtime_error(szBuf);
}
if(mutex == NULL) {
char szBuf[1024]="";
snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
throw megaglest_runtime_error(szBuf);
}
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
int result = 0;
if(waitMilliseconds >= 0) {
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
result = SDL_CondWaitTimeout(trigger,mutex->getMutex(),waitMilliseconds);
}
else {
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
result = SDL_CondWait(trigger, mutex->getMutex());
}
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
return result;
}
MasterSlaveThreadController::MasterSlaveThreadController() {
std::vector<SlaveThreadControllerInterface *> empty;
init(empty);
}
MasterSlaveThreadController::MasterSlaveThreadController(std::vector<SlaveThreadControllerInterface *> &slaveThreadList) {
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] ==========================================================\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
init(slaveThreadList);
}
void MasterSlaveThreadController::init(std::vector<SlaveThreadControllerInterface *> &newSlaveThreadList) {
static string masterSlaveOwnerId = string(__FILE__) + string("_MasterSlaveThreadController");
this->mutex = new Mutex(masterSlaveOwnerId);
this->slaveTriggerSem = new Semaphore(0);
this->slaveTriggerCounter = newSlaveThreadList.size() + triggerBaseCount;
setSlaves(newSlaveThreadList);
}
void MasterSlaveThreadController::clearSlaves(bool clearListOnly) {
if(this->slaveThreadList.empty() == false) {
if(clearListOnly == false) {
for(unsigned int i = 0; i < this->slaveThreadList.size(); ++i) {
SlaveThreadControllerInterface *slave = this->slaveThreadList[i];
slave->setMasterController(NULL);
}
}
this->slaveThreadList.clear();
}
}
MasterSlaveThreadController::~MasterSlaveThreadController() {
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
clearSlaves();
delete slaveTriggerSem;
slaveTriggerSem = NULL;
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] mutex->getRefCount() = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,mutex->getRefCount());
delete mutex;
mutex = NULL;
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
}
void MasterSlaveThreadController::setSlaves(std::vector<SlaveThreadControllerInterface *> &slaveThreadList) {
this->slaveThreadList = slaveThreadList;
if(this->slaveThreadList.empty() == false) {
for(unsigned int i = 0; i < this->slaveThreadList.size(); ++i) {
SlaveThreadControllerInterface *slave = this->slaveThreadList[i];
slave->setMasterController(this);
}
}
}
void MasterSlaveThreadController::signalSlaves(void *userdata) {
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
slaveTriggerCounter = this->slaveThreadList.size() + triggerBaseCount;
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
if(this->slaveThreadList.empty() == false) {
for(unsigned int i = 0; i < this->slaveThreadList.size(); ++i) {
SlaveThreadControllerInterface *slave = this->slaveThreadList[i];
slave->signalSlave(userdata);
}
}
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
}
void MasterSlaveThreadController::triggerMaster(int waitMilliseconds) {
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
MutexSafeWrapper safeMutex(mutex);
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] semVal = %u\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,slaveTriggerCounter);
slaveTriggerCounter--;
int newCount = slaveTriggerCounter;
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] slaveTriggerCounter = %u\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,slaveTriggerCounter);
safeMutex.ReleaseLock();
if(newCount <= triggerBaseCount) {
slaveTriggerSem->signal();
}
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
}
bool MasterSlaveThreadController::waitTillSlavesTrigger(int waitMilliseconds) {
bool result = true;
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] slaveTriggerCounter = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,slaveTriggerCounter);
if(this->slaveThreadList.empty() == false) {
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] slaveTriggerCounter = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,slaveTriggerCounter);
int slaveResult = slaveTriggerSem->waitTillSignalled(waitMilliseconds);
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] slaveTriggerCounter = %d slaveResult = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,slaveTriggerCounter,slaveResult);
if(slaveResult != 0) {
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
result = false;
}
else if(slaveResult == 0) {
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] slaveTriggerCounter = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,slaveTriggerCounter);
result = true;
}
}
if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
return result;
}
}}//end namespace