Fix longstanding bug removing too few messages from queue on ACK.

Print queue as part of stats.
This commit is contained in:
ehouse 2006-09-27 01:54:53 +00:00
parent 47565a3241
commit 22ae4209ca

View file

@ -534,7 +534,7 @@ comms_send( CommsCtxt* comms, XWStreamCtxt* stream )
MsgQueueElem* newMsgElem;
XWStreamCtxt* msgStream;
XP_DEBUGF( "assigning msgID of " XP_LD " on chnl %d", msgID, channelNo );
XP_DEBUGF( "assigning msgID=" XP_LD " on chnl %d", msgID, channelNo );
#ifdef DEBUG
if ( !!rec ) {
@ -606,7 +606,7 @@ printQueue( CommsCtxt* comms )
for ( elem = comms->msgQueueHead, i = 0; i < comms->queueLen;
elem = elem->next, ++i ) {
XP_STATUSF( "\t%d: channel: %d; msgID: " XP_LD,
XP_STATUSF( "\t%d: channel: %d; msgID=" XP_LD,
i+1, elem->channelNo, elem->msgID );
}
}
@ -620,14 +620,17 @@ static void
removeFromQueue( CommsCtxt* comms, XP_PlayerAddr channelNo, MsgID msgID )
{
MsgQueueElem* elem;
MsgQueueElem* prev;
MsgQueueElem* keep = NULL;
MsgQueueElem* tail = NULL;
MsgQueueElem* keepHead = NULL;
MsgQueueElem* next;
XP_STATUSF( "looking to remove msgs prior or equal to " XP_LD
" for channel %d (queue len now %d)",
msgID, channelNo, comms->queueLen );
for ( prev = (MsgQueueElem*)NULL, elem = comms->msgQueueHead;
!!elem; prev = elem, elem = elem->next ) {
for ( elem = comms->msgQueueHead; !!elem; elem = next ) {
next = elem->next;
/* remove the 0-channel message if we've established a channel number.
Only clients should have any 0-channel messages in the queue, and
@ -641,28 +644,24 @@ removeFromQueue( CommsCtxt* comms, XP_PlayerAddr channelNo, MsgID msgID )
}
if ( elem->msgID <= msgID ) {
if ( !prev ) { /* it's the first element */
comms->msgQueueHead = elem->next;
prev = comms->msgQueueHead; /* so elem=prev below will work */
} else {
prev->next = elem->next;
}
if ( comms->msgQueueTail == elem ) {
comms->msgQueueTail = prev;
}
XP_FREE( comms->mpool, elem->msg );
XP_FREE( comms->mpool, elem );
elem = prev;
--comms->queueLen;
if ( !elem ) {
break;
} else {
if ( !!keepHead ) {
XP_ASSERT( !!keep );
keep->next = elem;
} else {
keepHead = elem;
}
keep = elem;
tail = elem;
}
}
comms->msgQueueHead = keepHead;
comms->msgQueueTail = tail;
XP_STATUSF( "%s: queueLen now %d", __FUNCTION__, comms->queueLen );
XP_ASSERT( comms->queueLen > 0 || comms->msgQueueHead == NULL );
@ -716,7 +715,7 @@ comms_resendAll( CommsCtxt* comms )
if ( result == 0 && oneResult != 0 ) {
result = oneResult;
}
XP_STATUSF( "resend: msgid=" XP_LD "; rslt=%d",
XP_STATUSF( "resend: msgID=" XP_LD "; rslt=%d",
msg->msgID, oneResult );
}
@ -863,7 +862,7 @@ comms_checkIncomingStream( CommsCtxt* comms, XWStreamCtxt* stream,
msgID = stream_getU32( stream );
lastMsgRcd = stream_getU32( stream );
XP_DEBUGF( "rcd: msg " XP_LD " on chnl %d", msgID,
XP_DEBUGF( "rcd: msgID=" XP_LD " on chnl %d", msgID,
channelNo );
removeFromQueue( comms, channelNo, lastMsgRcd );
@ -897,7 +896,7 @@ comms_checkIncomingStream( CommsCtxt* comms, XWStreamCtxt* stream,
* channel. */
if ( !!recs ) {
if ( msgID != recs->lastMsgReceived + 1 ) {
XP_DEBUGF( "on channel %d, old msgID " XP_LD
XP_DEBUGF( "on channel %d, old msgID=" XP_LD
" (next should be " XP_LD ")", channelNo,
msgID, recs->lastMsgReceived+1 );
validMessage = XP_FALSE;
@ -970,36 +969,44 @@ comms_getStats( CommsCtxt* comms, XWStreamCtxt* stream )
{
XP_UCHAR buf[100];
AddressRecord* rec;
MsgQueueElem* elem;
XP_SNPRINTF( (XP_UCHAR*)buf, sizeof(buf), (XP_UCHAR*)"msg queue len: %d\n",
comms->queueLen );
stream_putBytes( stream, buf, (XP_U16)XP_STRLEN( buf ) );
XP_SNPRINTF( (XP_UCHAR*)buf, sizeof(buf),
(XP_UCHAR*)"msg queue len: %d\n", comms->queueLen );
stream_putString( stream, buf );
for ( elem = comms->msgQueueHead; !!elem; elem = elem->next ) {
XP_SNPRINTF( buf, sizeof(buf),
" - channelNo=%d; msgID=" XP_LD "; len=%d\n",
elem->channelNo, elem->msgID, elem->len );
stream_putString( stream, buf );
}
XP_SNPRINTF( (XP_UCHAR*)buf, sizeof(buf),
(XP_UCHAR*)"channel-less bytes sent: %d\n",
comms->nUniqueBytes );
stream_putBytes( stream, buf, (XP_U16)XP_STRLEN( buf ) );
stream_putString( stream, buf );
for ( rec = comms->recs; !!rec; rec = rec->next ) {
XP_SNPRINTF( (XP_UCHAR*)buf, sizeof(buf),
(XP_UCHAR*)" Stats for channel: %d\n",
rec->channelNo );
stream_putBytes( stream, buf, (XP_U16)XP_STRLEN( buf ) );
stream_putString( stream, buf );
XP_SNPRINTF( (XP_UCHAR*)buf, sizeof(buf),
(XP_UCHAR*)"Last msg sent: " XP_LD "\n",
rec->nextMsgID );
stream_putBytes( stream, buf, (XP_U16)XP_STRLEN( buf ) );
stream_putString( stream, buf );
XP_SNPRINTF( (XP_UCHAR*)buf, sizeof(buf),
(XP_UCHAR*)"Unique bytes sent: %d\n",
rec->nUniqueBytes );
stream_putBytes( stream, buf, (XP_U16)XP_STRLEN( buf ) );
stream_putString( stream, buf );
XP_SNPRINTF( (XP_UCHAR*)buf, sizeof(buf),
(XP_UCHAR*)"Last message acknowledged: %d\n",
rec->lastACK);
stream_putBytes( stream, buf, (XP_U16)XP_STRLEN( buf ) );
stream_putString( stream, buf );
}
} /* comms_getStats */
#endif