発明のための再発明

Webプログラマーが、プログラムの内部動作を通してプログラムを作る時の参考になるような情報を書くブログ(サーバーサイドやDevOpsメイン)

「F1 Query: Declarative Querying at Scale」 - Google内部のクエリ実行プラットフォームF1の動き

はじめに

以前Dynamoの論文を読んだので、ついでにGoogleのF1についての論文読んだ。
Dynamoは古かったが、こちらは2018年発表と新しい。
以下のリンクから手に入る。
https://ai.google/research/pubs/pub47224

F1はSpannerと一緒の文脈に居ることが多くSpannerをいじるためのものだと考えていたが、GoogleSpreadSheetなど複数ソースをまたいで動作するらしい。
論文の内容は全体を通して興味深い内容だった。
この論文では、

  • どのようにソースをまたいだ動きを実現したか
  • OLTP・OLAP・ETLという、別特性の動きをどのように一体化したか

を書いている。

以下、特に興味を引いた1章から4章の内容である。


F1の目的

F1は、社内で必要な機能である以下の3種全てを提供することを目的としている。

  1. 少ないレコードを対象としたOLTP
  2. low-latencyなOLAP
  3. データの変換やロードを行うETL

個々の機能について見れば既に実現されているものであるが、F1の新規性はDBMSやDremel(Google製のanalytical queryに特化したクエリエンジン)の機能を如何に(現代的なアーキテクチャ上で)統合したかにある。
また、初期段階ではSpannerとMesaのみをサポートしていたが、現在ではGoogleSpreadsheetやBigtableといった別のフォーマットにも対応している。

つまり、F1に対する要求は以下である

  1. Data Fragmentation
    Googleにはデータ管理のための方法が(SpannerやBigtableなど)数多く有るので、それらを横断して処理したい
  2. Datacenter Architecture
    F1は個別のマシンやクラスタではなく、データセンターに対して実行するものとして作られている。
  3. Scalability
    サイズ、速度、信頼性、コストなどの要求水準がユースケースによりバラバラであるが、各種に対応する
  4. Extensibility
    クエリとして書きづらい処理が必要になることがあるので、ユーザー定義関数のような拡張性を備える

F1の動き

具体的なF1の動きは、以下の図で表される。
「F1 Server」、「F1 Master」、「F1 Worker」が主な登場人物である。

f:id:mrasu:20181028215037j:plain

つまり、F1 Serverがクライアントからリクエストを受け付けたのち、

  • クエリが小さいなら、その場で実行
  • クエリが大きければ、workerに実行させるようスケジューリングする
  • 更に大きければ、MapReduceを使う

と、処理方法が3種類有る。
F1 Masterは、各データセンター内のF1のサーバー達を監視する。
ちなみに、F1 ServerとF1 Workerはデータを持たないので、台数を増やせばスケールする。

具体的なクエリ実行の流れは以下の様になる

  1. ClientからServerへリクエス
    F1 ClientがF1 ServerにSQLを送る。
    リクエストを受け取ったデータセンターよりも(対象データを保管している場所が近くて)ふさわしいデータセンターが有る場合は、それをクライアントに通知する。
    別のデータセンターを指定された場合は、クライアントはそのデータセンターに向けて再度SQLを送る。
  2. 実行プランの作成
    受け取ったSQLを物理的、論理的最適化を考慮した実行プランを生成する。(クエリ最適化の詳細はブログに書かないので本文を参照)
    クライアントはSQLを送るときにInteractiveかBatchのいずれかのMODEを指定し、オプティマイザはMODEによってどのように実行するか変化させる
  3. クエリ実行
    • Interactive 実行
      クライアントのリクエストに対して同期的に返答するモード
      クエリオプティマイザはヒューリスティックに、その場で実行する(centralized)かworkerを使用してクエリを分散させる(distributed)かを選択する
    • Batch 実行
      大量のデータ処理を行い長時間の実行が必要になる場合に使用するモードで、実行計画はF1 Server以外に保存される。
      また、MapReduceフレームワークを使用して非同期に分散される
  4. データアクセス
    F1 Serverはデータセンター毎に割り当てられているが、各F1 Serverは別データセンターのデータにもアクセス可能。
    前述のように、対象データはSpannerでも、CSVや圧縮データ(Capacitorとか)でも良い。
    対象データがconsistent readやrepeatable readをサポートしていれば、それをサポートする。
    データソースが多岐に渡るため、joinなどを行うために必要なメタデータをglobal catalog serviceという場所に置いている。
    また、クエリ発行時にDEFINE TABLEを通してメタデータを定義することも可能。
  5. Data Sinks への格納
    クライアントが要求した場合、実行結果をData Sinkへ格納することができる。
    Data Sinkには、データソースに応じて色々な形式でデータが保管される。
    必要であれば、出力先を変えたり、session-local な一時データとして保存することも可能
  6. クライアントへ返答

