前言

默认的朋友圈项目是没有通知的,如果部署在服务器很难查看到执行的日志,这里我们的解决方案就是:给他加一个钉钉机器人通知,来获取到每次执行朋友圈时的日志。

394375ac-43dd-441a-9b1b-5d0df937ed2a

获取钉钉机器人

pc 端钉钉找到需要添加机器人的群,然后点击设置->机器人->机器人管理->添加机器人
c241b752-731f-4dc4-af7e-cb555fe20453
选择自定义
1c1949dd-1350-4903-9bbb-3d80da545098
填写相关的信息 如下图示

要选择加签 复制保存加签的密钥

29b2bc8e-e8c0-4ce3-99d6-af908fe351e1

点击完成后获取到 Webhook 同样需要把 Webhook 保存下来

2a96a1dc-adae-4818-9b37-286295ff2c9d

修改项目

上面的步骤获取到 webhook 和密钥之后 下载朋友圈部署的项目 修改代码

https://github.com/Rock-Candy-Tea/hexo-circle-of-friends

https://fcircle-doc.yyyzyyyz.cn/#/

e9097d54-ba22-42cf-a55b-5f5616917c82

找到项目下的 hexo_circle_of_friends文件夹下的pipelines文件夹下面的sql_pipe.py文件 只需要修改这个文件即可

保存错误列表

添加一个变量 self.err_list = []

方法完整代码

def __init__(self):
self.userdata = []
self.nonerror_data = set() # 能够根据友链link获取到文章的人
self.err_list = []

friendlist_push方法里面添加错误友链

方法完整代码

def friendlist_push(self, settings):
for user in self.userdata:
friend = models.Friend(
name=user[0],
link=user[1],
avatar=user[2]
)
if user[0] in self.nonerror_data:
# print("未失联的用户")
friend.error = False
elif settings["BLOCK_SITE"]:
error = True
for url in settings["BLOCK_SITE"]:
if re.match(url, friend.link):
friend.error = False
error = False
if error:
logger.error("请求失败,请检查链接: %s" % friend.link)
friend.error = True
else:
logger.error("请求失败,请检查链接: %s" % friend.link)
friend.error = True
self.err_list.append(friend.link)
self.session.add(friend)
self.session.commit()

添加一个方法

需要修改secret 和 webhook_url.
webhook_url只需要把xxx换成全面几个步骤获取的webhook的地址就行


def sendmessage(self, linktotal, errtotal, posttotal, todaytime, failed_links):
timestamp = str(round(time.time() * 1000))
secret = 'xxxx'
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
webhook_url = 'xxxxxx&timestamp=' + timestamp + '&sign=' + sign + ''
headers = {'Content-Type': 'application/json'}
text_content = (
"### 友链总数: {}\n"
"### 失联友链: {}\n"
"### 总文章: {}\n"
"### 运行时间: {}\n\n"
"{}"
).format(linktotal, errtotal, posttotal, todaytime,
"" if not failed_links else '### 请求失败的链接:\n' + '\n'.join(
[f'- [{link}]({link})' for link in failed_links]))

data = {
"msgtype": "markdown",
"markdown": {
"title": "朋友圈监控",
"text": text_content
}
}

r = requests.post(webhook_url, headers=headers, data=json.dumps(data))

close_spider方法里面调用发送钉钉机器人的方法


def close_spider(self, spider):
# print(self.nonerror_data)
# print(self.userdata)
settings = spider.settings
self.friendlist_push(settings)
self.outdate_clean(settings["OUTDATE_CLEAN"])
logger.info("----------------------")
logger.info("友链总数 : %d" % self.session.query(models.Friend).count())
logger.info("失联友链数 : %d" % self.session.query(models.Friend).filter_by(error=True).count())
logger.info("共 %d 篇文章" % self.session.query(models.Post).count())
logger.info("最后运行于:%s" % today)
self.sendmessage(str(self.session.query(models.Friend).count()),
str(self.session.query(models.Friend).filter_by(error=True).count()),
str(self.session.query(models.Post).count()), str(today), self.err_list)
logger.info("done!")

sql_pipe.py完整代码

# -*- coding:utf-8 -*-
# Author:yyyz
import os
import re
import sys
from urllib import parse
from .. import models
from ..utils import baselogger, project
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from datetime import datetime, timedelta
import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse

