発明のための再発明

Webプログラマーが、プログラムの内部動作を通してプログラムを作る時の参考になるような情報を書くブログ(サーバーサイドやDevOpsメイン)

非同期で大量データをさばく

はじめに

最近、マイクロサービスの拡大と共にキューやストリームを使用した非同期処理が増えたと思います。
この記事では、NetflixPinterestAirbnbが公開した、各社の非同期処理の具体例を紹介します。
(各事例は簡単な概要の紹介しか書かないので、より詳細な内容は元記事を覗いてみてください。)

Netflix: 別サービスへのデータ更新の伝播

元記事: Delta: A Data Synchronization and Enrichment Platform

最初はNetflixが社内で使用しているデータ同期ツールDeltaの紹介です。
Webサービスでは、RDBにデータ(Source of truth)を置きつつ、検索やキャッシュ用に別ツールにも同じデータをコピーするという構成を取るということが有りますが、Netflixもこれをやっています。
Netflixでは、どのように同期するかという問題に対して、「データ変更を検知した時に、変更を別サービスに通知する」という仕組みを作って解決しています。

f:id:mrasu:20191231175911p:plain

上図のように、データが更新されると、Delta-connectorを介してKafkaに情報が送られた後に各種伝播が発生するようになっています。

参考

また、NetflixはDeltaだけでなく、関連ツールについてもブログにしています。

  • DBLog (DeltaにRDBの変更を通知しているツール。UPDATEを使って開始時のログの場所を確認することで、RDBからのダンプと変更検知を可能にしている)
  • Keystone (Deltaで使うキュー。社内のリアルタイム処理プラットフォーム)

Pinterest: ユーザーイベントの保管・解析

元記事: Real-time User Signal Serving for Feature Engineering

次は、Pinterestによるユーザーイベントの解析についてです。
Pinterestではユーザー行動(クリックや検索)を解析するために、ユーザーイベントをキューが受けた後に集計とその結果の保存を簡単に行えるようにしています。

f:id:mrasu:20191231175955p:plain

上図のように、最初にイベント情報が「User Events」キューに入った後、順次計算を行い結果を保存します。その後、クライアントがViewのデータを取り出せるようになっています。

Airbnb: 2段SQSによるキューのスケジューリング

元記事: Dynein: Building an Open-source Distributed Delayed Job Queueing System

最後は、Airbnbによるスケジューリングについてです。
AirbnbResqueを使っていましたが、性能やat-most-onceな特性などが問題となっていました。そのため、SQSを多段にしたスケジューリングツール(Dynein)を作ったそうです。

f:id:mrasu:20191231180027p:plain

上図がDyneinの動きです(赤いアイコンがAWS SQS, 青がDynamodb)。
SQSを多段にし、間にスケジューリング用のコンポーネントを作ることで、使用者にとってはSQSでありながらスケジューリングも出来るようになっています。

終わりに

以上、3社の非同期処理例を紹介しました。
また、直接的な例ではないですが、先日リリースされたThe Amazon Builders' Libraryの乗り越えられないキューバックログの回避という記事も面白い内容をもっています。

動的にLBの重み付けを変化する - Taiji by Facebook

はじめに

AWSのALBでも重み付けルーティングが採用されたように、LBでは重み付けの機能がよくありますが、Facebookでは時間やイベントなどによるトラフィック変化に対応するため、エッジサーバー上のLBの重み付けを動的に変化するようにしています。
この、動的な設定を行うために、Taijiというシステムが作られています。

Taijiは、世界中にあるFacebookのエッジサーバーがどのデータセンターへリクエストを回すかをユーザーのソーシャルデータから決めています
また、Taijiが落ちないように事前のテストや障害に対するフォールバックを用意しています。

今回の記事では、facebook researchが公開した「Taiji: Managing Global User Traffic for Large-Scale Internet Services at the Edge」から、Taijiの仕組みとテストに注目して紹介します。

ルーティング

Taijiは、エッジからデータセンターへのルーティングテーブルを動的に作成するシステムです。Taijiが作成する結果を基に、エッジ上にあるLBがルーティングをしています。

Facebookも他の会社と同じように、エッジノードとデータセンター内にある多数のサーバーで構成されています。エッジノードは静的コンテンツの配信や、データセンターへのリバースプロキシを担っています。かつて、リバースプロキシ先のデータセンターへのルーティングは静的に行っていましたが、サービスの規模がグローバルになり運用が辛くなったため、動的なルーティングをするためにTaijiが作成されました。

Taijiは「Connection-aware routing」という独自のルーティング方法を採用しつつ、下図のようにPipelineとRuntimeという2つのコンポーネントから出来ています。