ちなみに、クエリはSQL2011標準に従い、拡張としてArrayなどをサポートしている。(つまり、joinや集計、window関数が使える)

閑話休題

長くなってきたので小休止。
ここからが、論文のテーマである「どのようにクエリを実行するか」の詳細が始まるので、一旦整理する。

F1に送られてきたSQLはF1 Server内で実行計画へと変形される。
実行計画の作成時において、複数の選択肢の中から実行方法が選択される。
実行方法が変わるとF1の動作は大きく変化し、この変化こそがOLAP,OLTP,ETLの全種サポートを可能にした根本である。

実行方法は下のように分類できる

  1. Interactive モード
    同期実行するモードで、少量のデータをさばく場合に使用される。
    クライアントがInteractiveモードを選択した場合、オプティマイザはクエリを基に2つの実行方法をのいずれかを選択する
    1. Centralized execution
      1スレッドで完結し、クライアントからのリクエストを受けたF1 Serverがその場で実行する
    2. Distributed execution
      Workerを通して、複数のサーバーにクエリを実行させる。
      F1 Serverは各Workerのコーディネーターとして機能する
  2. Batchモード
    非同期実行するモード
    大量のデータを扱う場合に使用される。
    長時間実行されることを想定したフォールトトレラントアーキテクチャを持っている。

以下は、各動作内容を詳細化しているので、この分類を頭に置いた上で、各論を見ていく。

Interactive + Centralizedの動き

1スレッドで完結するモード。
SQLを基にした実行計画の内容を、単一スレッド上で各演算が実行される
論文では、ScanとJoin、Sortの動きを紹介している。

f:id:mrasu:20181028215055j:plain

  1. Scan
    データソースに応じて動きが異なる。
    フルスキャンしか出来ないものもあれば、key-based index lookupをサポートしているものもある。
    他にも、Array型を利用する挙動やProtocolBuffersを利用した挙動もある。
  2. Join
    nested loop join, hash join, merge joinなどが使える。array joinという、array型用のjoinも有る
    また、Spannerが持つストリーム用のmerge joinも使える
  3. その他
    Projection, Aggregation, Sort, Union, Window Function, Limit, Offsetなどがある。

Interactive + Distributed の動き

Workerを使用して、並列実行するモード。
実行計画は処理の一部を表したFragmentに分けられ、FragmentはDAGになる。

f:id:mrasu:20181028215109j:plain

特徴は

  • Fragmentは各ノードに割り当てられ、マルチスレッドで動き、場合によっては複数Workerを使って一つのクエリを処理することも有る。
  • 各演算には(joinや集計のために必要な列のような)必要条件があり、必要条件に入力互換性がある場合は両演算は1つのFragmentにまとめられ、違う場合はオプティマイザが変換用の演算を間に挿入する。
  • 各Fragmentに割り当てられるノード数は、子Fragmentが必要としたノード数の大きい方の値を取る。
  • Fragment内の各ノードでは、各行の計算が終わったら、結果をどのノードに送るかを計算するpartitioningという作業が存在する。
  • partitioning中には転送データを削減する処理も入っていて、例えばCOUNTを取る場合には各ノードの中でCOUNTを取り、その結果を親Fragmentに送ることで転送量を減らす。

Batch モード

Googleでは大規模データのために、MapReduceやFlumeJavaを使ってきたが、メンテナンス性や再利用性から、宣言的なSQLを使えるようにした。
クエリの最適化や実行プランの生成時はInteractiveと同じ処理を実行している。

