Feedforce Developer Blog

フィードフォース開発者ブログ

Webhook イベント を debounce させて処理量を 30% 減らしてみた

Omni Hub チームの shunten31 です. 最近はライスペーパーを購入して自宅で生春巻きを巻くようになりました.

背景

Omni Hub は ECサイト(Shopify*1 ) と 店舗レジの連携を通じて一貫した顧客体験 (OMO*2 ) を実現するサービスです. 機能の一つとして、 Shopify 上の顧客情報を POSレジシステム (スマレジ*3 ) に連携する、会員項目連携という機能があります.

これまで、会員項目連携は以下のように動作していました.

  1. Shopify のカスタマー情報の更新を、 EventBridge から受信する
  2. EventBridge の Rule を通じてジョブ形式に変換して SQS に投入する
  3. アプリケーション (ワーカー) がジョブを処理し、 POSレジに会員情報を連携する

会員項目連携の従来の構成

課題

会員項目連携の動作のトリガーとなる Shopify 上のカスタマー更新 Webhook は、マーチャントの施策や他アプリの動作によって短時間に大量に発生することがありました. こうして大量にジョブが発行されると、 他の処理に影響を及ぼすだけでなく、 コンピューティングやログなどのクラウド費用がかさんでしまうという問題がありました.

また、 カスタマーの情報をスマレジに反映するために利用する スマレジの提供する API は、 比較的厳しいレートリミットを敷いており、なるべく最小限にしたいというモチベーションがあります.

当初の解決策

当初、 解決策としてレートリミットを敷くことを考えました. 多くのカスタマーアカウントを抱えているマーチャントが不利になることのないように、 カスタマー単位でレートリミットを管理して、会員項目連携を制限するというものでした.

インフラ構成としては、 AWS Lambda と ElastiCache Serverless for Valkey を利用しました. EventBridge から到達した Shopify の更新イベントを Lambda で処理し、 ジョブワーカーの SQS キューに投入する形です. Lambda の処理において、 Valkey キャッシュに TTL つきのエントリを追加します. この TTL に基づいて Sliding Window 方式のレートリミットを実現しました.

レートリミット導入後の構成

残った問題点

Shopify でカスタマーが新規作成される際、 Shopify は内部処理の一環としてカスタマー更新イベントを連続で複数発行します. これによって、 カスタマー作成時の Webhook イベントでレートリミットに到達してしまい、以降しばらく会員項目連携が動作しない状態となってしまうことがわかりました.

解決方法

デバウンスする

複数の連続イベントに対処しつつ、レートリミットを適用するために、 連続した更新イベントを1つにまとめることはできないか考えました.

こうした「連続したイベントを1つにまとめる」処理を、 デバウンス(debounce) と呼びます. フロントエンドの文脈でよく使われる用語で、 MDN Web Docs にも記載があります.

developer.mozilla.org

から引用します.

プログラミングの文脈におけるデバウンスとは、特定の時間間隔でリクエストされたすべての演算を単一の呼び出しに「バッチ化」するということを意味しています。

デバウンスの実現方法

今回は、 1分単位で発生した複数のイベントをデバウンスさせて、1つにまとめることにしました. フロントエンドの文脈におけるデバウンスは、通常最後のイベントから一定時間経過した後に処理を実行する形で実装します.

例えば、 ユーザーが検索窓にクエリを入力しているとします. 最後のキー入力から 500ms 経過したら API を呼び出して検索を実行する、といったユースケースです. クエリ文字列が変わるたびに検索を行うのは、 パフォーマンスや負荷の観点から望ましくありません. デバウンス機能はこれを解決します.

しかし、Lambda においては、 各イベントが独立な Lambda ランタイムとして処理されるので、 最後のイベントから一定時間経過したら処理を実行、 といったデバウンスは実現が困難です.

代わりに、 SQS にジョブ(メッセージ) を送信するときには、 最大15分の遅延*4 を設けて送信できることに着目しました. この遅延を利用すれば、 指定した時間だけジョブの処理を遅延させられます.

すなわち、 以下のように動作します.

初回イベント

  1. Valkey キャッシュに、 該当カスタマーがデバウンス中であることを示すエントリーを 1分の TTL で作成
  2. 1分の遅延を指定して、会員項目連携を行うジョブを発行

初回イベントから1分以内、2度目以降

  1. Valkey キャッシュを確認して、 該当カスタマーのデバウンス中エントリーが存在することを検出
  2. ジョブを発行せずに終了する

