Feedforce Developer Blog

フィードフォース開発者ブログ

アプリケーション固有のロジックを Rust マクロで抽象化した話

はじめに

こんにちは、 Omni Hub チームのshunten31 です.

今回は、 アプリケーション固有のややこしいロジックの抽象化を行い、 Rust の proc-macro 機能を用いて、簡潔に記載できるようにした取り組みを紹介します.

Rust の魅力的な言語機能であるマクロを利用して、 プログラムの可読性とメンテナンス性の向上できたと考えています.

アプリケーション特有の「ややこしい制御ロジック」

ジョブ中心の設計

Omni Hub では、店舗(POSシステム) と ECシステムのデータをつなぐアプリケーションであることから、外部のシステムのAPI, webhook に依存した処理が多くあります. Omni Hub ではシステムの信頼性を重視しており、 外部のシステムや Omni Hub に障害やダウンタイムが発生した際にも、 いずれ正しくデータ連携が行われるよう、 再実行可能なジョブ形式でほとんどの処理を構成しています.

ジョブの種類はこのように Rust の enum 型を利用して定義しており、 それぞれ必要なペイロードを持ちます. ジョブの種類は現在70種類と、そこそこ多くの種類があります.

pub enum Job {
    IntegrateCustomer(CustomerId)  // 会員連携を行う
    IntegrateTransaction(TransactionId) // 取引連携を行う
    IntegratePoint(CustomerId, Point) // ポイント連携を行う
    ...
}

実行制御の必要性

ジョブは、 外部サービスの webhook や、 API 呼び出しを起点として発行され、 AWS SQS に投入されます. AWS SQS に入ったジョブは、ワーカーインスタンスが処理を行います.

この際、 外部サービスの webhook や SQS は "at-least-once delivery" であるため、 同じ内容の webhook やジョブが重複する可能性があります.

SQS の場合: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues-at-least-once-delivery.html

それ以外にも、 ジョブの処理の際に利用する外部サービスのAPI にはレートリミットが設けられています. レートリミットを無駄遣いすることを避けるため、 もともと別の webhook や SQS メッセージ経由であっても、結果的に同じ内容のジョブ実行は避けたいです.

実行制御の仕組み

ジョブは複数の worker インスタンスで同時処理を行いながら進められるので、 インスタンス間のロックとして redis を利用しています.

  1. ジョブの開始前に、ジョブの識別子で redis にロックを作成します. ロックはジョブの種類ごとに設定された ttl を持って作成されます.
    • ロックが取得と失敗時の挙動はジョブごとに設定されています
  2. ジョブを実行します.
  3. ジョブの実行完了後、ジョブのロックを開放するか、 開放せず ttl によって自動的に消滅するのを待つか ジョブごとの設定に従います.

現在のジョブの実行制御には、 上記3つの下線部にあたる設定が必要でした.

  • ロックの ttl
  • ロックが取得できなかった場合の挙動
  • ジョブの実行後ロックを開放するか

実行制御によるジョブの動作の例

いくつか、 実際に行っている実行制御の種類の例を示します.

同時実行の制限

同じジョブが非同期に同時実行されることによりにより、 状態が矛盾してしまったり、 単に冪等であるがゆえ同じジョブを実行する必要がないときに利用される実行制御です.

  • ジョブの開始時: ロックを取得する
    • ロックが取得できなければジョブを破棄する
  • ジョブの終了時: ロックを開放する

レートリミット

ジョブの処理内容が重く、 処理をおこなう頻度に上限を設けたい場合があります. このようなジョブでは、 下記のような設定を行うことで、レートリミットが実現できます.

下記のような処理を行うことで、 ttl 期間に 高々1回しかジョブを実行できないように制御が可能です.

  • ジョブ開始時: 特定の ttl でロックを取得する
    • ロックが取得できなければジョブを破棄する
  • ジョブ終了時: ロックを開放しない
    • ロックの ttl によって自動的に開放される

直列実行

ジョブの並列実行は避けたいが、 ジョブが破棄されることがないようにしたい場合に、ジョブを直列に実行します.

  • ジョブ開始時: ロックを取得する
    • ロックが取得できなければジョブを再びキューに戻す
  • ジョブ終了時: ロックを開放する

これまでの実行制御の設定の記載方法

これら3つの設定を、下記のように match 文を含んだ関数にそれぞれ記述していました.