f:id:mrasu:20191124152728g:plain

この章ではそれぞれについて、紹介します。

Connection-aware routing

Facebookはユーザーを基にリクエストを流すデータセンターを決めています。これは、

  • ステートフルなリクエストを同じデータセンターに割り当て続ける
  • 似たコンテンツを使用するであろうユーザーを同じデータセンターに割り当てることでキャッシュ効率などを上げる

ということを目的としています。このルーティング方法が「Connection-aware routing」です。
Connection-aware routingでは、ソーシャル上のユーザーの近さを基にした完全二分木を使っていて、葉をbucket、節をsectionと呼んでいます。このツリーはオフラインで計算されていて、この情報を基に割り当てるデータセンターが決定されます。
下図で雰囲気が解ると思います。
f:id:mrasu:20191124152847j:plain

ちなみに、ユーザーの住所や属性を使わずに、ソーシャルな情報からツリーを作成しているのはアクセス元の正確な地域が取れないこと、プライバシー上の懸念からです。また、ソーシャル関係の情報でも、一方的な関係(一方的なフォローなど)や一時的な関係は配信コンテンツの同一性に関係がないため、要素を除外してツリーを作成しています。

またLBについて、エッジにあるLBはデータセンターが付与するCookieを使用してユーザーを識別します。そのため、最初の通信はTaijiの結果を使用せずに近くのデータセンターへ流されています。

Runtime

TaijiコンポーネントはRuntimeとTraffic Pipelineの2つで出来ています。
Runtimeはエッジがデータセンターに流す割合を計算して、結果をTraffic Pipelineに渡すことが仕事です。アウトプットは

// fraction=割合  
{edge: {datacenter: fraction}}  

という形をしています。Runtimeは

  • 各サービスが要求する内容を定義したpolicy
  • エッジやデータセンターの稼動(キャパシティや稼働率など)
  • トラフィック量やエッジ・データセンター間のレイテンシ

などの情報を基に、各エッジがどのデータセンターにどのくらい流す必要があるかを定期的に計算します。

Traffic Pipeline

Traffic Pipeline はRuntimeの出力を入力として、エッジがどのユーザーをどのデータセンターに流すか定義したルーティングテーブルを作成することが仕事です。LBはこの結果を基にリクエストを流します。アウトプットは

// bucket=ユーザーの集合  
{edge: {datacenter: {bucket}}}  

という形をしています。事前に計算したユーザーのツリー(Connection-aware routingの節参照)から、以下を考慮しつつデータセンターにbucketを割り当てています。

  • 同じバケットは前回と同じデータセンターが割り当てられるようにする (ステートフルな通信では、データセンターが変わると再確立が必要なため)
  • 同一segmentにあるbucketは同一データセンターに割り当てる(より良い効率を目指すため)
  • segmentが大きい場合には、適当に分割する

安定

TaijiFacebookが提供する多くのサービスに対して、トラフィックのルーティングをしているので、問題があるとあらゆるサービスに影響します。そのため、落ちないことと、間違った計算をしないこと、両方が強く重視されます。

テスト

Taijiにおいて、プログラムが落ちてしまうような問題は、大量のユニットテストやインテグレーションテストで発見が可能です。
しかし、セマンティックな問題(式のミスなど)は本番で実際に動かしてアラートがでないと問題に気づきません。そのため、他にもテストを用意しています。

  • トラフィックを模したリグレッションテスト
    ポストモーテムで、問題が起きた時の入力・設定・policyを記録し、テストで状況を再現することによって問題が再発しないかをチェックします
  • 週次のトラフィックを再現してチェック
    新しいサービスを載せたり、policyを変更したりする場合には、Facebookに来る1週間のトラフィックのパターンを再現し、Taijiがどう振る舞うかをチェックします
  • コンポーネントの入出力バリデーション
    コンポーネントの入出力に異常な変化がないかをチェックします
    例えば、設定の変更がRuntimeの出力に意図しない変化起こさないかどうかや、データセンターへのトラフィック閾値を超えないか、などをチェックします

間接的な問題に対処する

