コンテンツにスキップ

GraphQLデータをCSVにエクスポートする

このチュートリアルでは、Network AnalyticsデータのGraphQL APIをクエリし、レスポンスをカンマ区切り値(CSV)に変換するPythonスクリプトの作成方法を示します。生成されたCSVは、Splunkのようなツールで簡単に取り込むことができ、さらなる視覚化や利用が可能です。

したがって、この例では、Network Analytics攻撃活動の分単位の集計を含むipFlows1mAttacksGroups データセットをクエリします。

前提条件

このチュートリアルでは、Account Analytics:read権限を持つ有効なCloudflare APIトークンが必要です。また、関心のあるアカウントがNetwork Analyticsにアクセスできる権限を持っていることが前提です。

このチュートリアルのスクリプトは、Pythonバージョン3.6以上を必要とします。

特定のアカウント用にCloudflare APIトークンを設定する方法については、Analytics APIトークンの設定を参照してください。アカウントにアクセスできることを確認してください。

認証付きスクリプトのセットアップ

最初のステップは、スクリプトをセットアップし、Cloudflare APIトークンを使用してGraphQL APIとのさらなる認証のための変数を定義することです。このスクリプトは、エクスポートするデータの範囲を設定するための変数も提供します。

この例では、昨日で終了した7日間の期間をクエリします。

スクリプトと認証のセットアップ
#!/usr/bin/env python3
import pandas as pd
from datetime import datetime, timedelta
import requests
# GraphQL APIのエンドポイント
url = 'https://api.cloudflare.com/client/v4/graphql/'
# これらの変数をカスタマイズします。
file_dir = '' # スラッシュを含める必要があります。空白のままにすると、
# csvは現在のディレクトリに作成されます。
api_token = '[ここにAPIトークンを入力]'
api_account = '[ここにアカウントIDを入力]'
# デフォルトで最も最近の日を昨日に設定します。
offset_days = 1
# どれくらいのデータを取得したいですか?デフォルトは7日間です。
historical_days = 7

n日前の日付を計算する

get_past_date()関数は、日数(num_days)を受け取り、その値を今日の日付から引いて、num_days日前の日付を返します。

n日前の日時を計算し、ISO形式で返す
def get_past_date(num_days):
today = datetime.utcnow().date()
return today - timedelta(days=num_days)

スクリプトは、GraphQL APIをクエリする際に、offset_daysおよびhistorical_days変数を使用して適切な日付範囲(min_dateおよびmax_date)を計算します。

GraphQL APIをクエリする

get_cf_graphql()関数は、GraphQL APIへのリクエストを組み立てて送信します。ヘッダーには認証用のデータが含まれます。

ペイロードにはGraphQLクエリが含まれています。このクエリでは、特定のアカウントと時間範囲に対して次のフィールドのリストを取得したいと考えています:

  • 攻撃ID
  • 攻撃タイプ
  • 開始時間
  • 終了時間
  • 緩和タイプ
  • 秒あたりのパケットの平均、最大レート

GraphQLクエリの基本については、クエリの基本を参照してください。

GraphQLクエリで使用される波括弧は、Pythonのf-stringでエスケープするために二重にされています。

GraphQLでは、クエリは1行のテキストである必要があるため、送信する前にすべての改行記号を削除する必要があります。

Network Analytics GraphQL APIをクエリする
def get_cf_graphql(start_date, end_date):
assert(start_date <= end_date)
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_token}'
}
# 使用したいGQLクエリ:
payload = f'''{{"query":
"query ipFlowEventLog($accountTag: string) {{
viewer {{
accounts(
filter: {{ accountTag: $accountTag }}
) {{
ipFlows1mAttacksGroups(
filter: $filter
limit: 10000
orderBy: [min_datetimeMinute_ASC]
) {{
dimensions {{
attackId
attackDestinationIP
attackMitigationType
attackType
}}
avg {{
packetsPerSecond
}}
min {{
datetimeMinute
}}
max {{
datetimeMinute
packetsPerSecond
}}
}}
}}
}}
}}",
"variables": {{
"accountTag": "{api_account}",
"filter": {{
"AND":[
{{
"date_geq": "{start_date}"
}},
{{
"date_leq": "{end_date}"
}}
]
}}
}}
}}'''
r = requests.post(url, data=payload.replace('\n', ''), headers=headers)
return r

データをCSVに変換する

オープンソースのパンダスライブラリ(pd)などのツールを使用して、GraphQL APIからのレスポンス(JSON)をCSVに変換します。

この例では、convert_to_csv()関数が変換前にいくつかのJSON処理を行います — データの正規化、必要なデータの選択、ユーザーフレンドリーな列名への変更を行います。この関数は、APIが正常に応答したか、エラーが発生したかを確認します。

結果は、file_dirで指定されたディレクトリにファイルとして出力されます。

