트위터 API에서 대량 트윗 수집 (77 tweet/s) (stream과 search의 병렬 처리에서 ~)
20188 단어 트위터빅데이터병렬 처리TwitterAPIStreamAPI
import tweepy
import datetime
import re
import collections
from pytz import timezone
import time
import MeCab
#import threading
#from multiprocessing import Pool
import os
#import multiprocessing
import concurrent.futures
#↑インポート
consumer_key = ""
consumer_secret = ""
access_token = ""
access_token_secret = ""
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
authapp = tweepy.AppAuthHandler(consumer_key,consumer_secret)
apiapp = tweepy.API(authapp)
#↑認証
m = MeCab.Tagger("-Ochasen")
N=0
#↑取得ツイート数
lang_dict="{'en': '英語', 'und': '不明', 'is': 'アイスランド語', 'ay': 'アイマラ語', 'ga': 'アイルランド語', 'az': 'アゼルバイジェン語', 'as': 'アッサム語', 'aa': 'アファル語', 'ab': 'アプハジア語', 'af': 'アフリカーンス語', 'am': 'アムハラ語', 'ar': 'アラビア語', 'sq': 'アルバニア語', 'hy': 'アルメニア語', 'it': 'イタリア語', 'yi': 'イディッシュ語', 'iu': 'イヌクティトット語', 'ik': 'イヌピア語', 'ia': 'インターリングア', 'ie': 'インターリング語', 'in': 'インドネシア語', 'ug': 'ウイグル語', 'cy': 'ウェールズ語', 'vo': 'ヴォラピュック語', 'wo': 'ウォロフ語', 'uk': 'ウクライナ語', 'uz': 'ウズベク語', 'ur': 'ウルドゥー語', 'et': 'エストニア語', 'eo': 'エスペラント語', 'or': 'オーリア語', 'oc': 'オキタン語', 'nl': 'オランダ語', 'om': 'オロモ語', 'kk': 'カザフ語', 'ks': 'カシミール語', 'ca': 'カタラン語', 'gl': 'ガリシア語', 'ko': '韓国語', 'kn': 'カンナダ語', 'km': 'カンボジア語', 'rw': 'キヤーワンダ語', 'el': 'ギリシャ語', 'ky': 'キルギス語', 'rn': 'キルンディ語', 'gn': 'グアラニー語', 'qu': 'クエチュア語', 'gu': 'グジャラト語', 'kl': 'グリーンランド語', 'ku': 'クルド語', 'ckb': '中央クルド語', 'hr': 'クロアチア語', 'gd': 'ゲーリック語', 'gv': 'ゲーリック語', 'xh': 'コーサ語', 'co': 'コルシカ語', 'sm': 'サモア語', 'sg': 'サングホ語', 'sa': 'サンスクリット語', 'ss': 'シスワティ語', 'jv': 'ジャワ語', 'ka': 'ジョージア語', 'sn': 'ショナ語', 'sd': 'シンド語', 'si': 'シンハラ語', 'sv': 'スウェーデン語', 'su': 'スーダン語', 'zu': 'ズールー語', 'es': 'スペイン語', 'sk': 'スロヴァキア語', 'sl': 'スロヴェニア語', 'sw': 'スワヒリ語', 'tn': 'セツワナ語', 'st': 'セト語', 'sr': 'セルビア語', 'sh': 'セルボクロアチア語', 'so': 'ソマリ語', 'th': 'タイ語', 'tl': 'タガログ語', 'tg': 'タジク語', 'tt': 'タタール語', 'ta': 'タミル語', 'cs': 'チェコ語', 'ti': 'チグリニャ語', 'bo': 'チベット語', 'zh': '中国語', 'ts': 'ヅォンガ語', 'te': 'テルグ語', 'da': 'デンマーク 語', 'de': 'ドイツ語', 'tw': 'トウィ語', 'tk': 'トルクメン語', 'tr': 'トルコ語', 'to': 'トンガ語', 'na': 'ナウル語', 'ja': '日本語', 'ne': 'ネパール語', 'no': 'ノルウェー語', 'ht': 'ハイチ語', 'ha': 'ハウサ語', 'be': '白ロシア語', 'ba': 'バシキール語', 'ps': 'パシト語', 'eu': 'バスク語', 'hu': 'ハンガリー語', 'pa': 'パンジャビ語', 'bi': 'ビスラマ語', 'bh': 'ビハール語', 'my': 'ビルマ語', 'hi': 'ヒンディー語', 'fj': 'フィジー語', 'fi': 'フィンランド語', 'dz': 'ブータン語', 'fo': 'フェロー語', 'fr': 'フランス語', 'fy': 'フリジア語', 'bg': 'ブルガリア語', 'br': 'ブルターニュ語', 'vi': 'ベトナム語', 'iw': 'ヘブライ語', 'fa': 'ペルシャ語', 'bn': 'ベンガル語', 'pl': 'ポーランド語', 'pt': 'ポルトガル語', 'mi': 'マオリ語', 'mk': 'マカドニア語', 'mg': 'マダガスカル語', 'mr': 'マラッタ語', 'ml': 'マラヤーラム語', 'mt': 'マルタ語', 'ms': 'マレー語', 'mo': 'モルダビア語', 'mn': 'モンゴル語', 'yo': 'ヨルバ語', 'lo': 'ラオタ語', 'la': 'ラテン語', 'lv': 'ラトビア語', 'lt': 'リトアニア語', 'ln': 'リンガラ語', 'li': 'リンブルク語', 'ro': 'ルーマニア語', 'rm': 'レートロマンス語', 'ru': 'ロシア語'}"
lang_dict=eval(lang_dict)
#↑辞書
def process(ty,tw):
pass
#↑行いたい言語処理
def judge_tweet_type(tweet):
if tweet.in_reply_to_status_id_str:
return "reply"
else:
head= str(tweet.text).split(":")
if len(head) >= 2 and "RT" in head[0]:
return "retwe"
else:
return "tweet"
#↑リプ、リツイート、ツイートか判断
def get_time(id):
two_raw=format(int(id),'016b').zfill(64)
unixtime = int(two_raw[:-22],2) + 1288834974657
unixtime_th = datetime.datetime.fromtimestamp(unixtime/1000)
tim = str(unixtime_th).replace(" ","_")[:-3]
return tim,unixtime
#↑idからツイート時間
def gather(tweet,type):
global N
global all_time
tim,unix=get_time(tweet.id)
original_text=tweet.text.replace("\n","")
tweet_type=judge_tweet_type(tweet)
nowtime=time.time()
tweet_pertime=str(round(N/(nowtime-all_time),1))
lag=str(round(nowtime-unix/1000,1))
lang=lang_dict[tweet.lang]
process(tweet_type,original_text)
print(N,tweet_pertime,"/s","+"+lag,tim,type,tweet_type,lang)
#print(N,tweet_pertime,"/s","+"+lag,tim,type,tweet_type,lang,original_text)
N=N+1
#↑streamとsearchの全ツイートが集まる
def search(last_id):
time_search =time.time()
for status in apiapp.search(q="filter:safe OR -filter:safe ",count="100",result_type="recent",since_id=last_id):
gather(status,"search")
#↑search本体
interval = 2.1
#↑search呼び出し間隔
trysearch=0
#↑search呼び出し回数
class StreamingListener(tweepy.StreamListener):
def on_status(self, status):
global time_search
global trysearch
gather(status,"stream")
time_stream=time.time()
time_stream-time_search % interval
#print(time_stream-time_search)
#print(interval*trysearch)
if time_stream-time_search-interval>interval*trysearch:
last_id=status.id
executor = concurrent.futures.ThreadPoolExecutor(max_workers=8)
#↑並列定義
executor.submit(search(last_id))
trysearch=trysearch+1
#↑streaming本体
def carry():
listener = StreamingListener()
streaming = tweepy.Stream(auth, listener)
streaming.sample()
#↑stream呼び出し関数
time_search =time.time()
#↑searchだがstream前に定義
executor = concurrent.futures.ThreadPoolExecutor(max_workers=8)
#↑並列定義
all_time=time.time()
executor.submit(carry)
#↑stream呼び出し関数呼び出し
이렇게 하면

이런 느낌으로 6900번째 트윗을 초당 80트윗 정도 2초 지연으로 stream과 search의 구별도 붙여 립이나 RT 등의 종별도 착용해 언어까지 표시해 줍니다.
도중의 함수의 process로 텍스트를 어떻게, 자연언어 처리 하는지, 여기에 플러스 해 나가면(자), 크다고 생각합니다.
고마워요. 또한 추가합니다.
정형문
트위터 (@kenkensz9)에 항상 있기 때문에 뭔가 있으면 부디
잘하면 좋겠어요!
Reference
이 문제에 관하여(트위터 API에서 대량 트윗 수집 (77 tweet/s) (stream과 search의 병렬 처리에서 ~)), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/kenkensz9/items/0c1507d232d24df74285텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)