発明のための再発明

Webプログラマーが、プログラムの内部動作を通してプログラムを作る時の参考になるような情報を書くブログ(サーバーサイドやDevOpsメイン)

分散ストレージ Ceph - "Ceph: A Scalable, High-Performance Distributed File System"

はじめに

分散ストレージであるCephについての論文を読んだので紹介します。

最近CybozuのNecoプロジェクトが始まり、面白そうなことをやっているな、と思っていたたところ、Cephについてのブログが出ました。
これを読んで、「Ceph読んでみたいな」と思ったので、理解の足がかりになるかなと思い論文("Ceph: A Scalable, High-Performance Distributed File System")を読みました。

ただし、この論文は2006年に発表されたもので、当時のCephの概要について説明されたものです。
現在の実装とは大きく違うと思います。
たとえば、この論文では「CephはEBOFSという独自ファイルシステムを採用している」とあるのですが、GitHubにはそのようなものは存在せず、2009年ごろに "osd: do not use ebofs"というコミットとともに削除された形跡が伺えます。
それでも、分散ストレージは「自分で作ってみたいシステムランキング」の上位にあるものなので興味深く読みました。
また、論文に出てくる「CRUSH」と「RADOS」はデータの配置と冗長化という、分散ストレージに欠かせない仕組みで、Cybozuのブログを読む限り今も現役なようです。

Cephについて

Ceph自体の説明については、
Cybozuの記事にまとまっているので、そちらが参考になります。: https://blog.cybozu.io/entry/2018/12/13/103039
公式: https://ceph.com

ちなみに、CNCFのIncubating ProjectになっているRookもCeph連携をしています。

論文の概要

"Ceph: A Scalable, High-Performance Distributed File System"では、Cephにおける

  • メタデータの管理
  • データの管理
  • クライアントとサーバーの関係

という、おおよそCephの全体像を書いています。
簡単にまとめると、
Cephは

で構成されていて、クライアントはこの2つのクラスタに問い合わせることでファイル情報や内容をやり取りします。
MDSクラスタの大きな特徴は、「ファイルの位置情報」を保管せずに、計算によって対象ファイルを保持するOSDがわかるということです。
また、OSDクラスタは冗長、耐障害を考慮された設計です。

OSD(object storage devices)とは、ディスク(またはRAID)と、それに付随するCPU・ネットワーク・キャッシュを含むデバイスの呼び方です。長いですが、「ファイルコンテンツの一部を持ったデバイスOSDというだな」という理解で十分だと思います。

以下、詳細です。


System Overview

この章は、Ceph全体についての説明です。次章以降、各機能が説明されます。

メタデータの分離

Cephでは、open, renameなどのメタデータに対する操作はメタデータサーバー(MDS)で一括管理されていますが、read,writeのようなIO操作はクライアントとOSDが直接通信するようになっています。
また、データがどのOSDに格納されているかという情報はメタデータサーバーでは管理していません。その代わりに、CRUSHを使うことで場所が計算できるようになっています。

メタデータの動的管理・分散管理

メタデータに対する操作はCeph全体の負荷の半分にもなる可能性があるので、メタデータを効率的に管理したいところです。そのために、Dynamic Subtree Partitioningという手法を使用しています。

RADOS (Relaiable Automatic Distributed Object Storage)

Cephは数千を超えるデバイスを持つシステムなので、以下のことが想定されます

  • バイスが追加、削除される
  • 故障が頻繁に起こる
  • 大きなデータが追加・移動・削除される

なので、Cephはデータのmigration,replication,failure detection, failure recoveryの機能を持っています。(詳細は後述)

クライアント操作から見る Ceph

capability

クライアント操作時には、"capability"という操作許可が各クライアントに発行されます。
例えば次のようなcapabilityの移動があります。

read時:
クライアントがread用にopen操作をMDSに要求すると、MDSがreadのcapabilityを与える。
capbilityを与えられたら、クライアントはメタデータ情報を使って、OSDクラスタへアクセスして、データを取得する。