Batchモードには以下の特徴がある。

  • F1 Serverがクエリを管理するのではなく、中央集権で動いている。
  • 途中処理を保存することでF1 ServerやWorkerが止まった場合も継続可能
  • Fragmentの実行タイミングはバラバラである (InteractiveではRPCにより同期している)
    フォールトトレラントMapReduceを使用していて、実行結果はColossusに保存され共有される。
    パイプライン処理ではないので、MapReduce処理における依存関係がなければ複数のFragmentに対するMapReduceが同時に動くことが出来る
  • BatchモードではMapReduce間で多大なデータ移動が発生するので、Hash joinではなく、Index nested-loop join(lookup join)にしてIOを下げることがある
  • クライアントとの接続が切れても続行可能

Batchの流れは下の図のようになっている

f:id:mrasu:20181028215119j:plain

  1. F1 Serverがクライアントからリクエストを受け付ける
  2. 実行計画をSpannerで作成された、QueryRegistryに登録する。
    QueryRegistryは実行計画だけでなく、Batch関連のメタデータを保存している。
  3. 登録されたクエリはQueryDistributerが負荷やデータソースを考慮して、各データセンターに分配する。
  4. QuerySchedulerがクエリを発見したら、実行すべきタスクの依存グラフを作成する
    各データセンターでは、QuerySchedulerが定期的にクエリを監視している
  5. データセンターに余裕ができ次第、QueryExecuterにタスクを実行させる
  6. QueryExecuterがMapReduceを使用してタスクを行う

当然、これらの機能は耐障害性を持っていて、たとえデータセンターが死んでも別のデータセンターにクエリが再配布される。


終わりに

以上が、「F1 Query: Declarative Querying at Scale」の内容である。
この論文には、他にも最適化やUDFなどによるクエリ拡張、パフォーマンスの話が載っているが、長くなるのでここまでにする。

この論文は詳細に書かれていて興味深い工夫もあり面白かった。
BigQueryがGoogleSpreadSheetに対応したというのも、同じような仕組みなんだろうなと想像が膨らむ内容でもあった。

yarnを使ってimportが解決されるまでを追う

yarn add から webpackがimportを解決するまでを追う

Javascriptでは、Yarnとwebpackを使うのが主流なので、それらがどのような動きをしているかを追った。

事前準備

動きを追うために以下を用意した

  1. コード
  2. Verdaccio

コード

コードの配置は以下

.  
├── build  
│   └── bundle.js  
├── node_modules  
│   ├── my-isarray  
│   ├── webpack -> ../../../.config/yarn/link/webpack  
│   ├── webpack-cli  
├── package.json  
├── src  
│   └── app.js  
├── webpack.config.js  
└── yarn.lock  

各ファイルの中身は、

  • my-isarray
console.log("test-isarray");  

export default function(obj) {  
    console.log("my-isarray is called");  
    return Array.isArray(obj);  
}  
  • src/app.js
import a from "my-isarray";  

(function() {  
    console.log("src************");  
})()  

a();  
  • webpack.config.js
module.exports = {  
  mode: "none",  
  entry: "./src/app.js",  
  output: {  
    filename: "bundle.js",  
    path: __dirname + "/build"  
  }  
}  

Verdaccio

プライベートnpm のレポジトリ用にVerdaccioを使った。
dockerが用意されているので、configを弄った後に自作のpackageを登録すれば完了する

Yarnとwebpackのコミットハッシュ

調査に使用したコードのハッシュ値は以下である

Yarn: c58bd58e029ab5a69d75742c6ae82aa96cef0140 (v1.11.0-0)
webpack: 5ade57451073ef0993379d2cc6b80d616e86ef5d (v4.19.0)

動きを追う

大まかな流れは画像の様になる

f:id:mrasu:20180923170303j:plain

yarn add

最初に、yarn addをしてnode_modulesに展開されるまでを追う

  1. npmからメタデータを取得する
    npmのAPIを使用して、hash値やバージョンなどの最新情報を取得する。
  2. npmから~/.cache/yarn/v2に保存
    メタデータと以前にダウンロードしたものが違うものであればtarballをダウンロードして、展開する。
    この時、直接 node_modules にダウンロードするのではなく、~/.cache/yarn/v2のような共通ディレクトリにダウンロードすることで、別プロジェクトが使うときにも使いまわせるようになっている。
  3. 対象のnode_modulesディレクトリへコピー
    ~/.cache/yarn/v2に保存したパッケージ内容を対象ディレクトリにコピーする。

