コンテンツにスキップ

PythonでLogpushを管理する

Pythonを使用してCloudflareのLogpushサービスを管理できます。以下のスクリプトには、ジョブを作成し、ジョブの詳細を取得し、ジョブの設定を更新し、Logpushジョブを削除するための例のリクエストが含まれています。

import json
import requests
url = "https://api.cloudflare.com/client/v4/"
x_auth_email = "<EMAIL>"
x_auth_key = "<API_KEY>"
zone_id = "<ZONE_ID>"
destination_conf = "s3://<BUCKET_NAME>/logs?region=us-west-1"
logpush_url = url + "/zones/%s/logpush" % zone_id
headers = {
'X-Auth-Email': <EMAIL>,
'X-Auth-Key': <API_KEY>,
'Content-Type': 'application/json'
}
# ジョブを作成
r = requests.post(logpush_url + "/jobs", headers=headers, data=json.dumps({"destination_conf":destination_conf}))
print(r.status_code, r.text)
assert r.status_code == 201
assert r.json()["result"]["enabled"] == False
# 新しいジョブのIDを保持
id = r.json()["result"]["id"]
# ジョブを取得
r = requests.get(logpush_url + "/jobs/%s" % id, headers=headers)
print(r.status_code, r.text)
assert r.status_code == 200
# ゾーンのすべてのジョブを取得
r = requests.get(logpush_url + "/jobs", headers=headers)
print(r.status_code, r.text)
assert r.status_code == 200
assert len(r.json()["result"]) > 0
# ジョブを更新
r = requests.put(logpush_url + "/jobs/%s" % id, headers=headers, data=json.dumps({"enabled":True}))
print(r.status_code, r.text)
assert r.status_code == 200
assert r.json()["result"]["enabled"] == True
# ジョブを削除
r = requests.delete(logpush_url + "/jobs/%s" % id, headers=headers)
print(r.status_code, r.text)
assert r.status_code == 200