write時:
writeのためのopenの場合には、writeのcapabilityを与える。
その後にクライアントはデータを変更して、closeする。
close操作の時に、MDSはファイルサイズを更新して、capabilityを破棄する。

同期

POSIXでは、読み込み時には書き込み済みのデータを読み込むこと、書き込みはatomicであることを要求しています。つまり、操作は発生順(order of occurrence)に結果を持つことを要求しています。
しかしCephでは、「複数書き込む場合」や、「書き込みと読み込みが同時に発生した場合」には、キャッシュ読み込みとバッファ書き込みのcapabilityが取り消されて、各操作の同期が強制されます。
この方法は、同期IOになるので遅くなりますが、通常のユースケースでは読みと書きが同時に起きることは少ないので、許容できると判断しています。
ただし、許容出来ない場合に備えて一貫性を犠牲にする選択肢も有るようです。

名前空間に関する操作

名前空間に関する「読み(readdir, statなど)」、「書き(unlink,chmodなど)」の操作はMDSに対して行われますが、ロックはありません。これは、シンプルさと最適化を求めた結果だそうです。
例えば、ls -lのようなreaddir+statの操作はよく実行される操作ですが、巨大なディレクトリに対してはパフォーマンスキラーです。
なので、デフォルトではキャッシュが使われます。
そのせいで、一貫性が損なわれますが、パフォーマンスのためには歓迎される犠牲だとして採用しています。

動的に分散されるメタデータ

Cephのメタデータは分散されつつも動的に場所が変わり、以下の特性があります

  1. ディレクトリのコンテンツは同一のOSDクラスタにある
  2. MDSに配置されるデータはアクセス量に従って動的に変化する(どこかのMDSにアクセスが偏る場合は、一部のディレクトリを別のMDSに移動します)
  3. メタデータの一貫性のポリシーは、security(ownerやmode),file(size,mtime),immutable(inode number, ctime, layout)の3種類存在し、目的に適したものを使っている。
  4. 大量のアクセスが同一のディレクトリやファイルに来た場合には、レプリケーションを拡大して、クライアントにレプリケーションへアクセスするように指示する

Distributed Object Storage

クライアントやMDSにとって、OSDクラスタは論理的に一つのストレージであるとみなして扱います。
それを実現するために、以下の工夫をしています。

CRUSH (Controlled Replication Under Scalable Hashing)

ファイルはobjectに分解され、objectはPG(placement group)というグループに割り当てられます。PGはCRUSHを使用して、各OSDに割り当てられます。
CRUSHでは、PGとPGに紐づくOSDのリストがあればデータを持つOSDがわかるようになっています。
この仕組みのおかげで、クライアントやMDSは独立して保存場所を計算することができ、メタデータが持つ情報の更新が少なく済むようになっています。
つまり、CRUSHでは、distribution(どこにデータを置くべきか)とlocation(どこにデータが有るか)の問題を同時に解決しています。

レプリケーションとデータ保全

データは複数PGにレプリケーションされます。
クライアントがプライマリPGのOSDにデータを送ると、対象のOSDは受け取ったデータをレプリケーションにも流して、待機します。
そして、レプリケーションの書き込みが終わってから、クライアントに完了を知らせます。
こうすることで、複数デバイスへの書き込みが保証されます。

Failure detection

ディスク障害などはODSから障害を通知しますが、ネットワーク障害の場合には各OSDのピアが生存確認できなくなった時に、中央に報告が上げられます。
そして、中央がシステム障害なのか一時的なものなのかを判断します。
Cephにはdownとoutの2つの障害状態を用意していて、OSDが通信できなくなるとdownとなりprimaryから外されます。そのままdownの状態が続くようだと、outの状態に遷移してPGには別のOSDが割り当てられます。
このように状態をもつことによって、(停電でOSDの半分がダウンするなど)大規模な障害が起きた時に状態を"down"に留めることによって、大規模なデータの再配置を避けることができるようになっています。

