EXTENDS Integers
CONSTANT DispatchCmd, TableCMD
(* --algorithm tableQueue
variables
dispatchQueue = <<>>;
tableCmdQueue = [emptyQueue |-> FALSE, q |-> <<>>];
dispatchState = [consume |-> 0, produce |-> 0];
consumeLock = 0;
tableQueueLock = 0;
needFireDispatchMap = [produce |-> FALSE];
procedure setNeedDispatch(irole)
begin
\* !bufferQueue.empty
SND_T_N_EMPTY:
if Len(tableCmdQueue.q) /= 0 then
\* bufferState & CONSUME == 0
SND_N_C:
if dispatchState.consume == 0 then
\* bufferState &= CONSUME
dispatchState.consume := 1;
\* need = true
needFireDispatchMap[irole] := TRUE;
end if;
end if;
end procedure;
procedure needFireDispatch(role)
variable locked = FALSE;
begin NEED_FIRE:
needFireDispatchMap[role] := FALSE;
\* bufferState & CONSUME == 0
NF_LOOP:
while dispatchState.consume == 0 do
\* !bufferQueue.isEmpty
NF_T_TQ_N_E:
if Len(tableCmdQueue) /= 0 then
\* !consumeLock.get()
NF_C_N_L:
if consumeLock == 0 then
\* CAS -> consumeLock
TRY_LOCK:
if consumeLock == 0 then
consumeLock := 1;
locked := TRUE;
end if;
\* CAS succeed!
LOCKED:
if locked then
call setNeedDispatch(role);
UNLOCK:
\* consumeLock.set(false)
consumeLock := 0;
\* break;
goto NF_RET;
end if;
end if;
else
goto NF_RET;
end if;
end while;
NF_RET:
skip;
end procedure;
procedure scheduleDispatch()
begin D:
dispatchQueue := Append(dispatchQueue, DispatchCmd);
end procedure;
process consumer \in 1..10
variables
tableQueueCmd;
begin CONSUME:
needFireDispatchMap[self] := FALSE;
C_LOOP:
while TRUE do
Acquire_C:
await dispatchQueue /= <<>>;
tableQueueCmd := Head(dispatchQueue);
dispatchQueue := Tail(dispatchQueue);
\* start to consume
skip;
\* test Dispatch cancelable
if Len(tableCmdQueue.q) == 0 then
dispatchState.consume := 0;
call needFireDispatch(self);
C_TEST_R:
if needFireDispatch[self] then
call scheduleDispatch();
end if;
else
call scheduleDispatch();
end if;
end while;
end process;
process producer = 100
begin Produce:
while TRUE do
\* acquire queue and set In-produce state
Acquire_P:
while TRUE do
if tableQueueLock == 0 then
tableQueueLock := 1;
dispatchState.produce := 1;
goto P_OK;
end if;
end while;
P_OK:
skip;
\* put cmd to CMD_QUEUE
tableCmdQueue.q := Append(tableCmdQueue.q, TableCMD);
\* test Queue size == 1
if Len(tableCmdQueue) == 1 then
\* tryFireDispatch
call needFireDispatch('produce');
P_TEST_R:
if needFireDispatch.produce == 1 then
call scheduleDispatch();
end if;
end if;
\* leave: unset In-produce state
LEAVE:
dispatchState.produce := 0;
end while;
end process;
process idleHandler = 200
variable iLocked = FALSE;
begin IDLE_HANDLE:
while TRUE do
\* fast check any condition volatile
\* bufferQeueu == empty
IDLE_TQ_EMPTY:
if tableCmdQueue.empty then
goto LOOP_END;
end if;
\* bufferState != 0
DQ_IN_CONSUME:
if dispatchState.consume /= 0 then
goto LOOP_END;
end if;
DQ_IN_PRODUCE:
if diispatchState.produce /= 0 then
goto LOOP_END;
end if;
\* bufferQeueue.empty
TQ_N_EMPTY:
if Len(tableCmdQueue.q) /= 0 then
goto LOOP_END;
end if;
\* bufferLock.get()
TQ_LOCKED:
if tableQueueLock /= 0 then
goto LOOP_END;
end if;
\* acquire CMD_QUEUE's lock
AcquireLock_IDLE:
if tableQueueLock == 0 then
tableQueueLock := 1;
iLocked := TRUE;
end if;
if iLocked then
\* test Not In-produce
if dispatchState.produce == 0 then
\* test Queue's empty
if Len(tableCmdQueue.q) == 0 then
\* swap Qeueue if test pass
tableCmdQueue.empty := TRUE ||
tableCmdQueue.q := <<>>;
end if;
end if;
\* leave: release CMD_QUEUE's lock
ReleaseLock_IDLE:
tableQueueLock := 0;
end if;
LOOP_END:
skip;
end while;
end process;
end algorithm; *)