today = (datetime.utcnow() + timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')
logger = baselogger.get_logger(__name__)


class SQLPipeline:
def __init__(self):
self.userdata = []
self.nonerror_data = set() # 能够根据友链link获取到文章的人
self.err_list = []

def open_spider(self, spider):
settings = spider.settings
base_path = project.get_base_path()
db = settings["DATABASE"]
if settings["DEBUG"]:
if db == "sqlite":
if sys.platform == "win32":
conn = rf"sqlite:///{os.path.join(base_path, 'data.db')}?check_same_thread=False"
else:
conn = f"sqlite:////{os.path.join(base_path, 'data.db')}?check_same_thread=False"
elif db == "mysql":
conn = "mysql+pymysql://%s:%s@%s:3306/%s?charset=utf8mb4" \
% ("root", "123456", "localhost", "test")
else:
raise Exception("SQL连接失败,不支持的数据库!")
else:
if db == "sqlite":
conn = f"sqlite:////{os.path.join(base_path, 'data.db')}?check_same_thread=False"
elif db == "mysql":
conn = f"mysql+pymysql://{os.environ['MYSQL_USERNAME']}:{parse.quote_plus(os.environ['MYSQL_PASSWORD'])}" \
f"@{os.environ['MYSQL_IP']}:{os.environ['MYSQL_PORT']}/{os.environ['MYSQL_DB']}?charset=utf8mb4"
else:
raise Exception("SQL连接失败,不支持的数据库!")
try:
self.engine = create_engine(conn, pool_recycle=-1)
except:
raise Exception("SQL连接失败")
Session = sessionmaker(bind=self.engine)
self.session = scoped_session(Session)

# 创建表
models.Model.metadata.create_all(self.engine)
# 删除friend表
self.session.query(models.Friend).delete()
# 获取post表数据
self.query_post()
logger.info("Initialization complete")

def process_item(self, item, spider):
if "userdata" in item.keys():
li = []
li.append(item["name"])
li.append(item["link"])
li.append(item["img"])
self.userdata.append(li)
# print(item)
return item

if "title" in item.keys():
if item["author"] in self.nonerror_data:
pass
else:
# 未失联的人
self.nonerror_data.add(item["author"])

# print(item)
for query_item in self.query_post_list:
try:
if query_item.link == item["link"]:
item["created"] = min(item['created'], query_item.created)
self.session.query(models.Post).filter_by(link=query_item.link).delete()
except:
pass

self.friendpoor_push(item)

return item

def sendmessage(self, linktotal, errtotal, posttotal, todaytime, failed_links):
timestamp = str(round(time.time() * 1000))
secret = 'xxxx'
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
webhook_url = 'xxxx&timestamp=' + timestamp + '&sign=' + sign + ''
headers = {'Content-Type': 'application/json'}
text_content = (
"### 友链总数: {}\n"
"### 失联友链: {}\n"
"### 总文章: {}\n"
"### 运行时间: {}\n\n"
"{}"
).format(linktotal, errtotal, posttotal, todaytime,
"" if not failed_links else '### 请求失败的链接:\n' + '\n'.join(
[f'- {link}' for link in failed_links]))

data = {
"msgtype": "markdown",
"markdown": {
"title": "朋友圈监控",
"text": text_content
}
}

r = requests.post(webhook_url, headers=headers, data=json.dumps(data))

def close_spider(self, spider):
# print(self.nonerror_data)
# print(self.userdata)
settings = spider.settings
self.friendlist_push(settings)
self.outdate_clean(settings["OUTDATE_CLEAN"])
logger.info("----------------------")
logger.info("友链总数 : %d" % self.session.query(models.Friend).count())
logger.info("失联友链数 : %d" % self.session.query(models.Friend).filter_by(error=True).count())
logger.info("共 %d 篇文章" % self.session.query(models.Post).count())
logger.info("最后运行于:%s" % today)
self.sendmessage(str(self.session.query(models.Friend).count()),
str(self.session.query(models.Friend).filter_by(error=True).count()),
str(self.session.query(models.Post).count()), str(today), self.err_list)
logger.info("done!")

def query_post(self):
try:
self.query_post_list = self.session.query(models.Post).all()
except:
self.query_post_list = []

def outdate_clean(self, time_limit):
out_date_post = 0
self.query_post()
for query_item in self.query_post_list:
updated = query_item.updated
try:
query_time = datetime.strptime(updated, "%Y-%m-%d")
if (datetime.utcnow() + timedelta(hours=8) - query_time).days > time_limit:
self.session.query(models.Post).filter_by(link=query_item.link).delete()
out_date_post += 1
except:
self.session.query(models.Post).filter_by(link=query_item.link).delete()
out_date_post += 1
self.session.commit()
self.session.close()
# print('\n')
# print('共删除了%s篇文章' % out_date_post)
# print('\n')
# print('-------结束删除规则----------')

def friendlist_push(self, settings):
for user in self.userdata:
friend = models.Friend(
name=user[0],
link=user[1],
avatar=user[2]
)
if user[0] in self.nonerror_data:
# print("未失联的用户")
friend.error = False
elif settings["BLOCK_SITE"]:
error = True
for url in settings["BLOCK_SITE"]:
if re.match(url, friend.link):
friend.error = False
error = False
if error:
logger.error("请求失败,请检查链接: %s" % friend.link)
friend.error = True
else:
logger.error("请求失败,请检查链接: %s" % friend.link)
friend.error = True
self.err_list.append(friend.link)
self.session.add(friend)
self.session.commit()

def friendpoor_push(self, item):
post = models.Post(
title=item['title'],
created=item['created'],
updated=item['updated'],
link=item['link'],
author=item['author'],
avatar=item['avatar'],
rule=item['rule']
)
self.session.add(post)
self.session.commit()

info = f"""\033[1;34m\n——————————————————————————————————————————————————————————————————————————————
{item['author']}\n《{item['title']}》\n文章发布时间:{item['created']}\t\t采取的爬虫规则为:{item['rule']}
——————————————————————————————————————————————————————————————————————————————\033[0m"""
logger.info(info)

最后

改完代码 放到服务器部署就行了,执行后就会收到钉钉执行日志

第一版本

b1cbb67b-74b7-4907-920f-643400fb2379

第二版本

394375ac-43dd-441a-9b1b-5d0df937ed2a