OSDクラスタ情報の更新

Cephの持つクラスタ情報がOSDの追加や削除によって変更されると、各OSDは自身の情報との差異に気づき次第、OSDの「あるべき姿」へと変化します。(「あるべき姿」になるために、プライマリの変更やデータの移行などが発生します)
このように、各OSDは独立して変化するので、あるOSDが落ちた場合には、影響を受けた各PGは並行して復帰します。

EBOFS (Extent and B-tree based Object File System)

Cephでは、メタデータとデータのatomicな操作が出来なかったので、ext3(古い!)のような既存のファイルシステムは使わずにEBOFSというものを作ったそうです。
EBOFSは以下の特徴を持ったファイルシステムです。

  1. atomic transactionをサポートすると同時に、ディスクへの書き込みは非同期に行われる。
  2. (既存のファイルシステムが時間を開けるのに対して)ディスクへのflushは積極的にスケジューリングされ、IO操作が不要になったときにはキャンセルすることもできる。
  3. オブジェクトのディスクへの配置、block allocation, index collectionには、B-treeが使われている。

終わりに

以上、"Ceph: A Scalable, High-Performance Distributed File System"の内容でした。
今回の記事のように、このブログではプログラムを作る時の参考になることを書き続けるつもりです。
もし興味があれば、twitterやブログのフォローしていただけると嬉しいです。

巨大企業のサーバー構成や内部ツールを覗く

はじめに

この記事は設計・アーキテクチャ Advent Calendar 2018の1日目の記事です。

大きなサービスを支えるのは一筋縄では行かず、考えることは多くあります。しかし、ありがたいことに巨大な企業の中にも自社のサーバー構成やそれを支えるツールを公開している企業があります。
この記事では、彼らの叡智に触れるため、有名企業の事例を取り上げ要約をします。
各事例には元記事へのリンクを書いているので、興味があればリンク先も覗いてみてください。

※新しいものばかりではないので、古くなっていたり既に別の方法に移行している可能性があることに注意してください。


LINE: 25k/secのスパイクをさばくアーキテクチャ

元記事: 25K request/secをさばいた「LINEのお年玉」のアーキテクチャの裏側

最初に紹介するのは、LINEが2018年に実施した、「LINEのお年玉」というイベントをさばいたアーキテクチャです。
この記事では、アーキテクチャの他にイベントで発生した問題についても書かれています。

Kafkaを軸とした多段構成

f:id:mrasu:20181130233406p:plain

「LINEのお年玉」では、即時に反映する必要が有る処理を前段で受けつつ、非同期でも問題ない処理をKafka(Streams)に流す構成を取ったそうです。
「LINEのお年玉」はスパイクが見込まれるイベントであることから、キューを挟むことによって、前段の負荷を減らして遅延処理できるように注意した構成ですね。
また、キューとは別にバッチ処理も行ったようです。
仕様技術はKafka, Prometheus, Fluentdです。

WeChat: アクションに応じた負荷制御システム DAGOR

元記事: Overload Control for Scaling WeChat Microservices

次は、中国でLINEのようなサービスを展開するWeChatです。
WeChat内で実装されている負荷制御に関する論文の内容が興味深かったので紹介します。
※ 元記事を読んだのではなく、The morning paperにあったまとめを読んだだけなので、原文とは異なった解釈をしているかもしれません。

負荷が高くなった場合は、優先度の低い行動に対する応答を止める

LINEと同じく、WeChatでも正月(春節旧正月)のアクセスは平常時を大きく超えるらしく、DAGORという負荷制御システムを実装して負荷に対応しているそうです。
DAGORは事前に各サービスの優先度を設定し、負荷の高まりを検知した時に

  • 処理可能な基準を変更することで、特定の優先度以上のサービスへの処理は止める
  • 大きなボトルネックが特定の優先度上にある場合、ユーザー単位に優先度を割り当てることによって処理を止めるユーザーを選択する

