啊这。
因为本身就可以打包,权当练习。
chapter.py
# coding=utf-8
import requests
import time
import os
import re
from bs4 import BeautifulSoup
from functools import wraps
import asyncio
from concurrent.futures import ThreadPoolExecutor
class retry(object):
def __init__(self, retry = 3, sleep = 1):
self.retry = retry
self.sleep = sleep
def __call__(self, func):
def checkRun(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
if(self.retry == 0):
raise e
else:
self.retry -= 1
time.sleep(self.sleep)
return checkRun(*args, **kwargs)
@wraps(func)
def run(*args, **kwargs):
return checkRun(*args, **kwargs)
return run
class Chapter:
def __init__(self, session, chpId, domain):
self.session = session
self.domain = domain
self.chpId = chpId
self.commonHeaders = {
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
"Accept-Encoding": "gzip",
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:82.0) Gecko/20100101 Firefox/82.0",
"Referer": "https://%s/album/%s/"%(domain, chpId),
}
self.session.headers.update(self.commonHeaders)
@retry(retry=3) #最大重试3次,4次全部报错,才会报错
def getHtml(self, chp_url):
return self.session.get(chp_url, timeout=30).text
def downloadAll(self, folder):
if not os.path.exists(folder):
os.makedirs(folder)
# 获取所有图片链接
chp_url = "https://%s/photo/%s?read_mode=read-by-page"%(self.domain, self.chpId)
html = self.getHtml(chp_url)
soup = BeautifulSoup(html,'lxml')
img_template = soup.select(".img_template img#album_photo_")[0]
img_prefix = img_template["data-src"]
img_options = soup.select("#pageselect option ")
img_suffixs = [ op["data-page"] for op in img_options ]
loop = asyncio.get_event_loop()
threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="download_")
tasks = []
# 逐个下载链接
for img_suffix in img_suffixs:
url = r"%s%s?v=%d"%(img_prefix, img_suffix, int(time.time()))
path = r'%s/%s' % (folder, img_suffix)
#future = threadPool.submit(th_download_pic, url, path)
task = loop.run_in_executor(threadPool, self.th_download_pic, url, path)
tasks.append(task)
#threadPool.shutdown(wait=True)
loop.run_until_complete(asyncio.wait(tasks))
print("")
def th_download_pic(self, url, path):
#print("下载中... %s"%(path))
print("\r下载中... %s"%(path), sep='', end='', flush=True)
if not os.path.exists(path):
tmp_path = path + ".tmp"
if os.path.exists(tmp_path):
os.remove(tmp_path)
try:
with open(tmp_path, "wb") as file:
response = requests.get(url, stream=True, headers=self.commonHeaders, timeout=60)
for data in response.iter_content(chunk_size=1024 * 1024):
file.write(data)
response.close()
os.rename(tmp_path, path)
except Exception as e:
print(e)
print("%s下载失败"%(path))
time.sleep(1)
if __name__ == '__main__':
session = requests.Session()
chpt = Chapter(session, "105925", domain_of_comic) #和谐去掉
chpt.downloadAll("test")
comic.py
# coding=utf-8
import requests
import time
import os
import re
from bs4 import BeautifulSoup
from chapter import Chapter
def getWebsites():
html = requests.get(domain_of_comic_intro, timeout=30).text#和谐去掉
soup = BeautifulSoup(html,'lxml')
p_all = soup.select(".has-luminous-vivid-orange-color")
sites = [ p.text for p in p_all ]
print(sites)
return sites
class Comic:
def __init__(self, session, comicId, domain):
self.session = session
self.domain = domain
self.comicId = comicId
self.commonHeaders = {
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
"Accept-Encoding": "gzip",
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:82.0) Gecko/20100101 Firefox/82.0",
}
self.session.headers.update(self.commonHeaders)
def downloadAll(self, folder, start = 0):
# 获取所有章节链接
comic_url = "https://%s/album/%s/"%(self.domain, self.comicId)
html = self.session.get(comic_url, timeout=30).text
soup = BeautifulSoup(html,'lxml')
comicName = soup.select("div.pull-left[itemprop~=name]")[0].text.strip().replace("/",".")
print(comicName)
folder = folder.replace("{comicName}", comicName)
chpt_all = soup.select("ul.btn-toolbar")[0]
t_title_all = chpt_all.select("li")
t_url_all = chpt_all.select("a")
titles = [ re.sub(r" |\||\r|\t|\?|(最新)|(\d{4}-\d{2}-\d{2}$)", "", tag.text.strip()).strip().replace("\n","_").replace("/",".") for tag in t_title_all ]
chptIds = [ tag["href"][tag["href"].rindex("/")+1:] for tag in t_url_all ]
if not os.path.exists(folder):
os.makedirs(folder)
for i in range(len(titles)):
if i < start:
continue
print("正在下载 ", titles[i])
chpt = Chapter(self.session, chptIds[i], self.domain)
path = "%s/%s"%(folder, titles[i])
chpt.downloadAll(path)
if __name__ == '__main__':
session = requests.Session()
# 获取可以直连的域名
#domain = getWebsites()[-1]
domain = getWebsites()[1]
# 根据id初始化
comic = Comic(session, "105924", domain)
# 指定保存文件夹, 从第几章开始下载 (index start from 0)
# comic.downloadAll("test", 0)
comic.downloadAll("{comicName}", 5)