Microsoft Sentinel(旧Azure Sentinel) でログ収集・分析基盤を構築してみた③ ~ Kusto Query を活用した調査~

はじめに

日商エレクトロニクスのAzure テクニカルマーケティング担当の髙橋です。

今回は、前回の分析ルール設定編に続き調査編になります。前回の記事は以下をご覧ください。

Microsoft Sentinel でログ収集・分析基盤を構築してみた② ~クエリによる脅威検知~

 

今回のゴール

  • Kusto Query Language(以降、KQL) の基本的な構文を理解する
  • クエリで、時間、フィールドの中身等の条件を指定しフィルター処理することができる
  • フィルターした結果に対して、合計、平均などの処理ができる
  • データをグラフとして可視化することができる

過去のMicrosoft Sentinel でログ収集・分析基盤を構築してみた① Microsoft Sentinel でログ収集・分析基盤を構築してみた②を経て、Microsoft Sentinel 上にログがため込まれており、分析ルールでアラート通知が飛ぶようになっていると思います。

実際の運用時には、アラートが出た際にKQLを使ってログの詳細分析が必要になってきます。
今回、このKQLの基本的な使い方を解説していきたいと思います。

 



1. KQLとは?


Kusto Query Languageと呼ばれ、Microsoftのサービス内で動作する読み取り専用のリクエストとなり、データを処理して結果を返すものです。

Azure Data Explorer、Azure Resourcce Graph、Application Insightでも利用されているため、覚えておくと色々なサービスで活用できます。

 

公式リファレンス:Kusto 照会言語 (KQL) の概要 – Azure Data Explorer | Microsoft Learn

 

基本構文


ソースとなるテーブルに対して 演算子を| (パイプ) で並べていきます。

データは、演算子から次の演算子へと流れていき、その中で処理(フィルター処理、並べ替え、集計)が行われます。

この処理の順番はパフォーマンスにも影響してくるため、より効率的な演算を考える必要があります。

2. よく使うオペレーター


今回は以下のようなサンプルレコードを想定して、よく使うオペレーターを見ていきます。

サンプルテーブル

– SigninLogs –

Username Device Location Status
Kazuki Windows 11 Tokyo, JP Success
Riku Windows 7 Yamanashi, JP Fail
Shingo MAX OSX Chiba, JP Success
Kazuyuki iPhone Melbourne, AU Success
Tomoko iPhone Singapore, SG Fail

 

文字列の検索(Search)


サンプルテーブル

– SigninLogs –

Username Device Location Status
Kazuki Windows 11 Tokyo, JP Success
Riku Windows 7 Yamanashi, JP Fail
Shingo MAX OSX Chiba, JP Success
Kazuyuki iPhone Melbourne, AU Success
Tomoko iPhone Singapore, SG Fail

クエリサンプル

特定のキーワードを含むレコードを検索します。

SigninLogs
| search “Tokyo, JP”

実行結果

“Tokyo, JP” というキーワードが含まれるレコードが結果として返ってきましたね。

Username Device Location Status
Kazuki Windows 11 Tokyo, JP Success

 

 

テーブルのフィルタ(Where)


サンプルテーブル

– SigninLogs –

Username Device Location Status
Kazuki Windows 11 Tokyo, JP Success
Riku Windows 7 Yamanashi, JP Fail
Shingo MAX OSX Chiba, JP Success
Kazuyuki iPhone Melbourne, AU Success
Tomoko iPhone Singapore, SG Fail

クエリサンプル

DeviceがiPhone かつ Location が Singapore, SGの条件でフィルター処理します。

SigninLogs
| where Device == “iPhone” and Location ==”Singapore, SG”

実行結果

アンド条件で一致するレコードが結果として返ってきましたね。

Username Device Location Status
Tomoko iPhone Singapore, SG Fail

 

 

指定した行数のデータを取得(Take)


サンプルテーブル

– SigninLogs –

Username Device Location Status
Kazuki Windows 11 Tokyo, JP Success
Riku Windows 7 Yamanashi, JP Fail
Shingo MAX OSX Chiba, JP Success
Kazuyuki iPhone Melbourne, AU Success
Tomoko iPhone Singapore, SG Fail

クエリサンプル

対象のレコードから最新の2レコードを取得します。

SigninLogs
| take 2

実行結果

上から2つのレコードが返ってきました。

Username Device Location Status
Kazuki Windows 11 Tokyo, JP Success
Riku Windows 7 Yamanashi, JP Fail

 

 

入力したテーブルのレコード数を取得(Count)


サンプルテーブル

– SigninLogs –

Username Device Location Status
Kazuki Windows 11 Tokyo, JP Success
Riku Windows 7 Yamanashi, JP Fail
Shingo MAX OSX Chiba, JP Success
Kazuyuki iPhone Melbourne, AU Success
Tomoko iPhone Singapore, SG Fail

