using System;
using System.Collections.Generic;
using System.IO.Ports;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace TaskTest2
{
// Parallel execution can be performed as much as the specified number.
// Job can be canceled.
// You can set the task timeout.
// When a task exception occurs, all workers can be stopped.
internal class Program
{
static readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
public static void Main(string[] args)
{
// Simulation of job cancellation time (external factor)
DateTime endTime = DateTime.Now.Add(new TimeSpan(0, 0, 1000));
Manager manager = new Manager(cancellationTokenSource);
var task = manager.Run(maxDegreeOfParallelism: 10, workerCnt: 10, dummyWorkerIterationCnt: 10, errorPosition: 1);
try
{
// Work cancellation The internal work must be completed within this time.
task = AsyncExtensions.TimeoutAfter(task, TimeSpan.FromSeconds(100), cancellationTokenSource);
while (!task.Wait(50))
{
Console.Write(".");
if (DateTime.Now > endTime)
manager.Stop();
}
}
catch (AggregateException aex)
{
foreach (var ex in aex.InnerExceptions)
{
Console.WriteLine($@"main inner exception {ex.Message}");
}
}
catch (TimeoutException ex)
{
Console.WriteLine($@"main timeout {ex.Message}");
}
catch (Exception ex)
{
Console.WriteLine($@"main exception ex {ex.Message}");
}
Console.WriteLine("main program end");
}
}
internal class Manager
{
CancellationTokenSource cancellationTokenSource;
public Manager(CancellationTokenSource cancellationTokenSource)
{
this.cancellationTokenSource = cancellationTokenSource;
}
public void Stop()
{
cancellationTokenSource.Cancel();
Console.WriteLine("cancel requested");
}
public async Task Run(int maxDegreeOfParallelism, int workerCnt, int dummyWorkerIterationCnt, int? errorPosition)
{
CancellationToken token = cancellationTokenSource.Token;
List<Func<Task>> actionList = new List<Func<Task>>();
for (int i = 0; i < workerCnt; i++)
{
int j = i;
actionList.Add(() => new DummyWorker(token).DoWork(iteration: dummyWorkerIterationCnt, errorPosition: errorPosition));
if (token.IsCancellationRequested)
{
Console.WriteLine($@"run token cancel requested");
break;
}
}
try
{
await AsyncExtensions.InvokeAsync(actionList, maxDegreeOfParallelism: maxDegreeOfParallelism, token);
}
catch (Exception ex)
{
Console.WriteLine($@"run method exception {ex.Message}");
}
Console.WriteLine("end");
}
}
internal class DummyWorker
{
CancellationToken token;
public DummyWorker(CancellationToken token)
{
this.token = token;
}
public Task DoWork(int iteration, int? errorPosition)
{
var task = Task.Run(async () =>
{
for (int i = 0; i < iteration; i++)
{
if (token.IsCancellationRequested)
{
Console.WriteLine($@"{GetType()} cancel requested");
break;
}
if (i == errorPosition)
throw new Exception("simulation exception : fire!");
// working...
await Task.Delay(500);
Console.WriteLine($@"working...iteration {i}");
}
}, token);
return task;
}
}
public static class AsyncExtensions
{
public static async Task<T> TimeoutAfter<T>(this Task<T> task, TimeSpan timeout, CancellationTokenSource cancellationTokenSource = null)
{
if (task == await Task.WhenAny(task, Task.Delay(timeout)))
return await task;
else
{
if (cancellationTokenSource != null)
cancellationTokenSource.Cancel();
throw new TimeoutException();
}
}
public static async Task TimeoutAfter(this Task task, TimeSpan timeout, CancellationTokenSource cancellationTokenSource = null)
{
if (task == await Task.WhenAny(task, Task.Delay(timeout)))
await task;
else
{
if (cancellationTokenSource != null)
cancellationTokenSource.Cancel();
throw new TimeoutException();
}
}
public static async Task InvokeAsync(IEnumerable<Func<Task>> taskFactories, int maxDegreeOfParallelism, CancellationToken token)
{
if (taskFactories == null) throw new ArgumentNullException(nameof(taskFactories));
if (maxDegreeOfParallelism <= 0) throw new ArgumentException(nameof(maxDegreeOfParallelism));
// Defensive copy. Similar to what Task.WhenAll/WhenAny does.
Func<Task>[] queue = taskFactories.ToArray();
if (queue.Length == 0)
{
return;
}
List<Task> tasksInFlight = new List<Task>(maxDegreeOfParallelism);
int index = 0;
do
{
while (tasksInFlight.Count < maxDegreeOfParallelism && index < queue.Length)
{
if (token.IsCancellationRequested)
return;
Func<Task> taskFactory = queue[index++];
tasksInFlight.Add(taskFactory());
}
if (token.IsCancellationRequested)
return;
Task completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);
try
{
await completedTask.ConfigureAwait(false); // Check result.
}
catch
{
throw;
}
tasksInFlight.Remove(completedTask);
}
while (index < queue.Length || tasksInFlight.Count != 0);
}
}
}