< Summary

Information
Class: CounterpointCollective.Collections.CoalescingQueue
Assembly: CounterpointCollective.CoalescingQueue
File(s): /builds/counterpointcollective/prestoprimitives/CoalescingQueue/Collections/CoalescingQueue.cs
Line coverage
85%
Covered lines: 46
Uncovered lines: 8
Coverable lines: 54
Total lines: 512
Line coverage: 85.1%
Branch coverage
83%
Covered branches: 5
Total branches: 6
Branch coverage: 83.3%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor()100%11100%
get_Completion()100%11100%
Complete()100%11100%
StartProcessPending()100%11100%
ProcessPendingAsync(...)100%11100%
AwaitQueueProcessed()100%11100%
add_OnNodeCompleted(...)100%11100%
remove_OnNodeCompleted(...)100%210%
CreateHandle(...)100%11100%
CreateHandle(...)100%11100%
CreateHandle(...)100%210%
CreateHandle(...)100%11100%
ReturnIfNotNull(...)100%22100%
GetValueProvider(...)75%4488.23%

File(s)

/builds/counterpointcollective/prestoprimitives/CoalescingQueue/Collections/CoalescingQueue.cs

#LineLine coverage
 1using System;
 2using System.Diagnostics;
 3using System.Runtime.CompilerServices;
 4
 5namespace CounterpointCollective.Collections
 6{
 7    public sealed class CoalescingQueue<TContainer> : ICoalescingQueue where TContainer: struct
 8    {
 9        private volatile bool completionRequested;
 10
 11        private object Lock { get; } = new();
 12        private readonly StructQueue<QueueEntry<TContainer>> _queue = new();
 13        private StructQueue<QueueEntry<TContainer>>.Handle? lastInQueue;
 14        private List<Exception> _exceptionsByDispatchers = [];
 15        private volatile int isProcessingPending;
 16#pragma warning disable CA1003 // Use generic event handler instances
 17        public event Action? OnNodeCompleted;
 18#pragma warning restore CA1003 // Use generic event handler instances
 19        private readonly TaskCompletionSource tcsCompletion = new();
 20
 21
 22
 23        public Task Completion => tcsCompletion.Task;
 24
 25
 26        private readonly ICoalescingQueueHandle<TContainer, Action?> _queueProcessedSentinelNodeHandle;
 27
 28
 29        public CoalescingQueue()
 30        {
 31            _queueProcessedSentinelNodeHandle =
 32                CreateHandle(
 33                    false,
 34                    rawValueProvider : (ref QueueEntry<TContainer> v) => ref v.Callback,
 35                    static (in e) => e.GetValue()?.Invoke()
 36                );
 37        }
 38
 39        internal interface IHandle
 40        {
 41            public bool IsAsync { get; }
 42            public void Dispatch(in StructQueue<QueueEntry<TContainer>>.Handle h);
 43            public Task DispatchAsync(in StructQueue<QueueEntry<TContainer>>.Handle h);
 44            void OnNodeCompleted();
 45
 46            public bool CoalesceConsecutiveItems { get; }
 47
 48            void Dispose(in StructQueue<QueueEntry<TContainer>>.Handle p);
 49        }
 50
 51        private sealed class Handle<TValue>(
 52            CoalescingQueue<TContainer> outer,
 53            bool coalesceConsecutiveItems,
 54            RawValueProvider<TContainer, TValue> valueProvider
 55        )
 56            : IHandle, ICoalescingQueueHandle<TContainer, TValue>
 57        {
 58
 59            public InAction<TContainer>? Disposer { get; set; }
 60
 61            public bool CoalesceConsecutiveItems => coalesceConsecutiveItems;
 62
 63            public bool IsAsync => AsyncDispatcher != null;
 64
 65            public void Dispatch(in StructQueue<QueueEntry<TContainer>>.Handle h)
 66                => Dispatcher?.Invoke(new(h, ValueProvider));
 67
 68            public Task DispatchAsync(in StructQueue<QueueEntry<TContainer>>.Handle h)
 69                => AsyncDispatcher!.Invoke(new(h, ValueProvider));
 70
 71            public CoalescingQueueNode<TContainer, TValue> Enqueue()
 72                => outer.Enqueue(this).Open();
 73
 74            public HeapSafeCoalescingQueueNode<TContainer, TValue> EnqueueHeapSafe()
 75                => outer.Enqueue(this);
 76
 77            public void OnNodeCompleted()
 78                => outer.OnNodeCompleted?.Invoke();
 79
 80            public RawValueProvider<TContainer, TValue> ValueProvider => valueProvider;
 81
 82            public Func<CompletedCoalescingQueueNode<TContainer, TValue>, Task>? AsyncDispatcher
 83            {
 84                get;
 85                internal set
 86                {
 87                    if (value != null && Dispatcher != null)
 88                    {
 89                        Dispatcher = null;
 90                    }
 91                    field = value;
 92                }
 93            }
 94
 95            public InAction<CompletedCoalescingQueueNode<TContainer, TValue>>? Dispatcher
 96            {
 97                get;
 98                internal set
 99                {
 100                    if (value != null && AsyncDispatcher != null)
 101                    {
 102                        AsyncDispatcher = null;
 103                    }
 104                    field = value;
 105                }
 106            }
 107
 108            public void Dispose(in StructQueue<QueueEntry<TContainer>>.Handle h)
 109            {
 110                if (Disposer != null)
 111                {
 112                    Disposer(in h.GetRef().Container);
 113                }
 114                h.Dispose();
 115            }
 116        }
 117
 118        public ICoalescingQueueHandle<TContainer, TValue> CreateHandle<TValue>(
 119            bool coalesceConsecutiveItems,
 120            ValueProvider<TContainer, TValue> valueProvider,
 121            InAction<CompletedCoalescingQueueNode<TContainer, TValue>> dispatcher,
 122            InAction<TContainer>? disposer = null
 123        )
 124        => CreateHandle(coalesceConsecutiveItems, valueProvider.ToRaw(), dispatcher, disposer);
 125
 126        private Handle<TValue> CreateHandle<TValue>(
 127            bool coalesceConsecutiveItems,
 128            RawValueProvider<TContainer, TValue> rawValueProvider,
 129            InAction<CompletedCoalescingQueueNode<TContainer, TValue>> dispatcher,
 130            InAction<TContainer>? disposer = null
 131        )
 132        => new(this, coalesceConsecutiveItems, rawValueProvider)
 133        {
 134            Dispatcher = dispatcher,
 135            Disposer = disposer
 136        };
 137
 138        public ICoalescingQueueHandle<TContainer, TValue> CreateHandle<TValue>(
 139            bool coalesceConsecutiveItems,
 140            ValueProvider<TContainer, TValue> valueProvider,
 141            Func<CompletedCoalescingQueueNode<TContainer, TValue>, Task> asyncDispatcher,
 142            InAction<TContainer>? disposer = null
 143        )
 144        {
 145            var h = new Handle<TValue>(this, coalesceConsecutiveItems, valueProvider.ToRaw())
 146            {
 147                AsyncDispatcher = asyncDispatcher,
 148                Disposer = disposer
 149            };
 150            return h;
 151        }
 152
 153        private HeapSafeCoalescingQueueNode<TContainer, TValue> Enqueue<TValue>(Handle<TValue> handle)
 154        {
 155            lock (Lock)
 156            {
 157                if (completionRequested)
 158                {
 159                    throw new InvalidOperationException("Cannot enqueue items after Complete has been called.");
 160                }
 161                else if (lastInQueue.HasValue && handle.CoalesceConsecutiveItems)
 162                {
 163                    ref var entry = ref lastInQueue.Value.GetRef();
 164                    if (entry.Handle == handle)
 165                    {
 166                        //Reuse last node in the queue. It was enqueued via the same handle and coalescing is enabled.
 167                        entry.Claim();
 168                        return new(lastInQueue.Value, handle.ValueProvider);
 169                    }
 170                }
 171
 172                {
 173                    //Add a new last node.
 174                    _queue.Enqueue(out var h);
 175                    ref var entry = ref h.GetRef();
 176                    entry.Handle = handle;
 177                    entry.Claim();
 178                    lastInQueue = h;
 179                    return new(lastInQueue.Value, handle.ValueProvider);
 180                }
 181            }
 182        }
 183
 184        private bool FirstNodeIsCompleted
 185        {
 186            get
 187            {
 188                Debug.Assert(Monitor.IsEntered(Lock));
 189                return _queue.TryPeek(out var n) && n.GetRef().IsCompleted;
 190            }
 191        }
 192
 193        private enum DequeueCompletedResult
 194        {
 195            None,
 196            Dequeued,
 197            DequeuedAndTheresMore
 198        }
 199
 200        private DequeueCompletedResult TryDequeueCompleted(out StructQueue<QueueEntry<TContainer>>.Handle node)
 201        {
 202            lock (Lock)
 203            {
 204                if (FirstNodeIsCompleted)
 205                {
 206                    _queue.Dequeue(out node);
 207                    if (_queue.Count == 0)
 208                    {
 209                        lastInQueue = null;
 210                        return DequeueCompletedResult.Dequeued;
 211                    } else
 212                    {
 213                        return DequeueCompletedResult.DequeuedAndTheresMore;
 214                    }
 215                }
 216                else
 217                {
 218                    node = default;
 219                    return DequeueCompletedResult.None;
 220                }
 221            }
 222        }
 223
 224        public async ValueTask<bool> ProcessPendingAsync(int maxToDispatch = -1)
 225        {
 226            if (Interlocked.Increment(ref isProcessingPending) > 1)
 227                return false; // someone else is already processing
 228
 229            int dispatched = 0;
 230            var maximumWasHit = false;
 231
 232            DequeueCompletedResult result;
 233
 234            do
 235            {
 236                while ((result = TryDequeueCompleted(out var p)) != DequeueCompletedResult.None)
 237                {
 238                    ref var entry = ref p.GetRef();
 239                    var handle = entry.Handle;
 240                    if (entry.MarkedForDispatch)
 241                    {
 242                        maximumWasHit = maxToDispatch > 0 && ++dispatched == maxToDispatch;
 243                        try
 244                        {
 245                            if (handle.IsAsync)
 246                            {
 247                                await handle.DispatchAsync(p);
 248                            }
 249                            else
 250                            {
 251                                handle.Dispatch(p);
 252                            }
 253                        }
 254                        catch (Exception ex)
 255                        {
 256                            lock (Lock)
 257                            {
 258                                _exceptionsByDispatchers.Add(ex);
 259                                completionRequested = true;
 260                            }
 261                        }
 262                    }
 263
 264                    handle.Dispose(in p);
 265
 266                    if (result != DequeueCompletedResult.DequeuedAndTheresMore)
 267                    {
 268                        break;
 269                    }
 270                }
 271            } while (!maximumWasHit && Interlocked.Decrement(ref isProcessingPending) > 0);
 272
 273            //^ we consciously leave our isProcessPendingToken to not leave the critical section yet if maximumWasHit.
 274
 275            if (maximumWasHit)
 276            {
 277                //And then we drain *all* processes pending atomically if maximumWasHit.
 278                Interlocked.Exchange(ref isProcessingPending, 0);
 279                //Only now a next loop is allowed to enter. We will do a final check.
 280                lock (Lock)
 281                {
 282                    if (FirstNodeIsCompleted)
 283                    {
 284                        return true;
 285                    }
 286                }
 287            }
 288
 289            //We can peak without the lock, because a concurrent call to Complete() will also try to complete and we alr
 290            if (completionRequested)
 291            {
 292                lock (Lock)
 293                {
 294                    CompleteIfPossible();
 295                }
 296            }
 297            return false;
 298        }
 299
 300        public void StartProcessPending()
 301        {
 302            var vt = ProcessPendingAsync();
 303            if (vt.IsCompletedSuccessfully) return;
 304
 305            _ = RunAsync(vt);
 306
 307            static async Task RunAsync(ValueTask<bool> vt)
 308            {
 309                try { await vt; } catch { }
 310            }
 311        }
 312
 313        public void Complete()
 314        {
 315            lock (Lock)
 316            {
 317                completionRequested = true;
 318                CompleteIfPossible();
 319            }
 320        }
 321
 322        private void CompleteIfPossible()
 323        {
 324            if (lastInQueue == null && isProcessingPending == 0)
 325            {
 326                if (_exceptionsByDispatchers.Count > 0)
 327                {
 328                    tcsCompletion.TrySetException(new AggregateException(_exceptionsByDispatchers));
 329                    _exceptionsByDispatchers.Clear();
 330                }
 331                else
 332                {
 333                    tcsCompletion.TrySetResult();
 334                }
 335            }
 336        }
 337
 338        public async ValueTask AwaitQueueProcessed()
 339        {
 340            Task t = null!;
 341            CoalescingQueueNode<TContainer, Action?> queueProcessedSentinelNode = default;
 342            var enqueuedSentinel = false;
 343            lock (Lock)
 344            {
 345                if (_queue.Count == 0 && isProcessingPending == 0)
 346                {
 347                    //Happy path, return immediately, without allocation.
 348                    return;
 349                }
 350                else if (completionRequested)
 351                {
 352                    t = Completion;
 353                }
 354                else
 355                {
 356                    //There are pending items. Allocate a TaskCompletionSource
 357                    //and enqueue a QueueProcessedSentinel that completes it
 358                    //once dispatched.
 359                    queueProcessedSentinelNode = _queueProcessedSentinelNodeHandle.Enqueue();
 360                    enqueuedSentinel = true;
 361                }
 362            }
 363
 364            if (enqueuedSentinel)
 365            {
 366                var tcs = new TaskCompletionSource();
 367                queueProcessedSentinelNode.MarkForDispatch();
 368                queueProcessedSentinelNode.Value = tcs.SetResult;
 369                queueProcessedSentinelNode.Dispose();
 370                t = tcs.Task;
 371            }
 372            await t;
 373        }
 374    }
 375
 376    /// <summary>
 377    /// A specialized coalescing queue that stores and dispatches instances of reference types.
 378    /// This concrete implementation is intended for scenarios where all values handled by the queue are reference types
 379    /// For higher efficiency it is usually preferable to use
 380    /// the generic <see cref="CoalescingQueue{TContainer}"/> directly with a struct container type, as this avoids heap
 381    /// allocations and boxing.
 382    /// </summary>
 383    public sealed class CoalescingQueue : ICoalescingQueue
 384    {
 8385        private CoalescingQueue<ObjectContainer> _inner = new();
 386
 3387        public Task Completion => _inner.Completion;
 388
 2389        public void Complete() => _inner.Complete();
 390
 1000000391        public void StartProcessPending() => _inner.StartProcessPending();
 392
 8393        public ValueTask<bool> ProcessPendingAsync(int maxToDispatch = -1) => _inner.ProcessPendingAsync(maxToDispatch);
 394
 1395        public ValueTask AwaitQueueProcessed() => _inner.AwaitQueueProcessed();
 396
 397
 398        public event Action? OnNodeCompleted
 399        {
 2400            add { _inner.OnNodeCompleted += value; }
 0401            remove { _inner.OnNodeCompleted -= value; }
 402        }
 403
 404        public struct ObjectContainer
 405        {
 406            public object? Data;
 407        }
 408
 409        public ICoalescingQueueHandle<ObjectContainer, TValue> CreateHandle<TValue>(
 410            bool coalesceConsecutiveItems,
 411            Action<TValue> dispatcher
 412        ) where TValue : class, new()
 1413            => CreateHandle(
 1414                coalesceConsecutiveItems,
 1415                dispatcher,
 1416                new NoopPool<TValue>()
 1417            );
 418
 419        public ICoalescingQueueHandle<ObjectContainer, TValue> CreateHandle<TValue>(
 420            bool coalesceConsecutiveItems,
 421            Action<TValue> dispatcher,
 422            IPool<TValue> pool
 423        ) where TValue : class =>
 7424            _inner.CreateHandle(
 7425                coalesceConsecutiveItems,
 7426                GetValueProvider(pool),
 1000007427                (in e) => dispatcher(e.GetValue()),
 7428                ReturnIfNotNull(pool)
 7429            );
 430
 431        public ICoalescingQueueHandle<ObjectContainer, TValue> CreateHandle<TValue>(
 432            bool coalesceConsecutiveItems,
 433            Func<TValue, Task> asyncDispatcher
 434        ) where TValue : class, new()
 0435            => CreateHandle(
 0436                coalesceConsecutiveItems,
 0437                asyncDispatcher,
 0438                new NoopPool<TValue>()
 0439            );
 440
 441        public ICoalescingQueueHandle<ObjectContainer, TValue> CreateHandle<TValue>(
 442            bool coalesceConsecutiveItems,
 443            Func<TValue, Task> asyncDispatcher,
 444            IPool<TValue> pool
 445            ) where TValue : class =>
 1446            _inner.CreateHandle(
 1447                coalesceConsecutiveItems,
 1448                GetValueProvider(pool),
 1449                e => asyncDispatcher(e.GetValue()),
 1450                ReturnIfNotNull(pool)
 1451            );
 452
 453        private static InAction<ObjectContainer> ReturnIfNotNull<TValue>(IPool<TValue> pool) where TValue : class
 8454        => (in v) =>
 8455        {
 1000008456            if (v.Data != null)
 8457            {
 1000008458                pool.Return(v.Data);
 8459            }
 1000016460        };
 461
 462        private static ValueProvider<ObjectContainer, TValue> GetValueProvider<TValue>(IPool<TValue> pool) where TValue 
 8463        => (ref wrapper) =>
 8464        {
 2000014465            var v = Volatile.Read(ref wrapper.Data);
 2000014466            if (v == null)
 8467            {
 8468                //Get a new instance from the valuePool to use as initial value.
 1000008469                v = pool.Get();
 1000008470                var v1 = Interlocked.CompareExchange(ref wrapper.Data, v, null);
 1000008471                if (v1 != null)
 8472                {
 8473                    //If another thread already set value, we return the claimed instance.
 0474                    pool.Return(v);
 0475                    v = v1;
 8476                }
 8477            }
 2000014478            return ref Unsafe.As<object, TValue>(ref wrapper.Data!);
 8479        };
 480    }
 481
 482    internal struct QueueEntry<TContainer> where TContainer : struct
 483    {
 484        //Pointers first to keep the struct size compact. The compiler will align pointers on 64-bit intervals.
 485        public CoalescingQueue<TContainer>.IHandle Handle { get; set; }
 486        public Action? Callback;
 487
 488        //TContainer may contain a pointer, so we put it next.
 489        public TContainer Container;
 490
 491        //Now the definitely non-pointer fields.
 492        private int pending;
 493        public bool MarkedForDispatch { get; set; }
 494
 495        public bool IsCompleted => Volatile.Read(ref pending) == 0;
 496
 497        public void Claim() => Interlocked.Increment(ref pending);
 498
 499        public int UnRef() => Interlocked.Decrement(ref pending);
 500    }
 501
 502
 503    internal delegate ref TValue RawValueProvider<TContainer, TValue>(ref QueueEntry<TContainer> value) where TContainer
 504
 505    public delegate ref TValue ValueProvider<TContainer, TValue>(ref TContainer container);
 506
 507    public static class ValueProviderExtensions
 508    {
 509        internal static RawValueProvider<TContainer, TValue> ToRaw<TContainer, TValue>(this ValueProvider<TContainer, TV
 510        => (ref v) => ref vp(ref v.Container);
 511    }
 512}