fn ttl(job: &Job) -> Duration {
match job {
IntegrateCustomer(..)
| IntegrateTransaction(..) => Duration::minutes(15)
IntegratePoint(..) => Duration::minutes(1)
//  ... 合計70種類ものケースがある
}

fn release_lock(job: &Job) -> bool {
match job {
IntegrateCustomer(..)
//  ... 合計70種類ものケースがある
| IntegrateTransaction(..) => true
IntegratePoint(..) => false
}

...

課題

現状のジョブの実行制御の設定方法には、以下の3つの問題がありました.

設定が複数箇所に分かれていた

上記のような巨大な match 文を含んだ関数が3つ、別々のファイルに記述されていました. このため、 特定のジョブがどのように制御されるかを確認するためには、 3つの関数それぞれで、 70くらいのケースがある match 文から目的のジョブを探す必要がありました.、

内部実装との結びつきが強く、直感的に理解しづらい

設定値を見つけても、それらの組み合わせから最終的な挙動を把握するには頭の中でロジックを組み立てる必要がありました また、設定値を見つけても、それらの組み合わせから最終的な挙動を把握するには頭の中でロジックを組み立てる必要がありました. メトリクスのチェックの際や、 設計の見直しの際に実行制御の設定を確認することがあり、 そのたびに余計な負荷がかかっていました.

意図しない差分(merge conflictや無関係なdiff)

ジョブの追加や、 ジョブの設定値の変更を複数の PR で行うと、 match 文の特性からマージコンフリクトが起こりやすい状態でした. また、 設定変更による差分も大きくなりがちでした.

以上のような問題を解決するため、 Rust のマクロ機能を用いて簡潔に設定を記載することを考えました. (Rust のマクロ機能をすでにご存知の方は、 飛ばしてください!)

Rust のマクロ機能

Rust には proc-macro (Procedural Macro) という機能があります. proc-macro を用いて、コードの構文木をもとに、別のコードを生成する手続きを記述することができます. コード生成はコンパイル時に行われ、 Rust の静的型検査の対象にもなります.

derive マクロ

derive マクロでは、 コード中の enum や struct 等の構文に対して、 #[derive(...)] の形式で attribute を加えて、 該当の構文木からコード生成を行うことができます. マクロの入力として、 Rustコードのトークン列を表す TokenStream を受取り、 生成コードの TokenStream を返すように実装します.

#[proc_macro_derive(Foo)]
pub fn foo_derive(input: TokenStream) -> TokenStream {
    let ast = syn::parse(input).unwrap();
    impl_foo(&ast)
}

上記で定義したマクロは、 下記のように使用することができます.

use my_macro_crate::Foo;

#[derive(Foo)]
struct MyStruct;

解決方法

実行制御による挙動を抽象化する

実行制御による挙動は数種類に抑えられていたため、 それを enum で表すことにしました. これによって、redis を利用した内部実装を隠蔽して、実行制御を扱えるようになりました. また、どういった実行制御の種類を採用しているかが一目でわかるようになりました.

use std::time::Duration

pub enum JobExecutionPolicy{
    /// すでに実行中の場合は実行しない
    Singleton
    /// 直列実行する
    Serial
    /// Duration あたり1回まで実行する
    Throttled(Duration)
}

マクロで簡潔に記載する

各ジョブに対する実行制御の設定を、 derive マクロと、 属性を利用して Job enum の定義に記述できるようにしました. 記載例は以下になります.

#[derive(JobControl)]
pub enum Job {
  #[policy(Serial)]
  JobA(CustomerId),
  #[policy(Throttled(Duration::from_secs(60 * 15)))]
  JobB(OrderId, CustomerId),
  ...
}

JobControl マクロは、 下記のようなコードを生成するように実装しています.

impl Job {
    fn policy(&self) -> JobExecutionPolicy {
        JobA(..) => JobExecutionPolicy::Serial
        JobB(..) => JobExecutionPolicy::Throttled(Duration::from_secs(60 * 15))
        ...
    }
}

ジョブ開始時や終了時のロックの取得/開放の際に、 生成されたコードの policy() を参照し、適切な操作を行うよう分岐しています.

これによって、 以下のメリットが得られたと考えています.

  • あるジョブに対してどのような実行制御が行われるか、簡単に確認できるようになった
  • 複数のPRで同時にジョブの追加・変更が行われたときにコンフリクトが発生しずらくなった

まとめ

今回は、 アプリケーション特有のロジックを、 Rust のマクロ機能利用して簡潔に記載できるようにした事例を紹介しました. マクロは乱用すると複雑化の原因になりますが、頻繁に変更されないアプリケーションの基幹部分に適用することで、可読性と保守性の向上につながると考えています。

付録

気になる方向けに、 実際に利用しているマクロのコードを紹介させていただきます. 定義のない構造体が使われていたり、 self contained ではないですが、大まかな参考としていただければ幸いです.

extern crate proc_macro;
use proc_macro::TokenStream;
use quote::quote;
use syn::{parse_macro_input, Expr, ItemEnum, Meta, MetaList, MetaNameValue, Variant};

pub fn derive(input: TokenStream) -> TokenStream {
  let input = parse_macro_input!(input as ItemEnum);
  let enum_name = &input.ident;
  let match_arm = input.variants.iter().map(|variant| {
    let variant_name = &variant.ident;
    let policy_opt = get_meta_expr(variant, "policy");
    match (policy_opt, variant.fields.clone()) {
      (Some(policy), syn::Fields::Named(_)) => quote! {
      #enum_name::#variant_name { .. } => #policy,
      },
      (Some(policy), syn::Fields::Unnamed(_)) => quote! {
      #enum_name::#variant_name(..) => #policy,
      },
      (Some(policy), syn::Fields::Unit) => quote! {
      #enum_name::#variant_name => #policy,
      },
      (None, syn::Fields::Named(_)) => quote! {
      #enum_name::#variant_name { .. } => JobExecutionPolicy::Unrestricted,
      },
      (None, syn::Fields::Unnamed(_)) => quote! {
      #enum_name::#variant_name(..) => JobExecutionPolicy::Unrestricted,
      },
      (None, syn::Fields::Unit) => quote! {
      #enum_name::#variant_name => JobExecutionPolicy::Unrestricted,
      },
    }
  });

  let expanded = quote! {
  impl JobExecutionControllable for #enum_name {
  fn policy(&self) -> JobExecutionPolicy {
    match self {
    #( #match_arm )*
    }
  }
  }
  };
  TokenStream::from(expanded)
}

fn get_meta_expr(variant: &Variant, key: &str) -> Option<Expr> {
  variant.attrs.iter().find_map(|attr| match &attr.meta {
    Meta::NameValue(MetaNameValue { path, value, .. }) if path.is_ident(key) => Some(value.clone()),
    Meta::List(meta_list @ MetaList { path, .. }) if path.is_ident(key) => {
      let x: Expr = meta_list.parse_args().expect("succeed");
      Some(x)
    }
    _ => None,
  })
}