问题描述
我有一个超过300万个项目行的tsv文件。 每个项目都有一个id,组和一个url,并且组列已排序。
即
x1 gr1 {some url}/x1.jpg
x2 gr1 {some url}/x2.jpg
x3 gr2 {some url}/x1.jpg
我将其加载到python脚本中,并且需要在将组中所有项目的URL加载到数据库中之前检查状态200 OK。 我想到了使用进程并对每个进程进行URL检查,(我对此没有太多经验,因此不确定它是否是个好主意)
我的逻辑atm:用gr1的项目填充数组a1->将a1中的每个项目传递给新进程->该进程检查200->如果可以,则将其放入数组a2中->检查a1中的所有项目后,将a2推入DB(以及其他东西)->重复
大约需要30分钟才能处理100,000个项目。 瓶颈是URL检查。 如果不检查URL,则与之相比,该脚本的运行速度很快。 至今:
import csv
import re
import requests
import multiprocessing
from pymongo import MongoClient
import sys
#Load in Data
f = open('../tsvsorttest.tsv', 'rb')
reader = csv.reader(f, delimiter='\n')
#Get the first group name
currGroup = re.split(r'\t', next(reader)[0].decode('utf8'))[1]
currGroupNum = 0
items = []
checkedItems = []
#Method that checks the URL, if its 200, add to newItems
def check_url(newitem):
if requests.get(newitem['image_url']).status_code is 200:
print('got an ok!')
checkedItems.append(newitem)
global num_left
num_left -= 1
def clear_img(checkitems):
for thisItem in checkitems:
p = multiprocessing.Process(target=check_url(thisItem))
p.start()
#Start the loop, use i to keep track of the iteration count
for i, utf8_row in enumerate(reader):
unicode_row = utf8_row[0].decode('utf8')
x = re.split(r'\t', unicode_row)
item = {"id": x[0],
"group": x[1],
"item_url": x[2]
}
if currGroup != x[1]:
y = len(items)
print('items length is ' + str(y))
#Dont want single item groups
if y > 1:
print 'beginning url checks'
num_left = len(items)
clear_img(items)
while num_left is not 0:
print 'Waiting'
num_left = 0
batch = {"vran": currGroup,
"bnum": currGroupNum,
"items": newItems,
}
if len(checkedItems) > 0:
batches.insert_one(batch)
currGroupNum += 1
currGroup = x[1]
items = []
checkedItems = []
items.append(item)
if i % 100 == 0:
print "Milestone: " + str(i)
print "done"
其他注意事项:将原始Tsv拆分成类似30个单独的tsv文件,并并行运行批处理脚本30次。 这会有所作为吗?
1楼
- 由于您不需要使用HEAD请求的实际图像,因此可以提高速度。 如果响应既不是200也不是404,则可能不允许HEAD(405),而您只需使用GET请求重试即可。
- 当前,您在开始任何新任务之前等待当前组完成。 通常,最好始终保持相同数量的运行请求大致相同。 另外,您可能想大幅增加工作人员池-由于任务主要是I / O限制,但是我建议您按照3的方式进行操作(即异步I / O)。
- 如果您愿意使用Python 3,则可以通过使用来使用对异步I / O的出色支持( ) :
import asyncio
from aiohttp import ClientSession, Timeout
import csv
import re
from threading import Thread
from queue import Queue
from time import sleep
async def check(url, session):
try:
with Timeout(10):
async with session.head(url) as response:
if response.status == 200:
return True
elif response.status == 404:
return False
else:
async with session.get(url) as response:
return (response.status == 200)
except:
return False
def worker(q):
while True:
f = q.get()
try:
f()
except Exception as e:
print(e)
q.task_done()
q = Queue()
for i in range(4):
t = Thread(target=worker,args=(q,))
t.daemon = True
t.start()
def item_ok(url):
#Do something
sleep(0.5)
pass
def item_failed(url):
#Do something
sleep(0.5)
pass
def group_done(name,g):
print("group %s with %d items done (%d failed)\n" %
(name,g['total'],g['fail']))
async def bound_check(sem, item, session, groups):
async with sem:
g = groups[item["group"]]
if (await check(item["item_url"], session)):
g["success"] += 1
q.put(lambda: item_ok(item["item_url"]))
else:
g["fail"] += 1
q.put(lambda: item_failed(item["item_url"]))
if g["success"] + g["fail"] == g['total']:
q.put(lambda: group_done(item['group'],g))
bound_check.processed += 1
if bound_check.processed % 100 == 0:
print ("Milestone: %d\n" % bound_check.processed)
bound_check.processed = 0
groups = {}
async def run(max_pending=1000):
#Choose such that you do not run out of FDs
sem = asyncio.Semaphore(max_pending)
f = open('./test.tsv', 'r',encoding = 'utf8')
reader = csv.reader(f, delimiter='\n')
tasks = []
async with ClientSession() as session:
for _, utf8_row in enumerate(reader):
unicode_row = utf8_row[0]
x = re.split(r'\t', unicode_row)
item = {"id": x[0],"group": x[1],"item_url": x[2]}
if not item["group"] in groups:
groups[item["group"]] = {'total' : 1,
'success' : 0,
'fail' : 0,
'items' : [item]}
else:
groups[item["group"]]['total'] += 1
groups[item["group"]]['items'].append(item)
task = asyncio.ensure_future(bound_check(sem, item, session, groups))
tasks.append(task)
responses = asyncio.gather(*tasks)
await responses
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
q.join()
print("Done")
2楼
已经提到您应该尝试使用HEAD
而不是GET
。
这样可以避免下载图像。
而且,您似乎在为每个请求生成单独的进程 ,这也效率很低。
我不认为在性能方面确实不需要使用asyncio。 使用普通线程池(甚至不是进程池)的解决方案有点麻烦,恕我直言:)另外,它在Python 2.7中可用。
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import csv
from collections import defaultdict
def read_rows(file):
with open(file) as f_in:
return [row for row in csv.reader(f_in, delimiter='\t')]
def check_url(inp):
"""Gets called by workers in thread pool. Checks for existence of URL."""
id, grp, url = inp
def chk():
try:
return requests.head(url).status_code == 200
except IOError as e:
return False
return (id, grp, url, chk())
if __name__ == '__main__':
d = defaultdict(lambda: [])
with ThreadPoolExecutor(max_workers=20) as executor:
future_to_input = {executor.submit(check_url, inp): inp for inp in read_rows('urls.txt')}
for future in as_completed(future_to_input):
id, grp, url, res = future.result()
d[grp].append((id, url, res))
# do something with your d (e.g. sort appropriately, filter those with len(d[grp]) <= 1, ...)
for g, bs in d.items():
print(g)
for id, url, res in bs:
print(" %s %5s %s" % (id, res, url))
如您所见,我分别处理CSV输入的每一行,然后对结果进行分组(使用d
),而不是对输入进行分组。
我想主要是口味问题。
您可能想玩max_workers=20
并可能增加它。