import base64 import hashlib import time import schedule from tencentcloud.common import credential from tencentcloud.common.profile.client_profile import ClientProfile from tencentcloud.common.profile.http_profile import HttpProfile from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException from tencentcloud.teo.v20220901 import teo_client, models from datetime import datetime from hashlib import sha1 import hmac import requests import json import urllib
def get_Edgeone(): try: cred = credential.Credential("腾讯API密钥", "腾讯API密钥") httpProfile = HttpProfile() httpProfile.endpoint = "teo.tencentcloudapi.com" clientProfile = ClientProfile() clientProfile.httpProfile = httpProfile client = teo_client.TeoClient(cred, "", clientProfile) current_datetime = datetime.utcnow()
start_of_day = current_datetime.replace(hour=0, minute=0, second=0, microsecond=0)
end_of_day = current_datetime.replace(hour=23, minute=59, second=59, microsecond=999999)
start_of_day_iso = start_of_day.strftime('%Y-%m-%dT%H:%M:%SZ') end_of_day_iso = end_of_day.strftime('%Y-%m-%dT%H:%M:%SZ') req = models.DescribeTimingL7AnalysisDataRequest() params = { "StartTime": start_of_day_iso, "EndTime": end_of_day_iso, "MetricNames": ["l7Flow_outFlux", "l7Flow_inFlux", "l7Flow_request"], "ZoneIds": ["CDN ID"], "Interval": "day", "Area": "mainland" } req.from_json_string(json.dumps(params))
resp = client.DescribeTimingL7AnalysisData(req) print(resp.to_json_string()) data = json.loads(resp.to_json_string()) l7Flow_request_value = None
for item in data["Data"][0]["TypeValue"]: if item["MetricName"] == "l7Flow_request": l7Flow_request_value = item["Sum"] break total_influx = 0 total_outflux = 0
for item in data["Data"][0]["TypeValue"]: if item["MetricName"] == "l7Flow_inFlux": total_influx += item["Sum"] elif item["MetricName"] == "l7Flow_outFlux": total_outflux += item["Sum"]
total_influx_megabytes = round(total_influx / (1024 * 1024), 2) total_outflux_megabytes = round(total_outflux / (1024 * 1024), 2)
print("访问请求:", l7Flow_request_value) print("请求流量:", total_influx_megabytes) print("响应流量:", total_outflux_megabytes)
return l7Flow_request_value, total_influx_megabytes, total_outflux_megabytes except TencentCloudSDKException as err: print(err)
def dogecloud_api(api_path, data={}, json_mode=False): access_key = '多吉云密钥' secret_key = '多吉云密钥'
body = '' mime = '' if json_mode: body = json.dumps(data) mime = 'application/json' else: body = urllib.parse.urlencode(data) mime = 'application/x-www-form-urlencoded' sign_str = api_path + "\n" + body signed_data = hmac.new(secret_key.encode('utf-8'), sign_str.encode('utf-8'), sha1) sign = signed_data.digest().hex() authorization = 'TOKEN ' + access_key + ':' + sign response = requests.post('https://api.dogecloud.com' + api_path, data=body, headers={ 'Authorization': authorization, 'Content-Type': mime }) return response.json()
def Get_traffic(): current_time = datetime.now() formatted_date = current_time.strftime("%Y-%m-%d") api = dogecloud_api('/cdn/stat/traffic.json', { 'rtype': 'path', 'start_date': formatted_date, 'end_date': formatted_date, 'granularity': 'day', 'area': 'china', 'domains': 'CDN部署的域名' }) if api['code'] == 200: data_result = api['data']['result']
if data_result: data_values = data_result[0]['data'] data_values_megabytes = [value / (1024 * 1024) for value in data_values] print("Data values in Megabytes:", data_values_megabytes) traffres = [round(value, 2) for value in data_values_megabytes] return traffres else: print("No data available.") else: print("API failed: " + api['msg'])
def Get_reuestcount(): current_time = datetime.now() formatted_date = current_time.strftime("%Y-%m-%d") api = dogecloud_api('/cdn/stat/request.json', { 'rtype': 'path', 'start_date': formatted_date, 'end_date': formatted_date, 'granularity': 'day', 'area': 'china', 'domains': 'CDN部署的域名' }) if api['code'] == 200: data_result = api['data']['result'] return data_result[0]['data'] else: print("API failed: " + api['msg'])
def gen_sign(timestamp, secret): string_to_sign = '{}\n{}'.format(timestamp, secret) hmac_code = hmac.new(string_to_sign.encode("utf-8"), digestmod=hashlib.sha256).digest() sign = base64.b64encode(hmac_code).decode('utf-8') return sign
def send_message(): l7Flow_request_value, total_influx_megabytes, total_outflux_megabytes = get_Edgeone() Dres = Get_traffic() Dcount = Get_reuestcount() current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") timestamp = int(time.time()) sign = gen_sign(timestamp, "飞书机器人签名") data = { "timestamp": timestamp, "sign": sign, "msg_type": "interactive", "card": { "elements": [ { "tag": "column_set", "flex_mode": "none", "background_style": "default", "columns": [ { "tag": "column", "width": "weighted", "weight": 1, "vertical_align": "top", "elements": [ { "tag": "div", "text": { "content": "**🔴 报警服务:**\n腾讯云", "tag": "lark_md" } } ] }, { "tag": "column", "width": "weighted", "weight": 1, "vertical_align": "top", "elements": [ { "tag": "div", "text": { "content": "**🕐 时间:**\n" + formatted_time + "", "tag": "lark_md" } } ] } ] }, { "tag": "column_set", "flex_mode": "none", "background_style": "default", "columns": [ { "tag": "column", "width": "weighted", "weight": 1, "vertical_align": "top", "elements": [ { "tag": "column_set", "flex_mode": "none", "background_style": "grey", "columns": [ { "tag": "column", "width": "weighted", "weight": 1, "vertical_align": "top", "elements": [ { "tag": "markdown", "content": "**今日请求数\n<font color='red'>" + str( l7Flow_request_value) + "</font>**", "text_align": "center" } ] } ] } ] }, { "tag": "column", "width": "weighted", "weight": 1, "vertical_align": "top", "elements": [ { "tag": "column_set", "flex_mode": "none", "background_style": "grey", "columns": [ { "tag": "column", "width": "weighted", "weight": 1, "vertical_align": "top", "elements": [ { "tag": "markdown", "content": "**今日请求流量\n<font color='green'>" + str( total_influx_megabytes) + " M</font>**", "text_align": "center" } ] } ] } ] }, { "tag": "column", "width": "weighted", "weight": 1, "vertical_align": "top", "elements": [ { "tag": "column_set", "flex_mode": "none", "background_style": "grey", "columns": [ { "tag": "column", "width": "weighted", "weight": 1, "vertical_align": "top", "elements": [ { "tag": "markdown", "content": "**今日响应流量\n<font color='blue'>" + str( total_outflux_megabytes) + " M</font>**", "text_align": "center" } ] } ] } ] } ] }, { "tag": "hr" }, { "tag": "column_set", "flex_mode": "none", "background_style": "default", "columns": [ { "tag": "column", "width": "weighted", "weight": 1, "vertical_align": "top", "elements": [ { "tag": "div", "text": { "content": "**🔴 报警服务:**\n多吉云", "tag": "lark_md" } } ] }, { "tag": "column", "width": "weighted", "weight": 1, "vertical_align": "top", "elements": [ { "tag": "div", "text": { "content": "**🕐 时间:**\n" + formatted_time + "", "tag": "lark_md" } } ] } ] }, { "tag": "column_set", "flex_mode": "none", "background_style": "default", "columns": [ { "tag": "column", "width": "weighted", "weight": 1, "vertical_align": "top", "elements": [ { "tag": "column_set", "flex_mode": "none", "background_style": "grey", "columns": [ { "tag": "column", "width": "weighted", "weight": 1, "vertical_align": "top", "elements": [ { "tag": "markdown", "content": "**今日请求数\n<font color='red'>" + str( Dcount[0]) + "</font>**", "text_align": "center" } ] } ] } ] }, { "tag": "column", "width": "weighted", "weight": 1, "vertical_align": "top", "elements": [ { "tag": "column_set", "flex_mode": "none", "background_style": "grey", "columns": [ { "tag": "column", "width": "weighted", "weight": 1, "vertical_align": "top", "elements": [ { "tag": "markdown", "content": "**今日请求流量\n<font color='green'>" + str( Dres[0]) + " M</font>**", "text_align": "center" } ] } ] } ] } ] }, { "tag": "hr" }, { "tag": "action", "actions": [ { "tag": "button", "text": { "content": "腾讯云 🌹", "tag": "lark_md" }, "type": "default", "value": {}, "multi_url": { "url": "https://cloud.tencent.com/" "/overview", "android_url": "", "ios_url": "", "pc_url": "" } }, { "tag": "button", "text": { "content": "多吉云 🌹", "tag": "lark_md" }, "type": "default", "value": {}, "multi_url": { "url": "https://console.dogecloud.com/cdn/overview", "android_url": "", "ios_url": "", "pc_url": "" } } ] } ], "header": { "template": "red", "title": { "content": "CDN流量监控", "tag": "plain_text" } } } } headers = { 'Content-Type': 'application/json' } res = requests.post("飞书机器人webhook", json=data, headers=headers) print(res.json())
def job(): print("Job started at", datetime.now()) send_message()
schedule.every(3).hours.do(job)
if __name__ == '__main__': while True: schedule.run_pending() time.sleep(1)
|