Shutdown send/recv threads before socket reinit

This commit is contained in:
Jack Garrard 2022-10-29 17:17:38 -07:00
parent 627e07cd65
commit 1fc3a78943
3 changed files with 41 additions and 19 deletions

View file

@ -29,12 +29,13 @@ class SocketClient : public SocketBase {
bool startThreads(); bool startThreads();
void endThreads(); void endThreads();
void waitForThreads();
bool send(Packet* packet); bool send(Packet* packet);
bool recv(); bool recv();
bool queuePacket(Packet *packet); bool queuePacket(Packet *packet);
void trySendQueue(); bool trySendQueue();
void sendFunc(); void sendFunc();
void recvFunc(); void recvFunc();

View file

@ -109,6 +109,7 @@ bool Client::startThread() {
*/ */
void Client::restartConnection() { void Client::restartConnection() {
Logger::log("Restarting connection.\n");
if (!sInstance) { if (!sInstance) {
Logger::log("Static Instance is null!\n"); Logger::log("Static Instance is null!\n");
return; return;
@ -134,8 +135,12 @@ void Client::restartConnection() {
Logger::log("Sucessfully Closed Socket.\n"); Logger::log("Sucessfully Closed Socket.\n");
} }
Logger::log("Waiting for send/recv threads to finish.\n");
sInstance->mSocket->waitForThreads();
sInstance->mConnectCount = 0; sInstance->mConnectCount = 0;
Logger::log("Reinitializing connection\n");
sInstance->mIsConnectionActive = sInstance->mSocket->init(sInstance->mServerIP.cstr(), sInstance->mServerPort).isSuccess(); sInstance->mIsConnectionActive = sInstance->mSocket->init(sInstance->mServerIP.cstr(), sInstance->mServerPort).isSuccess();
if(sInstance->mSocket->getLogState() == SOCKET_LOG_CONNECTED) { if(sInstance->mSocket->getLogState() == SOCKET_LOG_CONNECTED) {
@ -409,8 +414,9 @@ void Client::readFunc() {
mHeap->free(curPacket); mHeap->free(curPacket);
}else { // if false, socket has errored or disconnected, so close the socket and end this thread. }else { // if false, socket has errored or disconnected, so restart the connection
Logger::log("Client Socket Encountered an Error! Errno: 0x%x\n", mSocket->socket_errno); Logger::log("Client Socket Encountered an Error, restarting connection! Errno: 0x%x\n", mSocket->socket_errno);
this->restartConnection();
} }
} }

View file

@ -162,7 +162,7 @@ const char* SocketClient::getUdpStateChar() {
} }
bool SocketClient::send(Packet *packet) { bool SocketClient::send(Packet *packet) {
if (this->socket_log_state != SOCKET_LOG_CONNECTED) if (this->socket_log_state != SOCKET_LOG_CONNECTED || packet == nullptr)
return false; return false;
char* buffer = reinterpret_cast<char*>(packet); char* buffer = reinterpret_cast<char*>(packet);
@ -190,7 +190,7 @@ bool SocketClient::send(Packet *packet) {
} else { } else {
Logger::log("Failed to Fully Send Packet! Result: %d Type: %s Packet Size: %d\n", valread, packetNames[packet->mType], packet->mPacketSize); Logger::log("Failed to Fully Send Packet! Result: %d Type: %s Packet Size: %d\n", valread, packetNames[packet->mType], packet->mPacketSize);
this->socket_errno = nn::socket::GetLastErrno(); this->socket_errno = nn::socket::GetLastErrno();
this->tryReconnect(); this->closeSocket();
return false; return false;
} }
return true; return true;
@ -201,7 +201,8 @@ bool SocketClient::recv() {
if (this->socket_log_state != SOCKET_LOG_CONNECTED) { if (this->socket_log_state != SOCKET_LOG_CONNECTED) {
Logger::log("Unable To Receive! Socket Not Connected.\n"); Logger::log("Unable To Receive! Socket Not Connected.\n");
this->socket_errno = nn::socket::GetLastErrno(); this->socket_errno = nn::socket::GetLastErrno();
return this->tryReconnect(); this->closeSocket();
return false;
} }
int headerSize = sizeof(Packet); int headerSize = sizeof(Packet);
@ -209,9 +210,13 @@ bool SocketClient::recv() {
const int fd_count = 2; const int fd_count = 2;
struct pollfd pfds[fd_count] = {{0}, {0}}; struct pollfd pfds[fd_count] = {{0}, {0}};
// TCP Connection
pfds[0].fd = this->socket_log_socket; pfds[0].fd = this->socket_log_socket;
pfds[0].events = 1; pfds[0].events = 1;
pfds[0].revents = 0; pfds[0].revents = 0;
// UDP Connection
pfds[1].fd = this->mUdpSocket; pfds[1].fd = this->mUdpSocket;
pfds[1].events = 1; pfds[1].events = 1;
pfds[1].revents = 0; pfds[1].revents = 0;
@ -289,7 +294,8 @@ bool SocketClient::recv() {
return true; return true;
} else { } else {
Logger::log("Header Read Failed! Value: %d Total Read: %d\n", result, valread); Logger::log("Header Read Failed! Value: %d Total Read: %d\n", result, valread);
return this->tryReconnect(); // if we sucessfully reconnect, we dont want this->closeSocket();
return false;
} }
} }
} }
@ -330,7 +336,8 @@ bool SocketClient::recv() {
} else { } else {
mHeap->free(packetBuf); mHeap->free(packetBuf);
Logger::log("Packet Read Failed! Value: %d\nPacket Size: %d\nPacket Type: %s\n", result, header->mPacketSize, packetNames[header->mType]); Logger::log("Packet Read Failed! Value: %d\nPacket Size: %d\nPacket Type: %s\n", result, header->mPacketSize, packetNames[header->mType]);
return this->tryReconnect(); this->closeSocket();
return false;
} }
} }
@ -356,7 +363,8 @@ bool SocketClient::recv() {
} else { // if we error'd, close the socket } else { // if we error'd, close the socket
Logger::log("valread was zero! Disconnecting.\n"); Logger::log("valread was zero! Disconnecting.\n");
this->socket_errno = nn::socket::GetLastErrno(); this->socket_errno = nn::socket::GetLastErrno();
return this->tryReconnect(); this->closeSocket();
return false;
} }
} }
@ -454,14 +462,18 @@ void SocketClient::endThreads() {
mSendThread->mDelegateThread->destroy(); mSendThread->mDelegateThread->destroy();
} }
void SocketClient::waitForThreads() {
while (!mRecvThread->isDone()){}
while (!mSendThread->isDone()){}
}
void SocketClient::sendFunc() { void SocketClient::sendFunc() {
Logger::log("Starting Send Thread.\n"); Logger::log("Starting Send Thread.\n");
while (true) { while (trySendQueue() || socket_log_state != SOCKET_LOG_DISCONNECTED) {}
trySendQueue();
}
Logger::log("Sending packet failed!\n");
Logger::log("Ending Send Thread.\n"); Logger::log("Ending Send Thread.\n");
} }
@ -471,12 +483,13 @@ void SocketClient::recvFunc() {
Logger::log("Starting Recv Thread.\n"); Logger::log("Starting Recv Thread.\n");
while (true) { while (recv() || socket_log_state != SOCKET_LOG_DISCONNECTED) {}
if (!recv()) {
Logger::log("Receiving Packet Failed!\n");
}
}
// Free up all blocked threads
mSendQueue.push(0, sead::MessageQueue::BlockType::NonBlocking);
mRecvQueue.push(0, sead::MessageQueue::BlockType::NonBlocking);
Logger::log("Receiving Packet Failed!\n");
Logger::log("Ending Recv Thread.\n"); Logger::log("Ending Recv Thread.\n");
} }
@ -492,13 +505,15 @@ bool SocketClient::queuePacket(Packet* packet) {
} }
} }
void SocketClient::trySendQueue() { bool SocketClient::trySendQueue() {
Packet* curPacket = (Packet*)mSendQueue.pop(sead::MessageQueue::BlockType::Blocking); Packet* curPacket = (Packet*)mSendQueue.pop(sead::MessageQueue::BlockType::Blocking);
send(curPacket); bool successful = send(curPacket);
mHeap->free(curPacket); mHeap->free(curPacket);
return successful;
} }
Packet* SocketClient::tryGetPacket(sead::MessageQueue::BlockType blockType) { Packet* SocketClient::tryGetPacket(sead::MessageQueue::BlockType blockType) {