Added ordered packet system

Now packets can be grouped up and sent separately, but still be processed
sequentially regardless of how much they get scrambled over the network.
If a packet group isn't received for 15 seconds, all unprocessed packets
in that group will be dropped. Packet ordering is required for level and
area synchronization information when someone joins the level late.
This commit is contained in:
MysterD 2021-06-11 19:37:32 -07:00
parent b6959dc7ea
commit fde6eaf0a3
8 changed files with 368 additions and 58 deletions

View file

@ -141,6 +141,11 @@ void network_send_to(u8 localIndex, struct Packet* p) {
// remember reliable packets
network_remember_reliable(p);
// set ordered data (MUST BE IMMEDITAELY BEFORE HASING+SENDING)
if (p->orderedGroupId != 0) {
packet_set_ordered_data(p);
}
// save inside packet buffer
u32 hash = packet_hash(p);
memcpy(&p->buffer[p->dataLength], &hash, sizeof(u32));
@ -234,9 +239,10 @@ void network_update(void) {
gNetworkSystem->update();
}
// update reliable packets
// update reliable and ordered packets
if (gNetworkType != NT_NONE) {
network_update_reliable();
packet_ordered_update();
}
}

View file

@ -118,6 +118,7 @@ u8 network_player_connected(enum NetworkPlayerType type, u8 globalIndex) {
if (gNetworkType == NT_SERVER) {
gNetworkPlayerServer = gNetworkPlayerLocal;
}
packet_ordered_clear(globalIndex);
return 0;
}
@ -155,6 +156,7 @@ u8 network_player_connected(enum NetworkPlayerType type, u8 globalIndex) {
if (type == NPT_SERVER) { gNetworkPlayerServer = np; }
else { chat_add_message_ext("player connected", CMT_SYSTEM, get_player_color(np->globalIndex, 0)); }
LOG_INFO("player connected, local %d, global %d", i, np->globalIndex);
packet_ordered_clear(np->globalIndex);
return i;
}
@ -187,6 +189,7 @@ u8 network_player_disconnected(u8 globalIndex) {
for (int j = 0; j < MAX_SYNC_OBJECTS; j++) { gSyncObjects[j].rxEventId[i] = 0; }
LOG_INFO("player disconnected, local %d, global %d", i, globalIndex);
chat_add_message_ext("player disconnected", CMT_SYSTEM, get_player_color(globalIndex, 0));
packet_ordered_clear(globalIndex);
return i;
}
return UNKNOWN_GLOBAL_INDEX;

View file