webpackを使って対象パッケージを使用する

次に、webpackが"ビルド"として行っている「ファイルの結合」の流れを追う

  1. Parserを通して各ファイルの依存関係を発見する
    Acornを使用してパースしている。
    Acornの結果を基に、webpack内にあるParser.jsがフックポイントを作って回る。
    このフックポイントの一つがimportを通ったときに発火するようになっているので、発火した場所を見ることで各ファイルが依存したパッケージを知ることが出来る。
  2. Resolverが名前解決する
    enhanced-resolveという、デフォルトのResolverがwebpackから切り出されている。
    これを利用することで、各ファイルが依存しているパッケージをnode_modulesから選び出している。
  3. webpackに登録されたLoader,Pluginsを実行し、各ファイル・全体を変更する
  4. 結果を基にファイルをつなぎ合わせる

以上

以上で、yarnがパッケージをダウンロードしてwebpackがファイルに結合するまでの流れが追えた。

読んでいて思ったこと

Yarn

yarnでは、jestとflowを使用していて、「やはり、Facebook製」といった感じを出している。
やることを絞っているためか、コードは結構シンプルで読みやすい。
上に書いた事以外にも、ネストや重複した依存関係の解決があるのでそこはまた読んでみたい所

webpack

webpackはプラグインの仕組みを使っていて、他の人が勝手にプラグインを書いて好きに公開できるようになっている。
この仕組みはwebpackを下支えしている良い仕組みだと思う。
ただし、コードを見ると、制御できている人が居るのか疑問なレベルの複雑さを持っている。
読み慣れていないせいかもしれないが、コールバック地獄がかなり読みづらかった。
また、hooksという各PluginやLoaderに対するフックポイントをまとめた変数があるのだが、フックポイントが多すぎてどこがいつ影響するのかわかりづらい。
あと、デフォルトのプラグインがまとめてlib直下に送られているのは流石にどうかと思う・・・

Dynamo: Amazon’s Highly Available Key-value Store

はじめに

AmazonがDynamodbを紹介した論文を読んだ。
2007年のもので、NoSQLブームの走りとなったものだったと思う。
コンピュータ・サイエンス関係の「おすすめの論文」のようなもので、よくおすすめされるので読んでみたが、意外と面白かったので、面白かった場所に焦点を当てて書く。
ただし、現在は、AWSのサービスとして提供されていて機能追加も多いのでこの論文とは大きく異なると思われるので注意が必要。

Dynamodbとは

Amazonが開発したNoSQLで、高可用性KVS。
論文上では99.9995%のリクエスト返答成功率と、データロス無しという、数字を出している。

https://aws.amazon.com/jp/dynamodb/
を読むと、概要がわかりやすいだろう。

Dynamo で使われている技術 (4章)

4章では、主要技術を5つ挙げている。

  1. Consistent hasing
    データ分割のアルゴリズムとして、Consistent hashingを採用している。
    また、キーが偏らないように Virtual node を使い単一ホストに対する複数キーの設定を設定している。
    ただ、単純には行かなかったようで、後半で試行錯誤を書いている。

  2. Versioning
    Vector clockを採用し、順序が確定する場合には最新の情報を返す。
    更新要求が成功したからと言って、dynamoが更新された情報を返すわけではなく、古いデータを取得することが有りえる。
    そのため、同一データに対して同時に更新する可能性があり、不整合が発生しうる。その場合は復元を行う(後半に詳細が有る)
    さらに、vectorが大きくなるのを抑えるために、古い情報は捨てている。

  3. Sloppy quorum and hinted handoff
    (W: Writeが成功する時の最低ノード数) + (R: Read時のデータ取得対象ノード数) > (N: レプリケーションノード数)
    になるようにして、Quorumのようになっている。
    書き込み対象が落ちている場合は、別ノードにデータが書き込まれる。復旧したら、データを戻した後にデータを消す(hinted hand off)。
    ちなみに、W, Rのノード数を変えることで、個々のサービスの要求を満たすことが出来るようになっている。
    例えば6章に、商品情報や宣伝のサービスではW=N,R=1にして高速で動作するようにしているとしている。(更新が圧倒的に少ないのでこういう設定が可能)

  4. Anti-entropy using merkle tree
    Merkle tree を使うことによって差分を高速に発見し、そこだけデータ同期を行う

  5. Gossip based membership protocol and failure detection
    GossipでノードやConsistent hashのリング情報などを共有している
    Seedsというリング上の全てのノードを知っているノードがいて、全ノードがそれらに合わせるので、分断は起きない。