そして、 1分の遅延を設けたジョブが処理される際には、その時点で最新のカスタマー情報をもとに連携が行われます.

なお、 この方式では更新イベントの受信から POSレジへの反映まで最大 1 分の遅延が生じます. 会員項目連携は数秒単位の即時性を要求される業務ではないため、 この遅延は許容可能と判断しました.

擬似コード

デバウンスとレートリミットを合わせた判定ロジックは、 以下のように動作します. カスタマー単位で 2 つの Valkey キーを使い分けます.

  • pending_key: デバウンス進行中を示すフラグ. TTL は debounce_ms (今回は 60 秒)
  • rate_key: 直近に受理したイベントのタイムスタンプを保持する sorted set. window_ms で Sliding Window を表現

実際は、 Lambda は Rust で実装を行い、 Valkey キャッシュ の操作は Lua スクリプトとしてアトミックに実行しています. 下記の擬似コードは Lambda の処理を Python 風に書き下したものです. (下記のコードはコードベースを参照の上、 Claude に生成してもらいました)

# Valkey 上で Lua スクリプトとしてアトミックに実行される判定ロジック
def debounce_and_check(shop, customer, limit, window_ms, debounce_ms):
    rate_key    = f"rate_limit:{{{shop}:{customer}}}"        # 直近イベントの sorted set
    pending_key = f"debounce:{{{shop}:{customer}}}:pending"  # デバウンス進行中フラグ
    now = current_time_millis()

    # 1. デバウンス判定: 進行中なら何もせず終了
    if exists(pending_key):
        return Noop

    # 2. レートリミット判定: ウィンドウ外の古いエントリを削除してから件数を確認
    zremrangebyscore(rate_key, 0, now - window_ms)
    if zcard(rate_key) >= limit:
        return RateLimited

    # 3. 受理: sorted set にタイムスタンプを追加し、 pending フラグを立てる
    zadd(rate_key, score=now, member=now)
    pexpire(rate_key, window_ms)
    set(pending_key, 1, px=debounce_ms)

    # SQS の DelaySeconds 上限は 900 秒 (15 分)
    delay_seconds = min(ceil(debounce_ms / 1000), 900)
    return Enqueue(delay_seconds)


# Lambda ハンドラー側
def handle_event(event):
    result = debounce_and_check(
        event.shop, event.customer,
        limit=5, window_ms=60_000, debounce_ms=60_000,
    )
    match result:
        case Enqueue(delay_seconds):
            # 会員項目連携ジョブを SQS に投入 (delay_seconds 後に処理される)
            sqs.send_message(job, DelaySeconds=delay_seconds)
        case Noop:
            # 既に同一カスタマー向けのジョブが遅延発行済み. 何もしない
            pass
        case RateLimited:
            # ウィンドウ内の上限に達したのでスキップ
            pass

この実装で工夫した点を少し補足します.

まず判定の順序です. デバウンス判定を先に行うことで、 連続イベントは rate_key のカウントを消費せずに早期 return できるようにしています. これによって、 「新規作成時の連続イベントでレートリミットを使い切ってしまう」という当初の問題を回避しています.

次にアトミック性です. Lambda は並行して起動されうるので、 pending_keyEXISTS チェックと SET を別々のリクエストで行ってしまうと、 同時に到着したイベントに対してジョブを二重に発行してしまうおそれがあります. そのため、 判定と書き込みを Lua スクリプトにまとめて、 1 回の RTT でアトミックに実行しています.

最後に Valkey Cluster 上でのスロット配置です. 2 つのキーを 1 つの Lua スクリプトから操作する必要があるため、 {shop:customer} を hash tag として共有して同一スロットに寄せています. hash tag 自体はカスタマーごとに異なるので、 負荷も自然に分散されます.

効果測定

レートリミットのみだった状況にデバウンス機能を追加した際のグラフが以下です.

  • 青線: カスタマー更新イベント数
  • 橙線: レートリミットによりブロックされたイベント数
  • 緑線: デバウンスされたイベント数

デバウンス導入前後のイベント数推移

右側のグラフは積層グラフになっており、 デバウンスの導入によってこれまでレートリミットにより弾かれるイベント(橙の厚み)は大きく減少し、代わりにデバウンスされたイベントで同等のジョブ抑制効果を得られていることがわかります.

これ以降も 1 か月程度継続的に運用していますが、 デバウンスによって全イベント数の 3 割〜半分程度を抑制できており、 負荷軽減や、 インフラコスト面が改善しています.