という動作をします。
下の図の矢印部分に制御が入るイメージです。

f:id:mrasu:20181130233835p:plain

これによって、「支払い処理は動作するが、動画のアップロードは出来ない」という制御を自動化で出来ると共に、サービス間連携の流れを把握せずにDAGORの運用することが可能になります。
ちなみに、ユーザー単位に優先度を設定することに関して、セッション単位に優先度を割り当てても良さそうな気がします。しかし、セッションを使用すると、ユーザーが「エラーが出たら再ログインし続ければ治る」ということに気がつき、再ログインを繰り返してしまうそうです。結果UXの低下につながるので、セッション単位での優先度割当は使わないそうです。

Salesforce: マルチテナントを支えるDB運用

元動画: Salesforce Multi Tenant Architecture: How We Do the Magic We Do
別スライド: SlidShare

次は、マルチテナントの運用で有名なSalesforceです。
SaaS企業ではほぼ必須になるマルチテナントですが、Salesforceは奇抜なデータ構造を持っています。
この発表では、彼らの「データとメタデータを分けたテーブル設計」とそれに伴う処理方法や、彼らの持つスケーラビリティ性を説明しています。

DataTables vs MetadataTables vs SpecializedPivotTables

f:id:mrasu:20181130234451p:plain

通常のテーブル設計では、テーブルは「事前に決めた対象分野のデータ」を入れるためにあると考えますが、Salesforceではあらゆるデータを放り込む「Data Table」と、そのテーブルの行と列の「意味」を保持する「Metadata Table」に分かれています。
さらに、インデックスや制約を保持する「SpecializedPivotalTable」なども存在します。

つまり、DataTableの各列に意味はなく、列名もテナントのidなどを除いて、Value1,Value2,Value3...となっています。Value1が何を表すかはMetadataTableを参照することで確認できる様になっています。
この構成により、どんなデータにも対応可能になり、柔軟性を発揮しているそうです。

Dropbox: 全文検索エンジン Nautilus

元記事: Architecture of Nautilus, the new Dropbox search engine

次は、Dropboxが内部で使用している全文検索エンジンNautilus」についてです。
Dropboxともなるとデータは地理的に分散されています。加えてユーザー毎にシャーディングするわけに行かないせいで、検索が大変そうです。

Indexing vs Serving

Nautilusは「Indexing」と「Serving」の2役に分かれて動作しています

f:id:mrasu:20181130234625p:plain

Indexingは、保存しているドキュメントからメタデータを生成して検索出来るようにすることが仕事です。
メタデータは、「ドキュメントから直接取り出したもの」と、「そこから更に加工されたもの(加工するものをannotatorという)」で構成されています。
さらにメタデータ作成とは別に、転置インデックス作成のためのoffline buildが別のタイミングで実行されます。
メタデータ作成とインデックス作成が分離しているので、新規annotatorに対するカナリアリリースや既存ドキュメントへの反映が、特別な処理をすることなく実現できています。

ServingではOctopusというシステムを中心に、

  • Nautilus上での検索
  • 外部サービス(Dropbox paperなど)への検索依頼
  • 結果のランク付け
  • アクセスコントロール

が実行されます。
このように、IndexingとServingの2役に分割されていることで、両者が独立して動けるようになっています。

Netflix: コンテナマネジメントシステム Titus

元記事: Titus: Introducing Containers to the Netflix Cloud

最後はNetflixが内部で使用しているコンテナマネジメントツール「Titus」です。
NetflixAWSを使用していることから、Kubernetesとは違い、AWSの各種サービスと連携する事を前提としています。

既存システムを考慮したコンテナ移行

Netflixがコンテナへ移行する際には、既存インフラとの連携や「小さく変える」というのが重視されたようで、Titusでは以下を考慮して作られたそうです。

  • 変更無しで、既存のアプリケーションがコンテナ上で動く
  • コンテナに乗せたアプリケーションが簡単に他のアプリケーションやAWSサービスに繋げられる
  • バッチやジョブが同じリソースプール上で動く
  • 効率的で信頼性がある

