GraphQLデータをCSVにエクスポートする
このチュートリアルでは、Network AnalyticsデータのGraphQL APIをクエリし、レスポンスをカンマ区切り値(CSV)に変換するPythonスクリプトの作成方法を示します。生成されたCSVは、Splunk ↗のようなツールで簡単に取り込むことができ、さらなる視覚化や利用が可能です。
したがって、この例では、Network Analytics攻撃活動の分単位の集計を含むipFlows1mAttacksGroups データセットをクエリします。
このチュートリアルでは、Account Analytics:read権限を持つ有効なCloudflare APIトークンが必要です。また、関心のあるアカウントがNetwork Analyticsにアクセスできる権限を持っていることが前提です。
このチュートリアルのスクリプトは、Pythonバージョン3.6以上を必要とします。
特定のアカウント用にCloudflare APIトークンを設定する方法については、Analytics APIトークンの設定を参照してください。アカウントにアクセスできることを確認してください。
最初のステップは、スクリプトをセットアップし、Cloudflare APIトークンを使用してGraphQL APIとのさらなる認証のための変数を定義することです。このスクリプトは、エクスポートするデータの範囲を設定するための変数も提供します。
この例では、昨日で終了した7日間の期間をクエリします。
#!/usr/bin/env python3
import pandas as pdfrom datetime import datetime, timedeltaimport requests
# GraphQL APIのエンドポイントurl = 'https://api.cloudflare.com/client/v4/graphql/'
# これらの変数をカスタマイズします。file_dir = '' # スラッシュを含める必要があります。空白のままにすると、# csvは現在のディレクトリに作成されます。api_token = '[ここにAPIトークンを入力]'api_account = '[ここにアカウントIDを入力]'# デフォルトで最も最近の日を昨日に設定します。offset_days = 1# どれくらいのデータを取得したいですか?デフォルトは7日間です。historical_days = 7get_past_date()関数は、日数(num_days)を受け取り、その値を今日の日付から引いて、num_days日前の日付を返します。
def get_past_date(num_days): today = datetime.utcnow().date() return today - timedelta(days=num_days)スクリプトは、GraphQL APIをクエリする際に、offset_daysおよびhistorical_days変数を使用して適切な日付範囲(min_dateおよびmax_date)を計算します。
get_cf_graphql()関数は、GraphQL APIへのリクエストを組み立てて送信します。ヘッダーには認証用のデータが含まれます。
ペイロードにはGraphQLクエリが含まれています。このクエリでは、特定のアカウントと時間範囲に対して次のフィールドのリストを取得したいと考えています:
- 攻撃ID
- 攻撃タイプ
- 開始時間
- 終了時間
- 緩和タイプ
- 秒あたりのパケットの平均、最大レート
GraphQLクエリの基本については、クエリの基本を参照してください。
GraphQLクエリで使用される波括弧は、Pythonのf-stringでエスケープするために二重にされています。
GraphQLでは、クエリは1行のテキストである必要があるため、送信する前にすべての改行記号を削除する必要があります。
def get_cf_graphql(start_date, end_date): assert(start_date <= end_date) headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {api_token}' } # 使用したいGQLクエリ: payload = f'''{{"query": "query ipFlowEventLog($accountTag: string) {{ viewer {{ accounts( filter: {{ accountTag: $accountTag }} ) {{ ipFlows1mAttacksGroups( filter: $filter limit: 10000 orderBy: [min_datetimeMinute_ASC] ) {{ dimensions {{ attackId attackDestinationIP attackMitigationType attackType }} avg {{ packetsPerSecond }} min {{ datetimeMinute }} max {{ datetimeMinute packetsPerSecond }} }} }} }} }}", "variables": {{ "accountTag": "{api_account}", "filter": {{ "AND":[ {{ "date_geq": "{start_date}" }}, {{ "date_leq": "{end_date}" }} ] }} }} }}'''
r = requests.post(url, data=payload.replace('\n', ''), headers=headers) return rオープンソースのパンダス ↗ライブラリ(pd)などのツールを使用して、GraphQL APIからのレスポンス(JSON)をCSVに変換します。
この例では、convert_to_csv()関数が変換前にいくつかのJSON処理を行います — データの正規化、必要なデータの選択、ユーザーフレンドリーな列名への変更を行います。この関数は、APIが正常に応答したか、エラーが発生したかを確認します。
結果は、file_dirで指定されたディレクトリにファイルとして出力されます。
def convert_to_csv(raw_data, start_date, end_date): data = pd.read_json(raw_data, dtype=False)['data'] errors = pd.read_json(raw_data, dtype=False)['errors']
# エラーが発生したかどうかを確認 if errors.notna().any() or not 'viewer' in data or not 'accounts' in data['viewer']: print('データの取得に失敗しました: GraphQL APIがエラーで応答しました:') print(raw_data) return
# ネストされたJSONデータを最初にフラット化 network_analytics_normalized = pd.json_normalize(data['viewer']['accounts'], 'ipFlows1mAttacksGroups')
if len(network_analytics_normalized) == 0: print('空のレスポンスを受け取りました') return
network_analytics_abridged = network_analytics_normalized[[ 'dimensions.attackId', 'min.datetimeMinute', 'max.datetimeMinute', 'dimensions.attackMitigationType', 'dimensions.attackType', 'dimensions.attackDestinationIP', 'max.packetsPerSecond', 'avg.packetsPerSecond']] # 列名をユーザーフレンドリーな名前に変更 network_analytics_abridged.columns = [ '攻撃ID', '開始時刻', '終了時刻', '実施されたアクション', '攻撃タイプ', '宛先IP', '最大パケット/秒', '平均パケット/秒'] file = "{}network-analytics-{}-{}.csv".format(file_dir, start_date, end_date) network_analytics_abridged.to_csv(file) print("正常にエクスポートされました: {}".format(file))#!/usr/bin/env python3
import pandas as pdfrom datetime import datetime, timedeltaimport requests
# GraphQL APIのエンドポイントurl = 'https://api.cloudflare.com/client/v4/graphql/'
# これらの変数をカスタマイズします。file_dir = '' # スラッシュを含める必要があります。空白のままにすると、# csvは現在のディレクトリに作成されます。api_token = '[ここにAPIトークンを入力]'api_account = '[ここにアカウントIDを入力]'# デフォルトで最も最近の日を昨日に設定します。offset_days = 1# どれくらいのデータを取得したいですか?デフォルトは7日間です。historical_days = 7
def get_past_date(num_days): today = datetime.utcnow().date() return today - timedelta(days=num_days)
def get_cf_graphql(start_date, end_date): assert(start_date <= end_date) headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {api_token}' } # 使用したいGQLクエリ: payload = f'''{{"query": "query ipFlowEventLog($accountTag: string) {{ viewer {{ accounts( filter: {{ accountTag: $accountTag }} ) {{ ipFlows1mAttacksGroups( filter: $filter limit: 10000 orderBy: [min_datetimeMinute_ASC] ) {{ dimensions {{ attackId attackDestinationIP attackMitigationType attackType }} avg {{ packetsPerSecond }} min {{ datetimeMinute }} max {{ datetimeMinute packetsPerSecond }} }} }} }} }}", "variables": {{ "accountTag": "{api_account}", "filter": {{ "AND":[ {{ "date_geq": "{start_date}" }}, {{ "date_leq": "{end_date}" }} ] }} }} }}'''
r = requests.post(url, data=payload.replace('\n', ''), headers=headers) return r
def convert_to_csv(raw_data, start_date, end_date): data = pd.read_json(raw_data, dtype=False)['data'] errors = pd.read_json(raw_data, dtype=False)['errors']
# エラーが発生したかどうかを確認 if errors.notna().any() or not 'viewer' in data or not 'accounts' in data['viewer']: print('データの取得に失敗しました: GraphQL APIがエラーで応答しました:') print(raw_data) return
# ネストされたJSONデータを最初にフラット化 network_analytics_normalized = pd.json_normalize(data['viewer']['accounts'], 'ipFlows1mAttacksGroups')
if len(network_analytics_normalized) == 0: print('空のレスポンスを受け取りました') return
network_analytics_abridged = network_analytics_normalized[[ 'dimensions.attackId', 'min.datetimeMinute', 'max.datetimeMinute', 'dimensions.attackMitigationType', 'dimensions.attackType', 'dimensions.attackDestinationIP', 'max.packetsPerSecond', 'avg.packetsPerSecond']] # 列名をユーザーフレンドリーな名前に変更 network_analytics_abridged.columns = [ '攻撃ID', '開始時刻', '終了時刻', '実施されたアクション', '攻撃タイプ', '宛先IP', '最大パケット/秒', '平均パケット/秒'] file = "{}network-analytics-{}-{}.csv".format(file_dir, start_date, end_date) network_analytics_abridged.to_csv(file) print("正常にエクスポートされました: {}".format(file))
start_date = get_past_date(offset_days + historical_days)end_date = get_past_date(offset_days)
req = get_cf_graphql(start_date, end_date)if req.status_code == 200: convert_to_csv(req.text, start_date, end_date)else: print("データの取得に失敗しました: GraphQL APIが{}ステータスコードで応答しました".format(req.status_code))