データをCSVに変換する
def convert_to_csv(raw_data, start_date, end_date):
data = pd.read_json(raw_data, dtype=False)['data']
errors = pd.read_json(raw_data, dtype=False)['errors']
# エラーが発生したかどうかを確認
if errors.notna().any() or not 'viewer' in data or not 'accounts' in data['viewer']:
print('データの取得に失敗しました: GraphQL APIがエラーで応答しました:')
print(raw_data)
return
# ネストされたJSONデータを最初にフラット化
network_analytics_normalized = pd.json_normalize(data['viewer']['accounts'], 'ipFlows1mAttacksGroups')
if len(network_analytics_normalized) == 0:
print('空のレスポンスを受け取りました')
return
network_analytics_abridged = network_analytics_normalized[[
'dimensions.attackId',
'min.datetimeMinute',
'max.datetimeMinute',
'dimensions.attackMitigationType',
'dimensions.attackType',
'dimensions.attackDestinationIP',
'max.packetsPerSecond',
'avg.packetsPerSecond']]
# 列名をユーザーフレンドリーな名前に変更
network_analytics_abridged.columns = [
'攻撃ID',
'開始時刻',
'終了時刻',
'実施されたアクション',
'攻撃タイプ',
'宛先IP',
'最大パケット/秒',
'平均パケット/秒']
file = "{}network-analytics-{}-{}.csv".format(file_dir, start_date, end_date)
network_analytics_abridged.to_csv(file)
print("正常にエクスポートされました: {}".format(file))

最終スクリプト

Cloudflare Network AnalyticsをGraphQL API経由でCSV形式で取得する
#!/usr/bin/env python3
import pandas as pd
from datetime import datetime, timedelta
import requests
# GraphQL APIのエンドポイント
url = 'https://api.cloudflare.com/client/v4/graphql/'
# これらの変数をカスタマイズします。
file_dir = '' # スラッシュを含める必要があります。空白のままにすると、
# csvは現在のディレクトリに作成されます。
api_token = '[ここにAPIトークンを入力]'
api_account = '[ここにアカウントIDを入力]'
# デフォルトで最も最近の日を昨日に設定します。
offset_days = 1
# どれくらいのデータを取得したいですか?デフォルトは7日間です。
historical_days = 7
def get_past_date(num_days):
today = datetime.utcnow().date()
return today - timedelta(days=num_days)
def get_cf_graphql(start_date, end_date):
assert(start_date <= end_date)
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_token}'
}
# 使用したいGQLクエリ:
payload = f'''{{"query":
"query ipFlowEventLog($accountTag: string) {{
viewer {{
accounts(
filter: {{ accountTag: $accountTag }}
) {{
ipFlows1mAttacksGroups(
filter: $filter
limit: 10000
orderBy: [min_datetimeMinute_ASC]
) {{
dimensions {{
attackId
attackDestinationIP
attackMitigationType
attackType
}}
avg {{
packetsPerSecond
}}
min {{
datetimeMinute
}}
max {{
datetimeMinute
packetsPerSecond
}}
}}
}}
}}
}}",
"variables": {{
"accountTag": "{api_account}",
"filter": {{
"AND":[
{{
"date_geq": "{start_date}"
}},
{{
"date_leq": "{end_date}"
}}
]
}}
}}
}}'''
r = requests.post(url, data=payload.replace('\n', ''), headers=headers)
return r
def convert_to_csv(raw_data, start_date, end_date):
data = pd.read_json(raw_data, dtype=False)['data']
errors = pd.read_json(raw_data, dtype=False)['errors']
# エラーが発生したかどうかを確認
if errors.notna().any() or not 'viewer' in data or not 'accounts' in data['viewer']:
print('データの取得に失敗しました: GraphQL APIがエラーで応答しました:')
print(raw_data)
return
# ネストされたJSONデータを最初にフラット化
network_analytics_normalized = pd.json_normalize(data['viewer']['accounts'], 'ipFlows1mAttacksGroups')
if len(network_analytics_normalized) == 0:
print('空のレスポンスを受け取りました')
return
network_analytics_abridged = network_analytics_normalized[[
'dimensions.attackId',
'min.datetimeMinute',
'max.datetimeMinute',
'dimensions.attackMitigationType',
'dimensions.attackType',
'dimensions.attackDestinationIP',
'max.packetsPerSecond',
'avg.packetsPerSecond']]
# 列名をユーザーフレンドリーな名前に変更
network_analytics_abridged.columns = [
'攻撃ID',
'開始時刻',
'終了時刻',
'実施されたアクション',
'攻撃タイプ',
'宛先IP',
'最大パケット/秒',
'平均パケット/秒']
file = "{}network-analytics-{}-{}.csv".format(file_dir, start_date, end_date)
network_analytics_abridged.to_csv(file)
print("正常にエクスポートされました: {}".format(file))
start_date = get_past_date(offset_days + historical_days)
end_date = get_past_date(offset_days)
req = get_cf_graphql(start_date, end_date)
if req.status_code == 200:
convert_to_csv(req.text, start_date, end_date)
else:
print("データの取得に失敗しました: GraphQL APIが{}ステータスコードで応答しました".format(req.status_code))