2022.04.24 - [언어/C#] - CLR 스레딩3 - 태스크(Task)
CLR 스레딩3 - 태스크(Task)
2022.04.17 - [언어/C#] - CLR 스레드 단순 계산 작업 태스크 앞선 글에서 계산 중심의 비동기 작업을 주행하기 위해 ThreadPool 의 QueueUserWorkItem을 호출하는 방식을 소개헀다. 그러나 이 방법은 작업 완료
tsyang.tistory.com
Parallel
예전에 책에서 보고 이런 거 까지 쓰겠어? 하고 넘어갔다. 근데 꽤 유용하다.
Parallel은 자체적으로 좋은 로드밸런서, 스케줄러, 파티셔너를 포함하고 있으며 예외 처리도 수월하게 해준다.
Parallel엔 Invoke, For, Foreach 메서드가 존재하는데 얘네는 전부 동기 메서드이다. 즉, 따로 await를 할 필요가 없어 흐름 제어에 용이하다.
Async버전의 For, Foreach가 있지만(유니티에서 못씀 아직) 일단 근본 메서드 3개부터 알아보자..
Invoke
public void Run()
{
Console.WriteLine($"Main Thread {System.Threading.Thread.CurrentThread.ManagedThreadId}");
Parallel.Invoke(Method1, Method2, Method3);
}
void Method1()
{
Console.WriteLine($"METHOD 1 by {System.Threading.Thread.CurrentThread.ManagedThreadId}");
}
void Method2()
{
Console.WriteLine($"METHOD 2 by {System.Threading.Thread.CurrentThread.ManagedThreadId}");
}
void Method3()
{
Console.WriteLine($"METHOD 3 by {System.Threading.Thread.CurrentThread.ManagedThreadId}");
}

Invoke는 간단하고 쉽다. 인자로 메서드들을 넘겨주면 그만이다. Parallel을 호출한 스레드에서도 작업을 수행하고 있음을 확인 할 수 있다.
For
void Work(int idx)
{
Thread.Sleep(100);
Console.WriteLine($"{idx}work by {Thread.CurrentThread.ManagedThreadId}");
}
for (int i = 0; i < 10; ++i)
Work(i);
위와 같은 For문을 Parallel로 적용하면 아래와 같다.
Parallel.For(0, 10, Work);

순차적으로 실행되고 있지는 않다.
Foreach
var items = Enumerable.Range(0, 10);
foreach (var item in items)
Work(item);
위 코드는 아래와 같이 Parallel로 쓸 수 있다.
Parallel.ForEach(items, item => Work(item));

Exception
Parallel 내부에서 발생한 익셉션은 편리하게 외부에서 AggregateException으로 받을 수 있다. Parallel 내부에서 Exception이 발생하면, 추가적으로 Task를 더 수행하지 않는다. (수행하던건 그냥 한다.)
public void Run()
{
Console.WriteLine($"Main Thread {System.Threading.Thread.CurrentThread.ManagedThreadId}");
try
{
var po = new ParallelOptions();
po.MaxDegreeOfParallelism = 4; //동시에 4개까지만 수행한다.
Parallel.ForEach(Enumerable.Range(0, 100), po, Work);
}
catch (AggregateException e)
{
foreach(var ex in e.InnerExceptions)
Console.WriteLine(ex);
}
}
void Work(int idx)
{
Thread.Sleep(100);
Console.WriteLine($"{idx}work by {Thread.CurrentThread.ManagedThreadId}");
if(idx % 5 == 0) throw new Exception($"error {idx}");
}
ParallelOption이 뭔지는 아래서 더 알아보고, 일단 동시에 Task를 4개까지만 돌린다고 보자. 0부터 100까지 Work를 실행한다. 5의 배수이면 익셉션이 발생한다.

AggregateException에는 2개의 익셉션이 잡혔다. Work는 0,5가 실행된 이후에도 6,4가 추가로 수행되었음을 볼 수 있다. (즉 이미 실행중이던 Task는 계속 수행한다.)
ParallelOptions
var cts = new CancellationTokenSource();
var po = new ParallelOptions
{
CancellationToken = cts.Token, //작업 취소 (기본 CancellationToken.None)
MaxDegreeOfParallelism = 4, //동시에 수행될 작업의 최대 갯수 (기본 : -1, 즉 최대사용)
TaskScheduler = TaskScheduler.Default, //전역 스레드 풀 사용 (기본 TaskScheduler.Default)
};
Parallel.ForEach(Enumerable.Range(0, 100), po, Work);
아까 본 옵션으로 돌아와보자, ParallelOptions를 넘겨줌으로써
- CancellationToken : 작업 취소 가능 (기본은 CancellationToken.None)
- MaxDegreeOfParallelism : 동시 수행할 작업의 최대 갯수 (-1이면 가용 CPU의 개수)
- TaskScheduler : 태스크 스케줄러, 기본은 전역 스레드 풀을 사용하는 것이며, FromCurrentSynchronizationContext 등으로 Context를 유지하는 등의 기능을 수행할 수 있음.
localInit, localFinally
var po = new ParallelOptions
{
MaxDegreeOfParallelism = 2,
};
Parallel.ForEach(
source: Enumerable.Range(0, 100),
parallelOptions: po,
localInit : () => new List<int>(), // "태스크" 당 1개
body : (i, loopState, list) =>
{
list.Add(Thread.CurrentThread.ManagedThreadId);
if(i == 50) throw new Exception("custom error");
Thread.Sleep(100);
return list;
},
localFinally: list => //도중 exception이 나도 1번 실행된다.
{
StringBuilder sb = new StringBuilder();
sb.AppendJoin(", ", list);
Console.WriteLine(sb);
}
);
localInit은 태스크당 1개 생성되는 결과값이다. 제네릭으로 아무 값이나 가능하다. localFinally는 도중 exception이 발생해도 항상 실행되는 try-catch-finally의 finally같은 녀석이다.
localInit은 태스크당 1개이지만 다른 스레드가 이어받지는 않는다.
그럼 이게 스레드당 1개라는 말 아니냐? 할 수 있는데 스레드당 여러번 만들 수 있다. 가령 위의 코드는 MaxDegreeOffParallelism이 2이다. 즉 최대 2개의 태스크만 수행한다.

코드의 결과는 위와같은데 6번 스레드가 총 5개의 컨테이너를 생성한 것을 확인할 수 있다.
즉, 스레드끼리 localInit에서 생성된 값을 공유하지는 않지만, 한 스레드가 여러개는 만들 수 있다. 라는 뜻.
ParallelLoopState

말 그대로 루프의 상태이다. 여기서는 Stop, Break메서드와 LowestBreakIteration이 주요하다.
우선 Stop은 말 그대로 Stop을 의미한다. CancellationToken과 다른 점은 토큰은 익셉션으로 처리되지만 Stop은 정상 종료라는 점이다.
그럼 Break는 뭘까? Break역시 루프를 중단시키지만, Break이전에 호출한 인덱스들의 실행을 보장한다. 즉 20번째에서 Break를 호출했다면 반드시 20번까지는 실행이 된다는 뜻이다.
그럼 LowestBreakIteration은 무엇인가? 동시에 여러 쓰레드에서 Break()를 호출할 수 있다. 이 때 Break를 호출한 가장 낮은 인덱스가 여기에 기록된다.
가령 어떤 리스트가 있는데 가장 앞쪽에 위치한 10의 배수를 찾고싶다고 가정하자.
var po = new ParallelOptions
{
MaxDegreeOfParallelism = 10,
};
var ret = Parallel.ForEach(
source: Enumerable.Range(1, 100),
parallelOptions: po,
localInit : () => new List<int>(),
body : (i, loopState, list) =>
{
if (i % 10 == 0)
{
Console.WriteLine($"Break Iteration {i}");
loopState.Break();
}
list.Add(i);
Thread.Sleep(100);
return list;
},
localFinally: list => //도중 exception이 나도 1번 실행된다.
{
StringBuilder sb = new StringBuilder();
sb.AppendJoin(", ", list);
Console.WriteLine(sb);
}
);
Console.WriteLine($"LowestBreakIteration {ret.LowestBreakIteration}");

숫자가 10, 20일때 각각 Break가 호출되었다. LowestBreakIteration은 9이다. (리스트가 1부터 시작이라 9번째는 10이다)
Parallel.Foreach의 리턴값은 ParallelLoopResult인데 여기서 LowestBreakIteration값을 확인할 수 있다.
참고 : 제프리 리처, CLR via C# 4판 , 비제이퍼블릭, 27장. 계산 중심의 비동기 작업 [829~834p]
https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel?view=net-9.0
'언어 > C#' 카테고리의 다른 글
| C#의 메모리 정렬 (StructLayout) (0) | 2025.04.17 |
|---|---|
| 성능 측정 - DotnetBenchmark 퀵세팅 (0) | 2025.03.11 |
| ThreadLocal<T>, AsyncLocal<T> (1) | 2024.12.20 |
| ValueTask / ValueTask<T> (0) | 2024.05.02 |
| Task.Delay (vs Thread.Sleep) (2) | 2024.04.10 |