この記事のポイント

  • Pythonの非同期処理に潜む設計ミスをレビューアー視点で整理
  • イベントループの責任境界、例外伝搬、キャンセル設計を具体例で理解する
  • 実務レビューで頻出する落とし穴を事前に把握し防止策を身につける

そもそもasync/awaitとは

Pythonのasync/await構文は、非同期処理(Asynchronous I/O)を簡潔に書ける機能です。例えば、API呼び出しやファイルI/Oのように待ち時間が発生する処理を、スレッドを増やさず効率的に並行実行できます。

処理イメージ例
import asyncio

async def fetch_data():
    await asyncio.sleep(1)
    return "data"

async def main():
    result = await fetch_data()
    print(result)

asyncio.run(main())

awaitは「ここで待つ」命令であり、背後でイベントループが次の処理に切り替えてくれます。これによりシンプルに並列性を実現できますが、設計責任の取り扱いを誤ると深刻なバグになります。

なぜこれをレビューするのか

非同期コードのレビューでは「正しく動くか」以前に、以下の設計論点を確認する必要があります。

  • 並行処理の安全性は保たれているか
  • イベントループの責任が適切に分離されているか
  • 例外伝搬と復旧設計が正しく考慮されているか
  • キャンセル可能性が設計に組み込まれているか

レビューアー視点

レビューアーは以下を重点的に読み取ります。

  • async関数の責務が適切に分離されているか
  • 呼び出し側と内部処理側の例外ハンドリングの設計責任はどちらにあるか
  • 並列起動されるタスク間で状態共有が安全か
  • 明示的にキャンセル設計がなされているか

開発者視点

開発者は「とりあえず動く非同期処理」から、「障害耐性がある非同期設計」へステップアップする必要があります。そのためにレビューでの設計整理は重要なフィードバックになります。


良い実装例

正常系非同期処理の設計例
import asyncio
import aiohttp
from typing import List

class ApiClient:
    def __init__(self, base_url: str):
        self.base_url = base_url

    async def fetch(self, endpoint: str) -> dict:
        async with aiohttp.ClientSession() as session:
            async with session.get(f"{self.base_url}/{endpoint}") as response:
                response.raise_for_status()
                return await response.json()

class ApiRequestLog:
    def __init__(self, request_id: str, endpoint: str, client_ip: str, response_code: int, requested_at: str):
        self.request_id = request_id
        self.endpoint = endpoint
        self.client_ip = client_ip
        self.response_code = response_code
        self.requested_at = requested_at

async def fetch_and_log(api_client: ApiClient, endpoint: str, logger) -> ApiRequestLog:
    try:
        result = await api_client.fetch(endpoint)
        log = ApiRequestLog(
            request_id=result["id"],
            endpoint=endpoint,
            client_ip=result["client_ip"],
            response_code=200,
            requested_at=result["timestamp"]
        )
        logger.save(log)
        return log
    except aiohttp.ClientError as e:
        logger.error(f"API呼び出し失敗: {e}")
        raise

async def main():
    client = ApiClient("https://example.com/api")
    logger = SomeLogger()
    tasks = [
        fetch_and_log(client, "user/123", logger),
        fetch_and_log(client, "user/456", logger),
    ]
    results: List[ApiRequestLog] = await asyncio.gather(*tasks, return_exceptions=False)
    print(results)

# 実行
# asyncio.run(main())

良い理由は以下の通りです。

  • API通信の責務とログ記録の責務を分離している
  • 例外伝搬方針を明示している(外側の呼び出し側に委譲)
  • タスク生成と並列起動が明示的で可読性が高い
  • 実行単位がタスク化されスケーラビリティを担保

レビュー観点

レビュー対象で特に確認したいのは次のポイントです。

  • イベントループ管理責任が曖昧化していないか
  • asyncio.gather 等の集合待機で例外を黙殺していないか
  • タイムアウト・キャンセル制御が考慮されているか
  • 非同期処理内でスレッド非安全なリソース(例:DBコネクション)を扱っていないか

良くない実装例: ケース1

問題例: 集合例外処理の不備
import asyncio
import aiohttp

class ApiClient:
    def __init__(self, base_url: str):
        self.base_url = base_url

    async def fetch(self, endpoint: str) -> dict:
        async with aiohttp.ClientSession() as session:
            async with session.get(f"{self.base_url}/{endpoint}") as response:
                return await response.json()

async def main():
    client = ApiClient("https://example.com/api")
    tasks = [
        client.fetch("user/123"),
        client.fetch("user/456"),
        client.fetch("user/789")
    ]
    results = await asyncio.gather(*tasks)
@Reviewer
例外発生時にgatherがまとめて失敗し、どのAPI呼び出しで失敗したのか追えなくなります。return_exceptions=Trueで例外情報も回収できる設計に変更してください。
for res in results: print(res)

問題点:

  • 複数APIを並列で呼び出しているが、どのAPIで失敗したのか追えない
  • gatherの失敗が全体の失敗に直結してしまい復旧困難
  • エラーハンドリング方針が設計段階で未定義

改善例

改善後: gather例外回収設計
import asyncio
import aiohttp

class ApiClient:
    def __init__(self, base_url: str):
        self.base_url = base_url

    async def fetch(self, endpoint: str) -> dict:
        async with aiohttp.ClientSession() as session:
            async with session.get(f"{self.base_url}/{endpoint}") as response:
                response.raise_for_status()
                return await response.json()

async def main():
    client = ApiClient("https://example.com/api")
    tasks = [
        client.fetch("user/123"),
        client.fetch("user/456"),
        client.fetch("user/789")
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    for idx, res in enumerate(results):
        if isinstance(res, Exception):
            print(f"Task{idx} 失敗: {res}")
        else:
            print(res)
設計補足
  • return_exceptions=Trueによってタスクごとの失敗を区別可能にしています
  • 失敗箇所が明確化され障害分析や部分復旧の余地が生まれます
  • これにより、全タスクが必ず待機終了できる保証も担保されます

良くない実装例: ケース2

問題例: タイムアウト管理の欠如
import asyncio
import aiohttp

async def fetch_data():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://example.com/api/user") as response:
            return await response.json()

async def main():
    data = await fetch_data()
    print(data)
@Reviewer
長期ハング時の回避策がありません。aiohttpのtimeoutオプションやasyncio.wait_forで呼び出し側にタイムアウト制御責任を持たせましょう。

問題点:

  • サービス障害時に無限待機に陥るリスク
  • 上位レイヤの障害監視で異常検知困難になる

改善例

改善後: タイムアウト設計導入
import asyncio
import aiohttp

async def fetch_data():
    timeout = aiohttp.ClientTimeout(total=5.0)
    async with aiohttp.ClientSession(timeout=timeout) as session:
        async with session.get("https://example.com/api/user") as response:
            return await response.json()

async def main():
    try:
        data = await asyncio.wait_for(fetch_data(), timeout=6.0)
        print(data)
    except asyncio.TimeoutError:
        print("タイムアウトしました")
設計補足
  • ネットワーク層(aiohttp)と呼び出し側(wait_for)の二重タイムアウトにより制御の冗長性を確保
  • 外部I/Oを含む場合、非同期コードでは常にタイムアウト設計が標準

観点チェックリスト


まとめ

async/awaitは表面上シンプルに見えますが、内部はイベント駆動・コルーチン・例外伝搬の複雑な設計要素が混在しています。レビューアーは動作確認だけでなく、各非同期構成要素が「責任分離」「回復設計」「障害検知」の3点で妥当性を持つかを常に確認していくことが重要です。