バージョン差異が見つかった場合の修復方法 (6章冒頭)

Versioningでバージョンの不整合があるので、それをどのように治すかという問題を解説している。
不整合に対する対応では、サービスの性質に応じて3種類の方法を用いている。

  1. クライアント(アプリ側)が独自のビジネスロジックで治す (例: カート情報)
  2. 最新の更新日付を持つバージョンを使用する (例: セッション情報)
  3. R=1,W=Nにして、そもそもバージョン差異に気づかないようにする。(例: 商品情報や宣伝のサービス)

ちなみに、全リクエスト中に複数バージョンが見つかった割合は

  • 0.00057%で2つのバージョン
  • 0.00047%で3つ
  • 0.00009%で4つ

と、少ない(6.3章)

Consistent hasingのキーの割当に用いた戦略について (6.2章)

Consistent hashは工夫が合ったようで、3種類の戦略を取ったらしい。
下に論文にある図を置く
f:id:mrasu:20180910002502j:plain

戦略1. ノードに対してランダムなtokenを割り当てて、token間のデータを各ノードが持つデータとする

Consistent hasingを素直に実装した初期の戦略。
ただ、この戦略には、以下の問題があった

  1. ノードが増えたときには必ずデータ移動が発生し、移動対象データの全スキャンが走るので重い
    それに対応するために、データ移動を優先度最低のバックグラウンドタスクとして、移動完了後にノードを追加するようにした。そのために、ノード追加の時間が長かった
  2. Merkle tree の再計算が必要
  3. データの存在するノードが変動する可能性が有るので、全体バックアップを取るのに苦労した

これら問題の本質は、

  1. データの分割箇所(partition)(上図の網掛け部分)の決定
  2. データを配置する配置ノード(上図のA,B,C)の決定

という選択を同時に変更してるところから発生している。
例えば、「リクエストが多いからノードを追加しよう」と思ってもデータの分割箇所の変更が発生していた。

戦略2. 事前にpartionを均等なQ個に分割し、partitionを配置するノードは後から割り当てる

バックアップやMerkle treeの問題は、ノードが保持するデータが変わるために発生する問題だったので、保持データが変わらないようにしている。
partition内のデータを保持するノードを決定する際には、リングの位置を表すtokenを各ノードに割り当てて、partitionに最も近いtokenを持つN個のノードへpartitionのデータが配置される。
これによって、

  • (戦略1で問題だった)partitionと配置ノードが同時に変わる問題を解決
  • partitionを動的に変えられる

という利点が有る。

戦略3. Q個の均等なpartionに分割し、各ノードに割り当てるpartitionも確定する

2番めの戦略を進めて、割り当てるpartitionのノードを固定している。
そのために、

  • ノードが無くなる場合には、消えたノードのpartitionを別のノードへ割り当てる
  • ノードを追加する場合には、既存のノードに割り当てられたpartitionを新規ノードに移動させる

となり、2で発生していた、ノードが保持するpartitionが変わる問題が解消された。
ちなみに、3つの戦略における負荷の偏りを測ると、3が最良で2が最悪となった。(2が遅い理由はみつからなかった)

3番目の戦略の良いところはデプロイが簡単になったところ。
簡単になった理由は、

  • partitionの範囲が固定されていてノードが保持するpartitionも固定されるおかげで、ノードのbootstrapや復元が早くなる (partitionの範囲が動的だとランダムアクセスになる)
  • アーカイブが簡単 (partitionの範囲が固定されるので)

ただし、ノード管理の手間は増えるという欠点が有る。

まとめ

以上、Dynamoの古い論文を読んだ。