@ -2,6 +2,49 @@
#include "../network.h"
#include "pc/debuglog.h"
void packet_process(struct Packet* p) {
switch (p->packetType) {
case PACKET_ACK: network_receive_ack(p); break;
case PACKET_PLAYER: network_receive_player(p); break;
case PACKET_OBJECT: network_receive_object(p); break;
case PACKET_SPAWN_OBJECTS: network_receive_spawn_objects(p); break;
case PACKET_SPAWN_STAR: network_receive_spawn_star(p); break;
case PACKET_SPAWN_STAR_NLE: network_receive_spawn_star_nle(p); break;
case PACKET_COLLECT_STAR: network_receive_collect_star(p); break;
case PACKET_COLLECT_COIN: network_receive_collect_coin(p); break;
case PACKET_COLLECT_ITEM: network_receive_collect_item(p); break;
case PACKET_RESERVATION_REQUEST: network_receive_reservation_request(p); break;
case PACKET_RESERVATION: network_receive_reservation(p); break;
case PACKET_JOIN_REQUEST: network_receive_join_request(p); break;
case PACKET_JOIN: network_receive_join(p); break;
case PACKET_CHAT: network_receive_chat(p); break;
case PACKET_KICK: network_receive_kick(p); break;
case PACKET_KEEP_ALIVE: network_receive_keep_alive(p); break;
case PACKET_LEAVING: network_receive_leaving(p); break;
case PACKET_SAVE_FILE: network_receive_save_file(p); break;
case PACKET_NETWORK_PLAYERS: network_receive_network_players(p); break;
case PACKET_DEATH: network_receive_death(p); break;
// location
case PACKET_CHANGE_LEVEL: network_receive_change_level(p); break;
case PACKET_CHANGE_AREA: network_receive_change_area(p); break;
case PACKET_LEVEL_AREA_REQUEST: network_receive_level_area_request(p); break;
case PACKET_LEVEL_REQUEST: network_receive_level_request(p); break;
case PACKET_LEVEL: network_receive_level(p); break;
case PACKET_AREA_REQUEST: network_receive_area_request(p); break;
case PACKET_AREA: network_receive_area(p); break;
case PACKET_SYNC_VALID: network_receive_sync_valid(p); break;
case PACKET_LEVEL_SPAWN_INFO: network_receive_level_spawn_info(p); break;
case PACKET_LEVEL_MACRO: network_receive_level_macro(p); break;
case PACKET_LEVEL_AREA_INFORM: network_receive_level_area_inform(p); break;
case PACKET_LEVEL_RESPAWN_INFO: network_receive_level_respawn_info(p); break;
// custom
case PACKET_CUSTOM: network_receive_custom(p); break;
default: LOG_ERROR("received unknown packet: %d", p->buffer[0]);
}
}
void packet_receive(struct Packet* p) {
u8 packetType = (u8)p->buffer[0];
@ -29,55 +72,19 @@ void packet_receive(struct Packet* p) {
if (np->onRxSeqId >= MAX_RX_SEQ_IDS) { np->onRxSeqId = 0; }
}
// check if we should process the packet
// parse the packet without processing the rest
if (packet_initial_read(p)) {
if (gNetworkType == NT_SERVER && p->destGlobalId != PACKET_DESTINATION_BROADCAST && p->destGlobalId != 0 && packetType != PACKET_ACK) {
// this packet is meant for someone else
struct Packet p2 = { 0 };
packet_duplicate(p, &p2);
network_send_to(p->destGlobalId, &p2);
} else if (p->orderedGroupId != 0) {
// add the packet to the ordered bucket (may process immediately if it's in order)
packet_ordered_add(p);
} else {
// process the packet
switch (packetType) {
case PACKET_ACK: network_receive_ack(p); break;
case PACKET_PLAYER: network_receive_player(p); break;
case PACKET_OBJECT: network_receive_object(p); break;
case PACKET_SPAWN_OBJECTS: network_receive_spawn_objects(p); break;
case PACKET_SPAWN_STAR: network_receive_spawn_star(p); break;
case PACKET_SPAWN_STAR_NLE: network_receive_spawn_star_nle(p); break;
case PACKET_COLLECT_STAR: network_receive_collect_star(p); break;
case PACKET_COLLECT_COIN: network_receive_collect_coin(p); break;
case PACKET_COLLECT_ITEM: network_receive_collect_item(p); break;
case PACKET_RESERVATION_REQUEST: network_receive_reservation_request(p); break;
case PACKET_RESERVATION: network_receive_reservation(p); break;
case PACKET_JOIN_REQUEST: network_receive_join_request(p); break;
case PACKET_JOIN: network_receive_join(p); break;
case PACKET_CHAT: network_receive_chat(p); break;
case PACKET_KICK: network_receive_kick(p); break;
case PACKET_KEEP_ALIVE: network_receive_keep_alive(p); break;
case PACKET_LEAVING: network_receive_leaving(p); break;
case PACKET_SAVE_FILE: network_receive_save_file(p); break;
case PACKET_NETWORK_PLAYERS: network_receive_network_players(p); break;
case PACKET_DEATH: network_receive_death(p); break;
// location
case PACKET_CHANGE_LEVEL: network_receive_change_level(p); break;
case PACKET_CHANGE_AREA: network_receive_change_area(p); break;
case PACKET_LEVEL_AREA_REQUEST: network_receive_level_area_request(p); break;
case PACKET_LEVEL_REQUEST: network_receive_level_request(p); break;
case PACKET_LEVEL: network_receive_level(p); break;
case PACKET_AREA_REQUEST: network_receive_area_request(p); break;
case PACKET_AREA: network_receive_area(p); break;
case PACKET_SYNC_VALID: network_receive_sync_valid(p); break;
case PACKET_LEVEL_SPAWN_INFO: network_receive_level_spawn_info(p); break;
case PACKET_LEVEL_MACRO: network_receive_level_macro(p); break;
case PACKET_LEVEL_AREA_INFORM: network_receive_level_area_inform(p); break;
case PACKET_LEVEL_RESPAWN_INFO: network_receive_level_respawn_info(p); break;
// custom
case PACKET_CUSTOM: network_receive_custom(p); break;
default: LOG_ERROR("received unknown packet: %d", p->buffer[0]);
}
packet_process(p);
}
}

View file

@ -52,6 +52,7 @@ enum PacketType {
};
struct Packet {
enum PacketType packetType;
u8 localIndex;
u16 dataLength;
u16 cursor;
@ -62,6 +63,9 @@ struct Packet {
u8 destGlobalId;
u16 seqId;
bool sent;
u8 orderedFromGlobalId;
u8 orderedGroupId;
u8 orderedSeqId;
u8 buffer[PACKET_LENGTH];
};
@ -71,6 +75,7 @@ enum KickReasonType {
};
// packet.c
void packet_process(struct Packet* p);
void packet_receive(struct Packet* packet);
// packet_read_write.c
@ -85,6 +90,7 @@ u32 packet_hash(struct Packet* packet);
bool packet_check_hash(struct Packet* packet);
void packet_ordered_begin(void);
void packet_ordered_end(void);
void packet_set_ordered_data(struct Packet* packet);
// packet_reliable.c
void network_forget_all_reliable(void);
@ -93,6 +99,12 @@ void network_receive_ack(struct Packet* p);
void network_remember_reliable(struct Packet* p);
void network_update_reliable(void);
// packet_ordered.c
void packet_ordered_add(struct Packet* p);
void packet_ordered_clear_table(u8 globalIndex, u8 groupdId);
void packet_ordered_clear(u8 globalIndex);
void packet_ordered_update(void);
// packet_player.c
void network_update_player(void);
void network_receive_player(struct Packet* p);

View file

@ -1,7 +1,7 @@
#include <stdio.h>
#include "../network.h"
#include "level_table.h"
//#define DISABLE_MODULE_LOG 1
#define DISABLE_MODULE_LOG 1
#include "pc/debuglog.h"
void network_send_level_area_inform(struct NetworkPlayer* np) {

View file

@ -10,7 +10,7 @@
#include "object_fields.h"
#include "behavior_table.h"
#include "model_ids.h"
//#define DISABLE_MODULE_LOG 1
#define DISABLE_MODULE_LOG 1
#include "pc/debuglog.h"
#define ERR_COULD_NOT_FIND_OBJECT ((u16)-1)

View file

@ -0,0 +1,238 @@
#include <stdio.h>
#include "../network.h"
#define DISABLE_MODULE_LOG 1
#include "pc/debuglog.h"
#define PACKET_ORDERED_TIMEOUT 15
struct OrderedPacketList {
struct Packet p;
struct OrderedPacketList* next;
};
struct OrderedPacketTable {
u8 fromGlobalId;
u8 groupId;
u8 processSeqId;
clock_t lastReceived;
struct OrderedPacketList* packets;
struct OrderedPacketTable* next;
};
static struct OrderedPacketTable* orderedPacketTable[MAX_PLAYERS] = { 0 };
static void packet_ordered_check_for_processing(struct OrderedPacketTable* opt) {
// sanity check
assert(opt != NULL);
struct OrderedPacketList* opl = opt->packets;
struct OrderedPacketList* oplLast = opl;
struct Packet* p = NULL;
while (opl != NULL) {
if (opt->processSeqId == opl->p.orderedSeqId) {
// we found the packet we're supposed to process!
p = &opl->p;
break;
}
oplLast = opl;
opl = opl->next;
}
// make sure we found the packet
if (p == NULL) { return; }
// process it
packet_process(p);
LOG_INFO("processed ordered packet (%d, %d, %d)", p->orderedFromGlobalId, p->orderedGroupId, p->orderedSeqId);
// remove from linked list
if (oplLast == opl) {
// we processed the head of the list
opt->packets = opl->next;
} else {
// we processed from after the head
oplLast->next = opl->next;
}
// deallocate
free(opl);
// find the next one we have to process.
opt->processSeqId++;
packet_ordered_check_for_processing(opt);
}
static void packet_ordered_add_to_table(struct OrderedPacketTable* opt, struct Packet* p) {
// sanity check
assert(opt != NULL);
assert(opt->fromGlobalId == p->orderedFromGlobalId);
assert(opt->groupId == p->orderedGroupId);
if (p->orderedSeqId < opt->processSeqId) {
// this packet has already been processed!
LOG_INFO("this packet has already been processed!");
return;
}
if (p->orderedSeqId == opt->processSeqId) {
// this is the packet that the table is waiting on, process it!
packet_process(p);
// find the next one we have to process.
opt->processSeqId++;
packet_ordered_check_for_processing(opt);
return;
}
struct OrderedPacketList* opl = opt->packets;
struct OrderedPacketList* oplLast = opl;
// make sure this packet isn't currently in the list
while (opl != NULL) {
if (opl->p.orderedSeqId == p->orderedSeqId) {
// this packet is already in the list!
LOG_INFO("this packet is already in the list!");
return;
}
// iterate
oplLast = opl;
opl = opl->next;
}
// allocate the packet list
opl = malloc(sizeof(struct OrderedPacketList));
if (oplLast == NULL) {
opt->packets = opl;
} else {
oplLast->next = opl;
}
// copy the packet over to the list
memcpy(&opl->p, p, sizeof(struct Packet));
opl->next = NULL;
LOG_INFO("added to list for (%d, %d, %d)", opt->fromGlobalId, opt->groupId, p->orderedSeqId);
opt->lastReceived = clock();
packet_ordered_check_for_processing(opt);
}
void packet_ordered_add(struct Packet* p) {
u8 globalId = p->orderedFromGlobalId;
struct OrderedPacketTable* opt = orderedPacketTable[globalId];
// try to find a ordered packet table for the packet's group
struct OrderedPacketTable* optLast = opt;
while (opt != NULL) {
if (opt->groupId == p->orderedGroupId) {
// found a matching group
packet_ordered_add_to_table(opt, p);
return;
}
// iterate
optLast = opt;
opt = opt->next;
}
// could not find a matching group, allocate a ordered packet table
opt = malloc(sizeof(struct OrderedPacketTable));
// put the opt in the right place
if (optLast == NULL) {
orderedPacketTable[globalId] = opt;
} else {
optLast->next = opt;
}
// set opt params
opt->fromGlobalId = p->orderedFromGlobalId;
opt->groupId = p->orderedGroupId;
opt->processSeqId = 1;
opt->packets = NULL;
opt->next = NULL;
opt->lastReceived = clock();
LOG_INFO("created table for (%d, %d)", opt->fromGlobalId, opt->groupId);
// add the packet to the table
packet_ordered_add_to_table(opt, p);
}
void packet_ordered_clear_table(u8 globalIndex, u8 groupId) {
LOG_INFO("clearing out ordered packet table for %d (%d)", globalIndex, groupId);
struct OrderedPacketTable* opt = orderedPacketTable[globalIndex];
struct OrderedPacketTable* optLast = opt;
while (opt != NULL) {
if (opt->groupId == groupId) {
// clear opl of table
struct OrderedPacketList* opl = opt->packets;
while (opl != NULL) {
struct OrderedPacketList* oplNext = opl->next;
free(opl);
opl = oplNext;
LOG_INFO("cleared out opl");
}
// remove from linked list
if (optLast == opt) {
orderedPacketTable[globalIndex] = opt->next;
} else {
optLast->next = opt->next;
}
// deallocate table
free(opt);
LOG_INFO("cleared out opt");
return;
}
// goto the next table
optLast = opt;
opt = opt->next;
}
}
void packet_ordered_clear(u8 globalIndex) {
LOG_INFO("clearing out all ordered packet tables for %d", globalIndex);
struct OrderedPacketTable* opt = orderedPacketTable[globalIndex];
while (opt != NULL) {
// clear opl of table
struct OrderedPacketList* opl = opt->packets;
while (opl != NULL) {
struct OrderedPacketList* oplNext = opl->next;
free(opl);
opl = oplNext;
LOG_INFO("cleared out opl");
}
// goto next table and free the current one
struct OrderedPacketTable* optNext = opt->next;
free(opt);
opt = optNext;
LOG_INFO("cleared out opt");
}
orderedPacketTable[globalIndex] = NULL;
}
void packet_ordered_update(void) {
clock_t currentClock = clock();
// check all ordered tables for a time out
for (int i = 0; i < MAX_PLAYERS; i++) {
struct OrderedPacketTable* opt = orderedPacketTable[i];
while (opt != NULL) {
struct OrderedPacketTable* optNext = opt->next;
float elapsed = (currentClock - opt->lastReceived) / (float)CLOCKS_PER_SEC;
if (elapsed > PACKET_ORDERED_TIMEOUT) {
// too much time has elapsed since we last received a packet for this group, forget the table!
packet_ordered_clear_table(i, opt->groupId);
}
opt = optNext;
}
}
}

View file

@ -1,10 +1,17 @@
#include "../network.h"
#include "game/area.h"
#include "pc/debuglog.h"
#define PACKET_FLAG_BUFFER_OFFSET 3
#define PACKET_DESTINATION_BUFFER_OFFSET 4
#define PACKET_ORDERED_SEQ_ID_OFFSET 7
static u16 sNextSeqNum = 1;
static bool sOrderedPackets = false;
static u8 sCurrentOrderedGroupId = 0;
static u8 sCurrentOrderedSeqId = 0;
static u16 nextSeqNum = 1;
void packet_init(struct Packet* packet, enum PacketType packetType, bool reliable, bool levelAreaMustMatch) {
memset(packet->buffer, 0, PACKET_LENGTH);
packet->cursor = 0;
@ -14,15 +21,18 @@ void packet_init(struct Packet* packet, enum PacketType packetType, bool reliabl
packet->levelAreaMustMatch = levelAreaMustMatch;
packet->requestBroadcast = false;
packet->sent = false;
packet->orderedFromGlobalId = sOrderedPackets ? gNetworkPlayerLocal->globalIndex : 0;
packet->orderedGroupId = sOrderedPackets ? sCurrentOrderedGroupId : 0;
packet->orderedSeqId = 0;
packet_write(packet, &packetType, sizeof(u8));
// write seq number
if (reliable) {
packet_write(packet, &nextSeqNum, sizeof(u16));
packet->seqId = nextSeqNum;
nextSeqNum++;
if (nextSeqNum == 0) { nextSeqNum++; }
packet_write(packet, &sNextSeqNum, sizeof(u16));
packet->seqId = sNextSeqNum;
sNextSeqNum++;
if (sNextSeqNum == 0) { sNextSeqNum++; }
} else {
u16 nullSeqNum = 0;
packet_write(packet, &nullSeqNum, sizeof(u16));
@ -37,6 +47,13 @@ void packet_init(struct Packet* packet, enum PacketType packetType, bool reliabl
u8 destination = PACKET_DESTINATION_BROADCAST;
packet_write(packet, &destination, sizeof(u8));
// write ordered packet information
if (sOrderedPackets) {
packet_write(packet, &packet->orderedFromGlobalId, sizeof(u8));
packet_write(packet, &packet->orderedGroupId, sizeof(u8));
packet_write(packet, &packet->orderedSeqId, sizeof(u8));
}
// write location
if (levelAreaMustMatch) {
packet_write(packet, &gCurrCourseNum, sizeof(s16));
@ -56,13 +73,15 @@ void packet_duplicate(struct Packet* srcPacket, struct Packet* dstPacket) {
dstPacket->requestBroadcast = srcPacket->requestBroadcast;
dstPacket->destGlobalId = srcPacket->destGlobalId;
dstPacket->sent = false;
dstPacket->orderedGroupId = srcPacket->orderedGroupId;
dstPacket->orderedSeqId = srcPacket->orderedSeqId;
memcpy(&dstPacket->buffer[0], &srcPacket->buffer[0], srcPacket->dataLength);
if (dstPacket->reliable) {
dstPacket->seqId = nextSeqNum;
nextSeqNum++;
if (nextSeqNum == 0) { nextSeqNum++; }
dstPacket->seqId = sNextSeqNum;
sNextSeqNum++;
if (sNextSeqNum == 0) { sNextSeqNum++; }
}
memcpy(&dstPacket->buffer[1], &dstPacket->seqId, 2);
@ -72,8 +91,9 @@ void packet_duplicate(struct Packet* srcPacket, struct Packet* dstPacket) {
void packet_set_flags(struct Packet* packet) {
u8 flags = 0;
flags |= SET_BIT(packet->levelAreaMustMatch, 0);
flags |= SET_BIT(packet->requestBroadcast, 1);
flags |= SET_BIT(packet->levelAreaMustMatch, 0);
flags |= SET_BIT(packet->requestBroadcast, 1);
flags |= SET_BIT(packet->orderedGroupId != 0, 2);
packet->buffer[PACKET_FLAG_BUFFER_OFFSET] = flags;
}
@ -89,11 +109,15 @@ void packet_write(struct Packet* packet, void* data, u16 length) {
}
u8 packet_initial_read(struct Packet* packet) {
// read packet type
packet->packetType = packet->buffer[0];
// read packet flags
u8 flags = 0;
packet_read(packet, &flags, sizeof(u8));
packet->levelAreaMustMatch = GET_BIT(flags, 0);
packet->requestBroadcast = GET_BIT(flags, 1);
bool packetIsOrdered = GET_BIT(flags, 2);
// read destination
packet_read(packet, &packet->destGlobalId, sizeof(u8));
@ -101,8 +125,8 @@ u8 packet_initial_read(struct Packet* packet) {
if (packet->levelAreaMustMatch) {
s16 currCourseNum, currActNum, currLevelNum, currAreaIndex;
packet_read(packet, &currCourseNum, sizeof(s16));
packet_read(packet, &currActNum, sizeof(s16));
packet_read(packet, &currLevelNum, sizeof(s16));
packet_read(packet, &currActNum, sizeof(s16));
packet_read(packet, &currLevelNum, sizeof(s16));
packet_read(packet, &currAreaIndex, sizeof(s16));
bool levelAreaMismatch =
(currCourseNum != gCurrCourseNum
@ -113,6 +137,13 @@ u8 packet_initial_read(struct Packet* packet) {
if (levelAreaMismatch) { return FALSE; }
}
// read ordered packet information
if (packetIsOrdered) {
packet_read(packet, &packet->orderedFromGlobalId, sizeof(u8));
packet_read(packet, &packet->orderedGroupId, sizeof(u8));
packet_read(packet, &packet->orderedSeqId, sizeof(u8));
}
// don't drop packet
return TRUE;
}
@ -142,9 +173,22 @@ bool packet_check_hash(struct Packet* packet) {
}
void packet_ordered_begin(void) {
// TODO: implement ordered packet streams
if (sOrderedPackets) { return; }
sOrderedPackets = true;
sCurrentOrderedGroupId++;
if (sCurrentOrderedGroupId == 0) { sCurrentOrderedGroupId++; }
sCurrentOrderedSeqId = 1;
}
void packet_ordered_end(void) {
// TODO: implement ordered packet streams
sOrderedPackets = false;
sCurrentOrderedSeqId = 0;
}
void packet_set_ordered_data(struct Packet* packet) {
if (packet->orderedGroupId == 0) { return; }
if (packet->orderedSeqId != 0) { return; }
packet->orderedSeqId = sCurrentOrderedSeqId++;
packet->buffer[PACKET_ORDERED_SEQ_ID_OFFSET] = packet->orderedSeqId;
}