using System;
using System.Collections.Generic;
using System.IO.Ports;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace TaskTest2
{
// Parallel execution can be performed as much as the specified number.
// Job can be canceled.
// You can set the task timeout.
// When a task exception occurs, all workers can be stopped.
internal class Program
{
static readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
public static void Main(string[] args)
{
// Simulation of job cancellation time (external factor)
DateTime endTime = DateTime.Now.Add(new TimeSpan(0, 0, 1000));
Manager manager = new Manager(cancellationTokenSource);
var task = manager.Run(maxDegreeOfParallelism: 10, workerCnt: 10, dummyWorkerIterationCnt: 10, errorPosition: 1);
try
{
// Work cancellation The internal work must be completed within this time.
task = AsyncExtensions.TimeoutAfter(task, TimeSpan.FromSeconds(100), cancellationTokenSource);
while (!task.Wait(50))
{
Console.Write(".");
if (DateTime.Now > endTime)
manager.Stop();
}
}
catch (AggregateException aex)
{
foreach (var ex in aex.InnerExceptions)
{
Console.WriteLine($@"main inner exception {ex.Message}");
}
}
catch (TimeoutException ex)
{
Console.WriteLine($@"main timeout {ex.Message}");
}
catch (Exception ex)
{
Console.WriteLine($@"main exception ex {ex.Message}");
}
Console.WriteLine("main program end");
}
}
internal class Manager
{
CancellationTokenSource cancellationTokenSource;
public Manager(CancellationTokenSource cancellationTokenSource)
{
this.cancellationTokenSource = cancellationTokenSource;
}
public void Stop()
{
cancellationTokenSource.Cancel();
Console.WriteLine("cancel requested");
}
public async Task Run(int maxDegreeOfParallelism, int workerCnt, int dummyWorkerIterationCnt, int? errorPosition)
{
CancellationToken token = cancellationTokenSource.Token;
List<Func<Task>> actionList = new List<Func<Task>>();
for (int i = 0; i < workerCnt; i++)
{
int j = i;
actionList.Add(() => new DummyWorker(token).DoWork(iteration: dummyWorkerIterationCnt, errorPosition: errorPosition));
if (token.IsCancellationRequested)
{
Console.WriteLine($@"run token cancel requested");
break;
}
}
try
{
await AsyncExtensions.InvokeAsync(actionList, maxDegreeOfParallelism: maxDegreeOfParallelism, token);
}
catch (Exception ex)
{
Console.WriteLine($@"run method exception {ex.Message}");
}
Console.WriteLine("end");
}
}
internal class DummyWorker
{
CancellationToken token;
public DummyWorker(CancellationToken token)
{
this.token = token;
}
public Task DoWork(int iteration, int? errorPosition)
{
var task = Task.Run(async () =>
{
for (int i = 0; i < iteration; i++)
{
if (token.IsCancellationRequested)
{
Console.WriteLine($@"{GetType()} cancel requested");
break;
}
if (i == errorPosition)
throw new Exception("simulation exception : fire!");
// working...
await Task.Delay(500);
Console.WriteLine($@"working...iteration {i}");
}
}, token);
return task;
}
}
public static class AsyncExtensions
{
public static async Task<T> TimeoutAfter<T>(this Task<T> task, TimeSpan timeout, CancellationTokenSource cancellationTokenSource = null)
{
if (task == await Task.WhenAny(task, Task.Delay(timeout)))
return await task;
else
{
if (cancellationTokenSource != null)
cancellationTokenSource.Cancel();
throw new TimeoutException();
}
}
public static async Task TimeoutAfter(this Task task, TimeSpan timeout, CancellationTokenSource cancellationTokenSource = null)
{
if (task == await Task.WhenAny(task, Task.Delay(timeout)))
await task;
else
{
if (cancellationTokenSource != null)
cancellationTokenSource.Cancel();
throw new TimeoutException();
}
}
public static async Task InvokeAsync(IEnumerable<Func<Task>> taskFactories, int maxDegreeOfParallelism, CancellationToken token)
{
if (taskFactories == null) throw new ArgumentNullException(nameof(taskFactories));
if (maxDegreeOfParallelism <= 0) throw new ArgumentException(nameof(maxDegreeOfParallelism));
// Defensive copy. Similar to what Task.WhenAll/WhenAny does.
Func<Task>[] queue = taskFactories.ToArray();
if (queue.Length == 0)
{
return;
}
List<Task> tasksInFlight = new List<Task>(maxDegreeOfParallelism);
int index = 0;
do
{
while (tasksInFlight.Count < maxDegreeOfParallelism && index < queue.Length)
{
if (token.IsCancellationRequested)
return;
Func<Task> taskFactory = queue[index++];
tasksInFlight.Add(taskFactory());
}
if (token.IsCancellationRequested)
return;
Task completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);
try
{
await completedTask.ConfigureAwait(false); // Check result.
}
catch
{
throw;
}
tasksInFlight.Remove(completedTask);
}
while (index < queue.Length || tasksInFlight.Count != 0);
}
}
}
mssql transaction 처리
if object_id ('tblx') is not null
drop table tblx, tbly, tblz
go
create table tblx (id int, c int)
go
create table tbly (id int, c int)
go
create table tblz (idx bigint identity(1,1),etime datetime, emessage nvarchar(max), )
go
create unique nonclustered index nc_tbly_01 on tbly (id)
go
if object_id ('usp_a') is null
exec ('create proc usp_a as select 1')
go
alter proc usp_a
(
@id int,
@c int
)
as
set implicit_transactions on ; -- 암시적 트랜잭션
begin try -- try catch 로 transaction 처리 할 부분 전체를 감싼다.
insert into tblx (id, c) values (@id, @c)
insert into tbly (id, c) values (@id, @c)
commit tran ;
end try
begin catch
if @@trancount > 0
begin
rollback tran;
set implicit_transactions off ;
-- 추가적인 로깅을 하던지
DECLARE @ErrorMessage NVARCHAR(max) = ERROR_MESSAGE()
insert into tblz (etime, emessage) values (getdate(), @ErrorMessage)
end
end catch
go
-- 아래
-- 테스트 시작
select @@trancount
select * from tblx
select * from tbly
select * from tblz
go
-- 오류없는 테스트
exec usp_a 1,1
exec usp_a 2,2
go
-- 데이터 확인
select * from tblx
select * from tbly
select * from tblz
go
-- 에러상황 테스트
-- tbly 는 unique index 가 있어서 중복값이 들어가지 못하는 상황이다.
-- rollback 했을때 전체가 rollback 되어야 하므로 tblx 에도 1,1 이 중복해 들어가지 않기를 바란다.
exec usp_a 1,1
select * from tblz
go
-- 데이터 확인
-- tbly 에서 에러가 생겨 데이터가 들어가지 않았다.
select * from tblx
select * from tbly
select * from tblz
go
-- 따로는 데이터가 잘 들어간다.
insert into tblx (id, c) values (1,1)
go
-- 데이터 확인
select * from tblx
select * from tbly
select * from tblz
go
multiThread test
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
namespace Helpers
{
class ThreadedHelper<I,O>
{
public I input { get; set; }
public ThreadedHelper(Func<I,O> doWork, Action<O> callback, Action<I, Exception> errorCallback)
{
this.doWork = doWork;
this.callback = callback;
this.errorCallback = errorCallback;
t = new Thread(Process);
}
public void Start()
{
t.Start();
}
private void Process()
{
try
{
O retun = doWork(input);
callback(retun);
}
catch (Exception ex)
{
errorCallback(input, ex);
t.Abort();
}
}
private Func<I,O> doWork;
private Action<O> callback;
private Action<I, Exception> errorCallback;
private Thread t;
// example
// using System.Threading;
// using Helpers;
//class Program
//{
// class Input
// {
// public int a { get; set; }
// public int b { get; set; }
// }
// class Output
// {
// public Input input { get; set; }
// public int result { get; set; }
// }
// static void Main(string[] args)
// {
// var p = new Program();
// new ThreadedHelper<Input, Output>(p.DoWork, p.Callback, p.ErrorCallback)
// {
// input = new Input { a = 4, b = 0 }
// }.Start();
// for (int i = 0; i < 30; i++)
// {
// Console.WriteLine("wait");
// Thread.Sleep(100);
// }
// Console.WriteLine("end");
// Console.ReadKey();
// }
// Output DoWork(Input input)
// {
// try
// {
// Thread.Sleep(2000);
// int s = input.a / input.b;
// return new Output { input = input, result = s };
// }
// catch (Exception ex)
// {
// throw ex;
// }
// }
// void Callback(Output output)
// {
// Console.WriteLine($@"output : {output.input.a},{output.input.b},{output.result}");
// }
// void ErrorCallback(Input i, Exception ex)
// {
// Console.WriteLine($@"output error : {i.a}, {i.b}, {ex.Message}, {ex.StackTrace}");
// }
//}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
namespace Helpers
{
class ThreadedHelper<I,O>
{
public delegate O DoWork(I inValue);
public delegate void CallBack(O outValue);
public delegate void ErrorCallBack(I input, Exception message);
public I input { get; set; }
public ThreadedHelper(DoWork work, CallBack callback, ErrorCallBack errorCallback)
{
this.work = work;
this.callback = callback;
this.errorCallback = errorCallback;
t = new Thread(Process);
}
public void Start()
{
t.Start();
}
private void Process()
{
try
{
O retun = work(input);
callback(retun);
}
catch (Exception ex)
{
errorCallback(input, ex);
t.Abort();
}
}
private DoWork work;
private CallBack callback;
private ErrorCallBack errorCallback;
private Thread t;
// example
// using System.Threading;
// using Helpers;
//class Program
//{
// class Input
// {
// public int a { get; set; }
// public int b { get; set; }
// }
// class Output
// {
// public Input input { get; set; }
// public int result { get; set; }
// }
// static void Main(string[] args)
// {
// var p = new Program();
// new ThreadedHelper<Input, Output>(p.DoWork, p.Callback, p.ErrorCallback)
// {
// input = new Input { a = 4, b = 0 }
// }.Start();
// for (int i = 0; i < 30; i++)
// {
// Console.WriteLine("wait");
// Thread.Sleep(100);
// }
// Console.WriteLine("end");
// Console.ReadKey();
// }
// Output DoWork(Input input)
// {
// try
// {
// Thread.Sleep(2000);
// int s = input.a / input.b;
// return new Output { input = input, result = s };
// }
// catch (Exception ex)
// {
// throw ex;
// }
// }
// void Callback(Output output)
// {
// Console.WriteLine($@"output : {output.input.a},{output.input.b},{output.result}");
// }
// void ErrorCallback(Input i, Exception ex)
// {
// Console.WriteLine($@"output error : {i.a}, {i.b}, {ex.Message}, {ex.StackTrace}");
// }
//}
}
}