Titusは、EC2インスタンス内で動いている「Titus Agent」と、インスタンスにコンテナを配置する「Titus Master」、外からリクエストを受け付ける「Titus API」で構成されています。

f:id:mrasu:20181130234707p:plain

ミドルウェアでは、ZookeeperがMasterのリーダー選出を、Cassandraが永続化を担当しています。

AWS連携

TitusはAWSのサービス(S3やSQS)を使うため、各コンテナのIAMを管理したいところですが、IAMはインスタンス単位で制御されています。
そのため、TitusAgentがプロキシとなり、各コンテナに必要な情報のみをコンテナに渡しています。
またコンテナのIPについて、各コンテナは同一VPC内の固有IPを割り当てられます。それによって、ポート管理やゲートウェイ管理、セキュリティグループ管理が簡単になっています。

他にも、ワークロードの違うアプリケーションを両立させる管理方法や、既存のデプロイツール(Spinnaker)への連携、CloudWatchに連動した独自オートスケール管理といった興味深いトピックが以下の記事で公開されています


まとめ

以上、巨大企業のサーバー構成や内部ツールに関する記事を紹介しました。
今回の記事のように、このブログではプログラムを作る時の参考になることを書き続けるつもりです。
もし興味があれば、twitterやブログのフォローしていただけると嬉しいです。

「F1 Query: Declarative Querying at Scale」 - Google内部のクエリ実行プラットフォームF1の動き

はじめに

以前Dynamoの論文を読んだので、ついでにGoogleのF1についての論文読んだ。
Dynamoは古かったが、こちらは2018年発表と新しい。
以下のリンクから手に入る。
https://ai.google/research/pubs/pub47224

F1はSpannerと一緒の文脈に居ることが多くSpannerをいじるためのものだと考えていたが、GoogleSpreadSheetなど複数ソースをまたいで動作するらしい。
論文の内容は全体を通して興味深い内容だった。
この論文では、

  • どのようにソースをまたいだ動きを実現したか
  • OLTP・OLAP・ETLという、別特性の動きをどのように一体化したか

を書いている。

以下、特に興味を引いた1章から4章の内容である。


F1の目的

F1は、社内で必要な機能である以下の3種全てを提供することを目的としている。

  1. 少ないレコードを対象としたOLTP
  2. low-latencyなOLAP
  3. データの変換やロードを行うETL

個々の機能について見れば既に実現されているものであるが、F1の新規性はDBMSやDremel(Google製のanalytical queryに特化したクエリエンジン)の機能を如何に(現代的なアーキテクチャ上で)統合したかにある。
また、初期段階ではSpannerとMesaのみをサポートしていたが、現在ではGoogleSpreadsheetやBigtableといった別のフォーマットにも対応している。

つまり、F1に対する要求は以下である

  1. Data Fragmentation
    Googleにはデータ管理のための方法が(SpannerやBigtableなど)数多く有るので、それらを横断して処理したい
  2. Datacenter Architecture
    F1は個別のマシンやクラスタではなく、データセンターに対して実行するものとして作られている。
  3. Scalability
    サイズ、速度、信頼性、コストなどの要求水準がユースケースによりバラバラであるが、各種に対応する
  4. Extensibility
    クエリとして書きづらい処理が必要になることがあるので、ユーザー定義関数のような拡張性を備える

F1の動き

具体的なF1の動きは、以下の図で表される。
「F1 Server」、「F1 Master」、「F1 Worker」が主な登場人物である。

f:id:mrasu:20181028215037j:plain

つまり、F1 Serverがクライアントからリクエストを受け付けたのち、

  • クエリが小さいなら、その場で実行
  • クエリが大きければ、workerに実行させるようスケジューリングする
  • 更に大きければ、MapReduceを使う

と、処理方法が3種類有る。
F1 Masterは、各データセンター内のF1のサーバー達を監視する。
ちなみに、F1 ServerとF1 Workerはデータを持たないので、台数を増やせばスケールする。

具体的なクエリ実行の流れは以下の様になる

  1. ClientからServerへリクエス
    F1 ClientがF1 ServerにSQLを送る。
    リクエストを受け取ったデータセンターよりも(対象データを保管している場所が近くて)ふさわしいデータセンターが有る場合は、それをクライアントに通知する。
    別のデータセンターを指定された場合は、クライアントはそのデータセンターに向けて再度SQLを送る。
  2. 実行プランの作成
    受け取ったSQLを物理的、論理的最適化を考慮した実行プランを生成する。(クエリ最適化の詳細はブログに書かないので本文を参照)
    クライアントはSQLを送るときにInteractiveかBatchのいずれかのMODEを指定し、オプティマイザはMODEによってどのように実行するか変化させる
  3. クエリ実行
    • Interactive 実行
      クライアントのリクエストに対して同期的に返答するモード
      クエリオプティマイザはヒューリスティックに、その場で実行する(centralized)かworkerを使用してクエリを分散させる(distributed)かを選択する
    • Batch 実行
      大量のデータ処理を行い長時間の実行が必要になる場合に使用するモードで、実行計画はF1 Server以外に保存される。
      また、MapReduceフレームワークを使用して非同期に分散される
  4. データアクセス
    F1 Serverはデータセンター毎に割り当てられているが、各F1 Serverは別データセンターのデータにもアクセス可能。
    前述のように、対象データはSpannerでも、CSVや圧縮データ(Capacitorとか)でも良い。
    対象データがconsistent readやrepeatable readをサポートしていれば、それをサポートする。
    データソースが多岐に渡るため、joinなどを行うために必要なメタデータをglobal catalog serviceという場所に置いている。
    また、クエリ発行時にDEFINE TABLEを通してメタデータを定義することも可能。
  5. Data Sinks への格納
    クライアントが要求した場合、実行結果をData Sinkへ格納することができる。
    Data Sinkには、データソースに応じて色々な形式でデータが保管される。
    必要であれば、出力先を変えたり、session-local な一時データとして保存することも可能
  6. クライアントへ返答

ちなみに、クエリはSQL2011標準に従い、拡張としてArrayなどをサポートしている。(つまり、joinや集計、window関数が使える)

閑話休題

長くなってきたので小休止。
ここからが、論文のテーマである「どのようにクエリを実行するか」の詳細が始まるので、一旦整理する。

F1に送られてきたSQLはF1 Server内で実行計画へと変形される。
実行計画の作成時において、複数の選択肢の中から実行方法が選択される。
実行方法が変わるとF1の動作は大きく変化し、この変化こそがOLAP,OLTP,ETLの全種サポートを可能にした根本である。

実行方法は下のように分類できる

  1. Interactive モード
    同期実行するモードで、少量のデータをさばく場合に使用される。
    クライアントがInteractiveモードを選択した場合、オプティマイザはクエリを基に2つの実行方法をのいずれかを選択する
    1. Centralized execution
      1スレッドで完結し、クライアントからのリクエストを受けたF1 Serverがその場で実行する
    2. Distributed execution
      Workerを通して、複数のサーバーにクエリを実行させる。
      F1 Serverは各Workerのコーディネーターとして機能する
  2. Batchモード
    非同期実行するモード
    大量のデータを扱う場合に使用される。
    長時間実行されることを想定したフォールトトレラントアーキテクチャを持っている。

以下は、各動作内容を詳細化しているので、この分類を頭に置いた上で、各論を見ていく。

Interactive + Centralizedの動き

1スレッドで完結するモード。
SQLを基にした実行計画の内容を、単一スレッド上で各演算が実行される
論文では、ScanとJoin、Sortの動きを紹介している。

