diff --git a/rts/src/main/java/eta/runtime/concurrent/Concurrent.java b/rts/src/main/java/eta/runtime/concurrent/Concurrent.java index 7481eeba..3f3d69d9 100644 --- a/rts/src/main/java/eta/runtime/concurrent/Concurrent.java +++ b/rts/src/main/java/eta/runtime/concurrent/Concurrent.java @@ -75,6 +75,7 @@ public static Closure takeMVar(StgContext context, MVar mvar) { cap.blockedLoop(); val = mvar.tryTake(); } while (val == null); + cap.lastBlockCounter = 0; } finally { tso.whyBlocked = NotBlocked; tso.blockInfo = null; @@ -101,6 +102,7 @@ public static Closure readMVar(StgContext context, MVar mvar) { cap.blockedLoop(); val = mvar.tryRead(); } while (val == null); + cap.lastBlockCounter = 0; } finally { tso.whyBlocked = NotBlocked; tso.blockInfo = null; @@ -127,6 +129,7 @@ public static void putMVar(StgContext context, MVar mvar, Closure val) { cap.blockedLoop(); success = mvar.tryPut(val); } while (!success); + cap.lastBlockCounter = 0; } finally { tso.blockInfo = null; tso.whyBlocked = NotBlocked; @@ -180,7 +183,7 @@ public static void yield(StgContext context) { TSO tso = context.currentTSO; tso.whyBlocked = BlockedOnYield; tso.blockInfo = null; - cap.blockedLoop(); + cap.blockedLoop(Runtime.getMaxTSOBlockTimeNanos()); } /* In Eta, all the threads are bound, so this always returns true. */ @@ -288,6 +291,7 @@ public static Closure threadWaitFuture(StgContext context, Future future) { } cap.blockedLoop(); } while (!future.isDone()); + cap.lastBlockCounter = 0; Object exception = null; Object result = null; if (tso.blockInfo != null) { @@ -357,7 +361,7 @@ public static void threadWaitIO(StgContext context, Channel channel, int ops) { tso.whyBlocked = blocked; tso.blockInfo = selectKey; do { - cap.blockedLoop(); + cap.blockedLoop(Runtime.getMaxTSOBlockTimeNanos()); } while (selectKey.isValid()); } catch (ClosedChannelException e) { throw new RuntimeException("threadWaitIO: ClosedChannelException", e); diff --git a/rts/src/main/java/eta/runtime/exception/Exception.java b/rts/src/main/java/eta/runtime/exception/Exception.java index d85a8160..100bcee9 100644 --- a/rts/src/main/java/eta/runtime/exception/Exception.java +++ b/rts/src/main/java/eta/runtime/exception/Exception.java @@ -115,6 +115,7 @@ public static void killThread(StgContext context, TSO target, Closure exception) do { cap.blockedLoop(); } while (msg.isValid()); + cap.lastBlockCounter = 0; } } } diff --git a/rts/src/main/java/eta/runtime/stg/Capability.java b/rts/src/main/java/eta/runtime/stg/Capability.java index 06d93994..755a6506 100644 --- a/rts/src/main/java/eta/runtime/stg/Capability.java +++ b/rts/src/main/java/eta/runtime/stg/Capability.java @@ -12,6 +12,7 @@ import java.util.concurrent.locks.LockSupport; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ThreadLocalRandom; import java.lang.ref.WeakReference; @@ -101,6 +102,8 @@ public static void setNumCapabilities(int n) { public Deque runQueue = new LinkedList(); public int lastWorkSize; public long lastBlockCheck; + public int lastBlockCounter = 0; + private ThreadLocalRandom tlr = ThreadLocalRandom.current(); public Deque inbox = new ConcurrentLinkedDeque(); /* MemoryManager related stuff */ @@ -163,8 +166,9 @@ public final Closure schedule(TSO tso) throws java.lang.Exception { } do { - blockedLoop(Runtime.getMinWorkerCapabilityIdleTimeNanos()); + blockedLoop(); } while (blockedCapabilities.contains(this)); + lastBlockCounter = 0; continue; } } @@ -595,7 +599,8 @@ public final void detectSTMDeadlock(WhyBlocked whyBlocked, Object blockInfo) { /* Blocked Loop */ public final void blockedLoop() { - blockedLoop(Runtime.getMaxTSOBlockTimeNanos()); + lastBlockCounter = lastBlockCounter % 10 + 1; /* should be plenty */ + blockedLoop(Runtime.getMaxTSOBlockTimeNanos() * tlr.nextInt(0, 1 << lastBlockCounter)); } public final void blockedLoop(long nanos) { diff --git a/rts/src/main/java/eta/runtime/stm/STM.java b/rts/src/main/java/eta/runtime/stm/STM.java index d96216fd..764e8891 100644 --- a/rts/src/main/java/eta/runtime/stm/STM.java +++ b/rts/src/main/java/eta/runtime/stm/STM.java @@ -139,6 +139,7 @@ public static Closure atomically(StgContext context, Closure code) { cap.blockedLoop(); valid = trec.reWait(tso); } while (valid); + cap.lastBlockCounter = 0; } /* If the transaction is invalid, retry. */ trec = TransactionRecord.start(null); diff --git a/rts/src/main/java/eta/runtime/thunk/Thunk.java b/rts/src/main/java/eta/runtime/thunk/Thunk.java index 8ae412d6..c3cab284 100644 --- a/rts/src/main/java/eta/runtime/thunk/Thunk.java +++ b/rts/src/main/java/eta/runtime/thunk/Thunk.java @@ -110,7 +110,7 @@ public final void handleBlackHole(StgContext context) { tso.whyBlocked = BlockedOnBlackHole; tso.blockInfo = this; } - cap.blockedLoop(); + cap.blockedLoop(Runtime.getMaxTSOBlockTimeNanos()); } }