python异步协程instagram爬虫

前言

没啥目的,就觉得ins里 图多

因为图都是原图的原因,再又因为是GitHub,图片加载会很慢,我暂时懒得解决

正文

一、分析

1、分析目标网站

首先分析网站图片加载流程, taeri__taeri 应该有人认识这个网红。ins照片一次只加载了一定数量的照片,往下翻又会加载,毫无疑问看 xhr

在预览栏里可以看到json数据,display_url 就是照片的链接,只要获取到这个就行了

2、分析请求参数

回到 headers 看看请求用了哪些参数;就两个, quer_hashvariables

variables 是一个json,里面有 id、first、after 这三项;为了不麻烦。。我直接说这三个是啥玩意儿,有兴趣的可以自己分析

id:user id 即用户id

first:这次请求加载照片数量

after:end cursor 这个参数是为了判断上一页的,没有这个就一直加载的第一页,而本页会带有一个end cursor参数来进行下一页请求

还有一点,需要加上cookie,这个应该不用多说了

3、程序流程

请求啥的都分析好了,接下来就来分析程序怎么写,我现在的需求是这样的

给定一个用户名,尽快获取该用户所有照片并下载到instagram//这个目录下

要求中说了需要尽快,那么单线程就算了,太没效率的,就一个普通网红照片好歹也有几百张;这里用到asyncio,流程是这样的

首先下载和获取图片链接弄成两个任务,在等待获取的时候我先去执行下载里的任务,在等待下载的时候我可以获取下载链接,这就是一个生产者消费者模式,这里涉及到了通信,我用queue。

二、代码

用到的lib:

1
2
3
4
5
6
7
8
9
10
11
12
13
import json
import multiprocessing
import sys
from urllib.parse import urljoin

import aiohttp
import asyncio
import os
import re

from pathlib import Path

import requests

__init__:

1
2
3
4
5
6
7
8
def __init__(self, username, maxtasks=200):
self.username = username
self.maxtasks = maxtasks # 最大任务数
self.queue = asyncio.Queue(maxsize=maxtasks * 2)
# 配置代理,没有科学上网没法访问ins
os.environ['http_proxy'] = PROXY
os.environ['https_proxy'] = PROXY
self.session = aiohttp.ClientSession(trust_env=True, headers=HEADERS)

首先获取user id:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
async def get_shared_data(self):
"""
获取 shared data
:return:
"""
try:
async with self.session.get(ROOT_URL + self.username) as resp:
html = await resp.text()
if html is not None and '_sharedData' in html:
shared_data = html.split("window._sharedData = ")[1].split(
";</script>")[0]
if not shared_data: # 没有shared data可以直接终止程序了
print('!!!!!!!')
exit(1)
return json.loads(shared_data)
except Exception:
pass

async def init(self):
"""
初始化必要参数
:return:
"""
user = (await self.get_shared_data())['entry_data']['ProfilePage'][0]['graphql']['user']
if not user:
print('user is none.')
exit(1)
self.user_id = user['id'] # user id
self.count = user['edge_owner_to_timeline_media']['count'] # 照片数量

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
async def produce_download_urls(self, max=50):
"""
获取每一页的所有照片链接
:param max: 一次要获取照片数量
:return:
"""
end_cursor = '' # 加载第一页
while True:
pic_params = {
'query_hash':
'f2405b236d85e8296cf30347c9f08c2a', # query_hash 可以固定一个值
'variables':
'{{"id":"{0}","first":{1},"after":"{2}"}}'.format(
self.user_id, max, end_cursor),
}
pic_url = ROOT_URL + 'graphql/query/'
async with self.session.get(pic_url, params=pic_params) as resp:
json = await resp.json()
edge_media = json['data']['user'][
'edge_owner_to_timeline_media']
edges = edge_media['edges']
if edges:
for edge in edges:
await self.queue.put(edge['node']['display_url']) # queue通信
has_next_page = edge_media['page_info']['has_next_page'] # json中有一个has next page项,其值是 true或false,用来判断是否有下一页
if has_next_page:
end_cursor = edge_media['page_info']['end_cursor'] # 获取 end cursor
else:
break

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def download(self):
"""
下载照片
:return:
"""
while not (self.producer.done() and self.queue.empty()): # 生产任务是否没有完成以及queue队列是否不为空
url = await self.queue.get() # 获取照片链接
filename = PATH / url.split('?')[0].split('/')[-1]
async with self.session.get(url) as resp:
with filename.open('wb') as f:
async for chunk in resp.content.iter_any():
f.write(chunk)
self.queue.task_done() # 表示刚刚排队的任务已完成(就是用get取出的照片url下载完成了)
print('.', end='', flush=True)

run:

1
2
3
4
5
6
7
8
9
10
11
12
13
async def run(self):
"""
:return:
"""
print('Preparing...')
print('Initializing...')
await self.init()
print('User id: %r.' % self.user_id)
print('Total %r photos.' % self.count)
print('-'*50)
self.producer = asyncio.create_task(self.produce_download_urls())
print('Downloading...', end='', flush=True)
await asyncio.gather(*(self.download() for _ in range(self.maxtasks))) # asyncio.gather和asyncio.wait差不多,具体百度

check:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def check(_):
"""
检测照片数量。。。太菜了不知道怎么停止只能这样了(逃;
"""
print('Start check...')
with requests.get(urljoin(ROOT_URL, USERNAME), headers=HEADERS,
proxies={'http': 'http://localhost:80001', 'https': 'https://localhost:8001'}) as resp:
pattern = '"edge_owner_to_timeline_media":.?{"count":(.*?),"page_info"'
count = int(re.findall(pattern, resp.text)[0])
while True:
files = len(os.listdir(PATH))
print('Check files:%r' % files)
if files == count:
# print('Total %r photos download done.' % count)
print('\nProduce done, Total %r photos, plz wait save done :)' % count)
sys.exit(0)

main:

1
2
3
4
5
6
async def main():
ins = Instagram(USERNAME)
try:
await ins.run()
finally:
await ins.close()

if __name__ == ‘__main__‘:

1
2
3
4
5
6
7
8
9
10
if __name__ == '__main__':
try:
p = multiprocessing.Process(target=check, args=(0,))
p.start()
future = asyncio.ensure_future(main())
loop = asyncio.get_event_loop()
loop.run_until_complete(future)
loop.close()
except KeyboardInterrupt:
pass

运行:

最后

项目地址
感谢观看 :)