< Summary

Information
Class: CounterpointCollective.Collections.CoalescingQueueDispatchExtensions
Assembly: CounterpointCollective.CoalescingQueue
File(s): /builds/counterpointcollective/prestoprimitives/CoalescingQueue/Collections/CoalescingQueueDispatchExtensions.cs
Line coverage
4%
Covered lines: 2
Uncovered lines: 39
Coverable lines: 41
Total lines: 98
Line coverage: 4.8%
Branch coverage
N/A
Covered branches: 0
Total branches: 0
Branch coverage: N/A
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
UseSynchronousDispatch(...)100%11100%
UseAsynchronousDispatch(...)100%210%
UseInlineFirstThenAsyncDispatch(...)100%210%

File(s)

/builds/counterpointcollective/prestoprimitives/CoalescingQueue/Collections/CoalescingQueueDispatchExtensions.cs

#LineLine coverage
 1using CounterpointCollective.Threading;
 2
 3namespace CounterpointCollective.Collections
 4{
 5    public static class CoalescingQueueDispatchExtensions
 6    {
 7        /// <summary>
 8        /// Configures the coalescing queue to process completed nodes synchronously on the calling thread.
 9        /// </summary>
 10        /// <remarks>
 11        /// This dispatch mode is useful when:
 12        /// <list type="bullet">
 13        /// <item>Node processing is lightweight and you want to avoid the overhead of scheduling work on the thread poo
 14        /// <item>You want to minimize latency for processing completed nodes.</item>
 15        /// </list>
 16        /// If a callback contains <c>await</c> expressions, the processing executes synchronously up to the first yield
 17        /// after that, its continuation and the dispatching of any other nodes that complete while processing is ongoin
 18        /// will be scheduled on the thread pool.
 19        /// </remarks>
 20        public static void UseSynchronousDispatch(this ICoalescingQueue queue)
 21        {
 222            queue.OnNodeCompleted += queue.StartProcessPending;
 223        }
 24
 25        /// <summary>
 26        /// Configures the coalescing queue to process completed nodes asynchronously on the thread pool.
 27        /// </summary>
 28        /// <remarks>
 29        /// When using this dispatch mode:
 30        /// <list type="bullet">
 31        /// <item>Node callbacks are never executed on the thread that completes the node.</item>
 32        /// <item>All processing is scheduled on the thread pool, allowing the calling thread to continue immediately.</
 33        /// <item>This mode is useful when node processing may be long-running or when you want to avoid blocking the pr
 34        /// </list>
 35        /// </remarks>
 36        public static void UseAsynchronousDispatch(this ICoalescingQueue queue)
 37        {
 038            var sem = new AsyncAutoResetEventSlim(false);
 39
 040            queue.OnNodeCompleted += () => sem.Set();
 041            queue.Completion.ContinueWith(_ => sem.Terminate());
 42
 043            Task.Run(async () =>
 044            {
 045                while (true)
 046                {
 047                    await Task.WhenAny(sem.WaitOneAsync());
 048                    if (sem.Terminated)
 049                    {
 050                        break;
 051                    }
 052                    await queue.ProcessPendingAsync();
 053                }
 054            });
 055        }
 56
 57        /// <summary>
 58        /// Configures the coalescing queue to process the first completed node synchronously on the calling thread,
 59        /// and then process any remaining completed nodes asynchronously on the thread pool.
 60        /// </summary>
 61        /// <remarks>
 62        /// This dispatch mode is useful when:
 63        /// <list type="bullet">
 64        /// <item>You want the first completed node to be handled immediately to reduce latency.</item>
 65        /// <item>You want to avoid long-running synchronous processing blocking the caller when multiple nodes complete
 66        /// </list>
 67        /// If the callback for the first node contains <c>await</c> expressions, its synchronous execution proceeds up 
 68        /// after that, its continuation will happen on the thread pool.
 69        /// </remarks>
 70        public static void UseInlineFirstThenAsyncDispatch(this ICoalescingQueue queue)
 71        {
 072            var sem = new AsyncAutoResetEventSlim(false);
 73
 074            queue.OnNodeCompleted += async () =>
 075            {
 076                var more = await queue.ProcessPendingAsync(1);
 077                if (more)
 078                {
 079                    sem.Set();
 080                }
 081            };
 082            queue.Completion.ContinueWith(_ => sem.Terminate());
 83
 084            Task.Run(async () =>
 085            {
 086                while (true)
 087                {
 088                    await Task.WhenAny(sem.WaitOneAsync());
 089                    if (sem.Terminated)
 090                    {
 091                        break;
 092                    }
 093                    await queue.ProcessPendingAsync();
 094                }
 095            });
 096        }
 97    }
 98}