블로그 이미지
010-9967-0955 보미아빠

카테고리

보미아빠, 석이 (500)
밥벌이 (16)
싸이클 (1)
일상 (1)
Total
Today
Yesterday

달력

« » 2024.5
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31

공지사항

최근에 올라온 글

using System;
using System.Collections.Generic;
using System.IO.Ports;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace TaskTest2
{
    // Parallel execution can be performed as much as the specified number.
    // Job can be canceled.
    // You can set the task timeout.
    // When a task exception occurs, all workers can be stopped.

    internal class Program
    {
        static readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

        public static void Main(string[] args)
        {

            // Simulation of job cancellation time (external factor)
            DateTime endTime = DateTime.Now.Add(new TimeSpan(0, 0, 1000));

            Manager manager = new Manager(cancellationTokenSource);
            var task = manager.Run(maxDegreeOfParallelism: 10, workerCnt: 10, dummyWorkerIterationCnt: 10, errorPosition: 1);
            try
            {
                // Work cancellation The internal work must be completed within this time.
                task = AsyncExtensions.TimeoutAfter(task, TimeSpan.FromSeconds(100), cancellationTokenSource);
                while (!task.Wait(50))
                {
                    Console.Write(".");
                    if (DateTime.Now > endTime)
                        manager.Stop();
                }
            }
            catch (AggregateException aex)
            {
                foreach (var ex in aex.InnerExceptions)
                {
                    Console.WriteLine($@"main inner exception {ex.Message}");
                }
            }
            catch (TimeoutException ex)
            {
                Console.WriteLine($@"main timeout {ex.Message}");
            }
            catch (Exception ex)
            {
                Console.WriteLine($@"main exception ex {ex.Message}");
            }
            Console.WriteLine("main program end");
        }
    }

    internal class Manager
    {
        CancellationTokenSource cancellationTokenSource;

        public Manager(CancellationTokenSource cancellationTokenSource)
        {
            this.cancellationTokenSource = cancellationTokenSource;
        }

        public void Stop()
        {
            cancellationTokenSource.Cancel();
            Console.WriteLine("cancel requested");
        }

        public async Task Run(int maxDegreeOfParallelism, int workerCnt, int dummyWorkerIterationCnt, int? errorPosition)
        {

            CancellationToken token = cancellationTokenSource.Token;
            List<Func<Task>> actionList = new List<Func<Task>>();

            for (int i = 0; i < workerCnt; i++)
            {
                int j = i;
                actionList.Add(() => new DummyWorker(token).DoWork(iteration: dummyWorkerIterationCnt, errorPosition: errorPosition));
                if (token.IsCancellationRequested)
                {
                    Console.WriteLine($@"run token cancel requested");
                    break;
                }
            }



            try
            {
                await AsyncExtensions.InvokeAsync(actionList, maxDegreeOfParallelism: maxDegreeOfParallelism, token);
            }
            catch (Exception ex)
            {
                Console.WriteLine($@"run method exception {ex.Message}");
            }
            Console.WriteLine("end");

        }
    }

    internal class DummyWorker
    {
        CancellationToken token;

        public DummyWorker(CancellationToken token)
        {
            this.token = token;
        }

        public Task DoWork(int iteration, int? errorPosition)
        {
            var task = Task.Run(async () =>
            {
                for (int i = 0; i < iteration; i++)
                {
                    if (token.IsCancellationRequested)
                    {
                        Console.WriteLine($@"{GetType()} cancel requested");
                        break;
                    }
                    if (i == errorPosition)
                        throw new Exception("simulation exception : fire!");

                    // working...
                    await Task.Delay(500);

                    Console.WriteLine($@"working...iteration {i}");
                }
            }, token);
            return task;
        }
    }

    public static class AsyncExtensions
    {
        public static async Task<T> TimeoutAfter<T>(this Task<T> task, TimeSpan timeout, CancellationTokenSource cancellationTokenSource = null)
        {
            if (task == await Task.WhenAny(task, Task.Delay(timeout)))
                return await task;
            else
            {
                if (cancellationTokenSource != null)
                    cancellationTokenSource.Cancel();

                throw new TimeoutException();
            }
        }
        public static async Task TimeoutAfter(this Task task, TimeSpan timeout, CancellationTokenSource cancellationTokenSource = null)
        {
            if (task == await Task.WhenAny(task, Task.Delay(timeout)))
                await task;
            else
            {
                if (cancellationTokenSource != null)
                    cancellationTokenSource.Cancel();

                throw new TimeoutException();
            }
        }
        public static async Task InvokeAsync(IEnumerable<Func<Task>> taskFactories, int maxDegreeOfParallelism, CancellationToken token)
        {
            if (taskFactories == null) throw new ArgumentNullException(nameof(taskFactories));
            if (maxDegreeOfParallelism <= 0) throw new ArgumentException(nameof(maxDegreeOfParallelism));

            // Defensive copy. Similar to what Task.WhenAll/WhenAny does.
            Func<Task>[] queue = taskFactories.ToArray();

            if (queue.Length == 0)
            {
                return;
            }

            List<Task> tasksInFlight = new List<Task>(maxDegreeOfParallelism);
            int index = 0;

            do
            {
                while (tasksInFlight.Count < maxDegreeOfParallelism && index < queue.Length)
                {
                    if (token.IsCancellationRequested)
                        return;

                    Func<Task> taskFactory = queue[index++];

                    tasksInFlight.Add(taskFactory());
                }

                if (token.IsCancellationRequested)
                    return;

                Task completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);

                try
                {
                    await completedTask.ConfigureAwait(false); // Check result.
                }
                catch
                {
                    throw;
                }

                tasksInFlight.Remove(completedTask);
            }
            while (index < queue.Length || tasksInFlight.Count != 0);
        }
    }
}
Posted by 보미아빠
, |

최근에 달린 댓글

최근에 받은 트랙백

글 보관함