この記事のポイント

  • gatherとcreate_taskの設計上の役割分離をレビュー観点で理解する
  • 並列処理の「起動責任」と「同期責任」を整理できる
  • 実務レビューでレビューアーが確認すべき典型パターンを網羅する

そもそもasyncio.gatherとcreate_taskとは

Pythonのasyncioモジュールでは、複数の非同期タスクを同時に走らせるために以下の2つの手法が提供されています。

  • asyncio.create_task()
    → 非同期タスクの「起動責任」を担う。イベントループ上で即時スケジュール登録する。

  • asyncio.gather()
    → 複数のコルーチンやタスクをまとめて「同期待機」する。結果収集と例外集約も担当する。

混同しやすいが役割が違います。レビューではこれらの責任分離が設計通り実装されているかを確認する必要があります。

なぜこれをレビューするのか

gatherとcreate_taskは、実装者が誤解したまま併用すると以下のような設計事故が頻発します。

  • gatherがスケジューラ代わりに乱用され即時スケジューリングの責任が曖昧化
  • タスク未回収(ゴーストタスク)発生
  • 例外のサイレント消失
  • 意図しないキャンセル伝播

レビューアー視点

レビューアーは以下のように切り分けて確認します。

  • タスク生成(create_task)と待機(gather)の役割が混在していないか
  • gather内部での例外集約方針(fail fast or partial collect)が設計されているか
  • create_task後のタスク管理責任が放置されていないか
  • タスクキャンセル伝播が適切に整理されているか

開発者視点

開発者は「とりあえずgatherしておけば並列になる」という短絡的理解から脱却し、「起動」と「同期」の責務分離を明確に設計する必要があります。


良い実装例

正常設計例:責務分離
import asyncio
import aiohttp

class ApiClient:
    def __init__(self, base_url: str):
        self.base_url = base_url

    async def fetch(self, endpoint: str) -> dict:
        async with aiohttp.ClientSession() as session:
            async with session.get(f"{self.base_url}/{endpoint}") as response:
                response.raise_for_status()
                return await response.json()

async def orchestrator():
    client = ApiClient("https://example.com/api")

    # タスク起動(スケジューラ責任)
    tasks = [
        asyncio.create_task(client.fetch("user/123")),
        asyncio.create_task(client.fetch("user/456")),
        asyncio.create_task(client.fetch("user/789"))
    ]

    # 同期待機(収集責任)
    try:
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for idx, res in enumerate(results):
            if isinstance(res, Exception):
                print(f"Task{idx}失敗: {res}")
            else:
                print(res)
    finally:
        for task in tasks:
            if not task.done():
                task.cancel()

asyncio.run(orchestrator())

良い理由

  • タスク生成と待機責任が明確に分離
  • gatherで例外を個別回収できる設計
  • タスク未完了検査を持ちキャンセル漏れを防止

レビュー観点

  • create_taskはスケジューラとして機能しているか(副作用なし起動)
  • gatherはあくまで同期(収集)処理として機能しているか
  • タスクオブジェクトのキャンセル設計責任が回収側に残されているか
  • 例外回収方針(全部失敗 vs 部分成功)がレビュー対象と合致しているか

良くない実装例: ケース1

問題例: gatherによるスケジューリング混用
import asyncio
import aiohttp

class ApiClient:
    def __init__(self, base_url: str):
        self.base_url = base_url

    async def fetch(self, endpoint: str) -> dict:
        async with aiohttp.ClientSession() as session:
            async with session.get(f"{self.base_url}/{endpoint}") as response:
                return await response.json()

async def main():
    client = ApiClient("https://example.com/api")

    results = await asyncio.gather(
        client.fetch("user/123"),
        client.fetch("user/456"),
        client.fetch("user/789")
    )
@Reviewer
gatherがスケジューラを兼ねており、タスク管理が外部からできなくなっています。create_taskで起動責任を分離し、gatherで収集する構成に整理しましょう。
for res in results: print(res) asyncio.run(main())

問題点

  • gather呼び出し=即スケジューリングされ、タスク管理責任が欠如
  • 中断・キャンセル制御が困難
  • 並行実行の可視性が薄れる

改善例

改善後: create_task起動分離
import asyncio
import aiohttp

async def main():
    client = ApiClient("https://example.com/api")

    tasks = [
        asyncio.create_task(client.fetch("user/123")),
        asyncio.create_task(client.fetch("user/456")),
        asyncio.create_task(client.fetch("user/789"))
    ]

    results = await asyncio.gather(*tasks, return_exceptions=True)
    for res in results:
        print(res)

    for task in tasks:
        if not task.done():
            task.cancel()
設計補足
  • gatherを同期責任専用に使用
  • タスク一覧を保有する設計により個別制御可能性を残す
  • 中断時復旧処理の柔軟性を確保

良くない実装例: ケース2

問題例: create_task放置によるゴーストタスク化
import asyncio
import aiohttp

async def fire_and_forget():
    client = ApiClient("https://example.com/api")
    asyncio.create_task(client.fetch("user/123"))
@Reviewer
create_task後のタスクオブジェクトを保持せず破棄しています。未完了・例外発生時に通知も回収もできなくなります。タスク参照は必ず保持しましょう。

問題点

  • 完了通知を拾えない(例外も未捕捉)
  • 復旧やキャンセルが不可能
  • テスト・監視不可能なゾンビタスク残留

改善例

改善後: タスク保持設計
import asyncio
import aiohttp

active_tasks = set()

async def fire_and_controlled():
    client = ApiClient("https://example.com/api")
    task = asyncio.create_task(client.fetch("user/123"))
    active_tasks.add(task)

    def done_callback(t):
        active_tasks.discard(t)
        if t.exception():
            print(f"例外発生: {t.exception()}")

    task.add_done_callback(done_callback)
設計補足
  • タスク保持管理にsetを利用(重複排除)
  • 完了後にcallbackで例外も収集
  • 非同期起動であっても管理責任を外部化しない

PlantUML設計イメージ

UML Diagram

観点チェックリスト


まとめ

gatherとcreate_taskは「同期責任」と「スケジュール責任」というまったく別の設計レイヤーです。レビューアーは常に責務の境界線を読み取り、スケジューリング管理責任の所在を明示させるレビューを心がける必要があります。責務境界が曖昧になることで、非同期コードの本質的な複雑さは指数的に増大していきます。