当前位置: 代码迷 >> python >> 在Python中高效检查数百万个图像URL
  详细解决方案

在Python中高效检查数百万个图像URL

热度:134   发布时间:2023-06-13 13:39:33.0

我有一个超过300万个项目行的tsv文件。 每个项目都有一个id,组和一个url,并且组列已排序。

x1    gr1    {some url}/x1.jpg
x2    gr1    {some url}/x2.jpg
x3    gr2    {some url}/x1.jpg  

我将其加载到python脚本中,并且需要在将组中所有项目的URL加载到数据库中之前检查状态200 OK。 我想到了使用进程并对每个进程进行URL检查,(我对此没有太多经验,因此不确定它是否是个好主意)

我的逻辑atm:用gr1的项目填充数组a1->将a1中的每个项目传递给新进程->该进程检查200->如果可以,则将其放入数组a2中->检查a1中的所有项目后,将a2推入DB(以及其他东西)->重复

大约需要30分钟才能处理100,000个项目。 瓶颈是URL检查。 如果不检查URL,则与之相比,该脚本的运行速度很快。 至今:

import csv
import re
import requests
import multiprocessing
from pymongo import MongoClient
import sys

#Load in Data
f = open('../tsvsorttest.tsv', 'rb')
reader = csv.reader(f, delimiter='\n')

#Get the first group name
currGroup = re.split(r'\t', next(reader)[0].decode('utf8'))[1]
currGroupNum = 0 
items = []
checkedItems = []

#Method that checks the URL, if its 200, add to newItems
def check_url(newitem):
    if requests.get(newitem['image_url']).status_code is 200:
        print('got an ok!')
        checkedItems.append(newitem)
    global num_left
    num_left -= 1


def clear_img(checkitems):
    for thisItem in checkitems:
        p = multiprocessing.Process(target=check_url(thisItem))
        p.start()

#Start the loop, use i to keep track of the iteration count
for i, utf8_row in enumerate(reader):
    unicode_row = utf8_row[0].decode('utf8')

    x = re.split(r'\t', unicode_row)

    item = {"id": x[0],
            "group": x[1],
            "item_url": x[2]
            }
    if currGroup != x[1]:
        y = len(items)
        print('items length is ' + str(y))

        #Dont want single item groups
        if y > 1:
            print 'beginning url checks'
            num_left = len(items)


            clear_img(items)
            while num_left is not 0:
                print 'Waiting'

            num_left = 0
            batch = {"vran": currGroup,
                     "bnum": currGroupNum,
                     "items": newItems,
                     }
            if len(checkedItems) > 0:
                batches.insert_one(batch)
                currGroupNum += 1

        currGroup = x[1]
        items = []
        checkedItems = []

    items.append(item)

    if i % 100 == 0:
        print "Milestone: " + str(i)

print "done"

其他注意事项:将原始Tsv拆分成类似30个单独的tsv文件,并并行运行批处理脚本30次。 这会有所作为吗?

  1. 由于您不需要使用HEAD请求的实际图像,因此可以提高速度。 如果响应既不是200也不是404,则可能不允许HEAD(405),而您只需使用GET请求重试即可。
  2. 当前,您在开始任何新任务之前等待当前组完成。 通常,最好始终保持相同数量的运行请求大致相同。 另外,您可能想大幅增加工作人员池-由于任务主要是I / O限制,但是我建议您按照3的方式进行操作(即异步I / O)。
  3. 如果您愿意使用Python 3,则可以通过使用来使用对异步I / O的出色支持( ) :
import asyncio
from aiohttp import ClientSession, Timeout
import csv
import re
from threading import Thread
from queue import Queue
from time import sleep

async def check(url, session):
    try:
        with Timeout(10):
            async with session.head(url) as response:
                if response.status == 200:
                    return True
                elif response.status == 404:
                    return False
                else:
                    async with session.get(url) as response:
                        return (response.status == 200)
    except:
        return False



def worker(q):
    while True:
        f = q.get()
        try:
            f()
        except Exception as e:
            print(e)
        q.task_done()

q = Queue()
for i in range(4):
     t = Thread(target=worker,args=(q,))
     t.daemon = True
     t.start()

def item_ok(url):
    #Do something
    sleep(0.5)
    pass

def item_failed(url):
    #Do something
    sleep(0.5)
    pass

def group_done(name,g):
    print("group %s with %d items done (%d failed)\n" %
          (name,g['total'],g['fail']))

async def bound_check(sem, item, session, groups):
    async with sem:
        g = groups[item["group"]]
        if (await check(item["item_url"], session)):
            g["success"] += 1
            q.put(lambda: item_ok(item["item_url"]))
        else:
            g["fail"] += 1
            q.put(lambda: item_failed(item["item_url"]))
        if g["success"] + g["fail"] == g['total']:
            q.put(lambda: group_done(item['group'],g))
        bound_check.processed += 1
        if bound_check.processed % 100 == 0:
            print ("Milestone: %d\n" % bound_check.processed)

bound_check.processed = 0

groups = {}

async def run(max_pending=1000):
    #Choose such that you do not run out of FDs
    sem = asyncio.Semaphore(max_pending)

    f = open('./test.tsv', 'r',encoding = 'utf8')
    reader = csv.reader(f, delimiter='\n')

    tasks = []

    async with ClientSession() as session:
        for _, utf8_row in enumerate(reader):
            unicode_row = utf8_row[0]
            x = re.split(r'\t', unicode_row)
            item = {"id": x[0],"group": x[1],"item_url": x[2]}
            if not item["group"] in groups:
                groups[item["group"]] = {'total'    : 1,
                                         'success'  : 0,
                                         'fail'     : 0,
                                         'items'    : [item]}
            else:
                groups[item["group"]]['total'] += 1
                groups[item["group"]]['items'].append(item)
            task = asyncio.ensure_future(bound_check(sem, item, session, groups))
            tasks.append(task)

        responses = asyncio.gather(*tasks)
        await responses

loop = asyncio.get_event_loop()
loop.run_until_complete(run())
q.join()

print("Done")

已经提到您应该尝试使用HEAD而不是GET 这样可以避免下载图像。 而且,您似乎在为每个请求生成单独的进程 ,这也效率很低。

我不认为在性能方面确实不需要使用asyncio。 使用普通线程池(甚至不是进程池)的解决方案有点麻烦,恕我直言:)另外,它在Python 2.7中可用。

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import csv
from collections import defaultdict

def read_rows(file):
    with open(file) as f_in:
        return [row for row in csv.reader(f_in, delimiter='\t')]

def check_url(inp):
    """Gets called by workers in thread pool. Checks for existence of URL."""
    id, grp, url = inp
    def chk():
        try:
            return requests.head(url).status_code == 200
        except IOError as e:
            return False
    return (id, grp, url, chk())

if __name__ == '__main__':
    d = defaultdict(lambda: [])
    with ThreadPoolExecutor(max_workers=20) as executor:
        future_to_input = {executor.submit(check_url, inp): inp for inp in read_rows('urls.txt')}
        for future in as_completed(future_to_input):
            id, grp, url, res = future.result()
            d[grp].append((id, url, res))
    # do something with your d (e.g. sort appropriately, filter those with len(d[grp]) <= 1, ...)
    for g, bs in d.items():
        print(g)
        for id, url, res in bs:
            print("  %s %5s %s" % (id, res, url))

如您所见,我分别处理CSV输入的每一行,然后对结果进行分组(使用d ),而不是对输入进行分组。 我想主要是口味问题。 您可能想玩max_workers=20并可能增加它。