Commit f1f2bc2c authored by Franksen, Benjamin's avatar Franksen, Benjamin
Browse files

lowcal: start timeout counter when sending requests, not when enqueueing them


The timeout property of a lowcal variable is redefined so as to refer to the
time between sending a request and receiving a response. In other words, the
time spent inside the send queue is not counted. Implementation-wise, this
means we set the timeout counter when dequeueing the request, not when we
enqueue it.

This change avoids spurious errors (PROT_UMSG or PROT_2LATE) that happen
when a varset has many multiplexers enqueued and a pending request times out
due to a send error. In this case, enqueued requests will be sent (because
the writer has higher priority than the timer) and then promptly times out;
the response, when it arrives, will then cause one of the two error message
above.

Another advantage of the new scheme is that the timeout can now be
configured without knowing how many multiplexers there are.

Finally, the implementation is simplified, since we no longer need to remove
elements from the queue, except the head. The functions unqueue and
varsetq_remove are no longer used and have been removed. We could even go
back to using a singly linked list for the send queue.
parent 92647dd5
......@@ -637,14 +637,6 @@ static Buf_Entry *varsetq_dequeue(Varset *varset_p)
return slot_p;
}
static void varsetq_remove(Varset *varset_p, Buf_Entry *slot_p)
{
if (!varsetq_is_empty(varset_p)) {
dlist_remove(varset_p->queue, node, slot_p);
assert_varsetq_invariants(varset_p);
}
}
static void varsetq_dump_elems(Varset *varset_p)
{
Buf_Entry *slot_p;
......@@ -812,7 +804,6 @@ static void enqueue(Buf_Entry *slot_p)
int varsetq_was_empty = varsetq_is_empty(slot_p->varset_p);
if (slot_p->prop.confirmed_service) {
slot_p->to_cnt = slot_p->to_init; /* Set timeout counter */
#ifdef LCAL_STAT
slot_p->stat.enqueue_time = alm_get_stamp();
#endif
......@@ -832,6 +823,9 @@ static Buf_Entry *dequeue(void)
if (varset_p != NULL) {
slot_p = varsetq_dequeue(varset_p);
assert(slot_p != NULL); /* G1 */
if (slot_p->prop.confirmed_service) {
slot_p->to_cnt = slot_p->to_init; /* Set timeout counter */
}
/* if we have not emptied the queue, we must re-enqueue
the varset into the global send queue */
if (!varsetq_is_empty(varset_p)) {
......@@ -841,25 +835,6 @@ static Buf_Entry *dequeue(void)
return slot_p;
}
/* Precondition: slot_p is enqueued */
static void unqueue(Buf_Entry *slot_p)
{
Varset *varset_p = slot_p->varset_p;
varsetq_remove(varset_p, slot_p);
if (varsetq_is_empty(varset_p)) {
if (sendq.last_due == varset_p) {
/* We are removing last_due. In order to preserve
invariant G2 we must move last_due to the previous
varset (or NULL if it was the first one). */
sendq.last_due = sendq.last_due->node.prev; /* establish G2 */
}
/* second precondition has just been established */
global_sendq_remove(varset_p);
}
}
static void queue_dump_elems(void)
{
Varset *varset_p;
......@@ -2294,20 +2269,15 @@ prot_Return lowcal_time (mcan_Obj_Handle* obj_handle_p,
slot_p->flags.timeout = TRUE;
slot_p->to_cnt = 0;
if (slot_p->flags.state == LCAL_PENDING) {
/* A pending slot is never enqueued (by definition), so no need
to unqueue in this case.
TODO: make this an assertion here. */
slot_p->varset_p->pending = FALSE;
if (glbl_lowcal_use_handshake) {
/* In handshake mode make sure we get a chance to send another
slot from the same varset now. */
writer_notify(now, now);
}
} else if (slot_p->flags.state == LCAL_ENQUEUED) {
/* take it out of the queue */
unqueue(slot_p);
/* A slot with a non-zero to_cnt is by definition in the pending state. */
assert(slot_p->flags.state == LCAL_PENDING);
slot_p->varset_p->pending = FALSE;
if (glbl_lowcal_use_handshake) {
/* In handshake mode make sure we get a chance to send another
slot from the same varset now. */
writer_notify(now, now);
}
/* Set state */
slot_p->flags.state = LCAL_INACTIVE;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment