はじめに
はじめまして、スリーシェイクインターン生の有馬祐二と関根弘晃です。私たちは2024年3月18日~3月29日に開催された短期インターンシップに参加しました。私たちのグループではインターンの期間でテレメトリデータの処理等の標準化ツール、OpenTelemetryの概要とその中身について調査、整理を行いました。
自己紹介
有馬 祐二
法政大学 理工学部 応用情報工学科 学部4年の有馬祐二です。大学では、機械学習を使った植物の病害診断の研究を行っています。インフラ系にも興味があって参加させていただきました。
関根 弘晃
千葉工業大学 情報科学部 情報ネットワーク学科 学部2年の関根弘晃です。ネットワークインフラの構築や運用に興味があり、本インターンで少しでも色々なことを吸収して、自分の学びに生かしていきたいと思い、参加させていただきました。
TL;DR
近年、オブザーバビリティが注目されており、その計装の標準化ツールとしてOpenTelemetryがあります。Signalやコンテキスト情報を管理するProviderやPropagatorを作成し、setすることで計装を行うことができます。
OpenTelemetryには、OpenTelemetry Collectorと呼ばれるOpenTelemetryが提供するデータ管理システムがあります。OpenTelemetry Collectorでは、言語ごとのライブラリを実装する手間や、属性ごとにデータを管理する方法を提供しています。OpenTelemetry Collectorは複数のPipelineで構成されています。Pipelineの構成要素としては、Receivers、Processors、Exportersが存在します。Processorsはデータの処理を行う部分で、データをこしとったりリソースを節約することに有用です。今回はいくつかの例を挙げて一部を実際に動作させてみました。Filter Processorという特定のProcessorに注目し、どのような記述の仕方をするのか、どのような条件のもとデータを取り扱うことができるのかに着目し、調査しました。
実装では、HTTP通信するサービスとgRPCで通信するサービスのそれぞれで計装を行いました。3つ目の実装として、計装結果の表示の前にテレメトリーデータの管理を行うOpenTelemetry Collectorを繋げてみました。そして4つ目の実装として、3つ目の実装において、テレメトリーデータに対し、Filter Processorを設定しました。
オブザーバビリティ
オブザーバビリティとは
オブザーバビリティとは、観測 (Observe)と可能性(ability)が組み合わさった語で可観測性と訳されることがあります。システムを管理する場合、システムを監視して内部状態を把握する必要があります。しかし、分散システムのように複雑なシステムを運用する場合に、単一のシグナルでは、システム全体の内部状態を把握することが難しくなります。
例えば、あるリクエストをした時に多くのサービスを横断する処理を行うアプリケーションを考えてみます。このときに正常に処理されなかった場合、その時にどこで問題が起きたのか、原因は何であるかの特定をする必要があります。オブザーバビリティ導入以前の監視では、エラーや利用率の通知を行っていましたが、システムの全体や一部を抜きだしたものであるため、内部状態を知ることが難しいといえます。
そこで、システムの出力を観測し、内部状態を理解できる状態にする仕組みであるオブザーバビリティが求められます。オブザーバビリティを利用することでシステム全体の内部状態を理解することができます。オブザーバビリティには、3本柱と呼ばれるTrace, Metrics, Logという3種類のデータがあります。
Traceとは
Traceはアプリケーションの経路とその内容を表したもので、いくつかのSpanを所持しています。「どこで問題がおきているか」を表しています。
Traceは、分散システム内の実行パスを参照し、Spanを1つのまとまりとして保存することで複数Spanの組み合わせによるエラーに対処します。Spanは、リクエスト内の各処理の情報、実行パス内の単一の要求を表し、個々のコンポーネントの処理を適宜記録することで、エラーが発生した箇所を特定しやすくします。
下図はOpenTelemetryの公式から引用してきた図で、ウォーターフォール図としてTraceを視覚化した図です。クライアントから/api、/payment Gatewayを通り、/DBにデータが流れることが分かります。1つのraceの中でそれぞれの場所でSpanを作成することで、システムの内部状態の理解につなげています。例えば、注目したいサービスを使うTraceを見るといったことや、あるTraceでエラーが発生した時にどこで不具合が起きたのかを見ることができます。
Metricsとは
Metricsは、実行時に取得されるサービスの測定値で、測定値を取得した瞬間はMetrics Eventとして知られ、これは測定値自体だけでなく、測定値が取得された時間と関連するメタデータでも構成されます。可用性とパフォーマンスの重要な指標であり、「何が起きているか?」を表します。
例:アクセス数、エラー率、CPU使用率
つまり、時系列データや割合などがMetricsに分類されます。
Logとは
サービスまたは他のコンポーネントによって発行されるタイムスタンプ付きのメッセージです。特定のユーザ要求やトランザクションに関連付けられおらず、どこから呼び出されたか等のコンテキスト情報に欠けます。「どんな問題が発生したか?」を表します。
例:アクセスLog、例外TraceのLog
テキストベースであるため、自由度が高い一方で、未知のエラーや複雑なシステムにおける難解なエラーに遭遇した場合、エラーの修復箇所が分かりづらいという欠点があります。
OpenTelemetry
OpenTelemetryとは
システムの内部状態を理解するために出力させるデータをテレメトリーデータといいます。テレメトリーデータを受け取り、可視化などを行い、理解を可能にするオブザーバビリティバックエンドは多く存在します。そこで、計装する際に、システムの言語やインフラストラクチャ、ランタイム環境に依存しないテレメトリーデータの生成、収集、管理、エクスポートを行うフレームワークやツールキットが求められます。それがOpenTelemetryです。
以下の図はOpenTelemetryの公式にあるOpenTelemetryを使ったシステムの構成例です。作成したマイクロサービスや一般的に使われているインフラ、クライアントの計装をOpenTelemetry Collectorに接続し、データベースなどに接続します。
OpenTelemetryのうれしさ
OpenTelemetryを使う利点は、標準化の幅広さにあると考えています。
demoはこちらから実行することができます。
次の図はdemoのアーキテクチャです。様々な言語で構成されたシステムですが、OpenTelemetryのみで計装を行っています。こうした対応範囲の広さがOpenTelemetryの利点ということができます。また、AWSやK8sなど使用するインフラストラクチャにとらわれていないことも構成例の図からわかると思います。
対応するプログラミング言語
公式のドキュメントにある言語は以下になります。しかし、他の言語についても、実装可能であるように設計されています。
- C++
- .Net
- Erlang/Elixir
- Go
- Java
- JavaScript
- PHP
- Python
- Ruby
- Rust
- Swift
オブジェクト指向の言語から、フロントエンドの言語など様々な言語がドキュメントでサポートされていることが分かります。
OpenTelmetryのコンポーネントについて
OpenTelemetryでテレメトリデータを利用する際には、
- 受信
- 処理
- 送信
という段階がそれぞれ存在します。これらは各言語のSDK、APIでそれぞれ実装を行います。
Trace、Metrics、Logなどのsignalに関して管理を行う場所がProviderです。また、BaggageやTrace Contextというようにspan間で受け渡されるコンテキスト情報はPropagatorで管理されます。
PropagatorやProviderはテレメトリデータそのもの実態を表すものではないということに注意が必要です。Traceの場合、TracerをStartさせることでSpanの作成などが行えますが、TracerProviderで行うわけではありません。また、Propagatorについても、Carrierとして存在している媒体にPropagatorが注入や抽出などの処理を行います。
これら各種ProviderとPropagatorをSetすることで利用することができます。
OpenTelemetry Collector
OpenTelemetry Collectorとは
OpenTelemetryが提供するベンダーに依存しないテレメトリーデータの管理方法です。OpenTelemetryのライブラリは様々な言語で提供されている一方、データの送信や提供されているライブラリの組み込みに関しては一元化されておらず、開発者は送信するバックエンド×プログラミング言語の分だけ必要とされます。また、OpenTelemetry自体の方も取得データに共通する加工や複数のバックエンドサービスそれぞれに対応させる必要があります。
それらのコストを削減し、データを一元化して取り扱えるようにしたのが、OpenTelemetry Collectorです。
OpenTelemetry Collectorの構成
概要
OpenTelmetry Collectorは主に3つの要素から構成されています。これらの設定はyamlファイルにて記述されます。検証では、今回の確認には公式から配布されているOpenTelemetryのdemoを利用しました。また、こちらがgithubのリンクになります。
Receivers
1つ以上のソースからテレメトリーデータを受け取るための機能です。それ以外にも、サードパーティーから送られてきたデータを内部的にTraceとSpanに変換する機能もあります。
以下はdemoのReceiverの部分です。
receivers:
otlp:
protocols:
grpc:
http:
cors:
allowed_origins:
- "http://*"
- "https://*"
httpcheck/frontendproxy:
targets:
- endpoint: http://frontendproxy:${env:ENVOY_PORT}
Otlp
でgRPCとHTTPの二つのプロトコルをサポートしています。cors
でオリジン間リソース共有ポリシーを定義しており、httpcheck
でfrontendproxy
エンドポイントへのHTTPリクエストを監視しています${env:ENVOY_PORT}で
環境変数の値を取得しています。
Processors
Pipeline処理の次の段階で、Receiverから流れてきたデータを受け取り、Batch処理やFilter Processorを設定し、次の段階に送るデータを選択したりする機能です。
以下はコード例です。
processors:
probabilistic_sampler: #Processor名を指定
sampling_percentage: 15 #Processorの処理を設定(データの15%をサンプリングする設定)
tail_sampling: #Processor名を指定
decision_wait: 10s #Spanを蓄積する時間を指定
num_traces: 10 #メモリに保持するTraceを指定
policies: #Processorの処理を設定
[
{
name: longer-than-600s, #Processorで用いる処理の設定を記述する
type: latency, #条件を定める
latency: { threshold_ms: 600 }, #定めた条件の閾値を設定する
},
]
batch:
Processorsの中に、まずprobabilistic_sampler:
やtail_sampling:
などのProcessor名、その下に条件などを書いていくという形になっています。
今回の例ではprobabilistic_sampler:
にデータ全体のランダム15%を取得するという書き方をしています。tail_sampling:
ではまずdecision_wait:
で最初のSpanからサンプリングの決定を行うまでの時間を規定し、num_traces:
メモリに保持されるTraceの数を規定します。policies:
でどのような要件を規定するかを記述します。name:
は自分でわかりやすい名前を書き、その下のtypeでどの要件を決めるか書きます。今回はlatencyを指定していますが、probabilistic_sampler:
一定の割合のサンプリングを行うものや、status_code
のようにステータスコードのみを流すなど、このProcessor一つとっても様々な機能があります。書き方は以下の通りです。
processors
プロセッサ名
プロセッサにかける条件
プロセッサ名
プロセッサにかける条件
プロセッサにかける条件:
[
{
プロセッサにかける条件を記述する
},
]
Exporters
Processorsから送られてきたデータを、送信先のバックエンドが対応しているデータ形式に変更し、送信します。以下はdemoのExportersの部分になります。
exporters:
debug:
otlp:
endpoint: "jaeger:4317"
tls:
insecure: true
otlphttp/prometheus:
endpoint: "http://prometheus:9090/api/v1/otlp"
tls:
insecure: true
opensearch:
logs_index: otel
http:
endpoint: "http://opensearch:9200"
tls:
insecure: true
endpoint
で送信先を決めてデータを送信しています。
設定方法について
以下のコードは、先ほど一部を乗せたdemoの設定を書くyamlファイルの初期状態になります。
receivers:
otlp:
protocols:
grpc:
http:
cors:
allowed_origins:
- "http://*"
- "https://*"
httpcheck/frontendproxy:
targets:
- endpoint: http://frontendproxy:${env:ENVOY_PORT}
exporters:
debug:
otlp:
endpoint: "jaeger:4317"
tls:
insecure: true
otlphttp/prometheus:
endpoint: "http://prometheus:9090/api/v1/otlp"
tls:
insecure: true
opensearch:
logs_index: otel
http:
endpoint: "http://opensearch:9200"
tls:
insecure: true
processors:
batch:
connectors:
spanmetrics:
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [otlp, debug, spanmetrics]
metrics:
receivers: [httpcheck/frontendproxy, otlp, spanmetrics]
processors: [batch]
exporters: [otlphttp/prometheus, debug]
logs:
receivers: [otlp]
processors: [batch]
exporters: [opensearch, debug]
上に書いたような設定を行った後に、service
の枠にそのコンポーネントを書き込んで使います。
例えば、traces
にprobabilistic_sampler
を設定したい場合はservice.pipelines.traces.processorsに書き込みます。
traces:
receivers: [otlp] #TraceのReceiverに設定するコンポーネントを設定
processors: [batch,probabilistic_sampler] #TraceのProcessorに設定するコンポーネントを設定
exporters: [otlp, debug, spanmetrics] #TraceのExporterに設定するコンポーネントを設定
また、これら3つの要素を組み合わせたものをPipeLineと呼びます。Trace、Metrics、Logすべてに設定ができ、PipeLineの数を調節することもできます。例えば以下のコードでは、traces
に関して、Pipelineを二つ設定しています。
service:
extensions: [health_check, pprof, zpages]
pipelines:
traces: # tracesタイプの一つ目のPipeline
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [jaeger]
traces/2: # tracesタイプの二つ目のPipeline
receivers: [otlp]
processors: [batch]
exporters: [opencensus]
Processors:
receivers: [otlp]
processors: [batch]
exporters: [otlp]
logs:
receivers: [otlp]
processors: [batch]
exporters: [otlp]
図に表すとこのようになります。
OpenTelemetry Collector のまとめ
OpenTelemetry Collectorの利点として
- よくあるプロトコルのサポートなどがあり、そのまま使用可能
- 様々な負荷や構成の中でも安定して動作する
- Collector自身が観測できる
- コアコードに手を入れることなくカスタマイズ可能
- Processorによってデータの取り扱い方を自由に変更できる
- 複数のバックエンドに送信可能(Jaeger, Prometheus, Zipkinなど)
- テレメトリーデータの管理が容易になる
- アプリケーション計装のExporterが1種類で済む
- 3つの要素をインターフェースとして定義しているため、データを標準化できる
Processor
Processorとは何か
先ほどのOpenTelemetry Collectorの説明でも触れましたが、Processorとは、OpenTelemetryのTrace、Metrics、Logそれぞれに処理をすることによって、特定のデータのみを表示させたり、メモリなどのリソースを節約することに役立ちます。
OpenTelemetryとしてはBatchとMemory Limiterのみをサポートしていますが、GithubのcontribのProcessorフォルダには23個Processorが存在します。(自分たちで作ることもできます)
データの所有権について
概要
Pipelineの中のデータの所有権には、独占的所有権と共有所有権の2つのモードが存在しています。2つのモードに関しては共有所有権にする場合Capabilities 内の MutatesData=false に設定します。これはyamlファイルの中にコネクターを実装する旨を書き、その内容をconfig.go内にプログラムとして書き込む形だそうです。
独占的所有権
独占的所有権では、データを一つのProcessorで占有します。そのProcessorはデータを自由に処理することができます。データは各Pipelineに渡される前にクローンされます。これによって安全にデータを変換できます。専有所有権モードにより、データを自由に変更する必要があるProcessorを簡単に実装できます。
(例:attributesProcessor)。
共有所有権
共有所有権では、特定のProcessorにデータを占有させません。この場合はすべてのProcessorが一つのデータを参照します。ConsumeTraces/ConsumeMetrics/ConsumeLogs 関数を介して受信した元のデータを変更することは禁止されています。Processorはデータを読み取ることはできますが、データを変更することはできません。データのクローニングを行わない分、リソースを軽くすることができます。データの監視のみが必要な場合などに使用することが推奨されています。
おすすめProcessorの一覧
こちらでおすすめのProcessorが記載されています。コレクターのREADMEには推奨プロセッサが書いてありますが、そこまで強い感じの言葉ではなく、「入れるといいよ」ぐらいのニュアンスと理解しました。
この機能もOpenTelemetryの公式から配布されているdemoに実際に設定して確認してみました。
Memory_limiter
その名の通り、 OpenTelemetry Collectorで使用できるメモリに制限をかけるProcessorです。ソフト制限とハード制限が存在し、ソフト制限を超えるとデータのドロップ、ハード制限を超えるとガベージコレクションが実行されます。
Batch
Span、Metrics、Logをバッチ処理することによって、データの送信タイミングを規定し、データの圧縮や帯域の確保を行ってくれます。
BatchはDemoに最初から搭載されていました。
以下はBatch処理のコード例となります。
processors:
batch:
batch/2:#二つ目の設定
send_batch_size: 10000
timeout: 10s
タイムアウトに関係なくバッチが送信されるsend_batch_size
とバッチをサイズにかかわらずに送信するtimeout
を導入しています。
Any sampling or initial filtering processors
データをすべてExportさせているとバックエンドや帯域の制約によっては動作が重くなったりしてしまう上に、どうでもいいデータを大量に見ることになってしまいます。そこで、サンプリングをかけるために何かしらのProcessorを導入することが推奨されています。サンプリングには二種類あり、全部のデータの中からランダムでサンプリングを行うものと、特定の条件や属性を満たしたものだけサンプリングを行うといったやり方があります。
Any processor relying on sending source from Context
今回例となっているKubernetes Attributes Processor
について紹介します。これはk8sのメタデータを管理して、その属性に応じて自動的にタグを設定します。Processorは自動的にk8sのリソースを検出して、そこからメタデータを抽出して関連するSpan、Metrics、Logにリソース属性として追加します。初期状態では
- k8s.namespace.name
- k8s.pod.name
- k8s.pod.uid
- k8s.pod.start_time
- k8s.deployment.name
- k8s.node.name
以上の属性がデフォルトで追加されます。
例えば、podのデータがどこから出ているのかを確認するためのFilterしたいとき、こちらのProcessorを用いて絞りたい特定のタグを自動で設定してくれるそうです。
このProcessorを用いることによって、データの可視化を容易にすることが可能です。
DemoにおすすめProcessorを入れて試してみた。
Any sampling or initial filtering processorsについて以下の例で試してみました。
例1: 全データの15%をExporterに送信するように設定されたProcessor
以下のコードは、全データの15%をランダムにサンプリングしExporterに送信するように設定されたProcessorのコードです。
processors:
probabilistic_sampler:
sampling_percentage: 15
下のグラフは、全データの15%をランダムにサンプリングしExporterに送信するように設定されたProcessorの設定前と設定後のReceiverの状態を表した結果のグラフです。
また、AcceptされたSpanとLogについてまとめると、それぞれ以下の表で示すことができます。
Spans Rate Processor設定前 | Spans Rate Processor設定後 | Log Records Rate Processor設定前 | Log Records Rate Processor設定後 | |
---|---|---|---|---|
Min | 306 | 305 | 14.0 | 14.3 |
Max | 345 | 338 | 16.7 | 17 |
Mean | 320 | 321 | 15.2 | 15.6 |
Span、Log両方とも、Processorの範囲外なので、ほとんど差は見受けられませんでした。
次に、全データの15%をランダムにサンプリングしExporterに送信するように設定されたProcessorの設定前と設定後のExporterの状態を表したグラフです。
また、debugに送られたSpanとLogについてまとめると、それぞれ以下の表で示すことができます。
Spans Rate Processor設定前 | Spans Rate Processor設定後 | Log Records Rate Processor設定前 | Log Records Rate Processor設定後 | |
---|---|---|---|---|
Min | 306 | 42.5 | 14.1 | 1.69 |
Max | 339 | 57.8 | 16.5 | 3.34 |
Mean | 318 | 48.9 | 15.3 | 2.24 |
processorの設定によってデータがサンプリングされた後なので、データの送信量が大幅に減っていることが確認できました。
以上の結果から、Receiverで受け取るテレメトリーデータの量はほとんど変化がありませんが、Exporterで送信されるデータ量が減少していることが分かります。
例2: 特定の条件のみをサンプリングするProcessor
以下のコードはエラーのみをサンプリングするProcessorのコードです。
processors:
tail_sampling:
decision_wait: 10s
num_traces: 100
expected_new_traces_per_sec: 10
policies:
[
{
name: test-policy-5,
type: status_code,
status_code: {status_codes: [ERROR, UNSET]}
},
]
今回はエラーをチェックするためにdemoに実装されているflagsの機能を用いてエラーを意図的に発生させて確認していきます。
以下はdemoにあるflagdのコードです
{
"$schema": "https://flagd.dev/schema/v0/flags.json",
"flags": {
"productCatalogFailure": {
"description": "Fail product catalog service on a specific product",
"state": "ENABLED",
"variants": {
"on": true,
"off": false
},
"defaultVariant": "off"
},
"recommendationServiceCacheFailure": {
"description": "Fail recommendation service cache",
"state": "ENABLED",
"variants": {
"on": true,
"off": false
},
"defaultVariant": "off"
},
"adServiceManualGc": {
"description": "Triggers full manual garbage collections in the ad service",
"state": "ENABLED",
"variants": {
"on": true,
"off": false
},
"defaultVariant": "off"
},
"adServiceFailure": {
"description": "Fail ad service",
"state": "ENABLED",
"variants": {
"on": true,
"off": false
},
"defaultVariant": "off",
"targeting": {
"fractional": [
{
"var": "session"
},
[
"on",
10
],
[
"off",
90
]
]
}
},
"cartServiceFailure": {
"description": "Fail cart service",
"state": "ENABLED",
"variants": {
"on": true,
"off": false
},
"defaultVariant": "off"
},
"paymentServiceFailure": {
"description": "Fail payment service charge requests",
"state": "ENABLED",
"variants": {
"on": true,
"off": false
},
"defaultVariant": "off"
},
"paymentServiceUnreachable": {
"description": "Payment service is unavailable",
"state": "ENABLED",
"variants": {
"on": true,
"off": false
},
"defaultVariant": "off"
}
}
}
adServiceFailure | 広告サービス | GetAds1/10の確率でエラーを生成します |
adServiceManualGc | 広告サービス | 広告サービスで完全な手動ガベージ コレクションをトリガーする |
cartServiceFailure | カートサービス | EmptyCart1/10の確率でエラーを生成します |
productCatalogFailure | 製品カタログ | GetProduct製品 ID を含むリクエスト: OLJCESPC7Zの時にエラーを生成 |
recommendationServiceCacheFailure | おすすめ | キャッシュが指数関数的に増大するため、メモリ リークが発生する |
paymentServiceFailure | 決済サービス | chargeを呼び出すとエラーが発生します |
paymentServiceUnreachable | チェックアウトサービス | PaymentService を呼び出すときに不正なアドレスを使用すると、PaymentService が利用できないかのように見えます。 |
defalutvariant
をonに変更すればその機能を使うことができます。
今回はpaymentServiceFailure
をonにし、エラーを発生させることで、先へ進めなくなるという機能を用いて検証します。
以下の画像で示している「place order」と表示されているボタンを押すと、エラーを発生させることができます。
以下のグラフは、エラーのみをサンプリングするProcessorの設定前と設定後のReceiverの状態を表したグラフです。
また、AcceptされたSpanとLogについてまとめると、それぞれ以下の表で示すことができます。
Spans Rate Processor設定前 | Spans Rate Processor設定後 | Log Records Rate Processor設定前 | Log Records Rate Processor設定後 | |
---|---|---|---|---|
Min | 306 | 305 | 14.0 | 13.7 |
Max | 345 | 338 | 16.7 | 16.5 |
Mean | 320 | 321 | 15.2 | 15.1 |
例1と同様にReceiverはProcessorの影響外なので変化はほとんどありません。
また、以下のグラフは、エラーのみをサンプリングするProcessorの設定前と設定後のExporterの状態を表したグラフです。
また、debugに送られたSpanとLogについてまとめると、それぞれ以下の表で示すことができます。
Spans Rate Processor設定前 | Spans Rate Processor設定後 | Log Records Rate Processor設定前 | Log Records Rate Processor設定後 | |
---|---|---|---|---|
Min | 306 | 0 | 14.1 | 13.7 |
Max | 339 | 0.543 | 16.5 | 16.5 |
Mean | 318 | 0.115 | 15.3 | 15.1 |
例1と同様にExporterで受け取るデータはProcessorの影響を受けます。ProcessorはSpanのエラー以外は送信しない設定のため、数字が極端に減少しており、グラフの波も周期性がなくなっています。しかしLogについてはProcessorが処理を行っていないため、ほとんど変化はありません。
以上の結果により、エラーのみを抽出できていると考えられます。
基本的なProcessorの書き方
processors:
プロセッサ名:
プロセッサの具体的な処理
プロセッサの具体的な処理
[
{
プロセッサの処理の設定等
}
]
推奨Processorの中はこのような書き方をしているものがほとんどでした。
Filter Processor
そもそもFilterとは
受け取ったデータに対して、何らかの条件や属性などを付与し、それらに基づいてデータの処理や加工を行って出力することです。それによって出力させるデータの量を削減して、CPUやメモリの使用リソースを削減したり、多くのデータの中から必要なデータを取り出す際の労力を減らすことができます。
Filter Processorとは
このプロセッサを使用すると、Span、Metrics、Logからなるテレメトリーデータをドロップさせるタイミングを決定する条件式を追加することができます。
ターゲットには5種類あります。
Spanは開始と終了のある動作のことを示し、Span Eventはスパンの中で何か起こった時のそれ自身を指します。
SpanとSpan Eventなどの同じ信号に対して適応される条件の場合、上位の条件に一致してドロップされた際は、下位の条件はチェックされません。また、後述するOTTLを用いて記述された条件の処理中に発生したエラーに対してProcessorの反応を決めるオプションをつけることもできます。
error_mode | 説明 |
---|---|
ignore | 条件によってかえされたエラーを無視し、ログに記録して先に進みます。推奨モードです。 |
silent | 条件によって返されたエラーを無視し、ログに記載せずに先に進みます。 |
propagate | 起きたエラーをパイプラインに返却します。結果的にそのペイロードはコレクタからドロップします。 |
特に指定がない場合、propageteが使用されます。
OpenTelemetry Transformation Language (OTTL) とはなにか
Processor に内容を記述する際に使われる記法として、OpenTelemetry Transformation Language(OTTL)が存在します。SQLに似た宣言型言語で書かれています。
以下はAuthorizationヘッダーがAttributeに含まれている場合、そのAttributeを各Signalから削除するという例です。
traces:
delete(attributes["http.request.header.authorization"])
metrics:
delete(attributes["http.request.header.authorization"])
logs:
delete(attributes["http.request.header.authorization"])
基本的な書き方は以下の通りです。
関数(コンテクスト[割り当てる対象])
コンテクストの以下に一覧が記載されています。
また、関数についても公式のGithubに一覧が記載されています。
実際に動かしてみる
この章ではFilter Processorの動作確認を行っていきます。
例として、Traceデータのうち、HTTPでのアクセスが行われているSpanをドロップする、という内容について検証してみます。
HTTPスパンの削除
filter:
error_mode: ignore
traces:
span:
- attributes["http.request.method"] != nil
このように設定を変更し、Jaeger で確認したところ、下図のようにServiceの数が17→8になっており取得できるSpanの数が減っていることが確認できました。
Filter Processorの基本的な書き方としては以下の通りです。
filter
eroor_mode:ignore,silent,propagateから選択する
traces:
span:
条件を書き込む
条件を書き込む
spanevent:
条件を書き込む
条件を書き込む
metrics:
...
実装方法
概要
コードはこちらにあります
実装を行った言語としては、比較的に新しいかつ、利用されているGo言語を選びました。Go言語はまだOpenTelemetryにおいて成熟しきっていませんが、今後Go言語によるOpenTelemetryを使った計装の需要が高まっていくと考えています。
OpenTelemetryでは、
- 実際にアプリケーションの処理を行う各種サービス
- OpenTelemetry Collector
を用意することによって実装することができます。
OpenTelemetry Collectorの詳細は後述の章に譲るとしますが、テレメトリーデータを取りまとめて受信、処理、送信するものです。そのため、OpenTelemetry Collectorを介さずに直接Jaeger等のオブザーバビリティバックエンドに接続することもできます。
簡単な例としてこちらの公式にあるrolldiceを改良したものを例として挙げます。
改良を加えた点として、stdoutで出力していたものをJaegerにつなげるという点です。
例1: サーバとJaegerを直つなぎしてHTTP通信でリクエストする
アーキテクチャ
アーキテクチャ図は以下の図になります。
RollDiceサーバを立てて、WebブラウザであるRoll Diceクライアントから接続します。そしてそのTraceをJaegerに直接送ります。
- Roll Dice サーバ
Roll Diceクライアントからリクエストが来た時にランダムな1~6の数字を出します。 その際にTraceを作成します。 - Roll Dice クライアント
httpでブラウザからhttp://localhost:8080/rolldiceで接続します。更新するごとにリクエストが出されて数字が変わります。 - Jaeger
Roll Dice サーバからOpenTelemetryProtocol (OTLP) でTraceを受け取り確認することができます。
ディレクトリ構造
ディレクトリ構造は以下のようになっております。
.
├── docker-compose.yml
└── server
├── Dockerfile
├── Makefile
├── go.mod
├── go.sum
├── main.go
├── otel.go
└── rolldice.go
実装のポイントとなるファイルを抜粋して載せておきます。
Dockerfile
FROM golang:1.20.1-alpine
# goのversionは1.20以上でなければOpenTelemetryが動きません。
WORKDIR /src
docker-compose.yml
version: '3'
services:
server:
build:
context: .
dockerfile: ./Dockerfile
volumes:
- ./:/src/
tty: true
ports:
- "8080:8080"
environment:
# jaegerはhttp経由でOpenTelemetryProtocol (OTLP) を受け入れる際は4318番ポート
- OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318
networks:
- jaeger-example
depends_on:
- jaeger
jaeger:
environment:
- COLLECTOR_OTLP_ENABLED=true
image: jaegertracing/all-in-one:latest
ports:
# フロントエンド
- "16686:16686"
# HTTP経由のOTLP
- "4318:4318"
networks:
- jaeger-example
networks:
jaeger-example:
main.go
package main
import (
"context"
"errors"
"log"
"net"
"net/http"
"os"
"os/signal"
"time"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)
func main() {
if err := run(); err != nil {
log.Fatalln(err)
}
}
func run() (err error) {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
// Otelのセットアップ
otelShutdown, err := setupOTelSDK(ctx)
if err != nil {
return
}
defer func() {
err = errors.Join(err, otelShutdown(context.Background()))
}()
// サーバの設定
// 8080番ポートで受け入れ、newHTTPHandlerを使う
srv := &http.Server{
Addr: ":8080",
BaseContext: func(_ net.Listener) context.Context { return ctx },
ReadTimeout: time.Second,
WriteTimeout: 10 * time.Second,
Handler: newHTTPHandler(),
}
srvErr := make(chan error, 1)
go func() {
srvErr <- srv.ListenAndServe()
}()
select {
case err = <-srvErr:
return
case <-ctx.Done():
stop()
}
err = srv.Shutdown(context.Background())
return
}
func newHTTPHandler() http.Handler {
mux := http.NewServeMux()
handleFunc := func(pattern string, handlerFunc func(http.ResponseWriter, *http.Request)) {
handler := otelhttp.WithRouteTag(pattern, http.HandlerFunc(handlerFunc))
mux.Handle(pattern, handler)
}
// /rolldiceのpathの場合、rolldice関数が動く
handleFunc("/rolldice", rolldice)
handler := otelhttp.NewHandler(mux, "/")
return handler
}
rolldice.go
package main
import (
"io"
"log"
"math/rand"
"net/http"
"strconv"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
)
// traceの作成
var tracer = otel.Tracer("rolldice")
func rolldice(w http.ResponseWriter, r *http.Request) {
//spanの作成
_, span := tracer.Start(r.Context(), "roll")
defer span.End()
roll := 1 + rand.Intn(6)
// roll.valueのkeyにrollのvalueをもつattributeの作成とset
rollValueAttr := attribute.Int("roll.value", roll)
span.SetAttributes(rollValueAttr)
resp := strconv.Itoa(roll) + "\n"
if _, err := io.WriteString(w, resp); err != nil {
log.Printf("Write failed: %v\n", err)
}
}
otel.go
package main
import (
"context"
"errors"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/trace"
)
func setupOTelSDK(ctx context.Context) (shutdown func(context.Context) error, err error) {
var shutdownFuncs []func(context.Contet) error
shutdown = func(ctx context.Context) error {
var err error
for _, fn := range shutdownFuncs {
err = errors.Join(err, fn(ctx))
}
shutdownFuncs = nil
return err
}
handleErr := func(inErr error) {
err = errors.Join(inErr, shutdown(ctx))
}
// Propagatorの作成とset
prop := newPropagator()
otel.SetTextMapPropagator(prop)
// tracerProviderの作成とset
tracerProvider, err := newTraceProvider(ctx)
if err != nil {
handleErr(err)
return
}
otel.SetTracerProvider(tracerProvider)
shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown)
return
}
func newPropagator() propagation.TextMapPropagator {
return propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
)
}
func newTraceProvider(ctx context.Context) (*trace.TracerProvider, error) {
traceExporter, err := otlptracehttp.New(
ctx,
)
if err != nil {
return nil, err
}
traceProvider := trace.NewTracerProvider(
trace.WithBatcher(traceExporter,
trace.WithBatchTimeout(time.Second)),
)
return traceProvider, nil
}
結果
結果は以下の図のようになりました。
WebブラウザからRoll Diceサーバにリクエストを送った結果、1が表示されました。また、Jaegerのフロントエンドからは、roll.valueに1が入っているSpan (Span名 roll) をもつTraceが確認できました。
例2: サーバとJaegerを直つなぎしてgRPC通信でリクエストする
アーキテクチャ
gRPCを使う例も見てみます。
httpサーバを使う場合とgRPCを使う場合で大きくコードが変わります。
アーキテクチャ図は以下の図になります。
ディレクトリ構造
ディレクトリ構造は以下のようになっております。
.
├── client
│ ├── Dockerfile
│ ├── Makefile
│ ├── dice.proto
│ ├── go.mod
│ ├── go.sum
│ ├── main.go
│ └── pb
│ ├── dice.pb.go
│ └── dice_grpc.pb.go
├── docker-compose.yml
└── server
├── Dockerfile
├── Makefile
├── dice.proto
├── go.mod
├── go.sum
├── main.go
├── otel.go
├── pb
│ ├── dice.pb.go
│ └── dice_grpc.pb.go
└── rolldice.go
実装のポイントとなるファイルを抜粋して載せておきます。
docker-compose.yml
version: '3'
services:
server:
build:
context: .
dockerfile: ./server/Dockerfile
volumes:
- ./server:/src/
tty: true
ports:
- "8080:8080"
environment:
# jaegerはgRPC経由でOpenTelemetryProtocol (OTLP) を受け入れる際は4317番ポート
- OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4317
networks:
- dice
- jaeger-example
depends_on:
- jaeger
client:
build:
context: .
dockerfile: ./client/Dockerfile
volumes:
- ./client:/src/
tty: true
networks:
- dice
jaeger:
image: jaegertracing/all-in-one:latest
ports:
# フロントエンド
- "16686:16686"
# gRPC経由でOTLP
- "4317:4317"
networks:
- jaeger-example
networks:
dice:
jaeger-example:
server/main.go
import (
"log"
"dice/pb"
"net"
"context"
"os/signal"
"os"
"google.golang.org/grpc"
)
func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
// OpenTelemetryのセットアップ
statsHandler,err:=setupOTelSDK(ctx)
if err != nil {
return
}
// 8080番ポートのサーバの作成
//その際、先ほど設定したOpenTelmetryのHandlerを利用
lis, err := net.Listen("tcp", "server:8080")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s:=grpc.NewServer(grpc.StatsHandler(statsHandler))
pb.RegisterRollDicerServer(s, &server{})
go func(){
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
<-quit
log.Println("stopping gRPC server...")
s.GracefulStop()
}
server/otel.go
package main
import (
"context"
"time"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel"
"google.golang.org/grpc/stats"
)
func setupOTelSDK(ctx context.Context) (statsHandler stats.Handler,err error) {
// Provider の作成とset
tp, err := newTraceProvider(ctx)
if err != nil {
return otelgrpc.NewServerHandler(),err
}
otel.SetTracerProvider(tp)
// Propagatorの作成とset
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
// Providerを指定してHandlerを作成
statsHandler=otelgrpc.NewServerHandler(
otelgrpc.WithTracerProvider(tp),
)
return statsHandler,nil
}
func newPropagator() propagation.TextMapPropagator {
return propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
)
}
func newTraceProvider(ctx context.Context) (*trace.TracerProvider, error) {
traceExporter, err := otlptracegrpc.New(
ctx,
)
if err != nil {
return nil, err
}
traceProvider := trace.NewTracerProvider(
trace.WithBatcher(traceExporter,
trace.WithBatchTimeout(time.Second)),
)
return traceProvider, nil
}
client/main.go
import (
"log"
"dice/pb"
"net"
"context"
"os/signal"
"os"
"google.golang.org/grpc"
)
func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
// OpenTelemetryのセットアップ
statsHandler,err:=setupOTelSDK(ctx)
if err != nil {
return
}
// 8080番ポートのサーバの作成
//その際、先ほど設定したOpenTelmetryのHandlerを利用
lis, err := net.Listen("tcp", "server:8080")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s:=grpc.NewServer(grpc.StatsHandler(statsHandler))
pb.RegisterRollDicerServer(s, &server{})
go func(){
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
<-quit
log.Println("stopping gRPC server...")
s.GracefulStop()
}
結果
結果は以下の図のようになりました。
クライアント(サーバ)からRoll Diceサーバにリクエストを送った結果、5が表示されました。また、Jaegerのフロントエンドからは、roll.valueに5が入っているSpan (Span名 roll) をもつTraceが確認できました。
例3: サーバとJaegerをOpenTelemetry Collectorを介してgRPC通信でリクエストする
アーキテクチャ
OpenTelemetry Collector経由で行いたい場合は、docker-compose.ymlファイルの変更とotel-collector/otel-collector-config.ymlを追加するのみでできます。
アーキテクチャ図は以下の図になります。
ディレクトリ構造
ディレクトリ構造は以下のようになっております。
.
├── client
│ ├── Dockerfile
│ ├── Makefile
│ ├── dice.proto
│ ├── go.mod
│ ├── go.sum
│ ├── main.go
│ └── pb
│ ├── dice.pb.go
│ └── dice_grpc.pb.go
├── docker-compose.yml
├── otel-collector
│ └── otel-collector-config.yml
└── server
├── Dockerfile
├── Makefile
├── dice.proto
├── go.mod
├── go.sum
├── main.go
├── otel.go
├── pb
│ ├── dice.pb.go
│ └── dice_grpc.pb.go
└── rolldice.go
実装のポイントとなるファイルを抜粋して載せておきます。
docker-compose.yml
version: '3'
services:
server:
build:
context: .
dockerfile: ./server/Dockerfile
volumes:
- ./server:/src/
tty: true
ports:
- "8080:8080"
environment:
# otel-collectorに渡す
# 今回の場合は4316番ポート
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4316
networks:
- dice
- otel-collector
depends_on:
- jaeger
- otel-collector
client:
build:
context: .
dockerfile: ./client/Dockerfile
volumes:
- ./client:/src/
tty: true
networks:
- dice
otel-collector:
image:
otel/opentelemetry-collector-contrib
volumes:
# configファイルをローカルで共有する
- ./otel-collector/otel-collector-config.yml:/etc/otelcol-contrib/config.yaml
ports:
# serverから受け取る
- "4316:4316"
networks:
- otel-collector
- jaeger-example
jaeger:
environment:
- COLLECTOR_OTLP_ENABLED=true
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686"
# otel-collectorから受け取る
- "4317:4317"
networks:
- jaeger-example
networks:
dice:
jaeger-example:
otel-collector:
otel-collector/otel-collector-config.yml
receivers:
otlp:
protocols:
grpc:
# 受け入れは4316番ポート
endpoint: 0.0.0.0:4316
processors:
batch:
exporters:
otlp/jaeger:
# jaegerには4317番ポート
endpoint: jaeger:4317
tls:
insecure: true
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [otlp/jaeger]
直つなぎの場合とOpenTelemetry Collectorを挟んだ場合とでは、労力はあまり変わらず、OpenTelemetry Collectorの導入の敷居は低いといえる。
結果
例4: サーバとJaegerをOpenTelemetry Collectorを介してgRPC通信でリクエストするときにfilter Processorを設定する
アーキテクチャ
Batch Processorを使用している部分をFilter Processorに変更して実験しました。roll.valueの値が2の時はSpanを通さないというFilter Processorを設定する例を考えます。
アーキテクチャ図は以下の図になります。
ディレクトリ構造
ディレクトリ構造は以下のようになっております。
.
├── client
│ ├── Dockerfile
│ ├── Makefile
│ ├── dice.proto
│ ├── go.mod
│ ├── go.sum
│ ├── main.go
│ └── pb
│ ├── dice.pb.go
│ └── dice_grpc.pb.go
├── docker-compose.yml
├── otel-collector
│ └── otel-collector-config.yml
└── server
├── Dockerfile
├── Makefile
├── dice.proto
├── go.mod
├── go.sum
├── main.go
├── otel.go
├── pb
│ ├── dice.pb.go
│ └── dice_grpc.pb.go
└── rolldice.go
実装のポイントとなるファイルを抜粋して載せておきます。
otel-collector/otel-collector-config.yml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4316
processors:
# batch:
filter:
traces:
span:
- attributes["roll.value"]==2
exporters:
otlp/jaeger:
endpoint: jaeger:4317
tls:
insecure: true
service:
pipelines:
traces:
receivers: [otlp]
# processors: [batch]
processors: [filter]
exporters: [otlp/jaeger]
結果は以下のようになりました。
結果
roll.valueが2以外の場合 (4の場合)では、roll.valueが4のSpanが確認できます。
roll.valueが2の場合では、roll.valueを持つSpanがなくなっています。
実際に、FilterProcessorを用いることによって、特定のattributeをドロップさせる設定を行うことができ、Traceを確認したところ設定どおりにドロップできることを確認することができました。
感想
有馬
僕は、本インターンに参加する前にインフラ系に関して全く知識のない状態でしたが、興味があり、何から始めればよいかわからないという状態でした。参加した結果として、オブザーバビリティから始まり、OpenTelemetryの目的、内容などを調べていくうちにインフラエンジニアがどのような問題意識でいるのかということについて解像度があがったと感じ、大変意義のあるものであったと考えています。僕は主に基礎部分を調べた後に実装部分を行っていました。実際のサービスでどのようにテレメトリーデータを受け取ったり、加工したり、送ったりしているのかが想像できました。
関根
このインターンに参加する前、Opentelemetryの知識どころかその前の前提知識すらほぼない感じで臨みました。私は実装の仕方よりもどのような構成になっているのか、支える機能面はどのようになっているのかなどに重点を置いて調べました。調べていくうちに環境を安定的に動かしたりどこにコードがあるのかなど色々な事が経験できました。また、公式のドキュメントをしっかり読み込むことの大切さに気付くこともできました。とても良い経験でした。今回、私の環境でOpenTelemetryの公式配布のdemoをかなり使用させていただきましたが、まあまあサービスが重いうえにかなりのデータ量が流れていたので、「これもし観測するためのサービスなかったらエラー見つからないわ」と思いながら動かしていました。
参考資料
- 信頼性とは?可用性や保守性との違い、向上させる方法をわかりやすく解説|ソフトウェアテストのSHIFT
- オブザーバビリティ 運用のシステムと組織への定着を促す「OBServe」 – 株式会社X-Tech5
- Observability(オブザーバビリティ)とは何か?知っておきたい新しいIT運用の考え方 – セキュリティ事業 – マクニカ
- オブザーバビリティとは?監視との違い、必要性について解説
- 分散トレースのスパンとは?
- Documentation
- OpenTelemetryとは?
- OpenTelemetry超入門 – Qiita
- OpenTelemetryをざっくりと知る – Qiita
- 入門 OpenTelemetry Collector
- 入門 OpenTelemetry Collector
- The OpenTelemetry Collector | A Complete 1 Hour Workshop
- OpenTelemetry Collector – architecture and configuration guide | SigNoz
- OpenTelemetry Collector導入のPoCと今後に向けて – Gaudiy Tech Blog
- opentelemetry-collector-contrib/processor/filterprocessor/README.md at main · open-telemetry/opentelemetry-collector-contrib