前言
默认的朋友圈项目是没有通知的,如果部署在服务器很难查看到执行的日志,这里我们的解决方案就是:给他加一个钉钉机器人通知,来获取到每次执行朋友圈时的日志。

获取钉钉机器人
pc 端钉钉找到需要添加机器人的群,然后点击设置->机器人->机器人管理->添加机器人

选择自定义

填写相关的信息 如下图示
要选择加签 复制保存加签的密钥

点击完成后获取到 Webhook 同样需要把 Webhook 保存下来

修改项目
上面的步骤获取到 webhook 和密钥之后 下载朋友圈部署的项目 修改代码
https://github.com/Rock-Candy-Tea/hexo-circle-of-friends
https://fcircle-doc.yyyzyyyz.cn/#/

找到项目下的 hexo_circle_of_friends文件夹下的pipelines文件夹下面的sql_pipe.py文件 只需要修改这个文件即可
保存错误列表
添加一个变量 self.err_list = []
方法完整代码
def __init__(self): self.userdata = [] self.nonerror_data = set() self.err_list = []
|
在friendlist_push方法里面添加错误友链
方法完整代码
def friendlist_push(self, settings): for user in self.userdata: friend = models.Friend( name=user[0], link=user[1], avatar=user[2] ) if user[0] in self.nonerror_data: friend.error = False elif settings["BLOCK_SITE"]: error = True for url in settings["BLOCK_SITE"]: if re.match(url, friend.link): friend.error = False error = False if error: logger.error("请求失败,请检查链接: %s" % friend.link) friend.error = True else: logger.error("请求失败,请检查链接: %s" % friend.link) friend.error = True self.err_list.append(friend.link) self.session.add(friend) self.session.commit()
|
添加一个方法
需要修改secret 和 webhook_url.
webhook_url只需要把xxx换成全面几个步骤获取的webhook的地址就行
def sendmessage(self, linktotal, errtotal, posttotal, todaytime, failed_links): timestamp = str(round(time.time() * 1000)) secret = 'xxxx' secret_enc = secret.encode('utf-8') string_to_sign = '{}\n{}'.format(timestamp, secret) string_to_sign_enc = string_to_sign.encode('utf-8') hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) webhook_url = 'xxxxxx×tamp=' + timestamp + '&sign=' + sign + '' headers = {'Content-Type': 'application/json'} text_content = ( "### 友链总数: {}\n" "### 失联友链: {}\n" "### 总文章: {}\n" "### 运行时间: {}\n\n" "{}" ).format(linktotal, errtotal, posttotal, todaytime, "" if not failed_links else '### 请求失败的链接:\n' + '\n'.join( [f'- [{link}]({link})' for link in failed_links]))
data = { "msgtype": "markdown", "markdown": { "title": "朋友圈监控", "text": text_content } }
r = requests.post(webhook_url, headers=headers, data=json.dumps(data))
|
在close_spider方法里面调用发送钉钉机器人的方法
def close_spider(self, spider): settings = spider.settings self.friendlist_push(settings) self.outdate_clean(settings["OUTDATE_CLEAN"]) logger.info("----------------------") logger.info("友链总数 : %d" % self.session.query(models.Friend).count()) logger.info("失联友链数 : %d" % self.session.query(models.Friend).filter_by(error=True).count()) logger.info("共 %d 篇文章" % self.session.query(models.Post).count()) logger.info("最后运行于:%s" % today) self.sendmessage(str(self.session.query(models.Friend).count()), str(self.session.query(models.Friend).filter_by(error=True).count()), str(self.session.query(models.Post).count()), str(today), self.err_list) logger.info("done!")
|
sql_pipe.py完整代码
import os import re import sys from urllib import parse from .. import models from ..utils import baselogger, project from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session from datetime import datetime, timedelta import requests import json import time import hmac import hashlib import base64 import urllib.parse
today = (datetime.utcnow() + timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S') logger = baselogger.get_logger(__name__)
class SQLPipeline: def __init__(self): self.userdata = [] self.nonerror_data = set() self.err_list = []
def open_spider(self, spider): settings = spider.settings base_path = project.get_base_path() db = settings["DATABASE"] if settings["DEBUG"]: if db == "sqlite": if sys.platform == "win32": conn = rf"sqlite:///{os.path.join(base_path, 'data.db')}?check_same_thread=False" else: conn = f"sqlite:////{os.path.join(base_path, 'data.db')}?check_same_thread=False" elif db == "mysql": conn = "mysql+pymysql://%s:%s@%s:3306/%s?charset=utf8mb4" \ % ("root", "123456", "localhost", "test") else: raise Exception("SQL连接失败,不支持的数据库!") else: if db == "sqlite": conn = f"sqlite:////{os.path.join(base_path, 'data.db')}?check_same_thread=False" elif db == "mysql": conn = f"mysql+pymysql://{os.environ['MYSQL_USERNAME']}:{parse.quote_plus(os.environ['MYSQL_PASSWORD'])}" \ f"@{os.environ['MYSQL_IP']}:{os.environ['MYSQL_PORT']}/{os.environ['MYSQL_DB']}?charset=utf8mb4" else: raise Exception("SQL连接失败,不支持的数据库!") try: self.engine = create_engine(conn, pool_recycle=-1) except: raise Exception("SQL连接失败") Session = sessionmaker(bind=self.engine) self.session = scoped_session(Session)
models.Model.metadata.create_all(self.engine) self.session.query(models.Friend).delete() self.query_post() logger.info("Initialization complete")
def process_item(self, item, spider): if "userdata" in item.keys(): li = [] li.append(item["name"]) li.append(item["link"]) li.append(item["img"]) self.userdata.append(li) return item
if "title" in item.keys(): if item["author"] in self.nonerror_data: pass else: self.nonerror_data.add(item["author"])
for query_item in self.query_post_list: try: if query_item.link == item["link"]: item["created"] = min(item['created'], query_item.created) self.session.query(models.Post).filter_by(link=query_item.link).delete() except: pass
self.friendpoor_push(item)
return item
def sendmessage(self, linktotal, errtotal, posttotal, todaytime, failed_links): timestamp = str(round(time.time() * 1000)) secret = 'xxxx' secret_enc = secret.encode('utf-8') string_to_sign = '{}\n{}'.format(timestamp, secret) string_to_sign_enc = string_to_sign.encode('utf-8') hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) webhook_url = 'xxxx×tamp=' + timestamp + '&sign=' + sign + '' headers = {'Content-Type': 'application/json'} text_content = ( "### 友链总数: {}\n" "### 失联友链: {}\n" "### 总文章: {}\n" "### 运行时间: {}\n\n" "{}" ).format(linktotal, errtotal, posttotal, todaytime, "" if not failed_links else '### 请求失败的链接:\n' + '\n'.join( [f'- {link}' for link in failed_links]))
data = { "msgtype": "markdown", "markdown": { "title": "朋友圈监控", "text": text_content } }
r = requests.post(webhook_url, headers=headers, data=json.dumps(data))
def close_spider(self, spider): settings = spider.settings self.friendlist_push(settings) self.outdate_clean(settings["OUTDATE_CLEAN"]) logger.info("----------------------") logger.info("友链总数 : %d" % self.session.query(models.Friend).count()) logger.info("失联友链数 : %d" % self.session.query(models.Friend).filter_by(error=True).count()) logger.info("共 %d 篇文章" % self.session.query(models.Post).count()) logger.info("最后运行于:%s" % today) self.sendmessage(str(self.session.query(models.Friend).count()), str(self.session.query(models.Friend).filter_by(error=True).count()), str(self.session.query(models.Post).count()), str(today), self.err_list) logger.info("done!")
def query_post(self): try: self.query_post_list = self.session.query(models.Post).all() except: self.query_post_list = []
def outdate_clean(self, time_limit): out_date_post = 0 self.query_post() for query_item in self.query_post_list: updated = query_item.updated try: query_time = datetime.strptime(updated, "%Y-%m-%d") if (datetime.utcnow() + timedelta(hours=8) - query_time).days > time_limit: self.session.query(models.Post).filter_by(link=query_item.link).delete() out_date_post += 1 except: self.session.query(models.Post).filter_by(link=query_item.link).delete() out_date_post += 1 self.session.commit() self.session.close()
def friendlist_push(self, settings): for user in self.userdata: friend = models.Friend( name=user[0], link=user[1], avatar=user[2] ) if user[0] in self.nonerror_data: friend.error = False elif settings["BLOCK_SITE"]: error = True for url in settings["BLOCK_SITE"]: if re.match(url, friend.link): friend.error = False error = False if error: logger.error("请求失败,请检查链接: %s" % friend.link) friend.error = True else: logger.error("请求失败,请检查链接: %s" % friend.link) friend.error = True self.err_list.append(friend.link) self.session.add(friend) self.session.commit()
def friendpoor_push(self, item): post = models.Post( title=item['title'], created=item['created'], updated=item['updated'], link=item['link'], author=item['author'], avatar=item['avatar'], rule=item['rule'] ) self.session.add(post) self.session.commit()
info = f"""\033[1;34m\n—————————————————————————————————————————————————————————————————————————————— {item['author']}\n《{item['title']}》\n文章发布时间:{item['created']}\t\t采取的爬虫规则为:{item['rule']} ——————————————————————————————————————————————————————————————————————————————\033[0m""" logger.info(info)
|
最后
改完代码 放到服务器部署就行了,执行后就会收到钉钉执行日志
第一版本

第二版本
