| import base64import hashlib
 import time
 import schedule
 from tencentcloud.common import credential
 from tencentcloud.common.profile.client_profile import ClientProfile
 from tencentcloud.common.profile.http_profile import HttpProfile
 from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
 from tencentcloud.teo.v20220901 import teo_client, models
 from datetime import datetime
 from hashlib import sha1
 import hmac
 import requests
 import json
 import urllib
 
 
 def get_Edgeone():
 try:
 cred = credential.Credential("腾讯API密钥", "腾讯API密钥")
 httpProfile = HttpProfile()
 httpProfile.endpoint = "teo.tencentcloudapi.com"
 clientProfile = ClientProfile()
 clientProfile.httpProfile = httpProfile
 client = teo_client.TeoClient(cred, "", clientProfile)
 current_datetime = datetime.utcnow()
 
 start_of_day = current_datetime.replace(hour=0, minute=0, second=0, microsecond=0)
 
 end_of_day = current_datetime.replace(hour=23, minute=59, second=59, microsecond=999999)
 
 start_of_day_iso = start_of_day.strftime('%Y-%m-%dT%H:%M:%SZ')
 end_of_day_iso = end_of_day.strftime('%Y-%m-%dT%H:%M:%SZ')
 req = models.DescribeTimingL7AnalysisDataRequest()
 params = {
 "StartTime": start_of_day_iso,
 "EndTime": end_of_day_iso,
 "MetricNames": ["l7Flow_outFlux", "l7Flow_inFlux", "l7Flow_request"],
 "ZoneIds": ["CDN ID"],
 "Interval": "day",
 "Area": "mainland"
 }
 req.from_json_string(json.dumps(params))
 
 
 resp = client.DescribeTimingL7AnalysisData(req)
 
 print(resp.to_json_string())
 data = json.loads(resp.to_json_string())
 l7Flow_request_value = None
 
 for item in data["Data"][0]["TypeValue"]:
 if item["MetricName"] == "l7Flow_request":
 l7Flow_request_value = item["Sum"]
 break
 total_influx = 0
 total_outflux = 0
 
 for item in data["Data"][0]["TypeValue"]:
 if item["MetricName"] == "l7Flow_inFlux":
 total_influx += item["Sum"]
 elif item["MetricName"] == "l7Flow_outFlux":
 total_outflux += item["Sum"]
 
 total_influx_megabytes = round(total_influx / (1024 * 1024), 2)
 total_outflux_megabytes = round(total_outflux / (1024 * 1024), 2)
 
 print("访问请求:", l7Flow_request_value)
 print("请求流量:", total_influx_megabytes)
 print("响应流量:", total_outflux_megabytes)
 
 return l7Flow_request_value, total_influx_megabytes, total_outflux_megabytes
 except TencentCloudSDKException as err:
 print(err)
 
 
 
 def dogecloud_api(api_path, data={}, json_mode=False):
 access_key = '多吉云密钥'
 secret_key = '多吉云密钥'
 
 body = ''
 mime = ''
 if json_mode:
 body = json.dumps(data)
 mime = 'application/json'
 else:
 body = urllib.parse.urlencode(data)
 mime = 'application/x-www-form-urlencoded'
 sign_str = api_path + "\n" + body
 signed_data = hmac.new(secret_key.encode('utf-8'), sign_str.encode('utf-8'), sha1)
 sign = signed_data.digest().hex()
 authorization = 'TOKEN ' + access_key + ':' + sign
 response = requests.post('https://api.dogecloud.com' + api_path, data=body, headers={
 'Authorization': authorization,
 'Content-Type': mime
 })
 return response.json()
 
 
 
 def Get_traffic():
 current_time = datetime.now()
 formatted_date = current_time.strftime("%Y-%m-%d")
 api = dogecloud_api('/cdn/stat/traffic.json', {
 'rtype': 'path',
 'start_date': formatted_date,
 'end_date': formatted_date,
 'granularity': 'day',
 'area': 'china',
 'domains': 'CDN部署的域名'
 })
 if api['code'] == 200:
 data_result = api['data']['result']
 
 if data_result:
 data_values = data_result[0]['data']
 data_values_megabytes = [value / (1024 * 1024) for value in data_values]
 print("Data values in Megabytes:", data_values_megabytes)
 traffres = [round(value, 2) for value in data_values_megabytes]
 return traffres
 else:
 print("No data available.")
 else:
 print("API failed: " + api['msg'])
 
 
 
 def Get_reuestcount():
 current_time = datetime.now()
 formatted_date = current_time.strftime("%Y-%m-%d")
 api = dogecloud_api('/cdn/stat/request.json', {
 'rtype': 'path',
 'start_date': formatted_date,
 'end_date': formatted_date,
 'granularity': 'day',
 'area': 'china',
 'domains': 'CDN部署的域名'
 })
 if api['code'] == 200:
 data_result = api['data']['result']
 return data_result[0]['data']
 else:
 print("API failed: " + api['msg'])
 
 
 
 def gen_sign(timestamp, secret):
 string_to_sign = '{}\n{}'.format(timestamp, secret)
 hmac_code = hmac.new(string_to_sign.encode("utf-8"), digestmod=hashlib.sha256).digest()
 sign = base64.b64encode(hmac_code).decode('utf-8')
 return sign
 
 
 def send_message():
 l7Flow_request_value, total_influx_megabytes, total_outflux_megabytes = get_Edgeone()
 Dres = Get_traffic()
 Dcount = Get_reuestcount()
 current_time = datetime.now()
 formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
 timestamp = int(time.time())
 sign = gen_sign(timestamp, "飞书机器人签名")
 data = {
 "timestamp": timestamp,
 "sign": sign,
 "msg_type": "interactive",
 "card": {
 "elements": [
 {
 "tag": "column_set",
 "flex_mode": "none",
 "background_style": "default",
 "columns": [
 {
 "tag": "column",
 "width": "weighted",
 "weight": 1,
 "vertical_align": "top",
 "elements": [
 {
 "tag": "div",
 "text": {
 "content": "**🔴 报警服务:**\n腾讯云",
 "tag": "lark_md"
 }
 }
 ]
 },
 {
 "tag": "column",
 "width": "weighted",
 "weight": 1,
 "vertical_align": "top",
 "elements": [
 {
 "tag": "div",
 "text": {
 "content": "**🕐 时间:**\n" + formatted_time + "",
 "tag": "lark_md"
 }
 }
 ]
 }
 ]
 },
 {
 "tag": "column_set",
 "flex_mode": "none",
 "background_style": "default",
 "columns": [
 {
 "tag": "column",
 "width": "weighted",
 "weight": 1,
 "vertical_align": "top",
 "elements": [
 {
 "tag": "column_set",
 "flex_mode": "none",
 "background_style": "grey",
 "columns": [
 {
 "tag": "column",
 "width": "weighted",
 "weight": 1,
 "vertical_align": "top",
 "elements": [
 {
 "tag": "markdown",
 "content": "**今日请求数\n<font color='red'>" + str(
 l7Flow_request_value) + "</font>**",
 "text_align": "center"
 }
 ]
 }
 ]
 }
 ]
 },
 {
 "tag": "column",
 "width": "weighted",
 "weight": 1,
 "vertical_align": "top",
 "elements": [
 {
 "tag": "column_set",
 "flex_mode": "none",
 "background_style": "grey",
 "columns": [
 {
 "tag": "column",
 "width": "weighted",
 "weight": 1,
 "vertical_align": "top",
 "elements": [
 {
 "tag": "markdown",
 "content": "**今日请求流量\n<font color='green'>" + str(
 total_influx_megabytes) + " M</font>**",
 "text_align": "center"
 }
 ]
 }
 ]
 }
 ]
 },
 {
 "tag": "column",
 "width": "weighted",
 "weight": 1,
 "vertical_align": "top",
 "elements": [
 {
 "tag": "column_set",
 "flex_mode": "none",
 "background_style": "grey",
 "columns": [
 {
 "tag": "column",
 "width": "weighted",
 "weight": 1,
 "vertical_align": "top",
 "elements": [
 {
 "tag": "markdown",
 "content": "**今日响应流量\n<font color='blue'>" + str(
 total_outflux_megabytes) + " M</font>**",
 "text_align": "center"
 }
 ]
 }
 ]
 }
 ]
 }
 ]
 },
 {
 "tag": "hr"
 },
 {
 "tag": "column_set",
 "flex_mode": "none",
 "background_style": "default",
 "columns": [
 {
 "tag": "column",
 "width": "weighted",
 "weight": 1,
 "vertical_align": "top",
 "elements": [
 {
 "tag": "div",
 "text": {
 "content": "**🔴 报警服务:**\n多吉云",
 "tag": "lark_md"
 }
 }
 ]
 },
 {
 "tag": "column",
 "width": "weighted",
 "weight": 1,
 "vertical_align": "top",
 "elements": [
 {
 "tag": "div",
 "text": {
 "content": "**🕐 时间:**\n" + formatted_time + "",
 "tag": "lark_md"
 }
 }
 ]
 }
 ]
 },
 {
 "tag": "column_set",
 "flex_mode": "none",
 "background_style": "default",
 "columns": [
 {
 "tag": "column",
 "width": "weighted",
 "weight": 1,
 "vertical_align": "top",
 "elements": [
 {
 "tag": "column_set",
 "flex_mode": "none",
 "background_style": "grey",
 "columns": [
 {
 "tag": "column",
 "width": "weighted",
 "weight": 1,
 "vertical_align": "top",
 "elements": [
 {
 "tag": "markdown",
 "content": "**今日请求数\n<font color='red'>" + str(
 Dcount[0]) + "</font>**",
 "text_align": "center"
 }
 ]
 }
 ]
 }
 ]
 },
 {
 "tag": "column",
 "width": "weighted",
 "weight": 1,
 "vertical_align": "top",
 "elements": [
 {
 "tag": "column_set",
 "flex_mode": "none",
 "background_style": "grey",
 "columns": [
 {
 "tag": "column",
 "width": "weighted",
 "weight": 1,
 "vertical_align": "top",
 "elements": [
 {
 "tag": "markdown",
 "content": "**今日请求流量\n<font color='green'>" + str(
 Dres[0]) + " M</font>**",
 "text_align": "center"
 }
 ]
 }
 ]
 }
 ]
 }
 ]
 },
 {
 "tag": "hr"
 },
 {
 "tag": "action",
 "actions": [
 {
 "tag": "button",
 "text": {
 "content": "腾讯云 🌹",
 "tag": "lark_md"
 },
 "type": "default",
 "value": {},
 "multi_url": {
 "url": "https://cloud.tencent.com/"
 "/overview",
 "android_url": "",
 "ios_url": "",
 "pc_url": ""
 }
 },
 {
 "tag": "button",
 "text": {
 "content": "多吉云 🌹",
 "tag": "lark_md"
 },
 "type": "default",
 "value": {},
 "multi_url": {
 "url": "https://console.dogecloud.com/cdn/overview",
 "android_url": "",
 "ios_url": "",
 "pc_url": ""
 }
 }
 ]
 }
 ],
 "header": {
 "template": "red",
 "title": {
 "content": "CDN流量监控",
 "tag": "plain_text"
 }
 }
 }
 }
 headers = {
 'Content-Type': 'application/json'
 }
 res = requests.post("飞书机器人webhook", json=data,
 headers=headers)
 print(res.json())
 
 
 def job():
 print("Job started at", datetime.now())
 send_message()
 
 
 schedule.every(3).hours.do(job)
 
 if __name__ == '__main__':
 while True:
 schedule.run_pending()
 time.sleep(1)
 
 |