< Summary

Information
Class: CounterpointCollective.Collections.CoalescingQueue<T>
Assembly: CounterpointCollective.CoalescingQueue
File(s): /builds/counterpointcollective/prestoprimitives/CoalescingQueue/Collections/CoalescingQueue.cs
Line coverage
89%
Covered lines: 134
Uncovered lines: 16
Coverable lines: 150
Total lines: 512
Line coverage: 89.3%
Branch coverage
80%
Covered branches: 58
Total branches: 72
Branch coverage: 80.5%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_Lock()100%11100%
.ctor()83.33%66100%
get_Completion()100%11100%
.ctor(...)100%11100%
get_Disposer()100%11100%
get_CoalesceConsecutiveItems()100%11100%
get_IsAsync()100%11100%
Dispatch(...)50%22100%
DispatchAsync(...)100%11100%
Enqueue()100%11100%
EnqueueHeapSafe()100%210%
OnNodeCompleted()100%22100%
get_ValueProvider()100%11100%
get_AsyncDispatcher()100%11100%
set_AsyncDispatcher(...)75%4475%
get_Dispatcher()100%11100%
set_Dispatcher(...)75%4475%
Dispose(...)100%22100%
CreateHandle(...)100%11100%
CreateHandle(...)100%11100%
CreateHandle(...)100%11100%
Enqueue(...)87.5%8893.33%
get_FirstNodeIsCompleted()100%22100%
TryDequeueCompleted(...)100%44100%
ProcessPendingAsync()77.27%242282.85%
StartProcessPending()50%3250%
RunAsync()100%210%
Complete()100%11100%
CompleteIfPossible()100%66100%
AwaitQueueProcessed()62.5%8889.47%

File(s)

/builds/counterpointcollective/prestoprimitives/CoalescingQueue/Collections/CoalescingQueue.cs