f:id:mrasu:20181028215055j:plain

  1. Scan
    データソースに応じて動きが異なる。
    フルスキャンしか出来ないものもあれば、key-based index lookupをサポートしているものもある。
    他にも、Array型を利用する挙動やProtocolBuffersを利用した挙動もある。
  2. Join
    nested loop join, hash join, merge joinなどが使える。array joinという、array型用のjoinも有る
    また、Spannerが持つストリーム用のmerge joinも使える
  3. その他
    Projection, Aggregation, Sort, Union, Window Function, Limit, Offsetなどがある。

Interactive + Distributed の動き

Workerを使用して、並列実行するモード。
実行計画は処理の一部を表したFragmentに分けられ、FragmentはDAGになる。

f:id:mrasu:20181028215109j:plain

特徴は

  • Fragmentは各ノードに割り当てられ、マルチスレッドで動き、場合によっては複数Workerを使って一つのクエリを処理することも有る。
  • 各演算には(joinや集計のために必要な列のような)必要条件があり、必要条件に入力互換性がある場合は両演算は1つのFragmentにまとめられ、違う場合はオプティマイザが変換用の演算を間に挿入する。
  • 各Fragmentに割り当てられるノード数は、子Fragmentが必要としたノード数の大きい方の値を取る。
  • Fragment内の各ノードでは、各行の計算が終わったら、結果をどのノードに送るかを計算するpartitioningという作業が存在する。
  • partitioning中には転送データを削減する処理も入っていて、例えばCOUNTを取る場合には各ノードの中でCOUNTを取り、その結果を親Fragmentに送ることで転送量を減らす。

Batch モード

Googleでは大規模データのために、MapReduceやFlumeJavaを使ってきたが、メンテナンス性や再利用性から、宣言的なSQLを使えるようにした。
クエリの最適化や実行プランの生成時はInteractiveと同じ処理を実行している。

Batchモードには以下の特徴がある。

  • F1 Serverがクエリを管理するのではなく、中央集権で動いている。
  • 途中処理を保存することでF1 ServerやWorkerが止まった場合も継続可能
  • Fragmentの実行タイミングはバラバラである (InteractiveではRPCにより同期している)
    フォールトトレラントMapReduceを使用していて、実行結果はColossusに保存され共有される。
    パイプライン処理ではないので、MapReduce処理における依存関係がなければ複数のFragmentに対するMapReduceが同時に動くことが出来る
  • BatchモードではMapReduce間で多大なデータ移動が発生するので、Hash joinではなく、Index nested-loop join(lookup join)にしてIOを下げることがある
  • クライアントとの接続が切れても続行可能

Batchの流れは下の図のようになっている

f:id:mrasu:20181028215119j:plain

  1. F1 Serverがクライアントからリクエストを受け付ける
  2. 実行計画をSpannerで作成された、QueryRegistryに登録する。
    QueryRegistryは実行計画だけでなく、Batch関連のメタデータを保存している。
  3. 登録されたクエリはQueryDistributerが負荷やデータソースを考慮して、各データセンターに分配する。
  4. QuerySchedulerがクエリを発見したら、実行すべきタスクの依存グラフを作成する
    各データセンターでは、QuerySchedulerが定期的にクエリを監視している
  5. データセンターに余裕ができ次第、QueryExecuterにタスクを実行させる
  6. QueryExecuterがMapReduceを使用してタスクを行う

当然、これらの機能は耐障害性を持っていて、たとえデータセンターが死んでも別のデータセンターにクエリが再配布される。


終わりに

以上が、「F1 Query: Declarative Querying at Scale」の内容である。
この論文には、他にも最適化やUDFなどによるクエリ拡張、パフォーマンスの話が載っているが、長くなるのでここまでにする。

この論文は詳細に書かれていて興味深い工夫もあり面白かった。
BigQueryがGoogleSpreadSheetに対応したというのも、同じような仕組みなんだろうなと想像が膨らむ内容でもあった。