diff --git a/source/glest_game/ai/ai_interface.cpp b/source/glest_game/ai/ai_interface.cpp index ae42aad0..ae5e95b6 100644 --- a/source/glest_game/ai/ai_interface.cpp +++ b/source/glest_game/ai/ai_interface.cpp @@ -37,6 +37,7 @@ namespace Glest{ namespace Game{ // ===================================================== AiInterfaceThread::AiInterfaceThread(AiInterface *aiIntf) : BaseThread() { + this->masterController = NULL; this->triggerIdMutex = new Mutex(); this->aiIntf = aiIntf; } @@ -124,6 +125,9 @@ void AiInterfaceThread::execute() { semTaskSignalled.waitTillSignalled(); + static string masterSlaveOwnerId = string(__FILE__) + string("_") + intToStr(__LINE__); + MasterSlaveThreadControllerSafeWrapper safeMasterController(masterController,20000,masterSlaveOwnerId); + if(getQuitStatus() == true) { if(SystemFlags::getSystemSettingType(SystemFlags::debugSystem).enabled) SystemFlags::OutputDebug(SystemFlags::debugSystem,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); break; @@ -144,50 +148,6 @@ void AiInterfaceThread::execute() { this->aiIntf->update(); -/* - World *world = faction->getWorld(); - - //Config &config= Config::getInstance(); - //bool sortedUnitsAllowed = config.getBool("AllowGroupedUnitCommands","true"); - bool sortedUnitsAllowed = false; - if(sortedUnitsAllowed) { - faction->sortUnitsByCommandGroups(); - } - - MutexSafeWrapper safeMutex(faction->getUnitMutex(),string(__FILE__) + "_" + intToStr(__LINE__)); - - //if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled) chrono.start(); - if(minorDebugPerformance) chrono.start(); - - int unitCount = faction->getUnitCount(); - for(int j = 0; j < unitCount; ++j) { - Unit *unit = faction->getUnit(j); - if(unit == NULL) { - throw megaglest_runtime_error("unit == NULL"); - } - - int64 elapsed1 = 0; - if(minorDebugPerformance) elapsed1 = chrono.getMillis(); - - bool update = unit->needToUpdate(); - - if(minorDebugPerformance && (chrono.getMillis() - elapsed1) >= 1) printf("Faction [%d - %s] #1-unit threaded updates on frame: %d for [%d] unit # %d, unitCount = %d, took [%lld] msecs\n",faction->getStartLocationIndex(),faction->getType()->getName().c_str(),frameIndex.first,faction->getUnitPathfindingListCount(),j,unitCount,(long long int)chrono.getMillis() - elapsed1); - - //update = true; - if(update == true) { - - int64 elapsed2 = 0; - if(minorDebugPerformance) elapsed2 = chrono.getMillis(); - - world->getUnitUpdater()->updateUnitCommand(unit,frameIndex.first); - - if(minorDebugPerformance && (chrono.getMillis() - elapsed2) >= 1) printf("Faction [%d - %s] #2-unit threaded updates on frame: %d for [%d] unit # %d, unitCount = %d, took [%lld] msecs\n",faction->getStartLocationIndex(),faction->getType()->getName().c_str(),frameIndex.first,faction->getUnitPathfindingListCount(),j,unitCount,(long long int)chrono.getMillis() - elapsed2); - } - } - - if(minorDebugPerformance && chrono.getMillis() >= 1) printf("Faction [%d - %s] threaded updates on frame: %d for [%d] units took [%lld] msecs\n",faction->getStartLocationIndex(),faction->getType()->getName().c_str(),frameIndex.first,faction->getUnitPathfindingListCount(),(long long int)chrono.getMillis()); -*/ - safeMutex.ReleaseLock(); setTaskCompleted(frameIndex.first); diff --git a/source/glest_game/ai/ai_interface.h b/source/glest_game/ai/ai_interface.h index 9c035d84..8214bc5b 100644 --- a/source/glest_game/ai/ai_interface.h +++ b/source/glest_game/ai/ai_interface.h @@ -31,13 +31,14 @@ namespace Glest{ namespace Game{ /// The AI will interact with the game through this interface // ===================================================== -class AiInterfaceThread : public BaseThread { +class AiInterfaceThread : public BaseThread, public SlaveThreadControllerInterface { protected: AiInterface *aiIntf; Semaphore semTaskSignalled; Mutex *triggerIdMutex; std::pair frameIndex; + MasterSlaveThreadController *masterController; virtual void setQuitStatus(bool value); virtual void setTaskCompleted(int frameIndex); @@ -49,6 +50,10 @@ public: virtual void execute(); void signal(int frameIndex); bool isSignalCompleted(int frameIndex); + + virtual void setMasterController(MasterSlaveThreadController *master) { masterController = master; } + virtual void signalSlave(void *userdata) { signal(*((int *)(userdata))); } + }; class AiInterface { @@ -87,6 +92,7 @@ public: void signalWorkerThread(int frameIndex); bool isWorkerThreadSignalCompleted(int frameIndex); + AiInterfaceThread *getWorkerThread() { return workerThread; } //get int getTimer() const {return timer;} diff --git a/source/glest_game/game/game.cpp b/source/glest_game/game/game.cpp index ca48c6dc..2b9e5b34 100644 --- a/source/glest_game/game/game.cpp +++ b/source/glest_game/game/game.cpp @@ -378,6 +378,7 @@ Game::~Game() { if(SystemFlags::getSystemSettingType(SystemFlags::debugSystem).enabled) SystemFlags::OutputDebug(SystemFlags::debugSystem,"In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + masterController.clearSlaves(true); deleteValues(aiInterfaces.begin(), aiInterfaces.end()); aiInterfaces.clear(); @@ -1133,8 +1134,10 @@ void Game::init(bool initForPreviewOnly) { bool isNetworkGame = this->gameSettings.isNetworkGame(); role = networkManager.getNetworkRole(); + masterController.clearSlaves(true); deleteValues(aiInterfaces.begin(), aiInterfaces.end()); + std::vector slaveThreadList; aiInterfaces.resize(world.getFactionCount()); for(int i=0; i < world.getFactionCount(); ++i) { Faction *faction= world.getFaction(i); @@ -1146,15 +1149,19 @@ void Game::init(bool initForPreviewOnly) { char szBuf[8096]=""; snprintf(szBuf,8096,Lang::getInstance().get("LogScreenGameLoadingCreatingAIFaction","",true).c_str(),i); logger.add(szBuf, true); + + slaveThreadList.push_back(aiInterfaces[i]->getWorkerThread()); } else { aiInterfaces[i]= NULL; } } + if(Config::getInstance().getBool("EnableNewThreadManager","false") == true) { + masterController.setSlaves(slaveThreadList); + } if(SystemFlags::getSystemSettingType(SystemFlags::debugSystem).enabled) SystemFlags::OutputDebug(SystemFlags::debugSystem,"In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); - // give CPU time to update other things to avoid apperance of hanging sleep(0); Shared::Platform::Window::handleEvent(); @@ -1525,55 +1532,63 @@ void Game::update() { } */ - // Signal the faction threads to do any pre-processing - bool hasAIPlayer = false; - for(int j = 0; j < world.getFactionCount(); ++j) { - Faction *faction = world.getFaction(j); + const bool newThreadManager = Config::getInstance().getBool("EnableNewThreadManager","false"); + if(newThreadManager == true) { + int currentFrameCount = world.getFrameCount(); + masterController.signalSlaves(¤tFrameCount); + bool slavesCompleted = masterController.waitTillSlavesTrigger(20000); + } + else { + // Signal the faction threads to do any pre-processing + bool hasAIPlayer = false; + for(int j = 0; j < world.getFactionCount(); ++j) { + Faction *faction = world.getFaction(j); - //printf("Faction Index = %d enableServerControlledAI = %d, isNetworkGame = %d, role = %d isCPU player = %d scriptManager.getPlayerModifiers(j)->getAiEnabled() = %d\n",j,enableServerControlledAI,isNetworkGame,role,faction->getCpuControl(enableServerControlledAI,isNetworkGame,role),scriptManager.getPlayerModifiers(j)->getAiEnabled()); + //printf("Faction Index = %d enableServerControlledAI = %d, isNetworkGame = %d, role = %d isCPU player = %d scriptManager.getPlayerModifiers(j)->getAiEnabled() = %d\n",j,enableServerControlledAI,isNetworkGame,role,faction->getCpuControl(enableServerControlledAI,isNetworkGame,role),scriptManager.getPlayerModifiers(j)->getAiEnabled()); - if( faction->getCpuControl(enableServerControlledAI,isNetworkGame,role) == true && - scriptManager.getPlayerModifiers(j)->getAiEnabled() == true) { + if( faction->getCpuControl(enableServerControlledAI,isNetworkGame,role) == true && + scriptManager.getPlayerModifiers(j)->getAiEnabled() == true) { - if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled && chrono.getMillis() > 0) SystemFlags::OutputDebug(SystemFlags::debugPerformance,"In [%s::%s Line: %d] [i = %d] faction = %d, factionCount = %d, took msecs: %lld [before AI updates]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,i,j,world.getFactionCount(),chrono.getMillis()); - aiInterfaces[j]->signalWorkerThread(world.getFrameCount()); - hasAIPlayer = true; + if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled && chrono.getMillis() > 0) SystemFlags::OutputDebug(SystemFlags::debugPerformance,"In [%s::%s Line: %d] [i = %d] faction = %d, factionCount = %d, took msecs: %lld [before AI updates]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,i,j,world.getFactionCount(),chrono.getMillis()); + aiInterfaces[j]->signalWorkerThread(world.getFrameCount()); + hasAIPlayer = true; + } } - } - if(showPerfStats) { - sprintf(perfBuf,"In [%s::%s] Line: %d took msecs: " MG_I64_SPECIFIER "\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,chronoPerf.getMillis()); - perfList.push_back(perfBuf); - } + if(showPerfStats) { + sprintf(perfBuf,"In [%s::%s] Line: %d took msecs: " MG_I64_SPECIFIER "\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,chronoPerf.getMillis()); + perfList.push_back(perfBuf); + } - if(hasAIPlayer == true) { - //sleep(0); + if(hasAIPlayer == true) { + //sleep(0); - bool workThreadsFinished = false; - Chrono chronoAI; - chronoAI.start(); + bool workThreadsFinished = false; + Chrono chronoAI; + chronoAI.start(); - const int MAX_FACTION_THREAD_WAIT_MILLISECONDS = 20000; - for(;chronoAI.getMillis() < MAX_FACTION_THREAD_WAIT_MILLISECONDS;) { - workThreadsFinished = true; - for(int j = 0; j < world.getFactionCount(); ++j) { - Faction *faction = world.getFaction(j); - if(faction == NULL) { - throw megaglest_runtime_error("faction == NULL"); - } - if( faction->getCpuControl(enableServerControlledAI,isNetworkGame,role) == true && - scriptManager.getPlayerModifiers(j)->getAiEnabled() == true) { - if(aiInterfaces[j]->isWorkerThreadSignalCompleted(world.getFrameCount()) == false) { - workThreadsFinished = false; - break; + const int MAX_FACTION_THREAD_WAIT_MILLISECONDS = 20000; + for(;chronoAI.getMillis() < MAX_FACTION_THREAD_WAIT_MILLISECONDS;) { + workThreadsFinished = true; + for(int j = 0; j < world.getFactionCount(); ++j) { + Faction *faction = world.getFaction(j); + if(faction == NULL) { + throw megaglest_runtime_error("faction == NULL"); + } + if( faction->getCpuControl(enableServerControlledAI,isNetworkGame,role) == true && + scriptManager.getPlayerModifiers(j)->getAiEnabled() == true) { + if(aiInterfaces[j]->isWorkerThreadSignalCompleted(world.getFrameCount()) == false) { + workThreadsFinished = false; + break; + } } } - } - if(workThreadsFinished == false) { - //sleep(0); - } - else { - break; + if(workThreadsFinished == false) { + //sleep(0); + } + else { + break; + } } } } @@ -1824,6 +1839,8 @@ void Game::update() { world.clearTileset(); SoundRenderer::getInstance().stopAllSounds(); + + masterController.clearSlaves(true); deleteValues(aiInterfaces.begin(), aiInterfaces.end()); aiInterfaces.clear(); gui.end(); //selection must be cleared before deleting units @@ -2092,6 +2109,7 @@ void Game::ReplaceDisconnectedNetworkPlayersWithAI(bool isNetworkGame, NetworkRo Logger &logger= Logger::getInstance(); ServerInterface *server = NetworkManager::getInstance().getServerInterface(); + bool newAIPlayerCreated = false; for(int i = 0; i < world.getFactionCount(); ++i) { Faction *faction = world.getFaction(i); if( faction->getFactionDisconnectHandled() == false && @@ -2123,6 +2141,8 @@ void Game::ReplaceDisconnectedNetworkPlayersWithAI(bool isNetworkGame, NetworkRo snprintf(szBuf,8096,msg.c_str(),i+1,this->gameSettings.getNetworkPlayerName(i).c_str()); commander.tryNetworkPlayerDisconnected(i); + + newAIPlayerCreated = true; } else { string msg = "Player #%d [%s] has disconnected, but player was only an observer!"; @@ -2140,6 +2160,21 @@ void Game::ReplaceDisconnectedNetworkPlayersWithAI(bool isNetworkGame, NetworkRo } } } + + if(newAIPlayerCreated == true && Config::getInstance().getBool("EnableNewThreadManager","false") == true) { + bool enableServerControlledAI = this->gameSettings.getEnableServerControlledAI(); + + masterController.clearSlaves(true); + + std::vector slaveThreadList; + for(int i=0; i < world.getFactionCount(); ++i) { + Faction *faction= world.getFaction(i); + if(faction->getCpuControl(enableServerControlledAI,isNetworkGame,role) == true) { + slaveThreadList.push_back(aiInterfaces[i]->getWorkerThread()); + } + } + masterController.setSlaves(slaveThreadList); + } } } diff --git a/source/glest_game/game/game.h b/source/glest_game/game/game.h index 48ee2575..beaf3049 100644 --- a/source/glest_game/game/game.h +++ b/source/glest_game/game/game.h @@ -196,6 +196,8 @@ private: std::map unitHighlightList; + MasterSlaveThreadController masterController; + public: Game(); Game(Program *program, const GameSettings *gameSettings, bool masterserverMode); diff --git a/source/glest_game/network/connection_slot.cpp b/source/glest_game/network/connection_slot.cpp index dfe7538c..5482622d 100644 --- a/source/glest_game/network/connection_slot.cpp +++ b/source/glest_game/network/connection_slot.cpp @@ -33,6 +33,7 @@ namespace Glest{ namespace Game{ // ===================================================== ConnectionSlotThread::ConnectionSlotThread(int slotIndex) : BaseThread() { + this->masterController = NULL; this->triggerIdMutex = new Mutex(); this->slotIndex = slotIndex; this->slotInterface = NULL; @@ -42,6 +43,7 @@ ConnectionSlotThread::ConnectionSlotThread(int slotIndex) : BaseThread() { } ConnectionSlotThread::ConnectionSlotThread(ConnectionSlotCallbackInterface *slotInterface,int slotIndex) : BaseThread() { + this->masterController = NULL; this->triggerIdMutex = new Mutex(); this->slotIndex = slotIndex; this->slotInterface = slotInterface; @@ -164,6 +166,14 @@ void ConnectionSlotThread::slotUpdateTask(ConnectionSlotEvent *event) { } } +void ConnectionSlotThread::signalSlave(void *userdata) { + + //ConnectionSlotEvent &event = eventList[i]; + std::map *eventList = (std::map *)userdata; + ConnectionSlotEvent &event = (*eventList)[slotIndex]; + signalUpdate(&event); +} + void ConnectionSlotThread::execute() { RunningStatusSafeWrapper runningStatus(this); try { @@ -179,6 +189,9 @@ void ConnectionSlotThread::execute() { semTaskSignalled.waitTillSignalled(); + static string masterSlaveOwnerId = string(__FILE__) + string("_") + intToStr(__LINE__); + MasterSlaveThreadControllerSafeWrapper safeMasterController(masterController,20000,masterSlaveOwnerId); + if(getQuitStatus() == true) { if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); break; diff --git a/source/glest_game/network/connection_slot.h b/source/glest_game/network/connection_slot.h index 9de12648..29c136c6 100644 --- a/source/glest_game/network/connection_slot.h +++ b/source/glest_game/network/connection_slot.h @@ -70,7 +70,7 @@ public: virtual ~ConnectionSlotCallbackInterface() {} }; -class ConnectionSlotThread : public BaseThread +class ConnectionSlotThread : public BaseThread, public SlaveThreadControllerInterface { protected: @@ -79,6 +79,7 @@ protected: Mutex *triggerIdMutex; vector eventList; int slotIndex; + MasterSlaveThreadController *masterController; virtual void setQuitStatus(bool value); virtual void setTaskCompleted(int eventId); @@ -91,6 +92,9 @@ public: ConnectionSlotThread(ConnectionSlotCallbackInterface *slotInterface,int slotIndex); virtual ~ConnectionSlotThread(); + virtual void setMasterController(MasterSlaveThreadController *master) { masterController = master; } + virtual void signalSlave(void *userdata); + virtual void execute(); void signalUpdate(ConnectionSlotEvent *event); bool isSignalCompleted(ConnectionSlotEvent *event); @@ -140,6 +144,8 @@ public: ConnectionSlot(ServerInterface* serverInterface, int playerIndex); ~ConnectionSlot(); + ConnectionSlotThread *getWorkerThread() { return slotThreadWorker; } + void update(bool checkForNewClients,int lockedSlotIndex); void setPlayerIndex(int value) { playerIndex = value; } int getPlayerIndex() const {return playerIndex;} diff --git a/source/glest_game/network/server_interface.cpp b/source/glest_game/network/server_interface.cpp index 108ace7f..71093d0b 100644 --- a/source/glest_game/network/server_interface.cpp +++ b/source/glest_game/network/server_interface.cpp @@ -243,6 +243,8 @@ ServerInterface::~ServerInterface() { //printf("===> Destructor for ServerInterface\n"); if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + masterController.clearSlaves(true); exitServer = true; for(int i= 0; i < GameConstants::maxPlayers; ++i) { if(slots[i] != NULL) { @@ -826,25 +828,6 @@ std::pair ServerInterface::clientLagCheck(ConnectionSlot *connectionS return clientLagExceededOrWarned; } -bool ServerInterface::signalClientReceiveCommands(ConnectionSlot *connectionSlot, int slotIndex, bool socketTriggered, ConnectionSlotEvent & event) { - bool slotSignalled = false; - - event.eventType = eReceiveSocketData; - event.networkMessage = NULL; - event.connectionSlot = connectionSlot; - event.socketTriggered = socketTriggered; - event.triggerId = slotIndex; - event.eventId = getNextEventId(); - - if(connectionSlot != NULL) { - if(socketTriggered == true || connectionSlot->isConnected() == false) { - connectionSlot->signalUpdate(&event); - slotSignalled = true; - } - } - return slotSignalled; -} - void ServerInterface::updateSocketTriggeredList(std::map & socketTriggeredList) { for(int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) { MutexSafeWrapper safeMutexSlot(slotAccessorMutexes[i],CODE_AT_LINE_X(i)); @@ -868,51 +851,97 @@ void ServerInterface::validateConnectedClients() { } } +bool ServerInterface::signalClientReceiveCommands(ConnectionSlot *connectionSlot, + int slotIndex, bool socketTriggered, ConnectionSlotEvent & event) { + bool slotSignalled = false; + + event.eventType = eReceiveSocketData; + event.networkMessage = NULL; + event.connectionSlot = connectionSlot; + event.socketTriggered = socketTriggered; + event.triggerId = slotIndex; + event.eventId = getNextEventId(); + + if(connectionSlot != NULL) { + if(socketTriggered == true || connectionSlot->isConnected() == false) { + connectionSlot->signalUpdate(&event); + slotSignalled = true; + } + } + return slotSignalled; +} + void ServerInterface::signalClientsToRecieveData(std::map &socketTriggeredList, std::map &eventList, std::map & mapSlotSignalledList) { - //bool checkForNewClients = true; - for(int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) { - MutexSafeWrapper safeMutexSlot(slotAccessorMutexes[i],CODE_AT_LINE_X(i)); - ConnectionSlot* connectionSlot = slots[i]; + const bool newThreadManager = Config::getInstance().getBool("EnableNewThreadManager","false"); + if(newThreadManager == true) { + masterController.clearSlaves(true); + std::vector slaveThreadList; + for(unsigned int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) { + MutexSafeWrapper safeMutexSlot(slotAccessorMutexes[i],CODE_AT_LINE_X(i)); + ConnectionSlot* connectionSlot = slots[i]; - bool socketTriggered = false; + bool socketTriggered = false; - if(connectionSlot != NULL) { - PLATFORM_SOCKET clientSocket = connectionSlot->getSocketId(); - if(Socket::isSocketValid(&clientSocket)) { - socketTriggered = socketTriggeredList[clientSocket]; + if(connectionSlot != NULL) { + PLATFORM_SOCKET clientSocket = connectionSlot->getSocketId(); + if(Socket::isSocketValid(&clientSocket)) { + socketTriggered = socketTriggeredList[clientSocket]; + } + } + ConnectionSlotEvent &event = eventList[i]; + if(connectionSlot != NULL) { + if(socketTriggered == true || connectionSlot->isConnected() == false) { + ConnectionSlotEvent &event = eventList[i]; + event.eventType = eReceiveSocketData; + event.networkMessage = NULL; + event.connectionSlot = connectionSlot; + event.socketTriggered = socketTriggered; + event.triggerId = i; + event.eventId = getNextEventId(); + + slaveThreadList.push_back(connectionSlot->getWorkerThread()); + mapSlotSignalledList[i] = true; + } } } - ConnectionSlotEvent &event = eventList[i]; - mapSlotSignalledList[i] = signalClientReceiveCommands(connectionSlot,i,socketTriggered,event); + masterController.setSlaves(slaveThreadList); + masterController.signalSlaves(&eventList); + } + else { + for(int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) { + MutexSafeWrapper safeMutexSlot(slotAccessorMutexes[i],CODE_AT_LINE_X(i)); + ConnectionSlot* connectionSlot = slots[i]; + + bool socketTriggered = false; + + if(connectionSlot != NULL) { + PLATFORM_SOCKET clientSocket = connectionSlot->getSocketId(); + if(Socket::isSocketValid(&clientSocket)) { + socketTriggered = socketTriggeredList[clientSocket]; + } + } + ConnectionSlotEvent &event = eventList[i]; + mapSlotSignalledList[i] = signalClientReceiveCommands(connectionSlot,i,socketTriggered,event); + } } } void ServerInterface::checkForCompletedClients(std::map & mapSlotSignalledList, std::vector &errorMsgList, std::map &eventList) { - time_t waitForThreadElapsed = time(NULL); - std::map slotsCompleted; - for(bool threadsDone = false; - exitServer == false && threadsDone == false && - difftime((long int)time(NULL),waitForThreadElapsed) < MAX_SLOT_THREAD_WAIT_TIME;) { - threadsDone = true; - // Examine all threads for completion of delegation - for(int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) { - //printf("===> START slot %d [%p] - About to checkForCompletedClients\n",i,slots[i]); + const bool newThreadManager = Config::getInstance().getBool("EnableNewThreadManager","false"); + if(newThreadManager == true) { + bool slavesCompleted = masterController.waitTillSlavesTrigger(1000 * MAX_SLOT_THREAD_WAIT_TIME); + for(unsigned int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) { MutexSafeWrapper safeMutexSlot(slotAccessorMutexes[i],CODE_AT_LINE_X(i)); - //printf("===> IN slot %d - About to checkForCompletedClients\n",i); - ConnectionSlot* connectionSlot = slots[i]; - if(connectionSlot != NULL && mapSlotSignalledList[i] == true && - slotsCompleted.find(i) == slotsCompleted.end()) { + if(connectionSlot != NULL && mapSlotSignalledList[i] == true) { try { - - std::vector errorList = connectionSlot->getThreadErrorList(); // Collect any collected errors from threads if(errorList.empty() == false) { @@ -924,16 +953,6 @@ void ServerInterface::checkForCompletedClients(std::map & mapSlotSigna } connectionSlot->clearThreadErrorList(); } - - // Not done waiting for data yet - bool updateFinished = (connectionSlot != NULL ? connectionSlot->updateCompleted(&eventList[i]) : true); - if(updateFinished == false) { - threadsDone = false; - break; - } - else { - slotsCompleted[i] = true; - } } catch(const exception &ex) { SystemFlags::OutputDebug(SystemFlags::debugError,"In [%s::%s Line: %d] Error [%s]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,ex.what()); @@ -942,8 +961,63 @@ void ServerInterface::checkForCompletedClients(std::map & mapSlotSigna errorMsgList.push_back(ex.what()); } } + } + masterController.clearSlaves(true); + } + else { + time_t waitForThreadElapsed = time(NULL); + std::map slotsCompleted; + for(bool threadsDone = false; + exitServer == false && threadsDone == false && + difftime((long int)time(NULL),waitForThreadElapsed) < MAX_SLOT_THREAD_WAIT_TIME;) { + threadsDone = true; + // Examine all threads for completion of delegation + for(int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) { - //printf("===> END slot %d - About to checkForCompletedClients\n",i); + //printf("===> START slot %d [%p] - About to checkForCompletedClients\n",i,slots[i]); + + MutexSafeWrapper safeMutexSlot(slotAccessorMutexes[i],CODE_AT_LINE_X(i)); + + //printf("===> IN slot %d - About to checkForCompletedClients\n",i); + + ConnectionSlot* connectionSlot = slots[i]; + if(connectionSlot != NULL && mapSlotSignalledList[i] == true && + slotsCompleted.find(i) == slotsCompleted.end()) { + try { + + + std::vector errorList = connectionSlot->getThreadErrorList(); + // Collect any collected errors from threads + if(errorList.empty() == false) { + for(int iErrIdx = 0; iErrIdx < errorList.size(); ++iErrIdx) { + string &sErr = errorList[iErrIdx]; + if(sErr != "") { + errorMsgList.push_back(sErr); + } + } + connectionSlot->clearThreadErrorList(); + } + + // Not done waiting for data yet + bool updateFinished = (connectionSlot != NULL ? connectionSlot->updateCompleted(&eventList[i]) : true); + if(updateFinished == false) { + threadsDone = false; + break; + } + else { + slotsCompleted[i] = true; + } + } + catch(const exception &ex) { + SystemFlags::OutputDebug(SystemFlags::debugError,"In [%s::%s Line: %d] Error [%s]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,ex.what()); + if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] error detected [%s]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,ex.what()); + + errorMsgList.push_back(ex.what()); + } + } + + //printf("===> END slot %d - About to checkForCompletedClients\n",i); + } } } } diff --git a/source/glest_game/network/server_interface.h b/source/glest_game/network/server_interface.h index 4be5e813..257a5273 100644 --- a/source/glest_game/network/server_interface.h +++ b/source/glest_game/network/server_interface.h @@ -93,6 +93,7 @@ private: map > badClientConnectIPList; ServerSocket *serverSocketAdmin; + MasterSlaveThreadController masterController; public: ServerInterface(bool publishEnabled); diff --git a/source/glest_game/type_instances/faction.cpp b/source/glest_game/type_instances/faction.cpp index 1b980ea9..5e02f6ae 100644 --- a/source/glest_game/type_instances/faction.cpp +++ b/source/glest_game/type_instances/faction.cpp @@ -215,6 +215,7 @@ void Faction::sortUnitsByCommandGroups() { FactionThread::FactionThread(Faction *faction) : BaseThread() { this->triggerIdMutex = new Mutex(); this->faction = faction; + this->masterController = NULL; } FactionThread::~FactionThread() { @@ -300,6 +301,11 @@ void FactionThread::execute() { semTaskSignalled.waitTillSignalled(); + //printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + static string masterSlaveOwnerId = string(__FILE__) + string("_") + intToStr(__LINE__); + MasterSlaveThreadControllerSafeWrapper safeMasterController(masterController,20000,masterSlaveOwnerId); + //printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + if(getQuitStatus() == true) { if(SystemFlags::getSystemSettingType(SystemFlags::debugSystem).enabled) SystemFlags::OutputDebug(SystemFlags::debugSystem,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); break; @@ -313,6 +319,8 @@ void FactionThread::execute() { safeMutex.ReleaseLock(); + //printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + if(executeTask == true) { ExecutingTaskSafeWrapper safeExecutingTaskMutex(this); @@ -330,6 +338,8 @@ void FactionThread::execute() { //if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled) chrono.start(); if(minorDebugPerformance) chrono.start(); + //printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + int unitCount = faction->getUnitCount(); for(int j = 0; j < unitCount; ++j) { Unit *unit = faction->getUnit(j); @@ -358,11 +368,19 @@ void FactionThread::execute() { if(minorDebugPerformance && chrono.getMillis() >= 1) printf("Faction [%d - %s] threaded updates on frame: %d for [%d] units took [%lld] msecs\n",faction->getStartLocationIndex(),faction->getType()->getName().c_str(),frameIndex.first,faction->getUnitPathfindingListCount(),(long long int)chrono.getMillis()); + //printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + safeMutex.ReleaseLock(); + //printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + setTaskCompleted(frameIndex.first); + + //printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); } + //printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + if(getQuitStatus() == true) { if(SystemFlags::getSystemSettingType(SystemFlags::debugSystem).enabled) SystemFlags::OutputDebug(SystemFlags::debugSystem,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); break; diff --git a/source/glest_game/type_instances/faction.h b/source/glest_game/type_instances/faction.h index 2659f7d9..0dd87e8c 100644 --- a/source/glest_game/type_instances/faction.h +++ b/source/glest_game/type_instances/faction.h @@ -76,13 +76,14 @@ struct CommandGroupUnitSorterId { bool operator()(const int l, const int r); }; -class FactionThread : public BaseThread { +class FactionThread : public BaseThread, public SlaveThreadControllerInterface { protected: Faction *faction; Semaphore semTaskSignalled; Mutex *triggerIdMutex; std::pair frameIndex; + MasterSlaveThreadController *masterController; virtual void setQuitStatus(bool value); virtual void setTaskCompleted(int frameIndex); @@ -92,6 +93,10 @@ public: FactionThread(Faction *faction); virtual ~FactionThread(); virtual void execute(); + + virtual void setMasterController(MasterSlaveThreadController *master) { masterController = master; } + virtual void signalSlave(void *userdata) { signalPathfinder(*((int *)(userdata))); } + void signalPathfinder(int frameIndex); bool isSignalPathfinderCompleted(int frameIndex); }; @@ -320,8 +325,11 @@ public: inline World * getWorld() { return world; } int getFrameCount(); + void signalWorkerThread(int frameIndex); bool isWorkerThreadSignalCompleted(int frameIndex); + FactionThread *getWorkerThread() { return workerThread; } + void limitResourcesToStore(); void sortUnitsByCommandGroups(); diff --git a/source/glest_game/world/world.cpp b/source/glest_game/world/world.cpp index 9ff32186..d73026b0 100644 --- a/source/glest_game/world/world.cpp +++ b/source/glest_game/world/world.cpp @@ -100,6 +100,7 @@ void World::cleanup() { factions[i]->end(); } + masterController.clearSlaves(true); if(SystemFlags::getSystemSettingType(SystemFlags::debugSystem).enabled) SystemFlags::OutputDebug(SystemFlags::debugSystem,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); for(int i= 0; iend(); } + + masterController.clearSlaves(true); for(int i= 0; iclearAproxCanMoveSoonCached(); } - // Signal the faction threads to do any pre-processing - for(int i = 0; i < factionCount; ++i) { - Faction *faction = getFaction(i); - //if(faction == NULL) { - // throw megaglest_runtime_error("faction == NULL"); - //} - faction->signalWorkerThread(frameCount); - } - - //sleep(0); - bool workThreadsFinished = false; Chrono chrono; chrono.start(); - const int MAX_FACTION_THREAD_WAIT_MILLISECONDS = 20000; - for(;chrono.getMillis() < MAX_FACTION_THREAD_WAIT_MILLISECONDS;) { - workThreadsFinished = true; + const bool newThreadManager = Config::getInstance().getBool("EnableNewThreadManager","false"); + if(newThreadManager == true) { + masterController.signalSlaves(&frameCount); + bool slavesCompleted = masterController.waitTillSlavesTrigger(20000); + + if(SystemFlags::VERBOSE_MODE_ENABLED && chrono.getMillis() >= 10) printf("In [%s::%s Line: %d] *** Faction thread preprocessing took [%lld] msecs for %d factions for frameCount = %d slavesCompleted = %d.\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis(),factionCount,frameCount,slavesCompleted); + } + else { + // Signal the faction threads to do any pre-processing for(int i = 0; i < factionCount; ++i) { Faction *faction = getFaction(i); //if(faction == NULL) { // throw megaglest_runtime_error("faction == NULL"); //} - if(faction->isWorkerThreadSignalCompleted(frameCount) == false) { - workThreadsFinished = false; + faction->signalWorkerThread(frameCount); + } + + //sleep(0); + bool workThreadsFinished = false; + Chrono chrono; + chrono.start(); + + const int MAX_FACTION_THREAD_WAIT_MILLISECONDS = 20000; + for(;chrono.getMillis() < MAX_FACTION_THREAD_WAIT_MILLISECONDS;) { + workThreadsFinished = true; + for(int i = 0; i < factionCount; ++i) { + Faction *faction = getFaction(i); + //if(faction == NULL) { + // throw megaglest_runtime_error("faction == NULL"); + //} + if(faction->isWorkerThreadSignalCompleted(frameCount) == false) { + workThreadsFinished = false; + break; + } + } + if(workThreadsFinished == false) { + //sleep(0); + } + else { break; } } - if(workThreadsFinished == false) { - //sleep(0); - } - else { - break; - } - } - if(SystemFlags::VERBOSE_MODE_ENABLED && chrono.getMillis() >= 10) printf("In [%s::%s Line: %d] *** Faction thread preprocessing took [%lld] msecs for %d factions for frameCount = %d.\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis(),factionCount,frameCount); + if(SystemFlags::VERBOSE_MODE_ENABLED && chrono.getMillis() >= 10) printf("In [%s::%s Line: %d] *** Faction thread preprocessing took [%lld] msecs for %d factions for frameCount = %d.\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis(),factionCount,frameCount); + } //units for(int i = 0; i < factionCount; ++i) { @@ -1704,6 +1719,15 @@ void World::initFactionTypes(GameSettings *gs) { } } + if(Config::getInstance().getBool("EnableNewThreadManager","false") == true) { + std::vector slaveThreadList; + for(unsigned int i = 0; i < factions.size(); ++i) { + Faction *faction = factions[i]; + slaveThreadList.push_back(faction->getWorkerThread()); + } + masterController.setSlaves(slaveThreadList); + } + if(loadWorldNode != NULL) { stats.loadGame(loadWorldNode); random.setLastNumber(loadWorldNode->getAttribute("random")->getIntValue()); diff --git a/source/glest_game/world/world.h b/source/glest_game/world/world.h index 685ae55d..30697ee6 100644 --- a/source/glest_game/world/world.h +++ b/source/glest_game/world/world.h @@ -143,6 +143,8 @@ private: const XmlNode *loadWorldNode; + MasterSlaveThreadController masterController; + public: World(); ~World(); diff --git a/source/shared_lib/include/platform/sdl/thread.h b/source/shared_lib/include/platform/sdl/thread.h index 03db9808..41d7bd29 100644 --- a/source/shared_lib/include/platform/sdl/thread.h +++ b/source/shared_lib/include/platform/sdl/thread.h @@ -94,6 +94,8 @@ public: void p(); void v(); int getRefCount() const { return refCount; } + + SDL_mutex* getMutex() { return mutex; } }; class MutexSafeWrapper { @@ -190,8 +192,10 @@ public: ~Semaphore(); void signal(); int waitTillSignalled(int waitMilliseconds=-1); + bool tryDecrement(); uint32 getSemValue(); + void resetSemValue(Uint32 initialValue); }; @@ -313,6 +317,86 @@ public: } }; +const bool debugMasterSlaveThreadController = false; +// ===================================================== +// class Trigger +// ===================================================== + +class Trigger { +private: + SDL_cond* trigger; + Mutex *mutex; + +public: + Trigger(Mutex *mutex); + ~Trigger(); + void signal(bool allThreads=false); + int waitTillSignalled(Mutex *mutex, int waitMilliseconds=-1); +}; + +class MasterSlaveThreadController; + +class SlaveThreadControllerInterface { +public: + virtual void setMasterController(MasterSlaveThreadController *master) = 0; + virtual void signalSlave(void *userdata) = 0; +}; + +class MasterSlaveThreadController { +private: + static const int triggerBaseCount = 1; + + Mutex *mutex; + Semaphore *slaveTriggerSem; + int slaveTriggerCounter; + + std::vector slaveThreadList; + + void init(std::vector &newSlaveThreadList); +public: + + MasterSlaveThreadController(); + MasterSlaveThreadController(std::vector &slaveThreadList); + ~MasterSlaveThreadController(); + + void setSlaves(std::vector &slaveThreadList); + void clearSlaves(bool clearListOnly=false); + + void signalSlaves(void *userdata); + void triggerMaster(int waitMilliseconds=-1); + bool waitTillSlavesTrigger(int waitMilliseconds=-1); + +}; + +class MasterSlaveThreadControllerSafeWrapper { +protected: + MasterSlaveThreadController *master; + string ownerId; + int waitMilliseconds; + +public: + + MasterSlaveThreadControllerSafeWrapper(MasterSlaveThreadController *master, int waitMilliseconds=-1, string ownerId="") { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + + this->master = master; + this->waitMilliseconds = waitMilliseconds; + this->ownerId = ownerId; + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + } + ~MasterSlaveThreadControllerSafeWrapper() { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + + if(master != NULL) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + + master->triggerMaster(this->waitMilliseconds); + } + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); + } +}; }}//end namespace diff --git a/source/shared_lib/sources/platform/sdl/thread.cpp b/source/shared_lib/sources/platform/sdl/thread.cpp index 27b5418a..bc4bf8d9 100644 --- a/source/shared_lib/sources/platform/sdl/thread.cpp +++ b/source/shared_lib/sources/platform/sdl/thread.cpp @@ -59,7 +59,7 @@ void Thread::start() { assert(thread != NULL); if(thread == NULL) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] thread == NULL",__FILE__,__FUNCTION__,__LINE__); + snprintf(szBuf,1023,"In [%s::%s Line: %d] thread == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); throw megaglest_runtime_error(szBuf); } } @@ -73,7 +73,7 @@ int Thread::beginExecution(void* data) { assert(thread != NULL); if(thread == NULL) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] thread == NULL",__FILE__,__FUNCTION__,__LINE__); + snprintf(szBuf,1023,"In [%s::%s Line: %d] thread == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); throw megaglest_runtime_error(szBuf); } thread->execute(); @@ -140,7 +140,7 @@ Mutex::Mutex(string ownerId) { assert(mutex != NULL); if(mutex == NULL) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL",__FILE__,__FUNCTION__,__LINE__); + snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); throw megaglest_runtime_error(szBuf); } deleteownerId = ""; @@ -150,13 +150,13 @@ Mutex::~Mutex() { SDLMutexSafeWrapper safeMutex(&mutexAccessor,true); if(mutex == NULL) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL refCount = %d owner [%s] deleteownerId [%s]",__FILE__,__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str()); + snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL refCount = %d owner [%s] deleteownerId [%s]",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str()); throw megaglest_runtime_error(szBuf); //printf("%s\n",szBuf); } else if(refCount >= 1) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] about to destroy mutex refCount = %d owner [%s] deleteownerId [%s]",__FILE__,__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str()); + snprintf(szBuf,1023,"In [%s::%s Line: %d] about to destroy mutex refCount = %d owner [%s] deleteownerId [%s]",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str()); throw megaglest_runtime_error(szBuf); } @@ -170,7 +170,7 @@ Mutex::~Mutex() { void Mutex::p() { if(mutex == NULL) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL refCount = %d owner [%s] deleteownerId [%s]",__FILE__,__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str()); + snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL refCount = %d owner [%s] deleteownerId [%s]",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str()); throw megaglest_runtime_error(szBuf); } SDL_mutexP(mutex); @@ -180,7 +180,7 @@ void Mutex::p() { void Mutex::v() { if(mutex == NULL) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL refCount = %d owner [%s] deleteownerId [%s]",__FILE__,__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str()); + snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL refCount = %d owner [%s] deleteownerId [%s]",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,refCount,ownerId.c_str(),deleteownerId.c_str()); throw megaglest_runtime_error(szBuf); } refCount--; @@ -195,7 +195,7 @@ Semaphore::Semaphore(Uint32 initialValue) { semaphore = SDL_CreateSemaphore(initialValue); if(semaphore == NULL) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",__FILE__,__FUNCTION__,__LINE__); + snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); throw megaglest_runtime_error(szBuf); } } @@ -203,7 +203,7 @@ Semaphore::Semaphore(Uint32 initialValue) { Semaphore::~Semaphore() { if(semaphore == NULL) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",__FILE__,__FUNCTION__,__LINE__); + snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); throw megaglest_runtime_error(szBuf); } SDL_DestroySemaphore(semaphore); @@ -213,7 +213,7 @@ Semaphore::~Semaphore() { void Semaphore::signal() { if(semaphore == NULL) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",__FILE__,__FUNCTION__,__LINE__); + snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); throw megaglest_runtime_error(szBuf); } SDL_SemPost(semaphore); @@ -222,7 +222,7 @@ void Semaphore::signal() { int Semaphore::waitTillSignalled(int waitMilliseconds) { if(semaphore == NULL) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",__FILE__,__FUNCTION__,__LINE__); + snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); throw megaglest_runtime_error(szBuf); } int semValue = 0; @@ -235,16 +235,43 @@ int Semaphore::waitTillSignalled(int waitMilliseconds) { return semValue; } +bool Semaphore::tryDecrement() { + if(semaphore == NULL) { + char szBuf[1024]=""; + snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + throw megaglest_runtime_error(szBuf); + } + int semValue = SDL_SemTryWait(semaphore); + return (semValue == 0); +} + uint32 Semaphore::getSemValue() { if(semaphore == NULL) { char szBuf[1024]=""; - snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",__FILE__,__FUNCTION__,__LINE__); + snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); throw megaglest_runtime_error(szBuf); } return SDL_SemValue(semaphore); } +void Semaphore::resetSemValue(Uint32 initialValue) { + if(semaphore == NULL) { + char szBuf[1024]=""; + snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + throw megaglest_runtime_error(szBuf); + } + + uint32 currentValue = SDL_SemValue(semaphore); + for(unsigned int i = currentValue; i < initialValue; ++i) { + SDL_SemPost(semaphore); + } +} + +// ===================================================== +// class ReadWriteMutex +// ===================================================== + ReadWriteMutex::ReadWriteMutex(int maxReaders) : semaphore(maxReaders) { this->maxReadersCount = maxReaders; } @@ -273,5 +300,232 @@ int ReadWriteMutex::maxReaders() { return this->maxReadersCount; } +// ===================================================== +// class Trigger +// ===================================================== + +Trigger::Trigger(Mutex *mutex) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + this->mutex = mutex; + this->trigger = SDL_CreateCond(); + if(this->trigger == NULL) { + char szBuf[1024]=""; + snprintf(szBuf,1023,"In [%s::%s Line: %d] trigger == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + throw megaglest_runtime_error(szBuf); + } + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); +} + +Trigger::~Trigger() { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + if(trigger == NULL) { + char szBuf[1024]=""; + snprintf(szBuf,1023,"In [%s::%s Line: %d] trigger == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + throw megaglest_runtime_error(szBuf); + } + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + SDL_DestroyCond(trigger); + trigger = NULL; + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + this->mutex = NULL; + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); +} + +void Trigger::signal(bool allThreads) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + if(trigger == NULL) { + char szBuf[1024]=""; + snprintf(szBuf,1023,"In [%s::%s Line: %d] trigger == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + throw megaglest_runtime_error(szBuf); + } + + //MutexSafeWrapper safeMutex(mutex); + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + if(allThreads == false) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + int result = SDL_CondSignal(trigger); + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] result = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,result); + } + else { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + int result = SDL_CondBroadcast(trigger); + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] result = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,result); + } + + //safeMutex.ReleaseLock(); + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); +} + +int Trigger::waitTillSignalled(Mutex *mutex, int waitMilliseconds) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + if(trigger == NULL) { + char szBuf[1024]=""; + snprintf(szBuf,1023,"In [%s::%s Line: %d] trigger == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + throw megaglest_runtime_error(szBuf); + } + if(mutex == NULL) { + char szBuf[1024]=""; + snprintf(szBuf,1023,"In [%s::%s Line: %d] mutex == NULL",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + throw megaglest_runtime_error(szBuf); + } + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + int result = 0; + if(waitMilliseconds >= 0) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + result = SDL_CondWaitTimeout(trigger,mutex->getMutex(),waitMilliseconds); + } + else { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + result = SDL_CondWait(trigger, mutex->getMutex()); + } + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + return result; +} + +MasterSlaveThreadController::MasterSlaveThreadController() { + std::vector empty; + init(empty); +} + +MasterSlaveThreadController::MasterSlaveThreadController(std::vector &slaveThreadList) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] ==========================================================\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + init(slaveThreadList); +} + +void MasterSlaveThreadController::init(std::vector &newSlaveThreadList) { + static string masterSlaveOwnerId = string(__FILE__) + string("_MasterSlaveThreadController"); + this->mutex = new Mutex(masterSlaveOwnerId); + this->slaveTriggerSem = new Semaphore(0); + this->slaveTriggerCounter = newSlaveThreadList.size() + triggerBaseCount; + setSlaves(newSlaveThreadList); +} + +void MasterSlaveThreadController::clearSlaves(bool clearListOnly) { + if(this->slaveThreadList.empty() == false) { + if(clearListOnly == false) { + for(unsigned int i = 0; i < this->slaveThreadList.size(); ++i) { + SlaveThreadControllerInterface *slave = this->slaveThreadList[i]; + slave->setMasterController(NULL); + } + } + this->slaveThreadList.clear(); + } +} + +MasterSlaveThreadController::~MasterSlaveThreadController() { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + clearSlaves(); + + delete slaveTriggerSem; + slaveTriggerSem = NULL; + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] mutex->getRefCount() = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,mutex->getRefCount()); + + delete mutex; + mutex = NULL; + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); +} + +void MasterSlaveThreadController::setSlaves(std::vector &slaveThreadList) { + this->slaveThreadList = slaveThreadList; + + if(this->slaveThreadList.empty() == false) { + for(unsigned int i = 0; i < this->slaveThreadList.size(); ++i) { + SlaveThreadControllerInterface *slave = this->slaveThreadList[i]; + slave->setMasterController(this); + } + } +} + +void MasterSlaveThreadController::signalSlaves(void *userdata) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + slaveTriggerCounter = this->slaveThreadList.size() + triggerBaseCount; + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + if(this->slaveThreadList.empty() == false) { + for(unsigned int i = 0; i < this->slaveThreadList.size(); ++i) { + SlaveThreadControllerInterface *slave = this->slaveThreadList[i]; + slave->signalSlave(userdata); + } + } + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); +} + +void MasterSlaveThreadController::triggerMaster(int waitMilliseconds) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + MutexSafeWrapper safeMutex(mutex); + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] semVal = %u\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,slaveTriggerCounter); + + slaveTriggerCounter--; + int newCount = slaveTriggerCounter; + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] slaveTriggerCounter = %u\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,slaveTriggerCounter); + + safeMutex.ReleaseLock(); + + if(newCount <= triggerBaseCount) { + slaveTriggerSem->signal(); + } + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); +} + +bool MasterSlaveThreadController::waitTillSlavesTrigger(int waitMilliseconds) { + bool result = true; + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] slaveTriggerCounter = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,slaveTriggerCounter); + + if(this->slaveThreadList.empty() == false) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] slaveTriggerCounter = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,slaveTriggerCounter); + + int slaveResult = slaveTriggerSem->waitTillSignalled(waitMilliseconds); + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] slaveTriggerCounter = %d slaveResult = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,slaveTriggerCounter,slaveResult); + + if(slaveResult != 0) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + result = false; + } + else if(slaveResult == 0) { + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d] slaveTriggerCounter = %d\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,slaveTriggerCounter); + + result = true; + } + } + + if(debugMasterSlaveThreadController) printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); + + return result; +} }}//end namespace