クエリサンプル

対象テーブルのレコード数をカウントします。

SigninLogs
| count

実行結果

レコード数として5が返ってきました。正しいですね。

Count
5

 

入力テーブルの内容を集計(Summarize)


サンプルテーブル

– SigninLogs –

Username Device Location Status
Kazuki Windows 11 Tokyo, JP Success
Riku Windows 7 Yamanashi, JP Fail
Shingo MAX OSX Chiba, JP Success
Kazuyuki iPhone Melbourne, AU Success
Tomoko iPhone Singapore, SG Fail

クエリサンプル

Status の値ごとに該当するレコードをカウントします。

SigninLogs
| summarize Total = count() by Status

実行結果

Status が「Success」は3レコード、「Fail」は2レコードという結果が返ってきました。

Status Total
Success 3
Fail 2

 

新しいフィールドを追加(Extend)


サンプルテーブル

– SigninLogs –

Username Device Location Status
Kazuki Windows 11 Tokyo, JP Success
Riku Windows 7 Yamanashi, JP Fail
Shingo MAX OSX Chiba, JP Success
Kazuyuki iPhone Melbourne, AU Success
Tomoko iPhone Singapore, SG Fail

クエリサンプル

ソーステーブルのLocation を split でカンマで区切りにし、配列に代入します。

Extend で Country カラムを追加し、配列の1つ目を入力しています。

SigninLogs

| take 2
| extend Country = split(Location, “,”)[1]

実行結果

Country
JP
JP

 

出力するフィールドを指定(Project)


サンプルテーブル

– SigninLogs –

Username Device Location Status
Kazuki Windows 11 Tokyo, JP Success
Riku Windows 7 Yamanashi, JP Fail
Shingo MAX OSX Chiba, JP Success
Kazuyuki iPhone Melbourne, AU Success
Tomoko iPhone Singapore, SG Fail

クエリサンプル

Location カラムを指定し出力します。

SigninLogs
| take 2
| project Location

実行結果

Location
Tokyo, JP
Yamanashi, JP

 

 

指定された列のテーブルを作成(Distinct)


サンプルテーブル

– SigninLogs –

Username Device Location Status
Kazuki Windows 11 Tokyo, JP Success
Riku Windows 7 Yamanashi, JP Fail
Shingo MAX OSX Chiba, JP Success
Kazuyuki iPhone Melbourne, AU Success
Tomoko iPhone Singapore, SG Fail

クエリサンプル

UserName と Location を指定しテーブルを作成

SigninLogs
| take 2
| distinct UserName , Location

実行結果

Username Location
Kazuki Tokyo, JP
Riku Yamanashi, JP

 

 

変数の定義(Let)


変数を定義し、複雑な式、数字、文字を代入します。

サンプルテーブル

Table

Timestamp Number Location
2007-12-28T12:10:00Z 123 Japan
2007-12-28T04:30:00Z 872 Japan
2007-12-28T04:16:00Z 196 Singapore

クエリサンプル

Numberが123 かつ Location がJapanのレコードを抽出します。

let n = 123; // number
let location = “Japan”; // string
TableTable
| where Number == n and Location == location

実行結果

Timestamp Number Location
2007-12-28T12:10:00Z 123 Japan

 

各テーブルの指定の列で値を照合し、2つのテーブルの行を結合(Join)


Table1

Key Value
a 1
b 2
b 3
c 4

Table2

Key Value2
b 10
c 20
c 30
d 40

クエリサンプル

デフォルトでは、左側の重複を除去する内部結合となります。

結合キーの左側(Table1)が重複除去されてから内部結合が行われます。

※重複がある場合、最初のレコードが維持されます。

Table1

| join (Table2) on Key

 

重複削除後のTable1は以下のようになります。

Table1

Key Value
a 1
b 2
c 4

その後、Table2と結合されます。

Table2

Key Value2
b 10
c 20
c 30
d 40

実行結果

Key Value Key1 Value1
b 2 b 10
c 4 c 20
c 4 c 30

kind=innerを指定することで、左側の重複排除をすることなく、左右で一致するレコードの結合が可能です。(SQLでいう標準の内部結合)

クエリサンプル

Table1

| join kind=inner (Table2) on Key

Table1

Key Value
a 1
b 2
b 3
c 4

Table2

Key Value2
b 10
c 20
c 30
d 40

実行結果

Key Value1 Key1 Value2
b 3 b 10
b 2 b 10
c 4 c 20
c 4 c 30

その他の演算子


現在のUTC時刻から指定された期間を減数(Ago)


サンプルテーブル T

クエリサンプル

