0x00 前言
使用API可以很简单的获取到想要的数据,但是由于国内API的申请比较困难,所以如何绕过API直接爬虫是一个迫切需要解决的问题。Github上的点击收藏量高的不限制爬虫都已经被twitter封过了
这里分享的版本是最开始写爬虫时练手的一个版本,功能实现的比较粗糙
0x01 具体分析
实现了根据用户ID,每天自动爬取用户推文,相当于监视,代码读起来相当简单,你可以根据自己的需求进行更改,下面就分享一下代码。
- 该代码为通过
screen name
爬取推文,本质是通过screen name获取到id后爬取,因id在日常中比较难寻找所以这个代码通过screen name
即可爬取,也可自行更改为使用id爬取
headers中参数查找方法:打开twitter用户界面 按F12,再按F5刷新
0x03 代码分享
# -*- codeing =utf-8 -*-
# @Time : 2020/10/21 16:43
# @Author:yuchuan
# @File : crawl_tweets_by_id.py
# @Software : PyCharm
import requests
import json
import urllib.parse
import csv
import pandas as pd
from itertools import chain
import numpy as np
from datetime import datetime, timedelta
import time
import calendar
#getuserID可通过user screenname获取ID,不用人工去获取
def main():#包装header,伪装成浏览器。#*******************为需要替换部分headers = {
"cookie": '*******************',"user-agent": "*******************","x-csrf-token": "*******************","authorization": "*******************",}#也可直接获取id 爬取username = pd.read_csv('*******************.csv')for i in range(0, len(username['username'])):print(str(username['username'][i]))csv_username=str(username['username'][i])userID=getuserID(csv_username,headers)twitterURL = "https://twitter.com/i/api/2/timeline/profile/" + userID + ".json?include_profile_interstitial_type=1&include_blocking=1&include_blocked_by=1&include_followed_by=1&include_want_retweets=1&include_mute_edge=1&include_can_dm=1&include_can_media_tag=1&skip_status=1&cards_platform=Web-12&include_cards=1&include_ext_alt_text=true&include_quote_count=true&include_reply_count=1&tweet_mode=extended&include_entities=true&include_user_entities=true&include_ext_media_color=true&include_ext_media_availability=true&send_error_codes=true&simple_quoted_tweet=true&include_tweet_replies=false&count=20&userId=" + userID + "&ext=mediaStats%2ChighlightedLabel"flag=Truecontent = []full_content=[]response = connect(headers, twitterURL)#flag 相当于确定获取到的内容是否在当日里。flag = false 停止爬取while (flag):# 建立连接response = connect(headers, twitterURL)# formatRes修改源码的类型responseJson = formatRes(response.content)# 爬取每个json中 所有推文以及时间,转推,点赞等content = parsetweets(responseJson)# 将每个json中的内容添加到一个列表中full_content.extend(content)#获取下一页推文的json包twitterURL = getNewURL(responseJson,userID)flag=CtrlFlag(content)# n = n - 1print("------------------------------------------------------------------------------------------------\n------------------------------------------------------------------------------------------------")# 提取只要当天的推文everydaytweet=todaytweet(full_content)# 将内容保存到CSV中,每个用户一个CSVsaveData(everydaytweet, csv_username)time.sleep(30)# 获取当天的推文:本来想法:直接在CSV中进行排序,然后截取当天推文。时间排序没成功:::::直接在列表中截取当天的推文,再保存在CSV文件中可以。
def CtrlFlag(content):flag=Truetime = (todaytime() + timedelta(hours=-8)).strftime("%Y-%m-%d")count=0for i in range(0,len(content)):if content[i][0][0:10] not in str(time):count=count+1if count==len(content):flag=Falsereturn flag
def getuserID(username,headers):connectURL = "https://twitter.com/i/api/graphql/jMaTS-_Ea8vh9rpKggJbCQ/UserByScreenName?variables=%7B%22screen_name%22%3A%22" + username + "%22%2C%22withHighlightedLabel%22%3Atrue%7D"print(connectURL)response=connect(headers,connectURL)responseJson= formatRes(response.content)# print(responseJson)data=responseJson['data']['user']# print(data)userID=find('rest_id',data)return userIDdef todaytweet(full_content):content=[]#todaytime是香港时间,-8获得UTC时间,与爬取的created_at时间统一time=(todaytime()+ timedelta(hours=-8)).strftime("%Y-%m-%d")for i in range(0,len(full_content)):if full_content[i][0][0:10] in str(time):content.append(full_content[i])return content# 爬取推文,和时间等,对时间进行格式化****/**/**
def parsetweets(dict):dict = dict['globalObjects']['tweets']full_text=findAll('full_text',dict)created_at=findAll('created_at',dict)favorite_count=findAll('favorite_count',dict)quote_count=findAll('quote_count',dict)reply_count=findAll('reply_count',dict)retweet_count=findAll('retweet_count',dict)formatcreated_at=[]time1=[]time2=[]utc_time1=[]for i in range(0,len(created_at)):#从twitter爬下来的时候,就是UTC时间,统一为UTC时间,将本地时间(香港)-8小时。美国时间+5小时time1.append(datetime.strptime(created_at[i],"%a %b %d %H:%M:%S +0000 %Y"))time2.append(datetime.strftime(time1[i],'%Y-%m-%d %H:%M:%S')) #datatime转strtweetData = []#tweetData = list(chain.from_iterable(zip( created_at,full_text))) # 合并两个列表# print(tweetData)for i in range(0,len(full_text)):tweetData.append([time2[i],full_text[i],favorite_count[i],quote_count[i],reply_count[i],retweet_count[i]])return tweetData# 当前日期 20201029格式,此时时间type:datetime,调用可能需要转换成str
def todaytime():today=datetime.today()return today#保存到CSV中,每个人保存在一个CSV文件中、
def saveData(content,filename):filetime = todaytime().strftime('%y%y%m%d')filename=filetime+" "+filenamefilepath = 'D:/twitterdata/'+filename+'.csv'name=['Time', 'Tweet','Favorite','Quote','Reply','Retweet']Data=pd.DataFrame(columns=name,data=content)Data.to_csv(filepath,encoding='utf-8-sig')# 直接查找键值 find
def find(target, dictData, notFound='没找到'):queue = [dictData]while len(queue) > 0:data = queue.pop()for key, value in data.items():if key == target: return valueelif type(value) == dict: queue.append(value)return notFound
def findAll(target, dictData, notFound=[]):#print(dictData)result = []for key, values in dictData.items():content = values[target]result.append(content)#print(result)return result# 获取到cursor,并组成新的url
def getNewURL(responseJson,userID):responseJsonCursor1 = responseJson['timeline']['instructions'][0]['addEntries']['entries'][-1]#这是字典,是列表中的最后一个元素cursorASCII=find('cursor',responseJsonCursor1)cursorASCII2 = find('value', cursorASCII)cursor=urllib.parse.quote(cursorASCII2)newURL="https://twitter.com/i/api/2/timeline/profile/"+userID+".json?include_profile_interstitial_type=1&include_blocking=1&include_blocked_by=1&include_followed_by=1&include_want_retweets=1&include_mute_edge=1&include_can_dm=1&include_can_media_tag=1&skip_status=1&cards_platform=Web-12&include_cards=1&include_ext_alt_text=true&include_quote_count=true&include_reply_count=1&tweet_mode=extended&include_entities=true&include_user_entities=true&include_ext_media_color=true&include_ext_media_availability=true&send_error_codes=true&simple_quoted_tweet=true&include_tweet_replies=false&count=20&cursor="+cursor+"&userId="+userID+"&ext=mediaStats%2ChighlightedLabel"return newURL#格式化获取到的json//bytes转string loads()将string读入字典中
def formatRes(res):strRes = str(res, 'utf-8')dictRes = json.loads(strRes)return dictRes#设置代理proxies,链接获取网页数据。代理部分需要自己设置
def connect(headers,twitterURL):proxies = {
"http": "http://127.0.0.1:7890", "https": "http://127.0.0.1:7890", }response = requests.get(twitterURL,headers = headers, proxies=proxies)return responseif __name__=="__main__": #当程序执行时main()
0x04 一些闲话
本人创建了一个公众号,分享科研路上的小问题,新发现,欢迎关注公众号,给我留言!!!
一起奋发向上,攻克难题吧~~