PyramidStore是一个用于开发Python影视爬虫的框架,它允许开发者通过简单的配置和代码实现,从各类视频网站抓取影视资源并以标准格式展示。该框架采用插件化设计,方便开发者快速实现不同网站的爬虫。
- 插件机制:每个爬虫作为一个独立插件,放置在
plugin
目录下 - 标准化输出:统一的数据结构,便于上层应用处理
- 代理功能:支持本地代理,处理特殊格式的视频流
- 基本的Python编程知识
- 了解HTTP请求和HTML/JSON解析
- 熟悉网络爬虫的基本概念
- 具备基础的网页分析能力
- Python 3.6+ 环境
- 开发工具(如VS Code、PyCharm等)
- 网页分析工具(如Chrome开发者工具)
- HTTP调试工具(如Postman、Fiddler)
pip install requests
pip install beautifulsoup4
pip install lxml
pip install pycryptodome # 用于某些加密解密场景
基于fish2018/py_spider项目的结构如下:
py_spider/
├── PyramidStore/ # PyramidStore核心库文件
│ ├── base/ # 基础工具
│ │ ├── spider.py # 爬虫基类
│ │ ├── local.py # 本地调试工具基类
│ │ └── localproxy.py # 本地代理工具
│ ├── plugin/ # 爬虫插件目录
│ │ ├── custom/ # 自定义爬虫目录
│ │ │ └── py_*.py # 自定义爬虫插件
│ │ └── ...
│ └── example.json # 影视json配置
├── local_spider_debug.py # 本地爬虫调试工具,用于快速测试爬虫各个功能
└── README.md # 项目说明文档
local_spider_debug.py
是一个非常重要的调试工具,它可以帮助开发者在本地测试爬虫各个功能,而无需部署到完整环境中。该工具支持测试爬虫的主页、分类、搜索、详情和播放链接等功能,使用方法会在后面的章节中详细说明。
PyramidStore 目录包含框架的核心代码,所有爬虫类都需要继承自 spider.py
中定义的基类。自定义爬虫应该存放在 plugin/custom
目录下,并且文件名必须以 py_
开头。
在PyramidStore框架中,所有爬虫都继承自基础的spider.py
类,并且遵循特定的命名规范和实现约定。
- 爬虫文件必须以
py_
开头 - 文件名应具有描述性,通常基于目标网站名称
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from bs4 import BeautifulSoup
import urllib.parse
import requests
import json
import time
import re
import sys
sys.path.append('../../')
try:
from base.spider import Spider
except ImportError:
# 定义一个基础接口类,用于本地测试
class Spider:
def init(self, extend=""):
pass
class Spider(Spider):
def __init__(self):
# 设置爬虫基本属性
self.siteUrl = 'https://example.com' # 目标网站地址
# 其他初始化配置
根据PyramidStore爬虫框架规范,每个爬虫插件必须实现以下核心方法:
def getName(self):
"""返回爬虫名称"""
return "我的影视站"
def homeContent(self, filter):
"""获取主页分类及筛选"""
result = {}
# 手动设置分类
cateManual = {
"电影": "1",
"电视剧": "2",
"动漫": "3",
"综艺": "4",
"纪录片": "5"
}
classes = []
for k in cateManual:
classes.append({
'type_id': cateManual[k],
'type_name': k
})
result['class'] = classes
# 如果需要返回筛选配置
if filter:
result['filters'] = self.config['filter']
return result
def homeVideoContent(self):
"""获取首页推荐视频"""
result = {}
videos = []
# 请求首页
url = self.siteUrl
response = self.fetch(url)
# 解析HTML提取推荐视频
# ...提取视频信息代码...
for item in items:
videos.append({
"vod_id": item_id,
"vod_name": title,
"vod_pic": img_url,
"vod_remarks": remarks # 视频状态说明,如"更新至x集"
})
result['list'] = videos
return result
def categoryContent(self, tid, pg, filter, extend):
"""获取分类内容"""
result = {}
# 构建分类页URL
url = f'{self.siteUrl}/category/{tid}/page/{pg}'
# 如果有筛选参数
if extend:
# 处理筛选参数
if 'year' in extend and extend['year'] != '0':
url += f'?year={extend["year"]}'
# 更多筛选参数处理...
# 请求并解析页面
response = self.fetch(url)
# 提取视频列表
videos = []
# ...解析代码...
result['list'] = videos
result['page'] = pg
result['pagecount'] = total_pages # 总页数
result['limit'] = 20 # 每页数量
result['total'] = total_count # 总条数
return result
def detailContent(self, ids):
"""获取详情页内容"""
# 获取视频ID
video_id = ids[0]
# 构建详情页URL
url = f'{self.siteUrl}/detail/{video_id}'
# 请求详情页
response = self.fetch(url)
# 解析详情页
# ...解析代码...
# 提取视频基本信息
title = ... # 标题
img = ... # 封面图
desc = ... # 描述
# 提取播放源和剧集
play_from = [] # 播放源列表
play_url = [] # 播放地址列表
# ...提取播放信息代码...
# 构建标准化返回结果
vod = {
"vod_id": video_id,
"vod_name": title,
"vod_pic": img,
"vod_year": year,
"vod_area": area,
"vod_remarks": remarks,
"vod_actor": actor,
"vod_director": director,
"vod_content": desc,
"vod_play_from": "$$$".join(play_from),
"vod_play_url": "$$$".join(play_url)
}
result = {
'list': [vod]
}
return result
def searchContent(self, key, quick, pg=1):
"""搜索功能"""
# 构建搜索URL
search_url = f'{self.siteUrl}/search?keyword={quote_plus(key)}'
# 请求搜索页面
response = self.fetch(search_url)
# 解析搜索结果
# ...解析代码...
videos = []
for item in items:
videos.append({
"vod_id": item_id,
"vod_name": title,
"vod_pic": img,
"vod_remarks": remarks
})
return videos
def searchContentPage(self, key, quick, pg=1):
return self.searchContent(key, quick, pg)
def playerContent(self, flag, id, vipFlags):
"""获取播放内容"""
result = {}
try:
# 处理播放链接
if self.isVideoFormat(id):
# 直接使用链接
result["parse"] = 0
result["url"] = id
else:
# 需要进一步解析
play_page_url = f"{self.siteUrl}/play/{id}"
response = self.fetch(play_page_url)
# 从播放页提取视频链接
html_content = response.text
video_url = self.extractVideoUrl(html_content)
if video_url:
result["parse"] = 0
result["url"] = video_url
else:
# 无法提取直链,返回播放页
result["parse"] = 1
result["url"] = play_page_url
# 添加请求头
result["header"] = {
"User-Agent": self.userAgent,
"Referer": self.siteUrl
}
return result
except Exception as e:
print(f"解析播放内容失败: {e}")
return {}
def isVideoFormat(self, url):
"""判断是否为视频格式"""
video_formats = ['.mp4', '.m3u8', '.ts', '.mkv', '.avi', '.webm']
if url.startswith('http'):
for format in video_formats:
if url.lower().find(format) > -1:
return True
return False
def manualVideoCheck(self):
"""是否需要手动检查视频"""
return False # 默认不需要
def fetch(self, url, headers=None):
"""统一的请求方法"""
if headers is None:
headers = {
'User-Agent': self.userAgent,
'Referer': self.siteUrl
}
try:
response = requests.get(url, headers=headers, timeout=5)
response.raise_for_status() # 检查请求是否成功
return response
except Exception as e:
print(f"请求失败: {url}, 错误: {e}")
return None
def extractVideoUrl(self, html_content):
"""从HTML中提取视频链接"""
# 方法1:正则表达式匹配
pattern = r'player.src\s*=\s*[\'"]([^\'"]+\.mp4)[\'"]'
match = re.search(pattern, html_content)
if match:
return match.group(1)
# 方法2:从JSON数据中提取
json_pattern = r'var\s+player_data\s*=\s*({[^}]+})'
json_match = re.search(json_pattern, html_content)
if json_match:
try:
data = json.loads(json_match.group(1))
return data.get('url', '')
except:
pass
# 方法3:从iframe中提取
soup = BeautifulSoup(html_content, 'html.parser')
iframe = soup.select_one('iframe')
if iframe and 'src' in iframe.attrs:
return iframe['src']
return ''
某些网站的视频内容可能需要代理才能正确播放,PyramidStore框架提供了本地代理支持:
def localProxy(self, param):
"""本地代理"""
action = param.get('action')
if action == 'proxy':
url = param.get('url')
headers = {
'User-Agent': self.userAgent,
'Referer': self.siteUrl
}
# 处理不同类型的内容
if param.get('type') == 'mpd':
# 处理DASH流
content = requests.get(url, headers=headers).text
return [200, "application/dash+xml", content]
elif param.get('type') == 'm3u8':
# 处理HLS流
content = requests.get(url, headers=headers).text
return [200, "application/vnd.apple.mpegurl", content]
elif param.get('type') == 'media':
# 处理媒体流
r = requests.get(url, headers=headers, stream=True)
return [206, "application/octet-stream", r.content]
return None
以下是一个完整的爬虫开发流程示例,以开发一个名为"DemoSite"的爬虫插件为例,展示从分析到实现的全过程。
首先,我们需要分析目标网站的结构,以确定各个页面和数据的获取方式:
-
首页结构分析:
- 网站首页地址:
https://example.com
- 分类菜单位置:顶部导航栏
- 分类项有:电影、电视剧、动漫、综艺
- 网站首页地址:
-
分类页面分析:
- 分类页面URL格式:
https://example.com/list/{分类ID}_{页码}.html
- 视频列表项选择器:
.video-list .video-item
- 视频标题选择器:
.video-item .title
- 视频封面选择器:
.video-item .pic img[src]
- 视频ID提取:从链接中提取,格式为
/detail/{视频ID}.html
- 分类页面URL格式:
-
详情页面分析:
- 详情页URL格式:
https://example.com/detail/{视频ID}.html
- 标题选择器:
.video-info .title
- 封面图选择器:
.video-cover img[src]
- 年份选择器:
.video-info .year
- 地区选择器:
.video-info .area
- 演员选择器:
.video-info .actor
- 导演选择器:
.video-info .director
- 简介选择器:
.video-info .desc
- 播放列表选择器:
.play-list
- 播放源选择器:
.source-list .source-item
- 详情页URL格式:
-
播放页面分析:
- 播放页URL格式:
https://example.com/play/{视频ID}/{集数ID}.html
- 视频播放器选择器:
#player
- 视频链接提取方式:从JavaScript变量中提取或网络请求分析
- 播放页URL格式:
基于上述分析,我们可以实现爬虫的基本结构:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import re
import time
import requests
from urllib.parse import quote_plus, urljoin
from bs4 import BeautifulSoup
import sys
sys.path.append('../../')
try:
from base.spider import Spider
except ImportError:
# 定义一个基础接口类,用于本地测试
class Spider:
def init(self, extend=""):
pass
class Spider(Spider):
def __init__(self):
self.siteUrl = 'https://example.com'
self.userAgent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
def getName(self):
return "示例影视站"
实现基础的网页请求与解析函数,用于后续功能开发:
def fetch(self, url, headers=None):
"""统一的网页请求函数"""
if headers is None:
headers = {
'User-Agent': self.userAgent,
'Referer': self.siteUrl
}
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
response.encoding = 'utf-8' # 确保编码正确
return response
except Exception as e:
print(f"请求{url}失败: {e}")
return None
def parseHtml(self, html):
"""解析HTML为BeautifulSoup对象"""
return BeautifulSoup(html, 'html.parser')
def extractVideoItem(self, item_element):
"""从列表项元素中提取视频信息"""
video = {}
try:
# 提取标题
title_element = item_element.select_one('.title')
if title_element:
video['vod_name'] = title_element.text.strip()
# 提取封面图
img_element = item_element.select_one('.pic img')
if img_element and 'src' in img_element.attrs:
video['vod_pic'] = urljoin(self.siteUrl, img_element['src'])
# 提取备注信息
remarks_element = item_element.select_one('.remarks')
if remarks_element:
video['vod_remarks'] = remarks_element.text.strip()
# 提取视频ID
link_element = item_element.select_one('a[href]')
if link_element and 'href' in link_element.attrs:
href = link_element['href']
id_match = re.search(r'/detail/([^.]+)', href)
if id_match:
video['vod_id'] = id_match.group(1)
except Exception as e:
print(f"提取视频项信息失败: {e}")
return video
实现homeContent
方法,返回网站的分类信息:
def homeContent(self, filter):
"""获取首页分类及筛选"""
result = {}
# 手动设置分类信息
cateManual = {
"电影": "1",
"电视剧": "2",
"动漫": "3",
"综艺": "4"
}
# 构建分类列表
classes = []
for k in cateManual:
classes.append({
'type_id': cateManual[k],
'type_name': k
})
result['class'] = classes
# 如果需要,返回筛选配置
if filter:
result['filters'] = {
"1": [ # 电影分类的筛选
{"key": "year", "name": "年份", "value": [{"n": "全部", "v": "0"}, {"n": "2023", "v": "2023"}, {"n": "2022", "v": "2022"}]},
{"key": "area", "name": "地区", "value": [{"n": "全部", "v": "0"}, {"n": "中国大陆", "v": "中国大陆"}, {"n": "美国", "v": "美国"}]}
],
"2": [ # 电视剧分类的筛选
{"key": "year", "name": "年份", "value": [{"n": "全部", "v": "0"}, {"n": "2023", "v": "2023"}, {"n": "2022", "v": "2022"}]},
{"key": "area", "name": "地区", "value": [{"n": "全部", "v": "0"}, {"n": "中国大陆", "v": "中国大陆"}, {"n": "韩国", "v": "韩国"}]}
]
}
return result
实现categoryContent
方法,爬取分类页面的视频列表:
def categoryContent(self, tid, pg, filter, extend):
"""获取分类内容"""
result = {}
# 构建分类页URL
url = f'{self.siteUrl}/list/{tid}_{pg}.html'
# 处理筛选参数
if extend:
params = []
if 'year' in extend and extend['year'] != '0':
params.append(f'year={extend["year"]}')
if 'area' in extend and extend['area'] != '0':
params.append(f'area={extend["area"]}')
if params:
url += '?' + '&'.join(params)
# 请求分类页
response = self.fetch(url)
if not response:
return result
# 解析HTML
soup = self.parseHtml(response.text)
# 提取视频列表
videos = []
items = soup.select('.video-list .video-item')
for item in items:
video = self.extractVideoItem(item)
if 'vod_id' in video: # 确保提取到了视频ID
videos.append(video)
# 提取分页信息
page_info = soup.select_one('.pagination')
total_pages = 1
if page_info:
# 尝试从分页器提取总页数
page_text = page_info.text
total_match = re.search(r'共(\d+)页', page_text)
if total_match:
total_pages = int(total_match.group(1))
# 构建返回结果
result['list'] = videos
result['page'] = pg
result['pagecount'] = total_pages
result['limit'] = len(videos)
result['total'] = total_pages * len(videos) # 估计总条数
return result
实现detailContent
方法,解析视频详情页并提取剧集信息:
def detailContent(self, ids):
"""获取详情页内容"""
video_id = ids[0]
# 构建详情页URL
url = f'{self.siteUrl}/detail/{video_id}.html'
# 请求详情页
response = self.fetch(url)
if not response:
return {}
# 解析HTML
soup = self.parseHtml(response.text)
# 提取基本信息
title = ''
title_element = soup.select_one('.video-info .title')
if title_element:
title = title_element.text.strip()
img = ''
img_element = soup.select_one('.video-cover img')
if img_element and 'src' in img_element.attrs:
img = urljoin(self.siteUrl, img_element['src'])
# 提取其他信息
infos = {}
info_items = soup.select('.video-info .info-item')
for item in info_items:
item_text = item.text.strip()
if ':' in item_text:
key, value = item_text.split(':', 1)
infos[key.strip()] = value.strip()
# 提取简介
desc = ''
desc_element = soup.select_one('.video-info .desc')
if desc_element:
desc = desc_element.text.strip()
# 提取播放源和剧集
play_from = []
play_url = []
source_items = soup.select('.source-list .source-item')
for source in source_items:
source_name = source.text.strip()
play_from.append(source_name)
# 获取对应播放列表
source_id = source.get('data-id', '')
episode_list = soup.select(f'.play-list[data-source="{source_id}"] a')
episodes = []
for ep in episode_list:
ep_name = ep.text.strip()
ep_url = ep.get('href', '')
if ep_url:
ep_id = re.search(r'/play/[^/]+/([^.]+)', ep_url)
if ep_id:
episodes.append(f"{ep_name}${video_id}_{ep_id.group(1)}")
play_url.append('#'.join(episodes))
# 构建返回数据
vod = {
"vod_id": video_id,
"vod_name": title,
"vod_pic": img,
"type_name": infos.get('类型', ''),
"vod_year": infos.get('年份', ''),
"vod_area": infos.get('地区', ''),
"vod_remarks": infos.get('状态', ''),
"vod_actor": infos.get('演员', ''),
"vod_director": infos.get('导演', ''),
"vod_content": desc,
"vod_play_from": "$$$".join(play_from),
"vod_play_url": "$$$".join(play_url)
}
result = {
'list': [vod]
}
return result
实现playerContent
方法,提取视频播放链接:
def playerContent(self, flag, id, vipFlags):
"""获取播放内容"""
result = {}
# 分解视频ID和剧集ID
parts = id.split('_')
if len(parts) != 2:
return result
video_id, episode_id = parts
# 构建播放页URL
url = f'{self.siteUrl}/play/{video_id}/{episode_id}.html'
# 请求播放页
response = self.fetch(url)
if not response:
return result
# 解析HTML
html_content = response.text
# 尝试提取播放链接
# 方法1: 从JavaScript变量中提取
player_data_pattern = r'var\s+player_data\s*=\s*({[^}]+})'
player_match = re.search(player_data_pattern, html_content)
if player_match:
try:
player_data = json.loads(player_match.group(1))
if 'url' in player_data:
play_url = player_data['url']
# 设置结果
result["parse"] = 0 # 直链
result["playUrl"] = ""
result["url"] = play_url
result["header"] = {
"User-Agent": self.userAgent,
"Referer": url
}
return result
except Exception as e:
print(f"解析播放数据失败: {e}")
# 方法2: 直接匹配视频URL
video_url_pattern = r'(https?://[^"\']+\.(mp4|m3u8))'
video_match = re.search(video_url_pattern, html_content)
if video_match:
play_url = video_match.group(0)
# 设置结果
result["parse"] = 0 # 直链
result["playUrl"] = ""
result["url"] = play_url
result["header"] = {
"User-Agent": self.userAgent,
"Referer": url
}
return result
# 方法3: 如果以上方法都失败,返回播放页面地址
result["parse"] = 1 # 需要嗅探
result["playUrl"] = ""
result["url"] = url
result["header"] = {
"User-Agent": self.userAgent,
"Referer": self.siteUrl
}
return result
实现searchContent
方法,爬取搜索页面的结果:
def searchContent(self, key, quick):
"""搜索功能"""
# 构建搜索URL
search_url = f'{self.siteUrl}/search.html?keyword={quote_plus(key)}'
# 请求搜索页
response = self.fetch(search_url)
if not response:
return []
# 解析HTML
soup = self.parseHtml(response.text)
# 提取视频列表
videos = []
items = soup.select('.search-list .video-item')
for item in items:
video = self.extractVideoItem(item)
if 'vod_id' in video: # 确保提取到了视频ID
videos.append(video)
return videos
最后,实现isVideoFormat
和manualVideoCheck
方法:
def isVideoFormat(self, url):
"""判断是否为视频格式"""
video_formats = ['.mp4', '.m3u8', '.ts', '.mkv', '.avi', '.webm']
if url.startswith('http'):
for fmt in video_formats:
if url.lower().find(fmt) > -1:
return True
return False
def manualVideoCheck(self):
"""是否需要手动检查视频"""
return False # 默认不需要
PyramidStore提供了本地调试工具local_spider_debug.py
,该工具是开发过程中不可或缺的辅助工具,可以快速测试爬虫的各项功能,而无需将爬虫部署到完整环境中。
python local_spider_debug.py [功能] [参数1] [参数2] ...
# 测试首页数据获取
python local_spider_debug.py home
# 测试分类数据获取
python local_spider_debug.py category 分类ID [页码] [是否启用筛选]
# 例如: python local_spider_debug.py category 1 1 1
# 测试搜索功能
python local_spider_debug.py search 关键词 [页码] [快速搜索]
# 例如: python local_spider_debug.py search 战狼 1 1
# 测试详情页面解析
python local_spider_debug.py detail 视频ID
# 例如: python local_spider_debug.py detail movie/12345
# 测试播放链接获取
python local_spider_debug.py player 视频ID [播放源索引] [剧集索引]
# 例如: python local_spider_debug.py player movie/12345 0 0
# 参数说明:播放源索引和剧集索引都是从0开始的整数
以下是一个完整的调试流程案例:
# 1. 首先测试爬虫的首页功能
python local_spider_debug.py home
# 2. 测试电影分类的内容获取
python local_spider_debug.py category 1 1 1
# 3. 从返回的分类内容中找到一个电影ID,测试详情页
python local_spider_debug.py detail movie/12345
# 4. 测试该电影的播放链接
python local_spider_debug.py player movie/12345 0 0
# 5. 测试搜索功能
python local_spider_debug.py search 流浪地球
调试工具会输出详细的执行过程和结果,包括:
- 请求信息:显示请求的URL、请求头和响应状态
- 解析过程:显示数据解析的关键步骤
- 结果数据:格式化输出爬虫方法返回的数据结构
- 错误信息:如果出现错误,会显示详细的错误信息和堆栈跟踪
同时,调试工具会将结果保存为JSON文件,便于后续分析和调试:
数据已保存到 debug_results/home_content_20250328_113845.json
数据已保存到 debug_results/detail_12345_20250328_113850.json
数据已保存到 debug_results/player_12345_第一集_20250328_113855.json
以详情页结果文件为例,其JSON结构通常包含以下内容:
{
"list": [{
"vod_id": "movie/12345",
"vod_name": "示例电影",
"vod_pic": "https://example.com/images/12345.jpg",
"vod_year": "2023",
"vod_area": "中国大陆",
"vod_remarks": "HD高清",
"vod_actor": "演员A,演员B,演员C",
"vod_director": "导演A",
"vod_content": "电影简介内容...",
"vod_play_from": "线路1$$$线路2",
"vod_play_url": "HD$movie/12345/1#TC$movie/12345/2$$$HD$movie/12345/3#TC$movie/12345/4"
}]
}
播放链接调试是爬虫开发中最关键的环节,调试工具会显示以下信息:
[播放源] 线路1
[播放集数] 第1集
[解析方式] parse=0 (直链模式)
[播放地址] https://example.com/videos/12345.mp4
[请求头信息]
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36
Referer: https://example.com
播放链接提取耗时: 0.328秒
症状:调试输出显示数据结构不符合规范,或者缺少必要字段
解决方案:
- 检查方法返回的数据结构是否符合PyramidStore的规范
- 确保所有必需字段都已包含,并且类型正确
- 使用IDE的调试功能或添加打印语句检查中间数据
# 在关键处添加调试输出
print(f"解析到的标题: {title}")
print(f"解析到的播放列表: {play_list}")
症状:无法从页面提取所需信息,返回空数据
解决方案:
- 使用
local_spider_debug.py
的输出查看HTML内容 - 使用浏览器开发者工具重新分析页面结构
- 检查是否需要处理JavaScript动态加载的内容
- 调整选择器或正则表达式,适应网站结构
# 添加更健壮的解析逻辑
def get_safe_text(soup_element, selector, default=""):
element = soup_element.select_one(selector)
return element.text.strip() if element else default
症状:请求目标网站超时或返回错误状态码
解决方案:
- 检查网络连接和目标站点状态
- 增加请求超时时间
- 添加重试逻辑
- 检查是否被网站的反爬机制拦截
# 添加重试逻辑的请求函数
def fetch_with_retry(self, url, max_retries=3, timeout=10):
for i in range(max_retries):
try:
headers = {
"User-Agent": self.userAgent,
"Referer": self.siteUrl
}
response = requests.get(url, headers=headers, timeout=timeout)
response.raise_for_status()
return response
except Exception as e:
print(f"第{i+1}次请求失败: {e}")
if i < max_retries - 1:
time.sleep(2) # 等待2秒后重试
return None
症状:成功获取播放链接,但无法播放视频
解决方案:
- 在浏览器中直接访问提取的链接,验证是否可以播放
- 检查链接是否需要特定的请求头(如Referer)
- 分析是否需要处理防盗链或地区限制
- 检查链接是否为临时链接,需要定期刷新
# 验证播放链接
def verify_play_url(self, url, headers=None):
try:
if headers is None:
headers = {
"User-Agent": self.userAgent,
"Referer": self.siteUrl
}
# 只请求头部信息,不下载整个视频
response = requests.head(url, headers=headers, timeout=5)
return response.status_code == 200
except Exception as e:
print(f"链接验证失败: {e}")
return False
为了更方便地分析网页结构,可以添加代码将HTML内容保存到本地文件:
def save_html_for_debug(self, html_content, filename="debug_page.html"):
"""保存HTML内容到本地文件,方便分析"""
try:
with open(filename, "w", encoding="utf-8") as f:
f.write(html_content)
print(f"HTML内容已保存到 {filename}")
except Exception as e:
print(f"保存HTML内容失败: {e}")
对于复杂的数据提取问题,可以创建专用的分析脚本:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import requests
import re
import json
from bs4 import BeautifulSoup
def analyze_player_page(url):
"""分析播放页面,提取可能的视频链接"""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Referer": url
}
try:
response = requests.get(url, headers=headers)
html_content = response.text
# 查找所有可能的MP4链接
mp4_pattern = r'(https?://[^"\']+\.mp4)'
mp4_links = re.findall(mp4_pattern, html_content)
# 查找所有可能的M3U8链接
m3u8_pattern = r'(https?://[^"\']+\.m3u8)'
m3u8_links = re.findall(m3u8_pattern, html_content)
# 分析JavaScript变量
js_vars = {}
js_var_pattern = r'var\s+(\w+)\s*=\s*[\'"]([^\'"]+)[\'"]'
js_var_matches = re.findall(js_var_pattern, html_content)
for var_name, var_value in js_var_matches:
js_vars[var_name] = var_value
# 提取潜在的JSON数据
json_pattern = r'var\s+\w+\s*=\s*({[^;]+});'
json_matches = re.findall(json_pattern, html_content)
json_data = []
for json_str in json_matches:
try:
data = json.loads(json_str)
json_data.append(data)
except:
pass
# 汇总结果
results = {
"url": url,
"mp4_links": mp4_links,
"m3u8_links": m3u8_links,
"js_vars": js_vars,
"json_data": json_data
}
# 打印分析结果
print(f"分析URL: {url}")
print(f"找到 {len(mp4_links)} 个MP4链接:")
for i, url in enumerate(results["mp4_links"], 1):
print(f" {i}. {url}")
print(f"找到 {len(m3u8_links)} 个M3U8链接:")
for i, url in enumerate(results["m3u8_links"], 1):
print(f" {i}. {url}")
print(f"找到 {len(js_vars)} 个JavaScript变量")
print(f"找到 {len(json_data)} 个JSON数据块")
# 保存分析结果
with open("player_analysis.json", "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print("\n分析结果已保存到 player_analysis.json")
# 保存HTML内容
with open("player_page.html", "w", encoding="utf-8") as f:
f.write(html_content)
print("HTML内容已保存到 player_page.html")
return results
except Exception as e:
print(f"分析失败: {e}")
return None
if __name__ == "__main__":
if len(sys.argv) < 2:
print("用法: python analyze_player.py <播放页URL>")
sys.exit(1)
url = sys.argv[1]
analyze_player_page(url)
对于有地区限制或需要特定IP访问的网站,可以使用代理进行调试:
def fetch_with_proxy(self, url, proxy=None):
"""使用代理的请求函数"""
if proxy is None:
# 可以设置默认代理
proxy = "http://127.0.0.1:7890" # 例如本地的Clash代理
proxies = {
"http": proxy,
"https": proxy
}
headers = {
"User-Agent": self.userAgent,
"Referer": self.siteUrl
}
try:
response = requests.get(url, headers=headers, proxies=proxies, timeout=10)
response.raise_for_status()
return response
except Exception as e:
print(f"代理请求失败: {e}")
return None
- 分析网页结构:使用浏览器开发者工具检查页面元素
- 追踪网络请求:通过Network面板分析API请求
- 解析JavaScript:查找关键变量和数据加载逻辑
- 理解数据流向:从首页到播放页面的完整流程
# 提取JS变量中的链接
js_var_pattern = r'var\s+(\w+)\s*=\s*[\'"]([^\'"]+\.mp4)[\'"]'
js_var_matches = re.findall(js_var_pattern, html_content)
# 对特定格式的加密链接进行解密
def decrypt_video_url(self, encrypted_url, key):
# 实现特定的解密算法
# ...
return decrypted_url
# 通过模板构建链接
def build_video_url(self, template_url, chapter_id):
# 替换模板中的占位符
return template_url.replace("{chapter_id}", chapter_id)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Referer": self.siteUrl,
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Cookie": "必要的Cookie信息"
}
def fetch_with_retry(self, url, max_retries=3, delay=1):
"""带重试和延迟的请求"""
for i in range(max_retries):
try:
if i > 0:
time.sleep(delay)
response = self.fetch(url)
if response:
return response
except Exception as e:
print(f"第{i+1}次请求失败: {e}")
return None
def get_proxy(self):
"""获取代理IP"""
# 实现代理IP获取逻辑
# ...
return proxy
def fetch_with_proxy(self, url):
"""使用代理的请求"""
proxy = self.get_proxy()
return self.fetch(url, proxies={"http": proxy, "https": proxy})
# 使用缓存减少重复请求
def fetch_with_cache(self, url, cache_time=3600):
cache_key = hashlib.md5(url.encode()).hexdigest()
cache_file = f'cache/{cache_key}.json'
# 检查缓存是否存在且未过期
if os.path.exists(cache_file):
file_time = os.path.getmtime(cache_file)
if time.time() - file_time < cache_time:
with open(cache_file, 'r', encoding='utf-8') as f:
return json.load(f)
# 缓存不存在或已过期,重新请求
response = self.fetch(url)
if response:
os.makedirs('cache', exist_ok=True)
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump(response.json(), f, ensure_ascii=False)
return response.json()
return None
import concurrent.futures
def fetch_multiple(self, urls, max_workers=5):
"""并发获取多个URL的内容"""
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_url = {executor.submit(self.fetch, url): url for url in urls}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
results[url] = future.result()
except Exception as e:
print(f"获取 {url} 时出错: {e}")
return results
PyramidStore爬虫插件需要在配置文件中正确设置:
{
"key": "py_mytvsite",
"name": "我的影视站",
"type": 3,
"api": "http://127.0.0.1:9978/proxy?do=py&type=py_mytvsite",
"searchable": 1,
"quickSearch": 1,
"filterable": 1,
"ext": {
"cookie": "如果需要,填写cookies信息",
"filter": {
"1": [
{"key": "year", "name": "年份", "value": [{"n": "全部", "v": "0"}, {"n": "2023", "v": "2023"}, {"n": "2022", "v": "2022"}]},
{"key": "genre", "name": "类型", "value": [{"n": "全部", "v": "0"}, {"n": "动作", "v": "action"}, {"n": "喜剧", "v": "comedy"}]}
],
"2": [
// 电视剧分类的筛选配置
]
}
}
}
以下是一个简化但完整的爬虫实现示例:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import requests
from urllib.parse import quote_plus
import re
from bs4 import BeautifulSoup
import sys
sys.path.append('../../')
try:
from base.spider import Spider
except ImportError:
# 定义一个基础接口类,用于本地测试
class Spider:
def init(self, extend=""):
pass
class Spider(Spider):
def __init__(self):
self.siteUrl = 'https://example.com'
self.userAgent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
def getName(self):
return "示例影视"
def homeContent(self, filter):
result = {}
cateManual = {
"电影": "1",
"电视剧": "2",
"动漫": "3"
}
classes = []
for k in cateManual:
classes.append({
'type_id': cateManual[k],
'type_name': k
})
result['class'] = classes
if filter:
result['filters'] = self.config['filter']
return result
def homeVideoContent(self):
result = {}
videos = []
# 实现逻辑...
result['list'] = videos
return result
def categoryContent(self, tid, pg, filter, extend):
result = {}
videos = []
# 实现逻辑...
result['list'] = videos
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, ids):
video_id = ids[0]
# 实现逻辑...
vod = {
"vod_id": video_id,
"vod_name": "示例视频",
"vod_pic": "https://example.com/image.jpg",
"vod_content": "视频简介",
"vod_play_from": "默认线路",
"vod_play_url": "第1集$123#第2集$456"
}
result = {'list': [vod]}
return result
def searchContent(self, key, quick):
videos = []
# 实现逻辑...
return videos
def playerContent(self, flag, id, vipFlags):
result = {}
result["parse"] = 0
result["url"] = f"https://example.com/play/{id}.mp4"
result["header"] = {
"User-Agent": self.userAgent,
"Referer": self.siteUrl
}
return result
def isVideoFormat(self, url):
video_formats = ['.mp4', '.m3u8', '.ts']
return any(url.lower().find(fmt) > -1 for fmt in video_formats)
def manualVideoCheck(self):
return False
- 问题:
homeContent
方法返回空列表 - 解决方案:
- 检查网站结构是否变化
- 尝试手动设置分类
- 问题:
detailContent
方法无法提取视频信息 - 解决方案:
- 使用浏览器开发者工具分析页面结构
- 更新选择器或正则表达式
- 考虑数据是否通过API加载
- 问题:
playerContent
方法无法获取可播放的视频URL - 解决方案:
- 检查视频是否通过加密方式加载
- 分析网络请求找出真实播放地址
- 实现解密算法(如需要)
PyramidStore爬虫开发是一个结合网络分析、数据处理和优化技巧的过程。通过本指南,您应该能够掌握如何创建一个功能完整的视频网站爬虫插件,从首页数据获取到视频播放链接提取的全流程。
记住,一个好的爬虫不仅仅是能够获取数据,还应该具备良好的容错性、高效的性能以及适应网站变化的能力。在实际开发中,持续测试和优化是保持爬虫稳定运行的关键。
祝您爬虫开发顺利!