前言
考虑到各种不同的因素,容易丢失一些重要的东西,所以我一直寻求一种好一点的方法去备份数据库 和 服务器里面的文件,但是最后发现都过于麻烦,主要是我懒
这里我的方案是 用python每天凌晨12点自动备份到腾讯COS,并且发送通知到飞书,提醒备份成功!
基础的飞书接入我就不多说了,具体请看文末参考内容
备份数据库
import os from qcloud_cos import CosConfig from qcloud_cos import CosS3Client from qcloud_cos.cos_threadpool import SimpleThreadPool import sys import logging import time import base64 import hashlib import schedule from datetime import datetime import hmac import requests import subprocess
def gen_sign(timestamp, secret): string_to_sign = '{}\n{}'.format(timestamp, secret) hmac_code = hmac.new(string_to_sign.encode("utf-8"), digestmod=hashlib.sha256).digest() sign = base64.b64encode(hmac_code).decode('utf-8') return sign
def send_message(message): current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") timestamp = int(time.time()) sign = gen_sign(timestamp, "密钥") data = { "timestamp": timestamp, "sign": sign, "msg_type": "post", "content": { "post": { "zh_cn": { "title": "数据库备份通知📢", "content": [ [ { "tag": "text", "text": "备份结果:" + message + "\n" }, { "tag": "text", "text": "备份时间:" + formatted_time } ] ] } } } } headers = { 'Content-Type': 'application/json' } res = requests.post("webhook", json=data, headers=headers)
def backup(): backup_command = "mysqldump -u root --password='root' umami > /test/data/umami.sql" ans = subprocess.call(backup_command, shell=True) if ans == 0: logging.basicConfig(level=logging.INFO, stream=sys.stdout)
secret_id = '腾讯密钥' secret_key = '腾讯密钥' region = 'cos地区'
token = None config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) client = CosS3Client(config)
uploadDir = '上传的文件目录' bucket = 'cos桶昵称' g = os.walk(uploadDir) pool = SimpleThreadPool() for path, dir_list, file_list in g: for file_name in file_list: srcKey = os.path.join(path, file_name) cosObjectKey = "test/" + file_name pool.add_task(client.upload_file, bucket, cosObjectKey, srcKey)
pool.wait_completion() result = pool.get_result() send_message("备份成功!") if not result['success_all']: send_message("备份失败!") else: send_message("备份失败!")
def job(): print("Job started at", datetime.now()) backup()
schedule.every().day.at("00:00").do(job)
if __name__ == '__main__': while True: schedule.run_pending() time.sleep(1)
|
修改代码
- sign = gen_sign(timestamp, “密钥”) 这里的密钥就是飞书机器人的密钥 具体操作查看参考中的
飞书高效监控CDN状态 - res = requests.post(“webhook”, json=data, headers=headers) 这里的webhook也是飞书机器人里面获取 具体操作查看参考中的
飞书高效监控CDN状态 - backup_command = “mysqldump -u root --password=‘root’ umami > /test/data/umami.sql” 这里需要修改的是你需要生成数据库文件的命令:mysqldump -u 用户名 --password=‘密码’ 数据库名 > 存放的路径
- secret_id 和 secret_key 为腾讯密钥 获取查看参考
- region:COS存储地区 获取查看参考
- uploadDir:本地文件需要上传的路径
- bucket:COS存储桶的昵称
- cosObjectKey:上传存储桶存放的位置
备份评论文件
import os from qcloud_cos import CosConfig from qcloud_cos import CosS3Client from qcloud_cos.cos_threadpool import SimpleThreadPool import sys import logging import time import base64 import hashlib import schedule from datetime import datetime import hmac import requests
def gen_sign(timestamp, secret): string_to_sign = '{}\n{}'.format(timestamp, secret) hmac_code = hmac.new(string_to_sign.encode("utf-8"), digestmod=hashlib.sha256).digest() sign = base64.b64encode(hmac_code).decode('utf-8') return sign
def send_message(message): current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") timestamp = int(time.time()) sign = gen_sign(timestamp, "飞书密钥") data = { "timestamp": timestamp, "sign": sign, "msg_type": "post", "content": { "post": { "zh_cn": { "title": "评论备份通知📢", "content": [ [ { "tag": "text", "text": "备份结果:" + message + "\n" }, { "tag": "text", "text": "备份时间:" + formatted_time } ] ] } } } } headers = { 'Content-Type': 'application/json' } res = requests.post("webhook", json=data, headers=headers)
def backup(): logging.basicConfig(level=logging.INFO, stream=sys.stdout)
secret_id = '腾讯密钥' secret_key = '腾讯密钥' region = 'cos地区'
token = None config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) client = CosS3Client(config)
uploadDir = '本地上传的路径' bucket = 'COS桶昵称' g = os.walk(uploadDir) pool = SimpleThreadPool() for path, dir_list, file_list in g: for file_name in file_list: srcKey = os.path.join(path, file_name) cosObjectKey = "test/" + file_name pool.add_task(client.upload_file, bucket, cosObjectKey, srcKey)
pool.wait_completion() result = pool.get_result() send_message("备份成功!") if not result['success_all']: send_message("备份失败!")
def job(): print("Job started at", datetime.now()) backup()
schedule.every().day.at("00:00").do(job)
if __name__ == '__main__': while True: schedule.run_pending() time.sleep(1)
|
修改代码
- sign = gen_sign(timestamp, “密钥”) 这里的密钥就是飞书机器人的密钥 具体操作查看参考中的
飞书高效监控CDN状态 - res = requests.post(“webhook”, json=data, headers=headers) 这里的webhook也是飞书机器人里面获取 具体操作查看参考中的
飞书高效监控CDN状态 - secret_id 和 secret_key 为腾讯密钥 获取查看参考
- region:COS存储地区 获取查看参考
- uploadDir:本地文件需要上传的路径
- bucket:COS存储桶的昵称
- cosObjectKey:上传存储桶存放的位置
效果


建议
这里我是使用了两个机器人通知,可以改成使用一个机器人通知,然后把两个代码进行合并,因为代码都差不多,没必要去分开来部署啥的,我是因为两个项目写的时间相差太远,所以直接用两个了。
参考
腾讯密钥获取
COS地区简称
腾讯云COS开发文档
飞书高效监控CDN状态