make server slots continuously read data on sockets during game

This commit is contained in:
Mark Vejvoda 2013-06-04 00:31:41 +00:00
parent 819622f40e
commit 56774bc087
8 changed files with 197 additions and 44 deletions

View File

@ -86,6 +86,7 @@ bool AiInterfaceThread::canShutdown(bool deleteSelfIfShutdownDelayed) {
bool ret = (getExecutingTask() == false);
if(ret == false && deleteSelfIfShutdownDelayed == true) {
setDeleteSelfOnExecutionDone(deleteSelfIfShutdownDelayed);
deleteSelfIfRequired();
signalQuit();
}

View File

@ -64,6 +64,7 @@ bool ClientInterfaceThread::canShutdown(bool deleteSelfIfShutdownDelayed) {
bool ret = (getExecutingTask() == false);
if(ret == false && deleteSelfIfShutdownDelayed == true) {
setDeleteSelfOnExecutionDone(deleteSelfIfShutdownDelayed);
deleteSelfIfRequired();
signalQuit();
}

View File

@ -41,6 +41,9 @@ ConnectionSlotThread::ConnectionSlotThread(int slotIndex) : BaseThread() {
//this->event = NULL;
eventList.clear();
eventList.reserve(100);
triggerGameStarted = new Mutex();
gameStarted = false;
}
ConnectionSlotThread::ConnectionSlotThread(ConnectionSlotCallbackInterface *slotInterface,int slotIndex) : BaseThread() {
@ -51,11 +54,17 @@ ConnectionSlotThread::ConnectionSlotThread(ConnectionSlotCallbackInterface *slot
uniqueID = "ConnectionSlotThread";
//this->event = NULL;
eventList.clear();
triggerGameStarted = new Mutex();
gameStarted = false;
}
ConnectionSlotThread::~ConnectionSlotThread() {
delete triggerIdMutex;
triggerIdMutex = NULL;
delete triggerGameStarted;
triggerGameStarted = NULL;
}
void ConnectionSlotThread::setQuitStatus(bool value) {
@ -75,6 +84,9 @@ void ConnectionSlotThread::signalUpdate(ConnectionSlotEvent *event) {
eventList.push_back(*event);
safeMutex.ReleaseLock();
}
if(getGameStarted() == true && getQuitStatus() == true) {
return;
}
semTaskSignalled.signal();
}
@ -127,6 +139,7 @@ bool ConnectionSlotThread::canShutdown(bool deleteSelfIfShutdownDelayed) {
bool ret = (getExecutingTask() == false);
if(ret == false && deleteSelfIfShutdownDelayed == true) {
setDeleteSelfOnExecutionDone(deleteSelfIfShutdownDelayed);
deleteSelfIfRequired();
signalQuit();
}
@ -176,12 +189,34 @@ void ConnectionSlotThread::signalSlave(void *userdata) {
signalUpdate(&event);
}
bool ConnectionSlotThread::getGameStarted() {
MutexSafeWrapper safeMutexGameStarted(triggerGameStarted,CODE_AT_LINE);
return gameStarted;
}
void ConnectionSlotThread::setGameStarted(bool value) {
MutexSafeWrapper safeMutexGameStarted(triggerGameStarted,CODE_AT_LINE);
if(gameStarted != value) {
gameStarted = value;
if(gameStarted == true) {
//printf("Signal game has started for slot: %d\n",slotIndex);
semTaskSignalled.signal();
}
else {
//printf("Signal game has NOT started for slot: %d\n",slotIndex);
}
}
}
void ConnectionSlotThread::execute() {
RunningStatusSafeWrapper runningStatus(this);
try {
//setRunningStatus(true);
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
//printf("Starting client SLOT thread: %d\n",slotIndex);
//unsigned int idx = 0;
for(;this->slotInterface != NULL;) {
if(getQuitStatus() == true) {
@ -231,54 +266,109 @@ void ConnectionSlotThread::execute() {
//}
}
else {
semTaskSignalled.waitTillSignalled();
static string masterSlaveOwnerId = string(__FILE__) + string("_") + intToStr(__LINE__);
MasterSlaveThreadControllerSafeWrapper safeMasterController(masterController,20000,masterSlaveOwnerId);
if(getQuitStatus() == true) {
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
break;
}
MutexSafeWrapper safeMutex(triggerIdMutex,CODE_AT_LINE);
int eventCount = eventList.size();
//printf("Slot thread slotIndex: %d eventCount: %d\n",slotIndex,eventCount);
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] Slot thread slotIndex: %d eventCount: %d\n",__FILE__,__FUNCTION__,__LINE__,slotIndex,eventCount);
if(eventCount > 0) {
ConnectionSlotEvent eventCopy;
eventCopy.eventId = -1;
for(int i = 0; i < eventList.size(); ++i) {
ConnectionSlotEvent &slotEvent = eventList[i];
if(slotEvent.eventCompleted == false) {
eventCopy = slotEvent;
break;
}
}
safeMutex.ReleaseLock();
if(getGameStarted() == true) {
//printf("#A Checking action for slot: %d\n",slotIndex);
if(getQuitStatus() == true) {
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
break;
}
if(eventCopy.eventId > 0) {
ExecutingTaskSafeWrapper safeExecutingTaskMutex(this);
ExecutingTaskSafeWrapper safeExecutingTaskMutex(this);
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] Slot thread slotIndex: %d eventCount: %d eventCopy.eventId: %d\n",__FILE__,__FUNCTION__,__LINE__,slotIndex,eventCount,(int)eventCopy.eventId);
//printf("#1 Slot thread slotIndex: %d eventCount: %d eventCopy.eventId: %d\n",slotIndex,eventCount,(int)eventCopy.eventId);
//this->slotInterface->slotUpdateTask(&eventCopy);
this->slotUpdateTask(&eventCopy);
setTaskCompleted(eventCopy.eventId);
//printf("#2 Slot thread slotIndex: %d eventCount: %d eventCopy.eventId: %d\n",slotIndex,eventCount,(int)eventCopy.eventId);
// If the slot or socket are NULL the connection was lost
// so exit the thread
ConnectionSlot *slot = this->slotInterface->getSlot(slotIndex);
if(slot == NULL) {
break;
}
Socket *socket = slot->getSocket(true);
if(socket == NULL) {
break;
}
ConnectionSlotEvent eventCopy;
eventCopy.eventType = eReceiveSocketData;
eventCopy.connectionSlot = slot;
eventCopy.eventId = slotIndex;
eventCopy.socketTriggered = socket->hasDataToReadWithWait(5000);
//eventCopy.socketTriggered = true;
if(getQuitStatus() == true) {
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
break;
}
//printf("#C Checking action for slot: %d\n",slotIndex);
this->slotUpdateTask(&eventCopy);
//printf("#D Checking action for slot: %d\n",slotIndex);
}
else {
safeMutex.ReleaseLock();
//printf("#1 Checking action for slot: %d\n",slotIndex);
if(getGameStarted() == true) {
continue;
}
//printf("#2 Checking action for slot: %d\n",slotIndex);
semTaskSignalled.waitTillSignalled();
//printf("#3 Checking action for slot: %d\n",slotIndex);
if(getGameStarted() == true) {
continue;
}
//printf("#4 Checking action for slot: %d\n",slotIndex);
static string masterSlaveOwnerId = string(__FILE__) + string("_") + intToStr(__LINE__);
MasterSlaveThreadControllerSafeWrapper safeMasterController(masterController,20000,masterSlaveOwnerId);
if(getQuitStatus() == true) {
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
break;
}
MutexSafeWrapper safeMutex(triggerIdMutex,CODE_AT_LINE);
int eventCount = eventList.size();
//printf("Slot thread slotIndex: %d eventCount: %d\n",slotIndex,eventCount);
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] Slot thread slotIndex: %d eventCount: %d\n",__FILE__,__FUNCTION__,__LINE__,slotIndex,eventCount);
if(eventCount > 0) {
ConnectionSlotEvent eventCopy;
eventCopy.eventId = -1;
for(int i = 0; i < eventList.size(); ++i) {
ConnectionSlotEvent &slotEvent = eventList[i];
if(slotEvent.eventCompleted == false) {
eventCopy = slotEvent;
break;
}
}
safeMutex.ReleaseLock();
if(getQuitStatus() == true) {
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
break;
}
if(eventCopy.eventId > 0) {
ExecutingTaskSafeWrapper safeExecutingTaskMutex(this);
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] Slot thread slotIndex: %d eventCount: %d eventCopy.eventId: %d\n",__FILE__,__FUNCTION__,__LINE__,slotIndex,eventCount,(int)eventCopy.eventId);
//printf("#1 Slot thread slotIndex: %d eventCount: %d eventCopy.eventId: %d\n",slotIndex,eventCount,(int)eventCopy.eventId);
//this->slotInterface->slotUpdateTask(&eventCopy);
this->slotUpdateTask(&eventCopy);
setTaskCompleted(eventCopy.eventId);
//printf("#2 Slot thread slotIndex: %d eventCount: %d eventCopy.eventId: %d\n",slotIndex,eventCount,(int)eventCopy.eventId);
}
}
else {
safeMutex.ReleaseLock();
}
}
}
@ -288,6 +378,8 @@ void ConnectionSlotThread::execute() {
}
}
//printf("Ending client SLOT thread: %d\n",slotIndex);
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
}
catch(const exception &ex) {
@ -364,11 +456,24 @@ ConnectionSlot::~ConnectionSlot() {
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
if(BaseThread::shutdownAndWait(slotThreadWorker) == true) {
//printf("#1 Ending client SLOT: %d slotThreadWorker: %p\n",playerIndex,slotThreadWorker);
if(slotThreadWorker != NULL && slotThreadWorker->getRunningStatus() == false) {
//printf("#2 Ending client SLOT: %d\n",playerIndex);
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
delete slotThreadWorker;
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
}
//else if(BaseThread::shutdownAndWait(slotThreadWorker) == true) {
if(slotThreadWorker != NULL && slotThreadWorker->canShutdown(true) == true) {
//printf("#3 Ending client SLOT: %d\n",playerIndex);
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
delete slotThreadWorker;
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
}
//printf("#4 Ending client SLOT: %d\n",playerIndex);
slotThreadWorker = NULL;
delete socketSynchAccessor;
@ -386,6 +491,19 @@ ConnectionSlot::~ConnectionSlot() {
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] END\n",__FILE__,__FUNCTION__);
}
bool ConnectionSlot::getGameStarted() {
bool result = false;
if(this->slotThreadWorker != NULL) {
result = this->slotThreadWorker->getGameStarted();
}
return result;
}
void ConnectionSlot::setGameStarted(bool value) {
if(this->slotThreadWorker != NULL) {
this->slotThreadWorker->setGameStarted(value);
}
}
void ConnectionSlot::setPlayerIndex(int value) {
playerIndex = value;
@ -558,6 +676,8 @@ void ConnectionSlot::update(bool checkForNewClients,int lockedSlotIndex) {
bool waitForLaggingClient = false;
bool waitedForLaggingClient = false;
//printf("Update slot: %d this->hasDataToRead(): %d\n",this->playerIndex,this->hasDataToRead());
for(;waitForLaggingClient == true ||
(this->hasDataToRead() == true &&
(gotTextMsg == true || gotCellMarkerMsg == true));) {
@ -740,6 +860,7 @@ void ConnectionSlot::update(bool checkForNewClients,int lockedSlotIndex) {
vctPendingNetworkCommandList.push_back(*networkMessageCommandList.getCommand(i));
}
safeMutexSlot.ReleaseLock();
//printf("Got commands from client frame: %d count: %d\n",currentFrameCount,vctPendingNetworkCommandList.size());
//printf("#2 Server slot got currentFrameCount = %d\n",currentFrameCount);
}
@ -1239,6 +1360,7 @@ void ConnectionSlot::update(bool checkForNewClients,int lockedSlotIndex) {
if(joinGameInProgress == true) {
NetworkMessageReady networkMessageReady(0);
this->sendMessage(&networkMessageReady);
this->setGameStarted(true);
this->currentFrameCount = serverInterface->getCurrentFrameCount();
//printf("#2 Server slot got currentFrameCount = %d\n",currentFrameCount);

View File

@ -85,9 +85,11 @@ protected:
int slotIndex;
MasterSlaveThreadController *masterController;
Mutex *triggerGameStarted;
bool gameStarted;
virtual void setQuitStatus(bool value);
virtual void setTaskCompleted(int eventId);
virtual bool canShutdown(bool deleteSelfIfShutdownDelayed=false);
void slotUpdateTask(ConnectionSlotEvent *event);
@ -96,6 +98,9 @@ public:
ConnectionSlotThread(ConnectionSlotCallbackInterface *slotInterface,int slotIndex);
virtual ~ConnectionSlotThread();
bool getGameStarted();
void setGameStarted(bool value);
virtual void setMasterController(MasterSlaveThreadController *master) { masterController = master; }
virtual void signalSlave(void *userdata);
@ -109,6 +114,8 @@ public:
void purgeCompletedEvents();
void purgeAllEvents();
void setAllEventsCompleted();
virtual bool canShutdown(bool deleteSelfIfShutdownDelayed=false);
};
// =====================================================
@ -157,6 +164,9 @@ public:
ConnectionSlot(ServerInterface* serverInterface, int playerIndex);
~ConnectionSlot();
bool getGameStarted();
void setGameStarted(bool value);
bool getStartInGameConnectionLaunch() const { return startInGameConnectionLaunch; }
void setStartInGameConnectionLaunch(bool value) { startInGameConnectionLaunch = value; }

View File

@ -1164,6 +1164,8 @@ void ServerInterface::executeNetworkCommandsFromClients() {
NetworkCommand &cmd = pendingList[idx];
this->requestCommand(&cmd);
}
//printf("Executed: %d commands from slot: %d\n",pendingList.size(),i);
}
}
}
@ -1375,7 +1377,16 @@ void ServerInterface::update() {
//printf("START Server update #1\n");
std::map<int,ConnectionSlotEvent> eventList;
bool hasData = Socket::hasDataToRead(socketTriggeredList);
//bool hasData = Socket::hasDataToRead(socketTriggeredList);
bool hasData = false;
if(gameHasBeenInitiated == false) {
hasData = Socket::hasDataToRead(socketTriggeredList);
}
else {
hasData = true;
}
//if(this->getGameHasBeenInitiated() == true &&
// this->getAllowInGameConnections() == true) {
@ -1394,7 +1405,7 @@ void ServerInterface::update() {
std::map<int,bool> mapSlotSignalledList;
// Step #1 tell all connection slot worker threads to receive socket data
signalClientsToRecieveData(socketTriggeredList, eventList, mapSlotSignalledList);
if(gameHasBeenInitiated == false) signalClientsToRecieveData(socketTriggeredList, eventList, mapSlotSignalledList);
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] ============ Step #2\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled && chrono.getMillis() > 0) SystemFlags::OutputDebug(SystemFlags::debugPerformance,"In [%s::%s Line: %d] took %lld msecs\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,chrono.getMillis());
@ -1407,7 +1418,7 @@ void ServerInterface::update() {
//printf("START Server update #3\n");
// Step #2 check all connection slot worker threads for completed status
checkForCompletedClients(mapSlotSignalledList,errorMsgList, eventList);
if(gameHasBeenInitiated == false) checkForCompletedClients(mapSlotSignalledList,errorMsgList, eventList);
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] ============ Step #3\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled && chrono.getMillis() > 0) SystemFlags::OutputDebug(SystemFlags::debugPerformance,"In [%s::%s Line: %d] took %lld msecs\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,chrono.getMillis());
@ -1417,7 +1428,7 @@ void ServerInterface::update() {
//printf("In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
// Step #3 check clients for any lagging scenarios and try to deal with them
checkForLaggingClients(mapSlotSignalledList, eventList, socketTriggeredList,errorMsgList);
if(gameHasBeenInitiated == false) checkForLaggingClients(mapSlotSignalledList, eventList, socketTriggeredList,errorMsgList);
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] ============ Step #4\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled && chrono.getMillis() > 0) SystemFlags::OutputDebug(SystemFlags::debugPerformance,"In [%s::%s Line: %d] took %lld msecs\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,chrono.getMillis());
@ -1815,6 +1826,7 @@ void ServerInterface::waitUntilReady(Checksum *checksum) {
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s] networkMessageType==nmtReady\n",__FUNCTION__);
connectionSlot->setReady();
connectionSlot->setGameStarted(true);
}
else if(networkMessageType != nmtInvalid) {
string sErr = "Unexpected network message: " + intToStr(networkMessageType);
@ -1983,6 +1995,8 @@ void ServerInterface::waitUntilReady(Checksum *checksum) {
if(connectionSlot != NULL && connectionSlot->isConnected() == true) {
NetworkMessageReady networkMessageReady(checksum->getSum());
connectionSlot->sendMessage(&networkMessageReady);
connectionSlot->setGameStarted(true);
}
}

View File

@ -264,6 +264,7 @@ bool FactionThread::canShutdown(bool deleteSelfIfShutdownDelayed) {
bool ret = (getExecutingTask() == false);
if(ret == false && deleteSelfIfShutdownDelayed == true) {
setDeleteSelfOnExecutionDone(deleteSelfIfShutdownDelayed);
deleteSelfIfRequired();
signalQuit();
}

View File

@ -47,6 +47,7 @@ bool FileCRCPreCacheThread::canShutdown(bool deleteSelfIfShutdownDelayed) {
bool ret = (getExecutingTask() == false);
if(ret == false && deleteSelfIfShutdownDelayed == true) {
setDeleteSelfOnExecutionDone(deleteSelfIfShutdownDelayed);
deleteSelfIfRequired();
signalQuit();
}
@ -421,6 +422,7 @@ bool SimpleTaskThread::canShutdown(bool deleteSelfIfShutdownDelayed) {
bool ret = (getExecutingTask() == false);
if(ret == false && deleteSelfIfShutdownDelayed == true) {
setDeleteSelfOnExecutionDone(deleteSelfIfShutdownDelayed);
deleteSelfIfRequired();
signalQuit();
}
@ -662,6 +664,7 @@ bool LogFileThread::canShutdown(bool deleteSelfIfShutdownDelayed) {
bool ret = (getExecutingTask() == false);
if(ret == false && deleteSelfIfShutdownDelayed == true) {
setDeleteSelfOnExecutionDone(deleteSelfIfShutdownDelayed);
deleteSelfIfRequired();
signalQuit();
}

View File

@ -2808,6 +2808,7 @@ bool BroadCastSocketThread::canShutdown(bool deleteSelfIfShutdownDelayed) {
bool ret = (getExecutingTask() == false);
if(ret == false && deleteSelfIfShutdownDelayed == true) {
setDeleteSelfOnExecutionDone(deleteSelfIfShutdownDelayed);
deleteSelfIfRequired();
signalQuit();
}