「F1 Query: Declarative Querying at Scale」 - Google内部のクエリ実行プラットフォームF1の動き
はじめに
以前Dynamoの論文を読んだので、ついでにGoogleのF1についての論文読んだ。
Dynamoは古かったが、こちらは2018年発表と新しい。
以下のリンクから手に入る。
https://ai.google/research/pubs/pub47224
F1はSpannerと一緒の文脈に居ることが多くSpannerをいじるためのものだと考えていたが、GoogleSpreadSheetなど複数ソースをまたいで動作するらしい。
論文の内容は全体を通して興味深い内容だった。
この論文では、
- どのようにソースをまたいだ動きを実現したか
- OLTP・OLAP・ETLという、別特性の動きをどのように一体化したか
を書いている。
以下、特に興味を引いた1章から4章の内容である。
F1の目的
F1は、社内で必要な機能である以下の3種全てを提供することを目的としている。
- 少ないレコードを対象としたOLTP
- low-latencyなOLAP
- データの変換やロードを行うETL
個々の機能について見れば既に実現されているものであるが、F1の新規性はDBMSやDremel(Google製のanalytical queryに特化したクエリエンジン)の機能を如何に(現代的なアーキテクチャ上で)統合したかにある。
また、初期段階ではSpannerとMesaのみをサポートしていたが、現在ではGoogleSpreadsheetやBigtableといった別のフォーマットにも対応している。
つまり、F1に対する要求は以下である
- Data Fragmentation
Googleにはデータ管理のための方法が(SpannerやBigtableなど)数多く有るので、それらを横断して処理したい - Datacenter Architecture
F1は個別のマシンやクラスタではなく、データセンターに対して実行するものとして作られている。 - Scalability
サイズ、速度、信頼性、コストなどの要求水準がユースケースによりバラバラであるが、各種に対応する - Extensibility
クエリとして書きづらい処理が必要になることがあるので、ユーザー定義関数のような拡張性を備える
F1の動き
具体的なF1の動きは、以下の図で表される。
「F1 Server」、「F1 Master」、「F1 Worker」が主な登場人物である。
つまり、F1 Serverがクライアントからリクエストを受け付けたのち、
- クエリが小さいなら、その場で実行
- クエリが大きければ、workerに実行させるようスケジューリングする
- 更に大きければ、MapReduceを使う
と、処理方法が3種類有る。
F1 Masterは、各データセンター内のF1のサーバー達を監視する。
ちなみに、F1 ServerとF1 Workerはデータを持たないので、台数を増やせばスケールする。
具体的なクエリ実行の流れは以下の様になる
- ClientからServerへリクエスト
F1 ClientがF1 ServerにSQLを送る。
リクエストを受け取ったデータセンターよりも(対象データを保管している場所が近くて)ふさわしいデータセンターが有る場合は、それをクライアントに通知する。
別のデータセンターを指定された場合は、クライアントはそのデータセンターに向けて再度SQLを送る。 - 実行プランの作成
受け取ったSQLを物理的、論理的最適化を考慮した実行プランを生成する。(クエリ最適化の詳細はブログに書かないので本文を参照)
クライアントはSQLを送るときにInteractiveかBatchのいずれかのMODEを指定し、オプティマイザはMODEによってどのように実行するか変化させる - クエリ実行
- データアクセス
F1 Serverはデータセンター毎に割り当てられているが、各F1 Serverは別データセンターのデータにもアクセス可能。
前述のように、対象データはSpannerでも、CSVや圧縮データ(Capacitorとか)でも良い。
対象データがconsistent readやrepeatable readをサポートしていれば、それをサポートする。
データソースが多岐に渡るため、joinなどを行うために必要なメタデータをglobal catalog serviceという場所に置いている。
また、クエリ発行時にDEFINE TABLE
を通してメタデータを定義することも可能。 - Data Sinks への格納
クライアントが要求した場合、実行結果をData Sinkへ格納することができる。
Data Sinkには、データソースに応じて色々な形式でデータが保管される。
必要であれば、出力先を変えたり、session-local な一時データとして保存することも可能 - クライアントへ返答
ちなみに、クエリはSQL2011標準に従い、拡張としてArrayなどをサポートしている。(つまり、joinや集計、window関数が使える)
閑話休題
長くなってきたので小休止。
ここからが、論文のテーマである「どのようにクエリを実行するか」の詳細が始まるので、一旦整理する。
F1に送られてきたSQLはF1 Server内で実行計画へと変形される。
実行計画の作成時において、複数の選択肢の中から実行方法が選択される。
実行方法が変わるとF1の動作は大きく変化し、この変化こそがOLAP,OLTP,ETLの全種サポートを可能にした根本である。
実行方法は下のように分類できる
- Interactive モード
同期実行するモードで、少量のデータをさばく場合に使用される。
クライアントがInteractiveモードを選択した場合、オプティマイザはクエリを基に2つの実行方法をのいずれかを選択する- Centralized execution
1スレッドで完結し、クライアントからのリクエストを受けたF1 Serverがその場で実行する - Distributed execution
Workerを通して、複数のサーバーにクエリを実行させる。
F1 Serverは各Workerのコーディネーターとして機能する
- Centralized execution
- Batchモード
非同期実行するモード
大量のデータを扱う場合に使用される。
長時間実行されることを想定したフォールトトレラントなアーキテクチャを持っている。
以下は、各動作内容を詳細化しているので、この分類を頭に置いた上で、各論を見ていく。
Interactive + Centralizedの動き
1スレッドで完結するモード。
SQLを基にした実行計画の内容を、単一スレッド上で各演算が実行される
論文では、ScanとJoin、Sortの動きを紹介している。
- Scan
データソースに応じて動きが異なる。
フルスキャンしか出来ないものもあれば、key-based index lookupをサポートしているものもある。
他にも、Array型を利用する挙動やProtocolBuffersを利用した挙動もある。 - Join
nested loop join, hash join, merge joinなどが使える。array joinという、array型用のjoinも有る
また、Spannerが持つストリーム用のmerge joinも使える - その他
Projection, Aggregation, Sort, Union, Window Function, Limit, Offsetなどがある。
Interactive + Distributed の動き
Workerを使用して、並列実行するモード。
実行計画は処理の一部を表したFragmentに分けられ、FragmentはDAGになる。
特徴は
- Fragmentは各ノードに割り当てられ、マルチスレッドで動き、場合によっては複数Workerを使って一つのクエリを処理することも有る。
- 各演算には(joinや集計のために必要な列のような)必要条件があり、必要条件に入力互換性がある場合は両演算は1つのFragmentにまとめられ、違う場合はオプティマイザが変換用の演算を間に挿入する。
- 各Fragmentに割り当てられるノード数は、子Fragmentが必要としたノード数の大きい方の値を取る。
- Fragment内の各ノードでは、各行の計算が終わったら、結果をどのノードに送るかを計算するpartitioningという作業が存在する。
- partitioning中には転送データを削減する処理も入っていて、例えばCOUNTを取る場合には各ノードの中でCOUNTを取り、その結果を親Fragmentに送ることで転送量を減らす。
Batch モード
Googleでは大規模データのために、MapReduceやFlumeJavaを使ってきたが、メンテナンス性や再利用性から、宣言的なSQLを使えるようにした。
クエリの最適化や実行プランの生成時はInteractiveと同じ処理を実行している。
Batchモードには以下の特徴がある。
- F1 Serverがクエリを管理するのではなく、中央集権で動いている。
- 途中処理を保存することでF1 ServerやWorkerが止まった場合も継続可能
- Fragmentの実行タイミングはバラバラである (InteractiveではRPCにより同期している)
フォールトトレラントなMapReduceを使用していて、実行結果はColossusに保存され共有される。
パイプライン処理ではないので、MapReduce処理における依存関係がなければ複数のFragmentに対するMapReduceが同時に動くことが出来る - BatchモードではMapReduce間で多大なデータ移動が発生するので、Hash joinではなく、Index nested-loop join(lookup join)にしてIOを下げることがある
- クライアントとの接続が切れても続行可能
Batchの流れは下の図のようになっている
- F1 Serverがクライアントからリクエストを受け付ける
- 実行計画をSpannerで作成された、QueryRegistryに登録する。
QueryRegistryは実行計画だけでなく、Batch関連のメタデータを保存している。 - 登録されたクエリはQueryDistributerが負荷やデータソースを考慮して、各データセンターに分配する。
- QuerySchedulerがクエリを発見したら、実行すべきタスクの依存グラフを作成する
各データセンターでは、QuerySchedulerが定期的にクエリを監視している - データセンターに余裕ができ次第、QueryExecuterにタスクを実行させる
- QueryExecuterがMapReduceを使用してタスクを行う
当然、これらの機能は耐障害性を持っていて、たとえデータセンターが死んでも別のデータセンターにクエリが再配布される。
終わりに
以上が、「F1 Query: Declarative Querying at Scale」の内容である。
この論文には、他にも最適化やUDFなどによるクエリ拡張、パフォーマンスの話が載っているが、長くなるのでここまでにする。
この論文は詳細に書かれていて興味深い工夫もあり面白かった。
BigQueryがGoogleSpreadSheetに対応したというのも、同じような仕組みなんだろうなと想像が膨らむ内容でもあった。