1、etree和协程爬明朝那些事
import requests
from lxml import etree
import asyncio
import aiohttp
import aiofiles
import os
# 1. 拿到主页面的源代码 (不需要异步)
# 2. 拿到页面源代码之后. 需要解析出 <卷名>, <章节, href>
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.75 Safari/537.36"
}
def get_chaptor_info(url):
resp = requests.get(url, headers=headers)
resp.encoding = "UTF-8"
page_source = resp.text
# 开始解析
tree = etree.HTML(page_source)
# 作业, 请解释出每个循环在这里的作用?
result = []
divs = tree.xpath("//div[@class='mulu']") # 每一个div就是一卷
for div in divs:
trs = div.xpath(".//table/tr") # 一堆tr
juan_name = trs[0].xpath(".//a/text()")
juan_name = "".join(juan_name).strip().replace(":", "_")
for tr in trs[1:]: # 93
tds = tr.xpath("./td")
for td in tds:
txt = td.xpath(".//text()")
href = td.xpath(".//@href")
txt = "".join(txt).replace(" ", "").strip()
href = "".join(href)
dic = {
"chapter_name": txt,
"chapter_url": href,
"juan_name": juan_name
}
result.append(dic)
return result
async def download_one(url, file_path):
print("我要下載文章了")
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as resp:
page_source = await resp.text(encoding="utf-8")
# 拿到文章
tree = etree.HTML(page_source)
content = tree.xpath("//div[@class='content']//p//text()")
content = "".join(content).replace("\n", "").replace("\r", "").replace(" ", "").strip()
# 寫入文件
async with aiofiles.open(file_path, mode="w", encoding="utf-8") as f:
await f.write(content)
print("恭喜你。 下載了一篇文章!", file_path)
async def download_chapter(chaptor_list):
tasks = []
for chaptor in chaptor_list: # {juan: xxx, name:xxx, href: xxx}
juan = chaptor['juan_name'] # 文件夹名
name = chaptor['chapter_name'] # 文件名 前言.txt
url = chaptor['chapter_url'] # 用来下载 -> 异步任务
if not os.path.exists(juan): # 判斷文件夾是否存在
os.makedirs(juan) # 如果不存在就創建
# 給出文件的真正的保存路徑
file_path = f"{juan}/{name}.txt" # 74
f = download_one(url, file_path)
t = asyncio.create_task(f)
tasks.append(t)
break # 测试的时候
await asyncio.wait(tasks)
def main():
url = "https://www.mingchaonaxieshier.com/"
chaptor_list = get_chaptor_info(url)
# print(chaptor_list)
# 开始上协程. 进行异步下载
asyncio.run(download_chapter(chaptor_list))
if __name__ == '__main__':
main()
2、协程和解密爬网吧电影
#EXTM3U
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=1263000,RESOLUTION=1280x528(不加密的m3u8)
/20211030/89ZfL7VX/hls/index.m3u8
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:4
#EXT-X-PLAYLIST-TYPE:VOD
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-KEY:METHOD=AES-128,URI="https://vo1.123188kk.com/20211030/89ZfL7VX/hls/key.key"(加密的m3u8)
#EXTINF:2.44,
https://vo1.123188kk.com/20211030/89ZfL7VX/hls/3YKZ9LsK.ts
# 整体步骤 => 网吧电影
1. 想办法找到M3U8文件
2. 判别(人工)是否需要下载第二层M3U8
3. 提取ts文件的下载路径
4. 下载
5. 判别是否需要解密
6. 如果需要解密, 拿到秘钥
7. 解密
8. 根据M3U8的正确顺序来合并所有的ts文件 => MP4
import requests
from lxml import etree
import re
from urllib.parse import urljoin
import os # 执行cmd/控制台上的命令
import asyncio
import aiohttp
import aiofiles
from Crypto.Cipher import AES # pip install pycryptodome
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.75 Safari/537.36"
}
def get_iframe_src(url): # 拿到iframe的src
resp = requests.get(url, headers=headers)
tree = etree.HTML(resp.text)
src = tree.xpath("//iframe/@src")[0]
return src
def get_m3u8_url(url):
resp = requests.get(url, headers=headers)
obj = re.compile(r'url: "(?P<m3u8>.*?)"', re.S)
m3u8 = obj.search(resp.text).group("m3u8") # B
return m3u8
def download_m3u8(url): # https://a.ak-kk.com/20211030/89ZfL7VX/index.m3u8
resp = requests.get(url, headers=headers)
with open("first.m3u8", mode="w", encoding="utf-8") as f:
f.write(resp.text)
# 这个位置的错误. 价值5分钟
with open("first.m3u8", mode='r', encoding="utf-8") as f2:
for line in f2: # 一行一行的读
if line.startswith("#"): # 以#开头
continue # 拜拜
# 此时的line就是第二层M3U8的地址
line = line.strip() # 注意要strip() 否则会有意想不到的收获
line = urljoin(url, line) # 拼接一下
# 下载第二层M3U8
resp = requests.get(line, headers=headers)
with open("second.m3u8", mode="w", encoding="utf-8") as f3:
f3.write(resp.text)
break # 可以加, 也可以不加
async def download_one(url, sem):
async with sem: # 使用信号量控制访问频率
file_name = url.split("/")[-1]
file_path = "./解密前/" + file_name
print(file_name, "开始工作了!")
for i in range(10): # 重试10次
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as resp:
content = await resp.content.read()
# 写入文件
async with aiofiles.open(file_path, mode="wb") as f:
await f.write(content)
print(file_name, "下载完成!")
break
except Exception as e:
print(file_name, "出错了, 马上重试", e) # 给个提示. 看到错误信息
async def download_all_videos():
# 信号量, 用来控制协程的并发量
sem = asyncio.Semaphore(100) # 网吧电影中极个别电影需要控制在5左右
# 1. 读取文件
tasks = []
with open("second.m3u8", mode="r", encoding="utf-8") as f:
for line in f:
if line.startswith("#"):
continue
line = line.strip() # 不写. 你会得到意想不到的收获
# 此时line就是下载地址
# 2. 创建任务
t = asyncio.create_task(download_one(line, sem))
tasks.append(t)
# 3. 统一等待
await asyncio.wait(tasks)
def get_key():
with open("second.m3u8", mode="r", encoding="utf-8") as f:
file_content = f.read() # 读取到所有内容
obj = re.compile(r'URI="(?P<key_url>.*?)"')
key_url = obj.search(file_content).group("key_url")
resp = requests.get(key_url, headers=headers) # 发请求, 拿秘钥
return resp.content # 直接拿字节. 为了解密的时候. 直接丢进去就可以了.
async def desc_one(file_path, key):
file_name = file_path.split("/")[-1]
new_file_path = "./解密后/" + file_name
# 解密
async with aiofiles.open(file_path, mode="rb") as f1, \
aiofiles.open(new_file_path, mode="wb") as f2:
content = await f1.read()
# 解密
# 固定逻辑, 创建一个加密器
aes = AES.new(key=key, mode=AES.MODE_CBC, IV=b"0000000000000000")
new_content = aes.decrypt(content)
await f2.write(new_content) # 写入新文件
print(new_file_path, "解密成功")
# 解密的协程逻辑
# 读M3U8文件. 拿到文件名称和路径
# 每个ts文件一个任务
# 在每个任务中. 解密即可
async def desc_all(key):
tasks = []
with open("second.m3u8", mode="r", encoding="utf-8") as f:
for line in f:
if line.startswith("#"):
continue
line = line.strip()
file_name = line.split("/")[-1]
file_path = "./解密前/" + file_name
# 创建任务. 去解密
t = asyncio.create_task(desc_one(file_path, key))
tasks.append(t)
await asyncio.wait(tasks)
def merge():
# 视频片段合成
# B站视频. 不适用这个.
# 需要一个命令
# windows: copy /b a.ts+b.ts+c.ts xxx.mp4
# linux/mac: cat a.ts b.ts c.ts > xxx.mp4
# 共同的坑:
# 1. 执行命令 太长了不行. 需要分段合并
# 2. 执行命令的时候. 容易出现乱码. 采用popen来执行命令. 就可以避免乱码
# 3. 你只需要关注. 是否合并成功了
# os.system("dir") # 会有乱码
# r = os.popen("dir")
# print(r.read()) # 可以暂时性的避免乱码
# 拿到所有文件名.和正确的合并顺序
file_list = []
with open("second.m3u8", mode="r", encoding="utf-8") as f:
for line in f:
if line.startswith("#"):
continue
line = line.strip()
file_name = line.split("/")[-1]
file_list.append(file_name)
# 进入到文件夹内
os.chdir("./解密后") # 更换工作目录
# file_list 所有文件名称
# 分段合并
n = 1
temp = [] # [a.ts, b.ts, c.ts] =?=> a.ts+b.ts+c.ts
for i in range(len(file_list)):
# 每 20 个合并一次
file_name = file_list[i]
temp.append(file_name)
if i != 0 and i % 20 == 0: # 20和一次(第一次合并有21个)
# 可以合并一次了
cmd = f"copy /b {'+'.join(temp)} {n}.ts"
r = os.popen(cmd)
print(r.read())
temp = [] # 新列表
n = n + 1
# 需要把剩余的ts进行合并
cmd = f"copy /b {'+'.join(temp)} {n}.ts"
r = os.popen(cmd)
print(r.read())
n = n + 1
# 第二次大合并 1.ts + 2.ts + 3.ts xxx.mp4
last_temp = []
for i in range(1, n):
last_temp.append(f"{i}.ts")
# 最后一次合并
cmd = f"copy /b {'+'.join(last_temp)} 春夏秋冬又一春.mp4"
r = os.popen(cmd)
print(r.read())
# 回来
os.chdir("../") # ../ 上层文件夹
def main():
# url = "http://www.wbdy.tv/play/63690_1_1.html"
# # 1.拿到iframe的src属性值
# src = get_iframe_src(url)
# print(src)
# # 2. 发送请求到iframe的src路径. 获取到M3U8地址
# src = urljoin(url, src)
# m3u8_url = get_m3u8_url(src)
# print(m3u8_url)
# # 3. 下载m3u8文件
# download_m3u8(m3u8_url)
# # 4. 下载视频. 上协程下载视频
# event_loop = asyncio.get_event_loop()
# event_loop.run_until_complete(download_all_videos())
# # 5. 拿秘钥
# key = get_key()
# # 6. 解密
# event_loop = asyncio.get_event_loop()
# event_loop.run_until_complete(desc_all(key))
# print("全部完成")
# 合成
merge()
if __name__ == '__main__':
main()
3、scrapy爬4399游戏
创建爬虫项目:scrapy startproject mySpider_2
目录结构说明图如下:
进入项目所在文件夹并创建虫子
cd mySpider_2
scrapy genspider youxi 4399.com
完善虫子youxi.py中的内容
运行虫子:cd D:\pachong_test\mySpider_2\mySpider_2\spiders
scrapy crawl youxi
修改settings.py文件中的pipeline信息,前面是pipeline的类名地址,后面是优先级, 优先级月低越先执行
编写管道pipeline.py对数据进行简单的保存,
这个方法的声明不能动!!! 在spider返回的数据会自动的调用这里的process_item方法,把它改了. 管道就断了
自定义数据传输结构item.py文件