structured concurrency recap
비동기 코드를 이해하기 쉬운 형태로 구조화하기 좋다.
모든 작업이 structured인 건 아니다.
가능하면 structured Task를 사용하자.
// bad
func makeSoup(order: Order) async throws -> Soup {
let boilingPot = Task { try await stove.boilBroth() }
let choppedIngredients = Task { try await chopIngredients(order.ingredients) }
let meat = Task { await marinate(meat: .chicken) }
let soup = await Soup(meat: meat.value, ingredients: choppedIngredients.value)
return await stove.cook(pot: boilingPot.value, soup: soup, duration: .minutes(10))
}
// good
func makeSoup(order: Order) async throws -> Soup {
async let pot = stove.boilBroth()
async let choppedIngredients = chopIngredients(order.ingredients)
async let meat = marinate(meat: .chicken)
let soup = try await Soup(meat: meat, ingredients: choppedIngredients)
return try await stove.cook(pot: pot, soup: soup, duration: .minutes(10))
}
동시 실행 작업수를 특정할 수 있으면 async let을, 특정할 수 없으면 taskgroup을 사용한다.
func chopIngredients(_ ingredients: [any Ingredient]) async -> [any ChoppedIngredient] {
return await withTaskGroup(of: (ChoppedIngredient?).self,
returning: [any ChoppedIngredient].self) { group in
// Concurrently chop ingredients
for ingredient in ingredients {
group.addTask { await chop(ingredient) }
}
// Collect chopped vegetables
var choppedIngredients: [any ChoppedIngredient] = []
for await choppedIngredient in group {
if choppedIngredient != nil {
choppedIngredients.append(choppedIngredient!)
}
}
return choppedIngredients
}
}
Task hierarchy
Task Cancellation
Unstructured는 명시적으로 취소를 해야만 취소가 된다.
task.cancel()
structured 부모가 취소되면 자식도 알아서 취소가 된다.
다만 취소는 cooperative 형태기 때문에 플래그를 설정해놓기만 하고, 자식이 이를 확인하고 대응해야 한다.
func makeSoup(order: Order) async throws -> Soup {
async let pot = stove.boilBroth()
guard !Task.isCancelled else { // bool 값 확인 후 흐름 끊기
throw SoupCancellationError()
}
async let choppedIngredients = chopIngredients(order.ingredients)
async let meat = marinate(meat: .chicken)
let soup = try await Soup(meat: meat, ingredients: choppedIngredients)
return try await stove.cook(pot: pot, soup: soup, duration: .minutes(10))
}
func chopIngredients(_ ingredients: [any Ingredient]) async throws -> [any ChoppedIngredient] {
return try await withThrowingTaskGroup(of: (ChoppedIngredient?).self,
returning: [any ChoppedIngredient].self) { group in
try Task.checkCancellation() // 취소시 에러 던지기
// Concurrently chop ingredients
for ingredient in ingredients {
group.addTask { await chop(ingredient) }
}
// Collect chopped vegetables
var choppedIngredients: [any ChoppedIngredient] = []
for try await choppedIngredient in group {
if let choppedIngredient {
choppedIngredients.append(choppedIngredient)
}
}
return choppedIngredients
}
}
이벤트 기반 취소 검사
withTaskCancellationHandler(operation: onCancel:)
특히 asyncsequence를 구현할 때 유용하다.
actor Cook {
func handleShift<Orders>(orders: Orders) async throws
where Orders: AsyncSequence,
Orders.Element == Order {
for try await order in orders {
let soup = try await makeSoup(order)
// ...
}
}
}
그래서 cancellationHandler를 사용해서 취소를 감지하고, for await-loop를 강제로 탈줄하게 해줘야 한다.
public func next() async -> Order? {
return await withTaskCancellationHandler {
let result = await kitchen.generateOrder()
guard state.isRunning else {
return nil
}
return result
} onCancel: {
state.cancel()
}
}
근데 onCancel은 동기적으로 돌고, 실제 작업은 비동기라서 여기서 변경 가능한 state를 공유하게 되면 문제가 될 수있다.
Swift-Atomic을 예시에서는 썼지만, lock이나 dispatchQueue도 쓸 수 있다.
private final class OrderState: Sendable {
let protectedIsRunning = ManagedAtomic<Bool>(true)
var isRunning: Bool {
get { protectedIsRunning.load(ordering: .acquiring) }
set { protectedIsRunning.store(newValue, ordering: .relaxed) }
}
func cancel() { isRunning = false }
}
Task Priority
자식 작업은 부모 작업의 priority를 이어받는다.
priority가 높은 작업이 await을 하게 되면, 그 자식 작업들은 모두 부모의 높은 priority로 승격된다.
동시성 런타임은 우선순위 큐기 때문에 priority가 높은 작업을 우선적으로 수행한다.
Task group patterns
taskgroup으로 너무 많은 작업을 실행하면 다른 작업을 실행하기가 어렵게 된다.
그래서 최대 작업 갯수를 제한해주는 것도 좋다.(예시는 최대 3개)
func chopIngredients(_ ingredients: [any Ingredient]) async -> [any ChoppedIngredient] {
return await withTaskGroup(of: (ChoppedIngredient?).self,
returning: [any ChoppedIngredient].self) { group in
// Concurrently chop ingredients
let maxChopTasks = min(3, ingredients.count)
for ingredientIndex in 0..<maxChopTasks {
group.addTask { await chop(ingredients[ingredientIndex]) }
}
// Collect chopped vegetables
var choppedIngredients: [any ChoppedIngredient] = []
var nextIngredientIndex = maxChopTasks
for await choppedIngredient in group {
if nextIngredientIndex < ingredients.count {
group.addTask { await chop(ingredients[nextIngredientIndex]) }
nextIngredientIndex += 1
}
if let choppedIngredient {
choppedIngredients.append(choppedIngredient)
}
}
return choppedIngredients
}
}
이를 패턴화 할 수 있다.
withTaskGroup(of: Something.self) { group in
for _ in 0..<maxConcurrentTasks {
group.addTask { }
}
while let <partial result> = await group.next() {
if !shouldStop {
group.addTask { }
}
}
}
withDiscardingTaskGroup
taskGroup으로 여러 작업을 돌리지만 작업의 결과는 필요하지 않을 수 있다.
func run() async throws {
try await withThrowingTaskGroup(of: Void.self) { group in
for cook in staff.keys {
group.addTask { try await cook.handleShift() }
}
group.addTask {
// keep the restaurant going until closing time
try await Task.sleep(for: shiftDuration)
}
try await group.next()
// cancel all ongoing shifts
group.cancelAll()
}
}
Swift 5.9에 추가된 withDiscardingTaskGroup을 쓰면, 각 작업이 쓰는 리소스가 작업이 끝나는 즉시 해제된다.
func run() async throws {
try await withThrowingDiscardingTaskGroup { group in
for cook in staff.keys {
group.addTask { try await cook.handleShift() }
}
group.addTask { // keep the restaurant going until closing time
try await Task.sleep(for: shiftDuration)
throw TimeToCloseError()
}
}
}
Task-local values
특정 작업(정확히는 작업 계층)과 연관된 데이터 조각
전역적이지만, 여기에 바인딩 된 값은 특정 작업 계층에서만 보인다.
TaskLocal은 static 프로퍼티에 @TaskLocal 프로퍼티 래퍼를 붙여서 선언한다.
tasklocal은 직접 값을 할당할 수는 없고, 반드시 특정 스코프 안에서만 쓸 수 있다.
actor Kitchen {
@TaskLocal static var orderID: Int?
@TaskLocal static var cook: String?
func logStatus() {
print("Current cook: \\(Kitchen.cook ?? "none")")
}
}
let kitchen = Kitchen()
await kitchen.logStatus()
await Kitchen.$cook.withValue("Sakura") {
await kitchen.logStatus() // 이 안에서만 cook 값이 Sakura
}
await kitchen.logStatus()
각 작업은 taskLocal을 저장해놓을 수 있는 연관 저장소인 TaskLocalStorage를 가진다.
자식은 해당 값에 접근할 때 자신에게 값이 없으면 계층을 타고 올라가면서 해당 값이 있는지를 찾는다.
Swift 런타임은 이를 최적화해서 계층을 올라가지 않고 해당 값이 있는 곳의 포인터를 가지고 있어 바로 거기로 간다.
ex. 서버에서의 로깅
단계를 추적하기 위해서, 로그에 여러 정보를 넣는다.
근데 이 부분은 장황하고 반복적이여서 실수하기도 쉽다.
func makeSoup(order: Order) async throws -> Soup {
log.debug("Preparing dinner", [
"cook": "\\(self.name)",
"order-id": "\\(order.id)",
"vegetable": "\\(vegetable)",
])
// ...
}
func chopVegetables(order: Order) async throws -> [Vegetable] {
log.debug("Chopping ingredients", [
"cook": "\\(self.name)",
"order-id": "\\(order.id)",
"vegetable": "\\(vegetable)",
])
async let choppedCarrot = try chop(.carrot)
async let choppedPotato = try chop(.potato)
return try await [choppedCarrot, choppedPotato]
}
func chop(_ vegetable: Vegetable, order: Order) async throws -> Vegetable {
log.debug("Chopping vegetable", [
"cook": "\\(self.name)",
"order-id": "\\(order)", // order.id 여야 한다!
"vegetable": "\\(vegetable)",
])
// ...
}
Apple 디바이스면 OSLog를 직접 쓰겠지만, 이미 돌아가는 앱을 클라우드에서 실행하는 경우에는 다른 솔루션이 필요하다.
MetadataProvider API가 Swift-Log 1.5 버전에 추가되었다.
관련된 이벤트를 로깅할 때 필요한 데이터를 일관적으로 실어보낼 수 있게 해준다.
dictonary 같은 구조를 가진다.
let orderMetadataProvider = Logger.MetadataProvider {
var metadata: Logger.Metadata = [:]
if let orderID = Kitchen.orderID {
metadata["orderID"] = "\\(orderID)"
}
return metadata
}
라이브러리마다 각자 Metadata를 가질 수 있기 때문에 이를 합쳐주고 이를 기반으로 로깅 시스템을 초기화한다.
let orderMetadataProvider = Logger.MetadataProvider {
var metadata: Logger.Metadata = [:]
if let orderID = Kitchen.orderID { // orderID가 TaskLocal
metadata["orderID"] = "\\(orderID)"
}
return metadata
}
let chefMetadataProvider = Logger.MetadataProvider {
var metadata: Logger.Metadata = [:]
if let chef = Kitchen.chef { // chef가 TaskLocal
metadata["chef"] = "\\(chef)"
}
return metadata
}
let metadataProvider = Logger.MetadataProvider.multiplex([orderMetadataProvider,
chefMetadataProvider])
LoggingSystem.bootstrap(StreamLogHandler.standardOutput, metadataProvider: metadataProvider)
let logger = Logger(label: "KitchenService")
로그를 남길 때마다 메타데이터를 자동으로 함께 넣어준다.
func makeSoup(order: Order) async throws -> Soup {
logger.info("Preparing soup order")
async let pot = stove.boilBroth()
async let choppedIngredients = chopIngredients(order.ingredients)
async let meat = marinate(meat: .chicken)
let soup = try await Soup(meat: meat, ingredients: choppedIngredients)
return try await stove.cook(pot: pot, soup: soup, duration: .minutes(10))
}
자식 작업 뿐 아니라 Task{ } 로 만들어도 taskLocal 값을 이어받는다. Task.detached {} 만 이어받지 않는다.
Task traces
서버용으로 쓰이는 성능 측정 도구
단일 작업 계층에서 작업 트리의 장점을 다중 시스템에서도 활용해서 성능 특정과 작업간의 관계를 파악할 수 있게 해준다.
OpenTelemetry 프로토콜 구현체를 가지고 있기 때문에 Zipkin이나 Jaeger 등의 기존 솔루션과도 호환이 된다.
목표는 Xcode instrument에서의 회색지대인 “응답 대기중” 영역을 없애고 서버에서 어떤 일이 일어나는지를 명확하게 알 수 있게 해주는 것이다.
로컬에서의 tracing은 함수 단위로 했지만 여기서는 span이라는 단위로 묶어줘야 한다.
func makeSoup(order: Order) async throws -> Soup {
try await withSpan("makeSoup(\\(order.id)") { span in
async let pot = stove.boilWater()
async let choppedIngredients = chopIngredients(order.ingredients)
async let meat = marinate(meat: .chicken)
let soup = try await Soup(meat: meat, ingredients: choppedIngredients)
return try await stove.cook(pot: pot, soup: soup, duration: .minutes(10))
}
}
추가 attribute 넣어주기
func makeSoup(order: Order) async throws -> Soup {
try await withSpan(#function) { span in
span.attributes["kitchen.order.id"] = order.id
async let pot = stove.boilWater()
async let choppedIngredients = chopIngredients(order.ingredients)
async let meat = marinate(meat: .chicken)
let soup = try await Soup(meat: meat, ingredients: choppedIngredients)
return try await stove.cook(pot: pot, soup: soup, duration: .minutes(10))
}
}
실패하면 실패 정보도 남는다.
분산 시스템에서도 코드 수정 없이 돌아간다.