From 1fc3a78943f8edcd08401ac59bb1accbaba1487b Mon Sep 17 00:00:00 2001 From: Jack Garrard Date: Sat, 29 Oct 2022 17:17:38 -0700 Subject: [PATCH] Shutdown send/recv threads before socket reinit --- include/server/SocketClient.hpp | 3 ++- source/server/Client.cpp | 10 +++++-- source/server/SocketClient.cpp | 47 ++++++++++++++++++++++----------- 3 files changed, 41 insertions(+), 19 deletions(-) diff --git a/include/server/SocketClient.hpp b/include/server/SocketClient.hpp index ff41816..26f1191 100644 --- a/include/server/SocketClient.hpp +++ b/include/server/SocketClient.hpp @@ -29,12 +29,13 @@ class SocketClient : public SocketBase { bool startThreads(); void endThreads(); + void waitForThreads(); bool send(Packet* packet); bool recv(); bool queuePacket(Packet *packet); - void trySendQueue(); + bool trySendQueue(); void sendFunc(); void recvFunc(); diff --git a/source/server/Client.cpp b/source/server/Client.cpp index ffe444e..4422e43 100644 --- a/source/server/Client.cpp +++ b/source/server/Client.cpp @@ -109,6 +109,7 @@ bool Client::startThread() { */ void Client::restartConnection() { + Logger::log("Restarting connection.\n"); if (!sInstance) { Logger::log("Static Instance is null!\n"); return; @@ -134,8 +135,12 @@ void Client::restartConnection() { Logger::log("Sucessfully Closed Socket.\n"); } + Logger::log("Waiting for send/recv threads to finish.\n"); + sInstance->mSocket->waitForThreads(); + sInstance->mConnectCount = 0; + Logger::log("Reinitializing connection\n"); sInstance->mIsConnectionActive = sInstance->mSocket->init(sInstance->mServerIP.cstr(), sInstance->mServerPort).isSuccess(); if(sInstance->mSocket->getLogState() == SOCKET_LOG_CONNECTED) { @@ -409,8 +414,9 @@ void Client::readFunc() { mHeap->free(curPacket); - }else { // if false, socket has errored or disconnected, so close the socket and end this thread. - Logger::log("Client Socket Encountered an Error! Errno: 0x%x\n", mSocket->socket_errno); + }else { // if false, socket has errored or disconnected, so restart the connection + Logger::log("Client Socket Encountered an Error, restarting connection! Errno: 0x%x\n", mSocket->socket_errno); + this->restartConnection(); } } diff --git a/source/server/SocketClient.cpp b/source/server/SocketClient.cpp index 01d0f33..7c734ec 100644 --- a/source/server/SocketClient.cpp +++ b/source/server/SocketClient.cpp @@ -162,7 +162,7 @@ const char* SocketClient::getUdpStateChar() { } bool SocketClient::send(Packet *packet) { - if (this->socket_log_state != SOCKET_LOG_CONNECTED) + if (this->socket_log_state != SOCKET_LOG_CONNECTED || packet == nullptr) return false; char* buffer = reinterpret_cast(packet); @@ -190,7 +190,7 @@ bool SocketClient::send(Packet *packet) { } else { Logger::log("Failed to Fully Send Packet! Result: %d Type: %s Packet Size: %d\n", valread, packetNames[packet->mType], packet->mPacketSize); this->socket_errno = nn::socket::GetLastErrno(); - this->tryReconnect(); + this->closeSocket(); return false; } return true; @@ -201,7 +201,8 @@ bool SocketClient::recv() { if (this->socket_log_state != SOCKET_LOG_CONNECTED) { Logger::log("Unable To Receive! Socket Not Connected.\n"); this->socket_errno = nn::socket::GetLastErrno(); - return this->tryReconnect(); + this->closeSocket(); + return false; } int headerSize = sizeof(Packet); @@ -209,9 +210,13 @@ bool SocketClient::recv() { const int fd_count = 2; struct pollfd pfds[fd_count] = {{0}, {0}}; + + // TCP Connection pfds[0].fd = this->socket_log_socket; pfds[0].events = 1; pfds[0].revents = 0; + + // UDP Connection pfds[1].fd = this->mUdpSocket; pfds[1].events = 1; pfds[1].revents = 0; @@ -289,7 +294,8 @@ bool SocketClient::recv() { return true; } else { Logger::log("Header Read Failed! Value: %d Total Read: %d\n", result, valread); - return this->tryReconnect(); // if we sucessfully reconnect, we dont want + this->closeSocket(); + return false; } } } @@ -330,7 +336,8 @@ bool SocketClient::recv() { } else { mHeap->free(packetBuf); Logger::log("Packet Read Failed! Value: %d\nPacket Size: %d\nPacket Type: %s\n", result, header->mPacketSize, packetNames[header->mType]); - return this->tryReconnect(); + this->closeSocket(); + return false; } } @@ -356,7 +363,8 @@ bool SocketClient::recv() { } else { // if we error'd, close the socket Logger::log("valread was zero! Disconnecting.\n"); this->socket_errno = nn::socket::GetLastErrno(); - return this->tryReconnect(); + this->closeSocket(); + return false; } } @@ -454,14 +462,18 @@ void SocketClient::endThreads() { mSendThread->mDelegateThread->destroy(); } +void SocketClient::waitForThreads() { + while (!mRecvThread->isDone()){} + while (!mSendThread->isDone()){} +} + void SocketClient::sendFunc() { Logger::log("Starting Send Thread.\n"); - while (true) { - trySendQueue(); - } + while (trySendQueue() || socket_log_state != SOCKET_LOG_DISCONNECTED) {} + Logger::log("Sending packet failed!\n"); Logger::log("Ending Send Thread.\n"); } @@ -471,12 +483,13 @@ void SocketClient::recvFunc() { Logger::log("Starting Recv Thread.\n"); - while (true) { - if (!recv()) { - Logger::log("Receiving Packet Failed!\n"); - } - } + while (recv() || socket_log_state != SOCKET_LOG_DISCONNECTED) {} + // Free up all blocked threads + mSendQueue.push(0, sead::MessageQueue::BlockType::NonBlocking); + mRecvQueue.push(0, sead::MessageQueue::BlockType::NonBlocking); + + Logger::log("Receiving Packet Failed!\n"); Logger::log("Ending Recv Thread.\n"); } @@ -492,13 +505,15 @@ bool SocketClient::queuePacket(Packet* packet) { } } -void SocketClient::trySendQueue() { +bool SocketClient::trySendQueue() { Packet* curPacket = (Packet*)mSendQueue.pop(sead::MessageQueue::BlockType::Blocking); - send(curPacket); + bool successful = send(curPacket); mHeap->free(curPacket); + + return successful; } Packet* SocketClient::tryGetPacket(sead::MessageQueue::BlockType blockType) {