Taiji以外の問題によって、Taijiが正常に動かなくなることも問題となります。
そのため、以下の施策も実施しています。

  • 新規インフラへの段階的な最適化
    新しいハードウェアを稼働するなど、データセンターの許容量が変わることがあります。その時に、1回のルーティング変更で新しい許容量に最適化するのではなく、複数回に分けて最適化することによって、過度にアクセスが増えることを抑制しつつウォームアップを可能にしています。
  • ソーシャルな変化を緩和する
    Connection-aware routingに使用するオフラインでのツリー更新に際して、5%以上のユーザー移動が起きないようにしています。
  • モニタリングシステムの障害への対応
    Taijiが入力として使用するデータにはモニタリングシステムを使用する物があるため、モニタリングシステムに障害があるとTaijiが十分な機能を提供できなくなります。その場合にはサービスオーナーに対してアラートが飛びます(Taijiは自動に調整しません)
  • データセンター障害への対応
    天災などによりデータセンターが落ちた場合
    • 即時の対応として、障害を検知したLBは別のデータセンターに流すようになります。
    • Taijiは、事前の負荷試験を通して得たデータセンターの許容量に従い、落ちたデータセンターのトラフィック分を生きているデータセンターに分配するようにルーティングを変更します。
  • 世界的なイベントへの対応
    ワールドカップや大晦日など世界的にトラフィックが変化するイベントでは、変化が大きすぎるためTaijiは対応出来ません。そのため、他の負荷対策が必要になります。

おわりに

以上、Taijiの紹介でした。
この記事では、Taijiの機能と障害対策に焦点を当てましたが、論文では、Taijiの性能やデータセンターへの割当ロジック・モデリングなども説明しているので、興味が湧いたらぜひ読んでみてください。

ちなみに、Taijiが使用するモニタリングシステムはGorillaというTime Series Databaseを使っていて、prometheusでも参考にされています。

Dgraphによる高速並行キャッシュ - Ristretto

はじめに

RistrettoはグラフデータベースのDgraph内のキャッシュが遅いという問題から生まれたキャッシュライブラリです。
GolangWeekly(@golangweekly)で紹介されていたのを見つけて気になったので、動きを追ってみました。

Ristrettoは、以下の特徴を持っています。

  • EventualConsistency(更新が遅延する)
  • LFU(TinyLFU)を使用
  • リングバッファ・ブルームフィルタによるmutexの削減

今回の記事では、Ristrettoの動きを通して、これらの特徴を解説します。

なお、この記事で使用したコミットは1982be43114b186bccc231adc44a72c474369470です。
また、DgraphでRistrettoを使用するプルリクエストはありますが、まだDgraphには入っていないようです。(#4025: Use ristretto in getNew)

参考資料:

全体像

Ristrettoを使って、SetGetした場合の動きは下図のようになります

f:id:mrasu:20190930014715p:plain

非同期

RistrettoはSetしてもすぐに参照できるわけではありません。なぜなら、

  1. goroutineを使うので、追加が非同期
  2. 追加するキーを評価しているので、追加に値しないと判断された場合は追加されない

からです。

Ristrettoは最初にインスタンスを作るときに、Cache#processItems()という関数をgoroutineで動かします。このprocessItemsがchannelを使ってキャッシュの更新をしています。そのため、Setは非同期になります。
また、キャッシュが満杯になっている場合には「重要なキーをキャッシュする」という信念のもと、既存のキーと新規に追加するキーの重要度を比較するので、新規のキーの重要度が低いと判断された場合にはキーは追加されません。(現在は、ランダムに選ばれた5個の既存のキーと重要度とを比較しています)

ちなみに、現時点では更新や削除の時にも同じchannelを使っているので非同期に更新されます。
そのため、下のように動きます。

cache.Set("key", "value", 1)  

time.Sleep(time.Second / 100)  
value, found := cache.Get("key")  
// value は "value"  
cache.Set("key", "value2", 1)  
cache.Del("key")  

value, found := cache.Get("key")  
// まだ、value は "value"  

time.Sleep(time.Second / 100)  
value, found = cache.Get("key")  
// value はnil (foundはfalse)  

LFU(TinyLFU)を使用

RistrettoはLFUの改変版であるTinyLFUを採用しています。
まだこの論文を読んでいないのでどこまでがTinyLFUで、どこからがRistrettoの工夫なのかわからないのですが、実装では

  • 参照時には、キーをリングバッファに追加するだけで、TinyLFU用の参照情報は非同期に更新する
  • リングバッファが貯まると、貯めていたキーをchannelへ追加
  • channelを通して、参照情報を更新する(ただし、ブルームフィルタを使用することで、対象のキーが初めて参照されたかを確認する。初めての場合には参照情報には追加せずブルームフィルタにのみ追加される)

おわりに

以上がRistrettoの動きでした。
これ以外にも解説記事には、GoがCSPを採用しているためにスレッドローカルなmapが作れなくて困った話やパフォーマンスなど面白いはなしが書いてあるので、もしRistrettoに興味が出たら一度読んでみてください。