| | | 1 | | using CounterpointCollective.Threading; |
| | | 2 | | |
| | | 3 | | namespace CounterpointCollective.Collections |
| | | 4 | | { |
| | | 5 | | public static class CoalescingQueueDispatchExtensions |
| | | 6 | | { |
| | | 7 | | /// <summary> |
| | | 8 | | /// Configures the coalescing queue to process completed nodes synchronously on the calling thread. |
| | | 9 | | /// </summary> |
| | | 10 | | /// <remarks> |
| | | 11 | | /// This dispatch mode is useful when: |
| | | 12 | | /// <list type="bullet"> |
| | | 13 | | /// <item>Node processing is lightweight and you want to avoid the overhead of scheduling work on the thread poo |
| | | 14 | | /// <item>You want to minimize latency for processing completed nodes.</item> |
| | | 15 | | /// </list> |
| | | 16 | | /// If a callback contains <c>await</c> expressions, the processing executes synchronously up to the first yield |
| | | 17 | | /// after that, its continuation and the dispatching of any other nodes that complete while processing is ongoin |
| | | 18 | | /// will be scheduled on the thread pool. |
| | | 19 | | /// </remarks> |
| | | 20 | | public static void UseSynchronousDispatch(this ICoalescingQueue queue) |
| | | 21 | | { |
| | 2 | 22 | | queue.OnNodeCompleted += queue.StartProcessPending; |
| | 2 | 23 | | } |
| | | 24 | | |
| | | 25 | | /// <summary> |
| | | 26 | | /// Configures the coalescing queue to process completed nodes asynchronously on the thread pool. |
| | | 27 | | /// </summary> |
| | | 28 | | /// <remarks> |
| | | 29 | | /// When using this dispatch mode: |
| | | 30 | | /// <list type="bullet"> |
| | | 31 | | /// <item>Node callbacks are never executed on the thread that completes the node.</item> |
| | | 32 | | /// <item>All processing is scheduled on the thread pool, allowing the calling thread to continue immediately.</ |
| | | 33 | | /// <item>This mode is useful when node processing may be long-running or when you want to avoid blocking the pr |
| | | 34 | | /// </list> |
| | | 35 | | /// </remarks> |
| | | 36 | | public static void UseAsynchronousDispatch(this ICoalescingQueue queue) |
| | | 37 | | { |
| | 0 | 38 | | var sem = new AsyncAutoResetEventSlim(false); |
| | | 39 | | |
| | 0 | 40 | | queue.OnNodeCompleted += () => sem.Set(); |
| | 0 | 41 | | queue.Completion.ContinueWith(_ => sem.Terminate()); |
| | | 42 | | |
| | 0 | 43 | | Task.Run(async () => |
| | 0 | 44 | | { |
| | 0 | 45 | | while (true) |
| | 0 | 46 | | { |
| | 0 | 47 | | await Task.WhenAny(sem.WaitOneAsync()); |
| | 0 | 48 | | if (sem.Terminated) |
| | 0 | 49 | | { |
| | 0 | 50 | | break; |
| | 0 | 51 | | } |
| | 0 | 52 | | await queue.ProcessPendingAsync(); |
| | 0 | 53 | | } |
| | 0 | 54 | | }); |
| | 0 | 55 | | } |
| | | 56 | | |
| | | 57 | | /// <summary> |
| | | 58 | | /// Configures the coalescing queue to process the first completed node synchronously on the calling thread, |
| | | 59 | | /// and then process any remaining completed nodes asynchronously on the thread pool. |
| | | 60 | | /// </summary> |
| | | 61 | | /// <remarks> |
| | | 62 | | /// This dispatch mode is useful when: |
| | | 63 | | /// <list type="bullet"> |
| | | 64 | | /// <item>You want the first completed node to be handled immediately to reduce latency.</item> |
| | | 65 | | /// <item>You want to avoid long-running synchronous processing blocking the caller when multiple nodes complete |
| | | 66 | | /// </list> |
| | | 67 | | /// If the callback for the first node contains <c>await</c> expressions, its synchronous execution proceeds up |
| | | 68 | | /// after that, its continuation will happen on the thread pool. |
| | | 69 | | /// </remarks> |
| | | 70 | | public static void UseInlineFirstThenAsyncDispatch(this ICoalescingQueue queue) |
| | | 71 | | { |
| | 0 | 72 | | var sem = new AsyncAutoResetEventSlim(false); |
| | | 73 | | |
| | 0 | 74 | | queue.OnNodeCompleted += async () => |
| | 0 | 75 | | { |
| | 0 | 76 | | var more = await queue.ProcessPendingAsync(1); |
| | 0 | 77 | | if (more) |
| | 0 | 78 | | { |
| | 0 | 79 | | sem.Set(); |
| | 0 | 80 | | } |
| | 0 | 81 | | }; |
| | 0 | 82 | | queue.Completion.ContinueWith(_ => sem.Terminate()); |
| | | 83 | | |
| | 0 | 84 | | Task.Run(async () => |
| | 0 | 85 | | { |
| | 0 | 86 | | while (true) |
| | 0 | 87 | | { |
| | 0 | 88 | | await Task.WhenAny(sem.WaitOneAsync()); |
| | 0 | 89 | | if (sem.Terminated) |
| | 0 | 90 | | { |
| | 0 | 91 | | break; |
| | 0 | 92 | | } |
| | 0 | 93 | | await queue.ProcessPendingAsync(); |
| | 0 | 94 | | } |
| | 0 | 95 | | }); |
| | 0 | 96 | | } |
| | | 97 | | } |
| | | 98 | | } |