From 5d2066fdbb65fc49a86bc749113f33268804f435 Mon Sep 17 00:00:00 2001 From: Mark Vejvoda Date: Thu, 10 Jan 2013 21:16:28 +0000 Subject: [PATCH] - added a new thread manager to try to see if it takes less CPU cycles while the thread controller waits for slaves to do their work. To enable the experimental thread manager set: EnableNewThreadManager=true --- source/glest_game/ai/ai_interface.cpp | 48 +-- source/glest_game/ai/ai_interface.h | 8 +- source/glest_game/game/game.cpp | 115 +++++--- source/glest_game/game/game.h | 2 + source/glest_game/network/connection_slot.cpp | 13 + source/glest_game/network/connection_slot.h | 8 +- .../glest_game/network/server_interface.cpp | 186 ++++++++---- source/glest_game/network/server_interface.h | 1 + source/glest_game/type_instances/faction.cpp | 18 ++ source/glest_game/type_instances/faction.h | 10 +- source/glest_game/world/world.cpp | 72 +++-- source/glest_game/world/world.h | 2 + .../shared_lib/include/platform/sdl/thread.h | 84 ++++++ .../sources/platform/sdl/thread.cpp | 278 +++++++++++++++++- 14 files changed, 666 insertions(+), 179 deletions(-) diff --git a/source/glest_game/ai/ai_interface.cpp b/source/glest_game/ai/ai_interface.cpp index ae42aad0..ae5e95b6 100644 --- a/source/glest_game/ai/ai_interface.cpp +++ b/source/glest_game/ai/ai_interface.cpp @@ -37,6 +37,7 @@ namespace Glest{ namespace Game{ // ===================================================== AiInterfaceThread::AiInterfaceThread(AiInterface *aiIntf) : BaseThread() { + this->masterController = NULL; this->triggerIdMutex = new Mutex(); this->aiIntf = aiIntf; } @@ -124,6 +125,9 @@ void AiInterfaceThread::execute() { semTaskSignalled.waitTillSignalled(); + static string masterSlaveOwnerId = string(__FILE__) + string("_") + intToStr(__LINE__); + MasterSlaveThreadControllerSafeWrapper safeMasterController(masterController,20000,masterSlaveOwnerId); + if(getQuitStatus() == true) { if(SystemFlags::getSystemSettingType(SystemFlags::debugSystem).enabled) SystemFlags::OutputDebug(SystemFlags::debugSystem,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); break; @@ -144,50 +148,6 @@ void AiInterfaceThread::execute() { this->aiIntf->update(); -/* - World *world = faction->getWorld(); - - //Config &config= Config::getInstance(); - //bool sortedUnitsAllowed = config.getBool("AllowGroupedUnitCommands","true"); - bool sortedUnitsAllowed = false; - if(sortedUnitsAllowed) { - faction->sortUnitsByCommandGroups(); - } - - MutexSafeWrapper safeMutex(faction->getUnitMutex(),string(__FILE__) + "_" + intToStr(__LINE__)); - - //if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled) chrono.start(); - if(minorDebugPerformance) chrono.start(); - - int unitCount = faction->getUnitCount(); - for(int j = 0; j < unitCount; ++j) { - Unit *unit = faction->getUnit(j); - if(unit == NULL) { - throw megaglest_runtime_error("unit == NULL"); - } - - int64 elapsed1 = 0; - if(minorDebugPerformance) elapsed1 = chrono.getMillis(); - - bool update = unit->needToUpdate(); - - if(minorDebugPerformance && (chrono.getMillis() - elapsed1) >= 1) printf("Faction [%d - %s] #1-unit threaded updates on frame: %d for [%d] unit # %d, unitCount = %d, took [%lld] msecs\n",faction->getStartLocationIndex(),faction->getType()->getName().c_str(),frameIndex.first,faction->getUnitPathfindingListCount(),j,unitCount,(long long int)chrono.getMillis() - elapsed1); - - //update = true; - if(update == true) { - - int64 elapsed2 = 0; - if(minorDebugPerformance) elapsed2 = chrono.getMillis(); - - world->getUnitUpdater()->updateUnitCommand(unit,frameIndex.first); - - if(minorDebugPerformance && (chrono.getMillis() - elapsed2) >= 1) printf("Faction [%d - %s] #2-unit threaded updates on frame: %d for [%d] unit # %d, unitCount = %d, took [%lld] msecs\n",faction->getStartLocationIndex(),faction->getType()->getName().c_str(),frameIndex.first,faction->getUnitPathfindingListCount(),j,unitCount,(long long int)chrono.getMillis() - elapsed2); - } - } - - if(minorDebugPerformance && chrono.getMillis() >= 1) printf("Faction [%d - %s] threaded updates on frame: %d for [%d] units took [%lld] msecs\n",faction->getStartLocationIndex(),faction->getType()->getName().c_str(),frameIndex.first,faction->getUnitPathfindingListCount(),(long long int)chrono.getMillis()); -*/ - safeMutex.ReleaseLock(); setTaskCompleted(frameIndex.first); diff --git a/source/glest_game/ai/ai_interface.h b/source/glest_game/ai/ai_interface.h index 9c035d84..8214bc5b 100644 --- a/source/glest_game/ai/ai_interface.h +++ b/source/glest_game/ai/ai_interface.h @@ -31,13 +31,14 @@ namespace Glest{ namespace Game{ /// The AI will interact with the game through this interface // ===================================================== -class AiInterfaceThread : public BaseThread { +class AiInterfaceThread : public BaseThread, public SlaveThreadControllerInterface { protected: AiInterface *aiIntf; Semaphore semTaskSignalled; Mutex *triggerIdMutex; std::pair frameIndex; + MasterSlaveThreadController *masterController; virtual void setQuitStatus(bool value); virtual void setTaskCompleted(int frameIndex); @@ -49,6 +50,10 @@ public: virtual void execute(); void signal(int frameIndex); bool isSignalCompleted(int frameIndex); + + virtual void setMasterController(MasterSlaveThreadController *master) { masterController = master; } + virtual void signalSlave(void *userdata) { signal(*((int *)(userdata))); } + }; class AiInterface { @@ -87,6 +92,7 @@ public: void signalWorkerThread(int frameIndex); bool isWorkerThreadSignalCompleted(int frameIndex); + AiInterfaceThread *getWorkerThread() { return workerThread; } //get int getTimer() const {return timer;} diff --git a/source/glest_game/game/game.cpp b/source/glest_game/game/game.cpp index ca48c6dc..2b9e5b34 100644 --- a/source/glest_game/game/game.cpp +++ b/source/glest_game/game/game.cpp @@ -378,6 +378,7 @@ Game::~Game() { if(SystemFlags::getSystemSettingType(SystemFlags::debugSystem).enabled) SystemFlags::OutputDebug(SystemFlags::debugSystem,"In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + masterController.clearSlaves(true); deleteValues(aiInterfaces.begin(), aiInterfaces.end()); aiInterfaces.clear(); @@ -1133,8 +1134,10 @@ void Game::init(bool initForPreviewOnly) { bool isNetworkGame = this->gameSettings.isNetworkGame(); role = networkManager.getNetworkRole(); + masterController.clearSlaves(true); deleteValues(aiInterfaces.begin(), aiInterfaces.end()); + std::vector slaveThreadList; aiInterfaces.resize(world.getFactionCount()); for(int i=0; i < world.getFactionCount(); ++i) { Faction *faction= world.getFaction(i); @@ -1146,15 +1149,19 @@ void Game::init(bool initForPreviewOnly) { char szBuf[8096]=""; snprintf(szBuf,8096,Lang::getInstance().get("LogScreenGameLoadingCreatingAIFaction","",true).c_str(),i); logger.add(szBuf, true); + + slaveThreadList.push_back(aiInterfaces[i]->getWorkerThread()); } else { aiInterfaces[i]= NULL; } } + if(Config::getInstance().getBool("EnableNewThreadManager","false") == true) { + masterController.setSlaves(slaveThreadList); + } if(SystemFlags::getSystemSettingType(SystemFlags::debugSystem).enabled) SystemFlags::OutputDebug(SystemFlags::debugSystem,"In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); - // give CPU time to update other things to avoid apperance of hanging sleep(0); Shared::Platform::Window::handleEvent(); @@ -1525,55 +1532,63 @@ void Game::update() { } */ - // Signal the faction threads to do any pre-processing - bool hasAIPlayer = false; - for(int j = 0; j < world.getFactionCount(); ++j) { - Faction *faction = world.getFaction(j); + const bool newThreadManager = Config::getInstance().getBool("EnableNewThreadManager","false"); + if(newThreadManager == true) { + int currentFrameCount = world.getFrameCount(); + masterController.signalSlaves(¤tFrameCount); + bool slavesCompleted = masterController.waitTillSlavesTrigger(20000); + } + else { + // Signal the faction threads to do any pre-processing + bool hasAIPlayer = false; + for(int j = 0; j < world.getFactionCount(); ++j) { + Faction *faction = world.getFaction(j); - //printf("Faction Index = %d enableServerControlledAI = %d, isNetworkGame = %d, role = %d isCPU player = %d scriptManager.getPlayerModifiers(j)->getAiEnabled() = %d\n",j,enableServerControlledAI,isNetworkGame,role,faction->getCpuControl(enableServerControlledAI,isNetworkGame,role),scriptManager.getPlayerModifiers(j)->getAiEnabled()); + //printf("Faction Index = %d enableServerControlledAI = %d, isNetworkGame = %d, role = %d isCPU player = %d scriptManager.getPlayerModifiers(j)->getAiEnabled() = %d\n",j,enableServerControlledAI,isNetworkGame,role,faction->getCpuControl(enableServerControlledAI,isNetworkGame,role),scriptManager.getPlayerModifiers(j)->getAiEnabled()); - if( faction->getCpuControl(enableServerControlledAI,isNetworkGame,role) == true && - scriptManager.getPlayerModifiers(j)->getAiEnabled() == true) { + if( faction->getCpuControl(enableServerControlledAI,isNetworkGame,role) == true && + scriptManager.getPlayerModifiers(j)->getAiEnabled() == true) { - if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled && chrono.getMillis() > 0) SystemFlags::OutputDebug(SystemFlags::debugPerformance,"In [%s::%s Line: %d] [i = %d] faction = %d, factionCount = %d, took msecs: %lld [before AI updates]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,i,j,world.getFactionCount(),chrono.getMillis()); - aiInterfaces[j]->signalWorkerThread(world.getFrameCount()); - hasAIPlayer = true; + if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled && chrono.getMillis() > 0) SystemFlags::OutputDebug(SystemFlags::debugPerformance,"In [%s::%s Line: %d] [i = %d] faction = %d, factionCount = %d, took msecs: %lld [before AI updates]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,i,j,world.getFactionCount(),chrono.getMillis()); + aiInterfaces[j]->signalWorkerThread(world.getFrameCount()); + hasAIPlayer = true; + } } - } - if(showPerfStats) { - sprintf(perfBuf,"In [%s::%s] Line: %d took msecs: " MG_I64_SPECIFIER "\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,chronoPerf.getMillis()); - perfList.push_back(perfBuf); - } + if(showPerfStats) { + sprintf(perfBuf,"In [%s::%s] Line: %d took msecs: " MG_I64_SPECIFIER "\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,chronoPerf.getMillis()); + perfList.push_back(perfBuf); + } - if(hasAIPlayer == true) { - //sleep(0); + if(hasAIPlayer == true) { + //sleep(0); - bool workThreadsFinished = false; - Chrono chronoAI; - chronoAI.start(); + bool workThreadsFinished = false; + Chrono chronoAI; + chronoAI.start(); - const int MAX_FACTION_THREAD_WAIT_MILLISECONDS = 20000; - for(;chronoAI.getMillis() < MAX_FACTION_THREAD_WAIT_MILLISECONDS;) { - workThreadsFinished = true; - for(int j = 0; j < world.getFactionCount(); ++j) { - Faction *faction = world.getFaction(j); - if(faction == NULL) { - throw megaglest_runtime_error("faction == NULL"); - } - if( faction->getCpuControl(enableServerControlledAI,isNetworkGame,role) == true && - scriptManager.getPlayerModifiers(j)->getAiEnabled() == true) { - if(aiInterfaces[j]->isWorkerThreadSignalCompleted(world.getFrameCount()) == false) { - workThreadsFinished = false; - break; + const int MAX_FACTION_THREAD_WAIT_MILLISECONDS = 20000; + for(;chronoAI.getMillis() < MAX_FACTION_THREAD_WAIT_MILLISECONDS;) { + workThreadsFinished = true; + for(int j = 0; j < world.getFactionCount(); ++j) { + Faction *faction = world.getFaction(j); + if(faction == NULL) { + throw megaglest_runtime_error("faction == NULL"); + } + if( faction->getCpuControl(enableServerControlledAI,isNetworkGame,role) == true && + scriptManager.getPlayerModifiers(j)->getAiEnabled() == true) { + if(aiInterfaces[j]->isWorkerThreadSignalCompleted(world.getFrameCount()) == false) { + workThreadsFinished = false; + break; + } } } - } - if(workThreadsFinished == false) { - //sleep(0); - } - else { - break; + if(workThreadsFinished == false) { + //sleep(0); + } + else { + break; + } } } } @@ -1824,6 +1839,8 @@ void Game::update() { world.clearTileset(); SoundRenderer::getInstance().stopAllSounds(); + + masterController.clearSlaves(true); deleteValues(aiInterfaces.begin(), aiInterfaces.end()); aiInterfaces.clear(); gui.end(); //selection must be cleared before deleting units @@ -2092,6 +2109,7 @@ void Game::ReplaceDisconnectedNetworkPlayersWithAI(bool isNetworkGame, NetworkRo Logger &logger= Logger::getInstance(); ServerInterface *server = NetworkManager::getInstance().getServerInterface(); + bool newAIPlayerCreated = false; for(int i = 0; i < world.getFactionCount(); ++i) { Faction *faction = world.getFaction(i); if( faction->getFactionDisconnectHandled() == false && @@ -2123,6 +2141,8 @@ void Game::ReplaceDisconnectedNetworkPlayersWithAI(bool isNetworkGame, NetworkRo snprintf(szBuf,8096,msg.c_str(),i+1,this->gameSettings.getNetworkPlayerName(i).c_str()); commander.tryNetworkPlayerDisconnected(i); + + newAIPlayerCreated = true; } else { string msg = "Player #%d [%s] has disconnected, but player was only an observer!"; @@ -2140,6 +2160,21 @@ void Game::ReplaceDisconnectedNetworkPlayersWithAI(bool isNetworkGame, NetworkRo } } } + + if(newAIPlayerCreated == true && Config::getInstance().getBool("EnableNewThreadManager","false") == true) { + bool enableServerControlledAI = this->gameSettings.getEnableServerControlledAI(); + + masterController.clearSlaves(true); + + std::vector slaveThreadList; + for(int i=0; i < world.getFactionCount(); ++i) { + Faction *faction= world.getFaction(i); + if(faction->getCpuControl(enableServerControlledAI,isNetworkGame,role) == true) { + slaveThreadList.push_back(aiInterfaces[i]->getWorkerThread()); + } + } + masterController.setSlaves(slaveThreadList); + } } } diff --git a/source/glest_game/game/game.h b/source/glest_game/game/game.h index 48ee2575..beaf3049 100644 --- a/source/glest_game/game/game.h +++ b/source/glest_game/game/game.h @@ -196,6 +196,8 @@ private: std::map unitHighlightList; + MasterSlaveThreadController masterController; + public: Game(); Game(Program *program, const GameSettings *gameSettings, bool masterserverMode); diff --git a/source/glest_game/network/connection_slot.cpp b/source/glest_game/network/connection_slot.cpp index dfe7538c..5482622d 100644 --- a/source/glest_game/network/connection_slot.cpp +++ b/source/glest_game/network/connection_slot.cpp @@ -33,6 +33,7 @@ namespace Glest{ namespace Game{ // ===================================================== ConnectionSlotThread::ConnectionSlotThread(int slotIndex) : BaseThread() { + this->masterController = NULL; this->triggerIdMutex = new Mutex(); this->slotIndex = slotIndex; this->slotInterface = NULL; @@ -42,6 +43,7 @@ ConnectionSlotThread::ConnectionSlotThread(int slotIndex) : BaseThread() { } ConnectionSlotThread::ConnectionSlotThread(ConnectionSlotCallbackInterface *slotInterface,int slotIndex) : BaseThread() { + this->masterController = NULL; this->triggerIdMutex = new Mutex(); this->slotIndex = slotIndex; this->slotInterface = slotInterface; @@ -164,6 +166,14 @@ void ConnectionSlotThread::slotUpdateTask(ConnectionSlotEvent *event) { } } +void ConnectionSlotThread::signalSlave(void *userdata) { + + //ConnectionSlotEvent &event = eventList[i]; + std::map *eventList = (std::map *)userdata; + ConnectionSlotEvent &event = (*eventList)[slotIndex]; + signalUpdate(&event); +} + void ConnectionSlotThread::execute() { RunningStatusSafeWrapper runningStatus(this); try { @@ -179,6 +189,9 @@ void ConnectionSlotThread::execute() { semTaskSignalled.waitTillSignalled(); + static string masterSlaveOwnerId = string(__FILE__) + string("_") + intToStr(__LINE__); + MasterSlaveThreadControllerSafeWrapper safeMasterController(masterController,20000,masterSlaveOwnerId); + if(getQuitStatus() == true) { if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); break; diff --git a/source/glest_game/network/connection_slot.h b/source/glest_game/network/connection_slot.h index 9de12648..29c136c6 100644 --- a/source/glest_game/network/connection_slot.h +++ b/source/glest_game/network/connection_slot.h @@ -70,7 +70,7 @@ public: virtual ~ConnectionSlotCallbackInterface() {} }; -class ConnectionSlotThread : public BaseThread +class ConnectionSlotThread : public BaseThread, public SlaveThreadControllerInterface { protected: @@ -79,6 +79,7 @@ protected: Mutex *triggerIdMutex; vector eventList; int slotIndex; + MasterSlaveThreadController *masterController; virtual void setQuitStatus(bool value); virtual void setTaskCompleted(int eventId); @@ -91,6 +92,9 @@ public: ConnectionSlotThread(ConnectionSlotCallbackInterface *slotInterface,int slotIndex); virtual ~ConnectionSlotThread(); + virtual void setMasterController(MasterSlaveThreadController *master) { masterController = master; } + virtual void signalSlave(void *userdata); + virtual void execute(); void signalUpdate(ConnectionSlotEvent *event); bool isSignalCompleted(ConnectionSlotEvent *event); @@ -140,6 +144,8 @@ public: ConnectionSlot(ServerInterface* serverInterface, int playerIndex); ~ConnectionSlot(); + ConnectionSlotThread *getWorkerThread() { return slotThreadWorker; } + void update(bool checkForNewClients,int lockedSlotIndex); void setPlayerIndex(int value) { playerIndex = value; } int getPlayerIndex() const {return playerIndex;} diff --git a/source/glest_game/network/server_interface.cpp b/source/glest_game/network/server_interface.cpp index 108ace7f..71093d0b 100644 --- a/source/glest_game/network/server_interface.cpp +++ b/source/glest_game/network/server_interface.cpp @@ -243,6 +243,8 @@ ServerInterface::~ServerInterface() { //printf("===> Destructor for ServerInterface\n"); if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + masterController.clearSlaves(true); exitServer = true; for(int i= 0; i < GameConstants::maxPlayers; ++i) { if(slots[i] != NULL) { @@ -826,25 +828,6 @@ std::pair ServerInterface::clientLagCheck(ConnectionSlot *connectionS return clientLagExceededOrWarned; } -bool ServerInterface::signalClientReceiveCommands(ConnectionSlot *connectionSlot, int slotIndex, bool socketTriggered, ConnectionSlotEvent & event) { - bool slotSignalled = false; - - event.eventType = eReceiveSocketData; - event.networkMessage = NULL; - event.connectionSlot = connectionSlot; - event.socketTriggered = socketTriggered; - event.triggerId = slotIndex; - event.eventId = getNextEventId(); - - if(connectionSlot != NULL) { - if(socketTriggered == true || connectionSlot->isConnected() == false) { - connectionSlot->signalUpdate(&event); - slotSignalled = true; - } - } - return slotSignalled; -} - void ServerInterface::updateSocketTriggeredList(std::map & socketTriggeredList) { for(int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) { MutexSafeWrapper safeMutexSlot(slotAccessorMutexes[i],CODE_AT_LINE_X(i)); @@ -868,51 +851,97 @@ void ServerInterface::validateConnectedClients() { } } +bool ServerInterface::signalClientReceiveCommands(ConnectionSlot *connectionSlot, + int slotIndex, bool socketTriggered, ConnectionSlotEvent & event) { + bool slotSignalled = false; + + event.eventType = eReceiveSocketData; + event.networkMessage = NULL; + event.connectionSlot = connectionSlot; + event.socketTriggered = socketTriggered; + event.triggerId = slotIndex; + event.eventId = getNextEventId(); + + if(connectionSlot != NULL) { + if(socketTriggered == true || connectionSlot->isConnected() == false) { + connectionSlot->signalUpdate(&event); + slotSignalled = true; + } + } + return slotSignalled; +} + void ServerInterface::signalClientsToRecieveData(std::map &socketTriggeredList, std::map &eventList, std::map & mapSlotSignalledList) { - //bool checkForNewClients = true; - for(int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) { - MutexSafeWrapper safeMutexSlot(slotAccessorMutexes[i],CODE_AT_LINE_X(i)); - ConnectionSlot* connectionSlot = slots[i]; + const bool newThreadManager = Config::getInstance().getBool("EnableNewThreadManager","false"); + if(newThreadManager == true) { + masterController.clearSlaves(true); + std::vector slaveThreadList; + for(unsigned int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) { + MutexSafeWrapper safeMutexSlot(slotAccessorMutexes[i],CODE_AT_LINE_X(i)); + ConnectionSlot* connectionSlot = slots[i]; - bool socketTriggered = false; + bool socketTriggered = false; - if(connectionSlot != NULL) { - PLATFORM_SOCKET clientSocket = connectionSlot->getSocketId(); - if(Socket::isSocketValid(&clientSocket)) { - socketTriggered = socketTriggeredList[clientSocket]; + if(connectionSlot != NULL) { + PLATFORM_SOCKET clientSocket = connectionSlot->getSocketId(); + if(Socket::isSocketValid(&clientSocket)) { + socketTriggered = socketTriggeredList[clientSocket]; + } + } + ConnectionSlotEvent &event = eventList[i]; + if(connectionSlot != NULL) { + if(socketTriggered == true || connectionSlot->isConnected() == false) { + ConnectionSlotEvent &event = eventList[i]; + event.eventType = eReceiveSocketData; + event.networkMessage = NULL; + event.connectionSlot = connectionSlot; + event.socketTriggered = socketTriggered; + event.triggerId = i; + event.eventId = getNextEventId(); + + slaveThreadList.push_back(connectionSlot->getWorkerThread()); + mapSlotSignalledList[i] = true; + } } } - ConnectionSlotEvent &event = eventList[i]; - mapSlotSignalledList[i] = signalClientReceiveCommands(connectionSlot,i,socketTriggered,event); + masterController.setSlaves(slaveThreadList); + masterController.signalSlaves(&eventList); + } + else { + for(int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) { + MutexSafeWrapper safeMutexSlot(slotAccessorMutexes[i],CODE_AT_LINE_X(i)); + ConnectionSlot* connectionSlot = slots[i]; + + bool socketTriggered = false; + + if(connectionSlot != NULL) { + PLATFORM_SOCKET clientSocket = connectionSlot->getSocketId(); + if(Socket::isSocketValid(&clientSocket)) { + socketTriggered = socketTriggeredList[clientSocket]; + } + } + ConnectionSlotEvent &event = eventList[i]; + mapSlotSignalledList[i] = signalClientReceiveCommands(connectionSlot,i,socketTriggered,event); + } } } void ServerInterface::checkForCompletedClients(std::map & mapSlotSignalledList, std::vector &errorMsgList, std::map &eventList) { - time_t waitForThreadElapsed = time(NULL); - std::map slotsCompleted; - for(bool threadsDone = false; - exitServer == false && threadsDone == false && - difftime((long int)time(NULL),waitForThreadElapsed) < MAX_SLOT_THREAD_WAIT_TIME;) { - threadsDone = true; - // Examine all threads for completion of delegation - for(int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) { - //printf("===> START slot %d [%p] - About to checkForCompletedClients\n",i,slots[i]); + const bool newThreadManager = Config::getInstance().getBool("EnableNewThreadManager","false"); + if(newThreadManager == true) { + bool slavesCompleted = masterController.waitTillSlavesTrigger(1000 * MAX_SLOT_THREAD_WAIT_TIME); + for(unsigned int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) { MutexSafeWrapper safeMutexSlot(slotAccessorMutexes[i],CODE_AT_LINE_X(i)); - //printf("===> IN slot %d - About to checkForCompletedClients\n",i); - ConnectionSlot* connectionSlot = slots[i]; - if(connectionSlot != NULL && mapSlotSignalledList[i] == true && - slotsCompleted.find(i) == slotsCompleted.end()) { + if(connectionSlot != NULL && mapSlotSignalledList[i] == true) { try { - - std::vector errorList = connectionSlot->getThreadErrorList(); // Collect any collected errors from threads if(errorList.empty() == false) { @@ -924,16 +953,6 @@ void ServerInterface::checkForCompletedClients(std::map & mapSlotSigna } connectionSlot->clearThreadErrorList(); } - - // Not done waiting for data yet - bool updateFinished = (connectionSlot != NULL ? connectionSlot->updateCompleted(&eventList[i]) : true); - if(updateFinished == false) { - threadsDone = false; - break; - } - else { - slotsCompleted[i] = true; - } } catch(const exception &ex) { SystemFlags::OutputDebug(SystemFlags::debugError,"In [%s::%s Line: %d] Error [%s]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,ex.what()); @@ -942,8 +961,63 @@ void ServerInterface::checkForCompletedClients(std::map & mapSlotSigna errorMsgList.push_back(ex.what()); } } + } + masterController.clearSlaves(true); + } + else { + time_t waitForThreadElapsed = time(NULL); + std::map slotsCompleted; + for(bool threadsDone = false; + exitServer == false && threadsDone == false && + difftime((long int)time(NULL),waitForThreadElapsed) < MAX_SLOT_THREAD_WAIT_TIME;) { + threadsDone = true; + // Examine all threads for completion of delegation + for(int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) { - //printf("===> END slot %d - About to checkForCompletedClients\n",i); + //printf("===> START slot %d [%p] - About to checkForCompletedClients\n",i,slots[i]); + + MutexSafeWrapper safeMutexSlot(slotAccessorMutexes[i],CODE_AT_LINE_X(i)); + + //printf("===> IN slot %d - About to checkForCompletedClients\n",i); + + ConnectionSlot* connectionSlot = slots[i]; + if(connectionSlot != NULL && mapSlotSignalledList[i] == true && + slotsCompleted.find(i) == slotsCompleted.end()) { + try { + + + std::vector errorList = connectionSlot->getThreadErrorList(); + // Collect any collected errors from threads + if(errorList.empty() == false) { + for(int iErrIdx = 0; iErrIdx < errorList.size(); ++iErrIdx) { + string &sErr = errorList[iErrIdx]; + if(sErr != "") { + errorMsgList.push_back(sErr); + } + } + connectionSlot->clearThreadErrorList(); + } + + // Not done waiting for data yet + bool updateFinished = (connectionSlot != NULL ? connectionSlot->updateCompleted(&eventList[i]) : true); + if(updateFinished == false) { + threadsDone = false; + break; + } + else { + slotsCompleted[i] = true; + } + } + catch(const exception &ex) { + SystemFlags::OutputDebug(SystemFlags::debugError,"In [%s::%s Line: %d] Error [%s]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,ex.what()); + if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] error detected [%s]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,ex.what()); + + errorMsgList.push_back(ex.what()); + } + } + + //printf("===> END slot %d - About to checkForCompletedClients\n",i); + } } } } diff --git a/source/glest_game/network/server_interface.h b/source/glest_game/network/server_interface.h index 4be5e813..257a5273 100644 --- a/source/glest_game/network/server_interface.h +++ b/source/glest_game/network/server_interface.h @@ -93,6 +93,7 @@ private: map > badClientConnectIPList; ServerSocket *serverSocketAdmin; + MasterSlaveThreadController masterController; public: ServerInterface(bool publishEnabled); diff --git a/source/glest_game/type_instances/faction.cpp b/source/glest_game/type_instances/faction.cpp index 1b980ea9..5e02f6ae 100644 --- a/source/glest_game/type_instances/faction.cpp +++ b/source/glest_game/type_instances/faction.cpp @@ -215,6 +215,7 @@ void Faction::sortUnitsByCommandGroups() { FactionThread::FactionThread(Faction *faction) : BaseThread() { this->triggerIdMutex = new Mutex(); this->faction = faction; + this->masterController = NULL; } FactionThread::~FactionThread() { @@ -300,6 +301,11 @@ void FactionThread::execute() { semTaskSignalled.waitTillSignalled(); + //printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + static string masterSlaveOwnerId = string(__FILE__) + string("_") + intToStr(__LINE__); + MasterSlaveThreadControllerSafeWrapper safeMasterController(masterController,20000,masterSlaveOwnerId); + //printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + if(getQuitStatus() == true) { if(SystemFlags::getSystemSettingType(SystemFlags::debugSystem).enabled) SystemFlags::OutputDebug(SystemFlags::debugSystem,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); break; @@ -313,6 +319,8 @@ void FactionThread::execute() { safeMutex.ReleaseLock(); + //printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + if(executeTask == true) { ExecutingTaskSafeWrapper safeExecutingTaskMutex(this); @@ -330,6 +338,8 @@ void FactionThread::execute() { //if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled) chrono.start(); if(minorDebugPerformance) chrono.start(); + //printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + int unitCount = faction->getUnitCount(); for(int j = 0; j < unitCount; ++j) { Unit *unit = faction->getUnit(j); @@ -358,11 +368,19 @@ void FactionThread::execute() { if(minorDebugPerformance && chrono.getMillis() >= 1) printf("Faction [%d - %s] threaded updates on frame: %d for [%d] units took [%lld] msecs\n",faction->getStartLocationIndex(),faction->getType()->getName().c_str(),frameIndex.first,faction->getUnitPathfindingListCount(),(long long int)chrono.getMillis()); + //printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + safeMutex.ReleaseLock(); + //printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + setTaskCompleted(frameIndex.first); + + //printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); } + //printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + if(getQuitStatus() == true) { if(SystemFlags::getSystemSettingType(SystemFlags::debugSystem).enabled) SystemFlags::OutputDebug(SystemFlags::debugSystem,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); break; diff --git a/source/glest_game/type_instances/faction.h b/source/glest_game/type_instances/faction.h index 2659f7d9..0dd87e8c 100644 --- a/source/glest_game/type_instances/faction.h +++ b/source/glest_game/type_instances/faction.h @@ -76,13 +76,14 @@ struct CommandGroupUnitSorterId { bool operator()(const int l, const int r); }; -class FactionThread : public BaseThread { +class FactionThread : public BaseThread, public SlaveThreadControllerInterface { protected: Faction *faction; Semaphore semTaskSignalled; Mutex *triggerIdMutex; std::pair frameIndex; + MasterSlaveThreadController *masterController; virtual void setQuitStatus(bool value); virtual void setTaskCompleted(int frameIndex); @@ -92,6 +93,10 @@ public: FactionThread(Faction *faction); virtual ~FactionThread(); virtual void execute(); + + virtual void setMasterController(MasterSlaveThreadController *master) { masterController = master; } + virtual void signalSlave(void *userdata) { signalPathfinder(*((int *)(userdata))); } + void signalPathfinder(int frameIndex); bool isSignalPathfinderCompleted(int frameIndex); }; @@ -320,8 +325,11 @@ public: inline World * getWorld() { return world; } int getFrameCount(); + void signalWorkerThread(int frameIndex); bool isWorkerThreadSignalCompleted(int frameIndex); + FactionThread *getWorkerThread() { return workerThread; } + void limitResourcesToStore(); void sortUnitsByCommandGroups(); diff --git a/source/glest_game/world/world.cpp b/source/glest_game/world/world.cpp index 9ff32186..d73026b0 100644 --- a/source/glest_game/world/world.cpp +++ b/source/glest_game/world/world.cpp @@ -100,6 +100,7 @@ void World::cleanup() { factions[i]->end(); } + masterController.clearSlaves(true); if(SystemFlags::getSystemSettingType(SystemFlags::debugSystem).enabled) SystemFlags::OutputDebug(SystemFlags::debugSystem,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); for(int i= 0; iend(); } + + masterController.clearSlaves(true); for(int i= 0; iclearAproxCanMoveSoonCached(); } - // Signal the faction threads to do any pre-processing - for(int i = 0; i < factionCount; ++i) { - Faction *faction = getFaction(i); - //if(faction == NULL) { - // throw megaglest_runtime_error("faction == NULL"); - //} - faction->signalWorkerThread(frameCount); - } - - //sleep(0); - bool workThreadsFinished = false; Chrono chrono; chrono.start(); - const int MAX_FACTION_THREAD_WAIT_MILLISECONDS = 20000; - for(;chrono.getMillis() < MAX_FACTION_THREAD_WAIT_MILLISECONDS;) { - workThreadsFinished = true; + const bool newThreadManager = Config::getInstance().getBool("EnableNewThreadManager","false"); + if(newThreadManager == true) { + masterController.signalSlaves(&frameCount); + bool slavesCompleted = masterController.waitTillSlavesTrigger(20000); + + if(SystemFlags::VERBOSE_MODE_ENABLED && chrono.getMillis() >= 10) printf("In [%s::%s Line: %d] *** Faction thread preprocessing took [%lld] msecs for %d factions for frameCount = %d slavesCompleted = %d.\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis(),factionCount,frameCount,slavesCompleted); + } + else { + // Signal the faction threads to do any pre-processing for(int i = 0; i < factionCount; ++i) { Faction *faction = getFaction(i); //if(faction == NULL) { // throw megaglest_runtime_error("faction == NULL"); //} - if(faction->isWorkerThreadSignalCompleted(frameCount) == false) { - workThreadsFinished = false; + faction->signalWorkerThread(frameCount); + } + + //sleep(0); + bool workThreadsFinished = false; + Chrono chrono; + chrono.start(); + + const int MAX_FACTION_THREAD_WAIT_MILLISECONDS = 20000; + for(;chrono.getMillis() < MAX_FACTION_THREAD_WAIT_MILLISECONDS;) { + workThreadsFinished = true; + for(int i = 0; i < factionCount; ++i) { + Faction *faction = getFaction(i); + //if(faction == NULL) { + // throw megaglest_runtime_error("faction == NULL"); + //} + if(faction->isWorkerThreadSignalCompleted(frameCount) == false) { + workThreadsFinished = false; + break; + } + } + if(workThreadsFinished == false) { + //sleep(0); + } + else { break; } } - if(workThreadsFinished == false) { - //sleep(0); - } - else { - break; - } - } - if(SystemFlags::VERBOSE_MODE_ENABLED && chrono.getMillis() >= 10) printf("In [%s::%s Line: %d] *** Faction thread preprocessing took [%lld] msecs for %d factions for frameCount = %d.\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis(),factionCount,frameCount); + if(SystemFlags::VERBOSE_MODE_ENABLED && chrono.getMillis() >= 10) printf("In [%s::%s Line: %d] *** Faction thread preprocessing took [%lld] msecs for %d factions for frameCount = %d.\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis(),factionCount,frameCount); + } //units for(int i = 0; i < factionCount; ++i) { @@ -1704,6 +1719,15 @@ void World::initFactionTypes(GameSettings *gs) { } } + if(Config::getInstance().getBool("EnableNewThreadManager","false") == true) { + std::vector slaveThreadList; + for(unsigned int i = 0; i < factions.size(); ++i) { + Faction *faction = factions[i]; + slaveThreadList.push_back(faction->getWorkerThread()); + } + masterController.setSlaves(slaveThreadList); + } + if(loadWorldNode != NULL) { stats.loadGame(loadWorldNode); random.setLastNumber(loadWorldNode->getAttribute("random")->getIntValue()); diff --git a/source/glest_game/world/world.h b/source/glest_game/world/world.h index 685ae55d..30697ee6 100644 --- a/source/glest_game/world/world.h +++ b/source/glest_game/world/world.h @@ -143,6 +143,8 @@ private: const XmlNode *loadWorldNode; + MasterSlaveThreadController masterController; + public: World(); ~World(); diff --git a/source/shared_lib/include/platform/sdl/thread.h b/source/shared_lib/include/platform/sdl/thread.h index 03db9808..41d7bd29 100644 --- a/source/shared_lib/include/platform/sdl/thread.h +++ b/source/shared_lib/include/platform/sdl/thread.h @@ -94,6 +94,8 @@ public: void p(); void v(); int getRefCount() const { return refCount; } + + SDL_mutex* getMutex() { return mutex; } }; class MutexSafeWrapper { @@ -190,8 +192,10 @@ public: ~Semaphore(); void signal(); int waitTillSignalled(int waitMilliseconds=-1); + bool tryDecrement(); uint32 getSemValue(); + void resetSemValue(Uint32 initialValue); }; @@ -313,6 +317,86 @@ public: } }; +const bool debugMasterSlaveThreadController = false; +// ===================================================== +// class Trigger +// ===================================================== + +class Trigger { +private: + SDL_cond* trigger; + Mutex *mutex; + +public: + Trigger(Mutex *mutex); + ~Trigger(); + void signal(bool allThreads=false); + int waitTillSignalled(Mutex *mutex, int waitMilliseconds=-1); +}; + +class MasterSlaveThreadController; + +class SlaveThreadControllerInterface { +public: + virtual void setMasterController(MasterSlaveThreadController *master) = 0; + virtual void signalSlave(void *userdata) = 0; +}; + +class MasterSlaveThreadController { +private: + static const int triggerBaseCount = 1; + + Mutex *mutex; + Semaphore *slaveTriggerSem; + int slaveTriggerCounter; + + std::vector slaveThreadList; + + void init(std::vector &newSlaveThreadList); +public: + + MasterSlaveThreadController(); + MasterSlaveThreadController(std::vector &slaveThreadList); + ~MasterSlaveThreadController(); + + void setSlaves(std::vector &slaveThreadList); + void clearSlaves(bool clearListOnly=false); + + void signalSlaves(void *userdata); + void triggerMaster(int waitMilliseconds=-1); + bool waitTillSlavesTrigger(int waitMilliseconds=-1); + +}; + +class MasterSlaveThreadControllerSafeWrapper { +protected: + MasterSlaveThreadController *master; + string ownerId; + int waitMilliseconds; + +public: + + MasterSlaveThreadControllerSafeWrapper(MasterSlaveThreadController *master, int waitMilliseconds=-1, string ownerId="") { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + + this->master = master; + this->waitMilliseconds = waitMilliseconds; + this->ownerId = ownerId; + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + } + ~MasterSlaveThreadControllerSafeWrapper() { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + + if(master != NULL) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + + master->triggerMaster(this->waitMilliseconds); + } + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + } +}; }}//end namespace diff --git a/source/shared_lib/sources/platform/sdl/thread.cpp b/source/shared_lib/sources/platform/sdl/thread.cpp index 27b5418a..bc4bf8d9 100644 --- a/source/shared_lib/sources/platform/sdl/thread.cpp +++ b/source/shared_lib/sources/platform/sdl/thread.cpp @@ -59,7 +59,7 @@ void Thread::start() { assert(thread != NULL); if(thread == NULL) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] thread == NULL",__FILE__,__FUNCTION__,__LINE__); + snprintf(szBuf,1023,"In [%s::%s Line: %d] thread == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); throw megaglest_runtime_error(szBuf); } } @@ -73,7 +73,7 @@ int Thread::beginExecution(void* data) { assert(thread != NULL); if(thread == NULL) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] thread == NULL",__FILE__,__FUNCTION__,__LINE__); + snprintf(szBuf,1023,"In [%s::%s Line: %d] thread == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); throw megaglest_runtime_error(szBuf); } thread->execute(); @@ -140,7 +140,7 @@ Mutex::Mutex(string ownerId) { assert(mutex != NULL); if(mutex == NULL) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL",__FILE__,__FUNCTION__,__LINE__); + snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); throw megaglest_runtime_error(szBuf); } deleteownerId = ""; @@ -150,13 +150,13 @@ Mutex::~Mutex() { SDLMutexSafeWrapper safeMutex(&mutexAccessor,true); if(mutex == NULL) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL refCount = %d owner [%s] deleteownerId [%s]",__FILE__,__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str()); + snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL refCount = %d owner [%s] deleteownerId [%s]",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str()); throw megaglest_runtime_error(szBuf); //printf("%s\n",szBuf); } else if(refCount >= 1) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] about to destroy mutex refCount = %d owner [%s] deleteownerId [%s]",__FILE__,__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str()); + snprintf(szBuf,1023,"In [%s::%s Line: %d] about to destroy mutex refCount = %d owner [%s] deleteownerId [%s]",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str()); throw megaglest_runtime_error(szBuf); } @@ -170,7 +170,7 @@ Mutex::~Mutex() { void Mutex::p() { if(mutex == NULL) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL refCount = %d owner [%s] deleteownerId [%s]",__FILE__,__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str()); + snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL refCount = %d owner [%s] deleteownerId [%s]",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str()); throw megaglest_runtime_error(szBuf); } SDL_mutexP(mutex); @@ -180,7 +180,7 @@ void Mutex::p() { void Mutex::v() { if(mutex == NULL) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL refCount = %d owner [%s] deleteownerId [%s]",__FILE__,__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str()); + snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL refCount = %d owner [%s] deleteownerId [%s]",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str()); throw megaglest_runtime_error(szBuf); } refCount--; @@ -195,7 +195,7 @@ Semaphore::Semaphore(Uint32 initialValue) { semaphore = SDL_CreateSemaphore(initialValue); if(semaphore == NULL) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",__FILE__,__FUNCTION__,__LINE__); + snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); throw megaglest_runtime_error(szBuf); } } @@ -203,7 +203,7 @@ Semaphore::Semaphore(Uint32 initialValue) { Semaphore::~Semaphore() { if(semaphore == NULL) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",__FILE__,__FUNCTION__,__LINE__); + snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); throw megaglest_runtime_error(szBuf); } SDL_DestroySemaphore(semaphore); @@ -213,7 +213,7 @@ Semaphore::~Semaphore() { void Semaphore::signal() { if(semaphore == NULL) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",__FILE__,__FUNCTION__,__LINE__); + snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); throw megaglest_runtime_error(szBuf); } SDL_SemPost(semaphore); @@ -222,7 +222,7 @@ void Semaphore::signal() { int Semaphore::waitTillSignalled(int waitMilliseconds) { if(semaphore == NULL) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",__FILE__,__FUNCTION__,__LINE__); + snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); throw megaglest_runtime_error(szBuf); } int semValue = 0; @@ -235,16 +235,43 @@ int Semaphore::waitTillSignalled(int waitMilliseconds) { return semValue; } +bool Semaphore::tryDecrement() { + if(semaphore == NULL) { + char szBuf[1024]=""; + snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + throw megaglest_runtime_error(szBuf); + } + int semValue = SDL_SemTryWait(semaphore); + return (semValue == 0); +} + uint32 Semaphore::getSemValue() { if(semaphore == NULL) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",__FILE__,__FUNCTION__,__LINE__); + snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); throw megaglest_runtime_error(szBuf); } return SDL_SemValue(semaphore); } +void Semaphore::resetSemValue(Uint32 initialValue) { + if(semaphore == NULL) { + char szBuf[1024]=""; + snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + throw megaglest_runtime_error(szBuf); + } + + uint32 currentValue = SDL_SemValue(semaphore); + for(unsigned int i = currentValue; i < initialValue; ++i) { + SDL_SemPost(semaphore); + } +} + +// ===================================================== +// class ReadWriteMutex +// ===================================================== + ReadWriteMutex::ReadWriteMutex(int maxReaders) : semaphore(maxReaders) { this->maxReadersCount = maxReaders; } @@ -273,5 +300,232 @@ int ReadWriteMutex::maxReaders() { return this->maxReadersCount; } +// ===================================================== +// class Trigger +// ===================================================== + +Trigger::Trigger(Mutex *mutex) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + this->mutex = mutex; + this->trigger = SDL_CreateCond(); + if(this->trigger == NULL) { + char szBuf[1024]=""; + snprintf(szBuf,1023,"In [%s::%s Line: %d] trigger == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + throw megaglest_runtime_error(szBuf); + } + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); +} + +Trigger::~Trigger() { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + if(trigger == NULL) { + char szBuf[1024]=""; + snprintf(szBuf,1023,"In [%s::%s Line: %d] trigger == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + throw megaglest_runtime_error(szBuf); + } + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + SDL_DestroyCond(trigger); + trigger = NULL; + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + this->mutex = NULL; + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); +} + +void Trigger::signal(bool allThreads) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + if(trigger == NULL) { + char szBuf[1024]=""; + snprintf(szBuf,1023,"In [%s::%s Line: %d] trigger == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + throw megaglest_runtime_error(szBuf); + } + + //MutexSafeWrapper safeMutex(mutex); + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + if(allThreads == false) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + int result = SDL_CondSignal(trigger); + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] result = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,result); + } + else { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + int result = SDL_CondBroadcast(trigger); + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] result = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,result); + } + + //safeMutex.ReleaseLock(); + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); +} + +int Trigger::waitTillSignalled(Mutex *mutex, int waitMilliseconds) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + if(trigger == NULL) { + char szBuf[1024]=""; + snprintf(szBuf,1023,"In [%s::%s Line: %d] trigger == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + throw megaglest_runtime_error(szBuf); + } + if(mutex == NULL) { + char szBuf[1024]=""; + snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + throw megaglest_runtime_error(szBuf); + } + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + int result = 0; + if(waitMilliseconds >= 0) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + result = SDL_CondWaitTimeout(trigger,mutex->getMutex(),waitMilliseconds); + } + else { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + result = SDL_CondWait(trigger, mutex->getMutex()); + } + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + return result; +} + +MasterSlaveThreadController::MasterSlaveThreadController() { + std::vector empty; + init(empty); +} + +MasterSlaveThreadController::MasterSlaveThreadController(std::vector &slaveThreadList) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] ==========================================================\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + init(slaveThreadList); +} + +void MasterSlaveThreadController::init(std::vector &newSlaveThreadList) { + static string masterSlaveOwnerId = string(__FILE__) + string("_MasterSlaveThreadController"); + this->mutex = new Mutex(masterSlaveOwnerId); + this->slaveTriggerSem = new Semaphore(0); + this->slaveTriggerCounter = newSlaveThreadList.size() + triggerBaseCount; + setSlaves(newSlaveThreadList); +} + +void MasterSlaveThreadController::clearSlaves(bool clearListOnly) { + if(this->slaveThreadList.empty() == false) { + if(clearListOnly == false) { + for(unsigned int i = 0; i < this->slaveThreadList.size(); ++i) { + SlaveThreadControllerInterface *slave = this->slaveThreadList[i]; + slave->setMasterController(NULL); + } + } + this->slaveThreadList.clear(); + } +} + +MasterSlaveThreadController::~MasterSlaveThreadController() { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + clearSlaves(); + + delete slaveTriggerSem; + slaveTriggerSem = NULL; + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] mutex->getRefCount() = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,mutex->getRefCount()); + + delete mutex; + mutex = NULL; + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); +} + +void MasterSlaveThreadController::setSlaves(std::vector &slaveThreadList) { + this->slaveThreadList = slaveThreadList; + + if(this->slaveThreadList.empty() == false) { + for(unsigned int i = 0; i < this->slaveThreadList.size(); ++i) { + SlaveThreadControllerInterface *slave = this->slaveThreadList[i]; + slave->setMasterController(this); + } + } +} + +void MasterSlaveThreadController::signalSlaves(void *userdata) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + slaveTriggerCounter = this->slaveThreadList.size() + triggerBaseCount; + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + if(this->slaveThreadList.empty() == false) { + for(unsigned int i = 0; i < this->slaveThreadList.size(); ++i) { + SlaveThreadControllerInterface *slave = this->slaveThreadList[i]; + slave->signalSlave(userdata); + } + } + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); +} + +void MasterSlaveThreadController::triggerMaster(int waitMilliseconds) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + MutexSafeWrapper safeMutex(mutex); + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] semVal = %u\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,slaveTriggerCounter); + + slaveTriggerCounter--; + int newCount = slaveTriggerCounter; + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] slaveTriggerCounter = %u\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,slaveTriggerCounter); + + safeMutex.ReleaseLock(); + + if(newCount <= triggerBaseCount) { + slaveTriggerSem->signal(); + } + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); +} + +bool MasterSlaveThreadController::waitTillSlavesTrigger(int waitMilliseconds) { + bool result = true; + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] slaveTriggerCounter = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,slaveTriggerCounter); + + if(this->slaveThreadList.empty() == false) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] slaveTriggerCounter = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,slaveTriggerCounter); + + int slaveResult = slaveTriggerSem->waitTillSignalled(waitMilliseconds); + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] slaveTriggerCounter = %d slaveResult = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,slaveTriggerCounter,slaveResult); + + if(slaveResult != 0) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + result = false; + } + else if(slaveResult == 0) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] slaveTriggerCounter = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,slaveTriggerCounter); + + result = true; + } + } + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + return result; +} }}//end namespace