Timestampの時刻が、現在の時刻から1時間以内のレコードを抽出します。

T | where Timestamp > ago(1h)

 

並び替え(Sort)


入力テーブルの行の順序を 1 つ以上の列で並べ替えます。

サンプルテーブル T

Timestamp Number
2007-12-28T12:10:00Z 123
2007-12-28T04:30:00Z 872
2007-12-28T04:16:00Z 196

クエリサンプル

Number列を昇順にし行を並べ替えます。

T
| sort by Number asc

実行結果

Timestamp Number
2007-12-28T12:10:00Z 123
2007-12-28T04:16:00Z 196
2007-12-28T04:30:00Z 872

日付の開始日を取得(startofday, week, month, year)


クエリサンプル

変数 date の日付に対して、day, week , month, year の開始日を取得します。

let date = datetime(2023-01-27 15:44:17)
.
(省略)
.
| project dayStart = startofday(date)
| project weekStart = startofweek(date)
| project monthStart = startofmonth(date)
| project yearStart = startofyear(date)

実行結果

dayStart weekStart monthStart yearStart
2023-1-27 00:00:00.0000000 2023-1-22 00:00:00.0000000 2023-1-1 00:00:00.0000000 2023-1-1 00:00:00.0000000

 

データ解析(parse)


以下のサンプルデータを元に解説します。

Event

EventText
Event: NotifySliceRelease (resourceName=PipelineScheduler, totalSlices=27, sliceNumber=23, lockTime=02/17/2016 08:40:01, releaseTime=02/17/2016 08:40:01, previousLockTime=02/17/2016 08:39:01)
Event: NotifySliceRelease (resourceName=PipelineScheduler, totalSlices=25, sliceNumber=15, lockTime=02/17/2016 08:40:00, releaseTime=02/17/2016 08:40:00, previousLockTime=02/17/2016 08:39:00)
Event: NotifySliceRelease (resourceName=PipelineScheduler, totalSlices=15, sliceNumber=20, lockTime=02/17/2016 08:40:01, releaseTime=02/17/2016 08:40:01, previousLockTime=02/17/2016 08:39:01)

クエリサンプル

Event
| parse EventText with * “resourceName=” resourceName “, totalSlices=” totalSlices:long * “, sliceNumber=” Garbage
| project resourceName, totalSlices, sliceNumber, lockTime, releaseTime, previousLockTime

まずはクエリサンプルの2行目「| parse EventText with * “resourceName=” resourceName “, totalSlices=” totalSlices:long * “, sliceNumber=” Garbage 」について解説します。

「EventText with * “resourceName=” resourceName」

⇒元のEventTextの”resourceName=”の後ろにある文字列を、resourceName列に格納するといったクエリになっています。

「”, totalSlices=” totalSlices:long *」

⇒同様に、”, totalSlices=” の後ろにある文字列を、 totalSlices 列に long値として格納しています。

「”, sliceNumber=” Garbage」

⇒最後に、”, sliceNumber=” 以降の文字列を、Garbage列に格納しています。(使わない文字列はGarbage等の任意の名前を付けて分かりやすくしています。

実行結果

指定した文字列の後ろにあるデータが格納されていますね。

resourceName totalSlices Garbage
PipelineScheduler 27 23, lockTime=02/17/2016 08:40:01, releaseTime=02/17/2016 08:40:01, previousLockTime=02/17/2016 08:39:01)
PipelineScheduler 25 15, lockTime=02/17/2016 08:40:00, releaseTime=02/17/2016 08:40:00, previousLockTime=02/17/2016 08:39:00)
PipelineScheduler 15 20, lockTime=02/17/2016 08:40:01, releaseTime=02/17/2016 08:40:01, previousLockTime=02/17/2016 08:39:01)

 

 

さいごに


今回は、KQLの基本的な構文と、よく使うオペレーターを見てきました。

今回ご紹介したもの以外にもたくさんの演算子がありますので公式リファレンスもご参照ください。

公式リファレンス:Kusto 照会言語 (KQL) の概要 – Azure Data Explorer | Microsoft Learn

 

これらを使って、アラートが発生した際にログの調査が可能となりますので是非ご活用ください。

今後も技術ブログをアップデートしていきますので、なにかご相談が御座いましたら下記フォームからお問い合わせください。

お問い合わせはこちら

この記事を書いた人

髙橋 和輝
髙橋 和輝
テクニカルマーケターとして、新技術の検証、ブログ執筆、セミナー講師を行っております!
学生時代はアプリ開発に興味がありましたが、インフラ、セキュリティ事業を経て、現在はクラウド屋さんになっております。
コロナ禍前は、月1で海外旅行にいくなどアクティブに活動していましたが、最近は家に引きこもってゲームが趣味になっています。

宜しくお願い致します!