#LineLine coverage
 1using System;
 2using System.Diagnostics;
 3using System.Runtime.CompilerServices;
 4
 5namespace CounterpointCollective.Collections
 6{
 7    public sealed class CoalescingQueue<TContainer> : ICoalescingQueue where TContainer: struct
 8    {
 9        private volatile bool completionRequested;
 10
 249888011        private object Lock { get; } = new();
 912        private readonly StructQueue<QueueEntry<TContainer>> _queue = new();
 13        private StructQueue<QueueEntry<TContainer>>.Handle? lastInQueue;
 914        private List<Exception> _exceptionsByDispatchers = [];
 15        private volatile int isProcessingPending;
 16#pragma warning disable CA1003 // Use generic event handler instances
 17        public event Action? OnNodeCompleted;
 18#pragma warning restore CA1003 // Use generic event handler instances
 919        private readonly TaskCompletionSource tcsCompletion = new();
 20
 21
 22
 323        public Task Completion => tcsCompletion.Task;
 24
 25
 26        private readonly ICoalescingQueueHandle<TContainer, Action?> _queueProcessedSentinelNodeHandle;
 27
 28
 929        public CoalescingQueue()
 30        {
 931            _queueProcessedSentinelNodeHandle =
 932                CreateHandle(
 933                    false,
 234                    rawValueProvider : (ref QueueEntry<TContainer> v) => ref v.Callback,
 135                    static (in e) => e.GetValue()?.Invoke()
 936                );
 937        }
 38
 39        internal interface IHandle
 40        {
 41            public bool IsAsync { get; }
 42            public void Dispatch(in StructQueue<QueueEntry<TContainer>>.Handle h);
 43            public Task DispatchAsync(in StructQueue<QueueEntry<TContainer>>.Handle h);
 44            void OnNodeCompleted();
 45
 46            public bool CoalesceConsecutiveItems { get; }
 47
 48            void Dispose(in StructQueue<QueueEntry<TContainer>>.Handle p);
 49        }
 50
 1951        private sealed class Handle<TValue>(
 1952            CoalescingQueue<TContainer> outer,
 1953            bool coalesceConsecutiveItems,
 1954            RawValueProvider<TContainer, TValue> valueProvider
 1955        )
 56            : IHandle, ICoalescingQueueHandle<TContainer, TValue>
 57        {
 58
 219985859            public InAction<TContainer>? Disposer { get; set; }
 60
 9920461            public bool CoalesceConsecutiveItems => coalesceConsecutiveItems;
 62
 119983163            public bool IsAsync => AsyncDispatcher != null;
 64
 65            public void Dispatch(in StructQueue<QueueEntry<TContainer>>.Handle h)
 119983066                => Dispatcher?.Invoke(new(h, ValueProvider));
 67
 68            public Task DispatchAsync(in StructQueue<QueueEntry<TContainer>>.Handle h)
 169                => AsyncDispatcher!.Invoke(new(h, ValueProvider));
 70
 71            public CoalescingQueueNode<TContainer, TValue> Enqueue()
 119983472                => outer.Enqueue(this).Open();
 73
 74            public HeapSafeCoalescingQueueNode<TContainer, TValue> EnqueueHeapSafe()
 075                => outer.Enqueue(this);
 76
 77            public void OnNodeCompleted()
 119983178                => outer.OnNodeCompleted?.Invoke();
 79
 239966580            public RawValueProvider<TContainer, TValue> ValueProvider => valueProvider;
 81
 82            public Func<CompletedCoalescingQueueNode<TContainer, TValue>, Task>? AsyncDispatcher
 83            {
 119985084                get;
 85                internal set
 86                {
 187                    if (value != null && Dispatcher != null)
 88                    {
 089                        Dispatcher = null;
 90                    }
 191                    field = value;
 192                }
 93            }
 94
 95            public InAction<CompletedCoalescingQueueNode<TContainer, TValue>>? Dispatcher
 96            {
 119983197                get;
 98                internal set
 99                {
 18100                    if (value != null && AsyncDispatcher != null)
 101                    {
 0102                        AsyncDispatcher = null;
 103                    }
 18104                    field = value;
 18105                }
 106            }
 107
 108            public void Dispose(in StructQueue<QueueEntry<TContainer>>.Handle h)
 109            {
 1199831110                if (Disposer != null)
 111                {
 1000008112                    Disposer(in h.GetRef().Container);
 113                }
 1199831114                h.Dispose();
 1199831115            }
 116        }
 117
 118        public ICoalescingQueueHandle<TContainer, TValue> CreateHandle<TValue>(
 119            bool coalesceConsecutiveItems,
 120            ValueProvider<TContainer, TValue> valueProvider,
 121            InAction<CompletedCoalescingQueueNode<TContainer, TValue>> dispatcher,
 122            InAction<TContainer>? disposer = null
 123        )
 9124        => CreateHandle(coalesceConsecutiveItems, valueProvider.ToRaw(), dispatcher, disposer);
 125
 126        private Handle<TValue> CreateHandle<TValue>(
 127            bool coalesceConsecutiveItems,
 128            RawValueProvider<TContainer, TValue> rawValueProvider,
 129            InAction<CompletedCoalescingQueueNode<TContainer, TValue>> dispatcher,
 130            InAction<TContainer>? disposer = null
 131        )
 18132        => new(this, coalesceConsecutiveItems, rawValueProvider)
 18133        {
 18134            Dispatcher = dispatcher,
 18135            Disposer = disposer
 18136        };
 137
 138        public ICoalescingQueueHandle<TContainer, TValue> CreateHandle<TValue>(
 139            bool coalesceConsecutiveItems,
 140            ValueProvider<TContainer, TValue> valueProvider,
 141            Func<CompletedCoalescingQueueNode<TContainer, TValue>, Task> asyncDispatcher,
 142            InAction<TContainer>? disposer = null
 143        )
 144        {
 1145            var h = new Handle<TValue>(this, coalesceConsecutiveItems, valueProvider.ToRaw())
 1146            {
 1147                AsyncDispatcher = asyncDispatcher,
 1148                Disposer = disposer
 1149            };
 1150            return h;
 151        }
 152
 153        private HeapSafeCoalescingQueueNode<TContainer, TValue> Enqueue<TValue>(Handle<TValue> handle)
 154        {
 1199834155            lock (Lock)
 156            {
 1199834157                if (completionRequested)
 158                {
 0159                    throw new InvalidOperationException("Cannot enqueue items after Complete has been called.");
 160                }
 1199834161                else if (lastInQueue.HasValue && handle.CoalesceConsecutiveItems)
 162                {
 2163                    ref var entry = ref lastInQueue.Value.GetRef();
 2164                    if (entry.Handle == handle)
 165                    {
 166                        //Reuse last node in the queue. It was enqueued via the same handle and coalescing is enabled.
 2167                        entry.Claim();
 2168                        return new(lastInQueue.Value, handle.ValueProvider);
 169                    }
 170                }
 171
 172                {
 173                    //Add a new last node.
 1199832174                    _queue.Enqueue(out var h);
 1199832175                    ref var entry = ref h.GetRef();
 1199832176                    entry.Handle = handle;
 1199832177                    entry.Claim();
 1199832178                    lastInQueue = h;
 1199832179                    return new(lastInQueue.Value, handle.ValueProvider);
 180                }
 181            }
 1199834182        }
 183
 184        private bool FirstNodeIsCompleted
 185        {
 186            get
 187            {
 188                Debug.Assert(Monitor.IsEntered(Lock));
 1299032189                return _queue.TryPeek(out var n) && n.GetRef().IsCompleted;
 190            }
 191        }
 192
 193        private enum DequeueCompletedResult
 194        {
 195            None,
 196            Dequeued,
 197            DequeuedAndTheresMore
 198        }
 199
 200        private DequeueCompletedResult TryDequeueCompleted(out StructQueue<QueueEntry<TContainer>>.Handle node)
 201        {
 1299032202            lock (Lock)
 203            {
 1299032204                if (FirstNodeIsCompleted)
 205                {
 1199831206                    _queue.Dequeue(out node);
 1199831207                    if (_queue.Count == 0)
 208                    {
 1100629209                        lastInQueue = null;
 1100629210                        return DequeueCompletedResult.Dequeued;
 211                    } else
 212                    {
 99202213                        return DequeueCompletedResult.DequeuedAndTheresMore;
 214                    }
 215                }
 216                else
 217                {
 99201218                    node = default;
 99201219                    return DequeueCompletedResult.None;
 220                }
 221            }
 1299032222        }
 223
 224        public async ValueTask<bool> ProcessPendingAsync(int maxToDispatch = -1)
 225        {
 1199830226            if (Interlocked.Increment(ref isProcessingPending) > 1)
 0227                return false; // someone else is already processing
 228
 1199830229            int dispatched = 0;
 1199830230            var maximumWasHit = false;
 231
 232            DequeueCompletedResult result;
 233
 234            do
 235            {
 1299032236                while ((result = TryDequeueCompleted(out var p)) != DequeueCompletedResult.None)
 237                {
 1199831238                    ref var entry = ref p.GetRef();
 1199831239                    var handle = entry.Handle;
 1199831240                    if (entry.MarkedForDispatch)
 241                    {
 1199831242                        maximumWasHit = maxToDispatch > 0 && ++dispatched == maxToDispatch;
 243                        try
 244                        {
 1199831245                            if (handle.IsAsync)
 246                            {
 1247                                await handle.DispatchAsync(p);
 248                            }
 249                            else
 250                            {
 1199830251                                handle.Dispatch(p);
 252                            }
 1199830253                        }
 1254                        catch (Exception ex)
 255                        {
 1256                            lock (Lock)
 257                            {
 1258                                _exceptionsByDispatchers.Add(ex);
 1259                                completionRequested = true;
 1260                            }
 1261                        }
 262                    }
 263
 1199831264                    handle.Dispose(in p);
 265
 1199831266                    if (result != DequeueCompletedResult.DequeuedAndTheresMore)
 267                    {
 268                        break;
 269                    }
 99202270                }
 1199830271            } while (!maximumWasHit && Interlocked.Decrement(ref isProcessingPending) > 0);
 272
 273            //^ we consciously leave our isProcessPendingToken to not leave the critical section yet if maximumWasHit.
 274
 1199830275            if (maximumWasHit)
 276            {
 277                //And then we drain *all* processes pending atomically if maximumWasHit.
 0278                Interlocked.Exchange(ref isProcessingPending, 0);
 279                //Only now a next loop is allowed to enter. We will do a final check.
 0280                lock (Lock)
 281                {
 0282                    if (FirstNodeIsCompleted)
 283                    {
 0284                        return true;
 285                    }
 0286                }
 287            }
 288
 289            //We can peak without the lock, because a concurrent call to Complete() will also try to complete and we alr
 1199830290            if (completionRequested)
 291            {
 1292                lock (Lock)
 293                {
 1294                    CompleteIfPossible();
 1295                }
 296            }
 1199830297            return false;
 1199830298        }
 299
 300        public void StartProcessPending()
 301        {
 1199822302            var vt = ProcessPendingAsync();
 2399644303            if (vt.IsCompletedSuccessfully) return;
 304
 0305            _ = RunAsync(vt);
 306
 307            static async Task RunAsync(ValueTask<bool> vt)
 308            {
 0309                try { await vt; } catch { }
 0310            }
 0311        }
 312
 313        public void Complete()
 314        {
 2315            lock (Lock)
 316            {
 2317                completionRequested = true;
 2318                CompleteIfPossible();
 2319            }
 2320        }
 321
 322        private void CompleteIfPossible()
 323        {
 3324            if (lastInQueue == null && isProcessingPending == 0)
 325            {
 3326                if (_exceptionsByDispatchers.Count > 0)
 327                {
 1328                    tcsCompletion.TrySetException(new AggregateException(_exceptionsByDispatchers));
 1329                    _exceptionsByDispatchers.Clear();
 330                }
 331                else
 332                {
 2333                    tcsCompletion.TrySetResult();
 334                }
 335            }
 2336        }
 337
 338        public async ValueTask AwaitQueueProcessed()
 339        {
 1340            Task t = null!;
 1341            CoalescingQueueNode<TContainer, Action?> queueProcessedSentinelNode = default;
 1342            var enqueuedSentinel = false;
 1343            lock (Lock)
 344            {
 1345                if (_queue.Count == 0 && isProcessingPending == 0)
 346                {
 347                    //Happy path, return immediately, without allocation.
 0348                    return;
 349                }
 1350                else if (completionRequested)
 351                {
 0352                    t = Completion;
 353                }
 354                else
 355                {
 356                    //There are pending items. Allocate a TaskCompletionSource
 357                    //and enqueue a QueueProcessedSentinel that completes it
 358                    //once dispatched.
 1359                    queueProcessedSentinelNode = _queueProcessedSentinelNodeHandle.Enqueue();
 1360                    enqueuedSentinel = true;
 361                }
 1362            }
 363
 1364            if (enqueuedSentinel)
 365            {
 1366                var tcs = new TaskCompletionSource();
 1367                queueProcessedSentinelNode.MarkForDispatch();
 1368                queueProcessedSentinelNode.Value = tcs.SetResult;
 1369                queueProcessedSentinelNode.Dispose();
 1370                t = tcs.Task;
 371            }
 1372            await t;
 1373        }
 374    }
 375
 376    /// <summary>
 377    /// A specialized coalescing queue that stores and dispatches instances of reference types.
 378    /// This concrete implementation is intended for scenarios where all values handled by the queue are reference types
 379    /// For higher efficiency it is usually preferable to use
 380    /// the generic <see cref="CoalescingQueue{TContainer}"/> directly with a struct container type, as this avoids heap
 381    /// allocations and boxing.
 382    /// </summary>
 383    public sealed class CoalescingQueue : ICoalescingQueue
 384    {
 385        private CoalescingQueue<ObjectContainer> _inner = new();
 386
 387        public Task Completion => _inner.Completion;
 388
 389        public void Complete() => _inner.Complete();
 390
 391        public void StartProcessPending() => _inner.StartProcessPending();
 392
 393        public ValueTask<bool> ProcessPendingAsync(int maxToDispatch = -1) => _inner.ProcessPendingAsync(maxToDispatch);
 394
 395        public ValueTask AwaitQueueProcessed() => _inner.AwaitQueueProcessed();
 396
 397
 398        public event Action? OnNodeCompleted
 399        {
 400            add { _inner.OnNodeCompleted += value; }
 401            remove { _inner.OnNodeCompleted -= value; }
 402        }
 403
 404        public struct ObjectContainer
 405        {
 406            public object? Data;
 407        }
 408
 409        public ICoalescingQueueHandle<ObjectContainer, TValue> CreateHandle<TValue>(
 410            bool coalesceConsecutiveItems,
 411            Action<TValue> dispatcher
 412        ) where TValue : class, new()
 413            => CreateHandle(
 414                coalesceConsecutiveItems,
 415                dispatcher,
 416                new NoopPool<TValue>()
 417            );
 418
 419        public ICoalescingQueueHandle<ObjectContainer, TValue> CreateHandle<TValue>(
 420            bool coalesceConsecutiveItems,
 421            Action<TValue> dispatcher,
 422            IPool<TValue> pool
 423        ) where TValue : class =>
 424            _inner.CreateHandle(
 425                coalesceConsecutiveItems,
 426                GetValueProvider(pool),
 427                (in e) => dispatcher(e.GetValue()),
 428                ReturnIfNotNull(pool)
 429            );
 430
 431        public ICoalescingQueueHandle<ObjectContainer, TValue> CreateHandle<TValue>(
 432            bool coalesceConsecutiveItems,
 433            Func<TValue, Task> asyncDispatcher
 434        ) where TValue : class, new()
 435            => CreateHandle(
 436                coalesceConsecutiveItems,
 437                asyncDispatcher,
 438                new NoopPool<TValue>()
 439            );
 440
 441        public ICoalescingQueueHandle<ObjectContainer, TValue> CreateHandle<TValue>(
 442            bool coalesceConsecutiveItems,
 443            Func<TValue, Task> asyncDispatcher,
 444            IPool<TValue> pool
 445            ) where TValue : class =>
 446            _inner.CreateHandle(
 447                coalesceConsecutiveItems,
 448                GetValueProvider(pool),
 449                e => asyncDispatcher(e.GetValue()),
 450                ReturnIfNotNull(pool)
 451            );
 452
 453        private static InAction<ObjectContainer> ReturnIfNotNull<TValue>(IPool<TValue> pool) where TValue : class
 454        => (in v) =>
 455        {
 456            if (v.Data != null)
 457            {
 458                pool.Return(v.Data);
 459            }
 460        };
 461
 462        private static ValueProvider<ObjectContainer, TValue> GetValueProvider<TValue>(IPool<TValue> pool) where TValue 
 463        => (ref wrapper) =>
 464        {
 465            var v = Volatile.Read(ref wrapper.Data);
 466            if (v == null)
 467            {
 468                //Get a new instance from the valuePool to use as initial value.
 469                v = pool.Get();
 470                var v1 = Interlocked.CompareExchange(ref wrapper.Data, v, null);
 471                if (v1 != null)
 472                {
 473                    //If another thread already set value, we return the claimed instance.
 474                    pool.Return(v);
 475                    v = v1;
 476                }
 477            }
 478            return ref Unsafe.As<object, TValue>(ref wrapper.Data!);
 479        };
 480    }
 481
 482    internal struct QueueEntry<TContainer> where TContainer : struct
 483    {
 484        //Pointers first to keep the struct size compact. The compiler will align pointers on 64-bit intervals.
 485        public CoalescingQueue<TContainer>.IHandle Handle { get; set; }
 486        public Action? Callback;
 487
 488        //TContainer may contain a pointer, so we put it next.
 489        public TContainer Container;
 490
 491        //Now the definitely non-pointer fields.
 492        private int pending;
 493        public bool MarkedForDispatch { get; set; }
 494
 495        public bool IsCompleted => Volatile.Read(ref pending) == 0;
 496
 497        public void Claim() => Interlocked.Increment(ref pending);
 498
 499        public int UnRef() => Interlocked.Decrement(ref pending);
 500    }
 501
 502
 503    internal delegate ref TValue RawValueProvider<TContainer, TValue>(ref QueueEntry<TContainer> value) where TContainer
 504
 505    public delegate ref TValue ValueProvider<TContainer, TValue>(ref TContainer container);
 506
 507    public static class ValueProviderExtensions
 508    {
 509        internal static RawValueProvider<TContainer, TValue> ToRaw<TContainer, TValue>(this ValueProvider<TContainer, TV
 510        => (ref v) => ref vp(ref v.Container);
 511    }
 512}

Methods/Properties

get_Lock()
.ctor()
get_Completion()
.ctor(CounterpointCollective.Collections.CoalescingQueue`1<TContainer>,System.Boolean,CounterpointCollective.Collections.RawValueProvider`2<TContainer,TValue>)
get_Disposer()
get_CoalesceConsecutiveItems()
get_IsAsync()
Dispatch(CounterpointCollective.Collections.StructQueue`1/Handle<CounterpointCollective.Collections.QueueEntry`1<TContainer>>&)
DispatchAsync(CounterpointCollective.Collections.StructQueue`1/Handle<CounterpointCollective.Collections.QueueEntry`1<TContainer>>&)
Enqueue()
EnqueueHeapSafe()
OnNodeCompleted()
get_ValueProvider()
get_AsyncDispatcher()
set_AsyncDispatcher(System.Func`2<CounterpointCollective.Collections.CompletedCoalescingQueueNode`2<TContainer,TValue>,System.Threading.Tasks.Task>)
get_Dispatcher()
set_Dispatcher(CounterpointCollective.Collections.InAction`1<CounterpointCollective.Collections.CompletedCoalescingQueueNode`2<TContainer,TValue>>)
Dispose(CounterpointCollective.Collections.StructQueue`1/Handle<CounterpointCollective.Collections.QueueEntry`1<TContainer>>&)
CreateHandle(System.Boolean,CounterpointCollective.Collections.ValueProvider`2<TContainer,TValue>,CounterpointCollective.Collections.InAction`1<CounterpointCollective.Collections.CompletedCoalescingQueueNode`2<TContainer,TValue>>,CounterpointCollective.Collections.InAction`1<TContainer>)
CreateHandle(System.Boolean,CounterpointCollective.Collections.RawValueProvider`2<TContainer,TValue>,CounterpointCollective.Collections.InAction`1<CounterpointCollective.Collections.CompletedCoalescingQueueNode`2<TContainer,TValue>>,CounterpointCollective.Collections.InAction`1<TContainer>)
CreateHandle(System.Boolean,CounterpointCollective.Collections.ValueProvider`2<TContainer,TValue>,System.Func`2<CounterpointCollective.Collections.CompletedCoalescingQueueNode`2<TContainer,TValue>,System.Threading.Tasks.Task>,CounterpointCollective.Collections.InAction`1<TContainer>)
Enqueue(CounterpointCollective.Collections.CoalescingQueue`1/Handle`1<TContainer,TValue>)
get_FirstNodeIsCompleted()
TryDequeueCompleted(CounterpointCollective.Collections.StructQueue`1/Handle<CounterpointCollective.Collections.QueueEntry`1<TContainer>>&)
ProcessPendingAsync()
StartProcessPending()
RunAsync()
Complete()
CompleteIfPossible()
AwaitQueueProcessed()