트위터 API에서 대량 트윗 수집 (77 tweet/s) (stream과 search의 병렬 처리에서 ~)

collect.py
import tweepy
import datetime
import re
import collections
from pytz import timezone
import time
import MeCab
#import threading
#from multiprocessing import Pool
import os
#import multiprocessing
import concurrent.futures
#↑インポート


consumer_key = ""
consumer_secret = ""
access_token = ""
access_token_secret = ""
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
authapp = tweepy.AppAuthHandler(consumer_key,consumer_secret)
apiapp = tweepy.API(authapp)
#↑認証

m = MeCab.Tagger("-Ochasen")

N=0
#↑取得ツイート数

lang_dict="{'en': '英語', 'und': '不明', 'is': 'アイスランド語', 'ay': 'アイマラ語', 'ga': 'アイルランド語', 'az': 'アゼルバイジェン語', 'as': 'アッサム語', 'aa': 'アファル語', 'ab': 'アプハジア語', 'af': 'アフリカーンス語', 'am': 'アムハラ語', 'ar': 'アラビア語', 'sq': 'アルバニア語', 'hy': 'アルメニア語', 'it': 'イタリア語', 'yi': 'イディッシュ語', 'iu': 'イヌクティトット語', 'ik': 'イヌピア語', 'ia': 'インターリングア', 'ie': 'インターリング語', 'in': 'インドネシア語', 'ug': 'ウイグル語', 'cy': 'ウェールズ語', 'vo': 'ヴォラピュック語', 'wo': 'ウォロフ語', 'uk': 'ウクライナ語', 'uz': 'ウズベク語', 'ur': 'ウルドゥー語', 'et': 'エストニア語', 'eo': 'エスペラント語', 'or': 'オーリア語', 'oc': 'オキタン語', 'nl': 'オランダ語', 'om': 'オロモ語', 'kk': 'カザフ語', 'ks': 'カシミール語', 'ca': 'カタラン語', 'gl': 'ガリシア語', 'ko': '韓国語', 'kn': 'カンナダ語', 'km': 'カンボジア語', 'rw': 'キヤーワンダ語', 'el': 'ギリシャ語', 'ky': 'キルギス語', 'rn': 'キルンディ語', 'gn': 'グアラニー語', 'qu': 'クエチュア語', 'gu': 'グジャラト語', 'kl': 'グリーンランド語', 'ku': 'クルド語', 'ckb': '中央クルド語', 'hr': 'クロアチア語', 'gd': 'ゲーリック語', 'gv': 'ゲーリック語', 'xh': 'コーサ語', 'co': 'コルシカ語', 'sm': 'サモア語', 'sg': 'サングホ語', 'sa': 'サンスクリット語', 'ss': 'シスワティ語', 'jv': 'ジャワ語', 'ka': 'ジョージア語', 'sn': 'ショナ語', 'sd': 'シンド語', 'si': 'シンハラ語', 'sv': 'スウェーデン語', 'su': 'スーダン語', 'zu': 'ズールー語', 'es': 'スペイン語', 'sk': 'スロヴァキア語', 'sl': 'スロヴェニア語', 'sw': 'スワヒリ語', 'tn': 'セツワナ語', 'st': 'セト語', 'sr': 'セルビア語', 'sh': 'セルボクロアチア語', 'so': 'ソマリ語', 'th': 'タイ語', 'tl': 'タガログ語', 'tg': 'タジク語', 'tt': 'タタール語', 'ta': 'タミル語', 'cs': 'チェコ語', 'ti': 'チグリニャ語', 'bo': 'チベット語', 'zh': '中国語', 'ts': 'ヅォンガ語', 'te': 'テルグ語', 'da': 'デンマーク 語', 'de': 'ドイツ語', 'tw': 'トウィ語', 'tk': 'トルクメン語', 'tr': 'トルコ語', 'to': 'トンガ語', 'na': 'ナウル語', 'ja': '日本語', 'ne': 'ネパール語', 'no': 'ノルウェー語', 'ht': 'ハイチ語', 'ha': 'ハウサ語', 'be': '白ロシア語', 'ba': 'バシキール語', 'ps': 'パシト語', 'eu': 'バスク語', 'hu': 'ハンガリー語', 'pa': 'パンジャビ語', 'bi': 'ビスラマ語', 'bh': 'ビハール語', 'my': 'ビルマ語', 'hi': 'ヒンディー語', 'fj': 'フィジー語', 'fi': 'フィンランド語', 'dz': 'ブータン語', 'fo': 'フェロー語', 'fr': 'フランス語', 'fy': 'フリジア語', 'bg': 'ブルガリア語', 'br': 'ブルターニュ語', 'vi': 'ベトナム語', 'iw': 'ヘブライ語', 'fa': 'ペルシャ語', 'bn': 'ベンガル語', 'pl': 'ポーランド語', 'pt': 'ポルトガル語', 'mi': 'マオリ語', 'mk': 'マカドニア語', 'mg': 'マダガスカル語', 'mr': 'マラッタ語', 'ml': 'マラヤーラム語', 'mt': 'マルタ語', 'ms': 'マレー語', 'mo': 'モルダビア語', 'mn': 'モンゴル語', 'yo': 'ヨルバ語', 'lo': 'ラオタ語', 'la': 'ラテン語', 'lv': 'ラトビア語', 'lt': 'リトアニア語', 'ln': 'リンガラ語', 'li': 'リンブルク語', 'ro': 'ルーマニア語', 'rm': 'レートロマンス語', 'ru': 'ロシア語'}"
lang_dict=eval(lang_dict)
#↑辞書

def process(ty,tw):
 pass
#↑行いたい言語処理

def judge_tweet_type(tweet):
 if tweet.in_reply_to_status_id_str:
  return "reply"
 else:
  head= str(tweet.text).split(":")
 if len(head) >= 2 and "RT" in head[0]:
  return "retwe"
 else:
  return "tweet"
#↑リプ、リツイート、ツイートか判断

def get_time(id):
 two_raw=format(int(id),'016b').zfill(64)
 unixtime = int(two_raw[:-22],2) + 1288834974657
 unixtime_th = datetime.datetime.fromtimestamp(unixtime/1000)
 tim = str(unixtime_th).replace(" ","_")[:-3]
 return tim,unixtime
#↑idからツイート時間

def gather(tweet,type):
 global N
 global all_time
 tim,unix=get_time(tweet.id)
 original_text=tweet.text.replace("\n","")
 tweet_type=judge_tweet_type(tweet)
 nowtime=time.time()
 tweet_pertime=str(round(N/(nowtime-all_time),1))
 lag=str(round(nowtime-unix/1000,1))
 lang=lang_dict[tweet.lang]
 process(tweet_type,original_text)
 print(N,tweet_pertime,"/s","+"+lag,tim,type,tweet_type,lang)
 #print(N,tweet_pertime,"/s","+"+lag,tim,type,tweet_type,lang,original_text)
 N=N+1
#↑streamとsearchの全ツイートが集まる


def search(last_id):
 time_search =time.time()
 for status in apiapp.search(q="filter:safe OR -filter:safe ",count="100",result_type="recent",since_id=last_id):
   gather(status,"search")
#↑search本体


interval = 2.1
#↑search呼び出し間隔

trysearch=0
#↑search呼び出し回数

class StreamingListener(tweepy.StreamListener):
    def on_status(self, status):
        global time_search
        global trysearch
        gather(status,"stream")
        time_stream=time.time()
        time_stream-time_search % interval
        #print(time_stream-time_search)
        #print(interval*trysearch)
        if time_stream-time_search-interval>interval*trysearch:
           last_id=status.id
           executor = concurrent.futures.ThreadPoolExecutor(max_workers=8)
           #↑並列定義
           executor.submit(search(last_id))
           trysearch=trysearch+1
#↑streaming本体

def carry():
 listener = StreamingListener()
 streaming = tweepy.Stream(auth, listener)
 streaming.sample()
#↑stream呼び出し関数

time_search =time.time()
#↑searchだがstream前に定義

executor = concurrent.futures.ThreadPoolExecutor(max_workers=8)
#↑並列定義
all_time=time.time()
executor.submit(carry)
#↑stream呼び出し関数呼び出し

이렇게 하면

이런 느낌으로 6900번째 트윗을 초당 80트윗 정도 2초 지연으로 stream과 search의 구별도 붙여 립이나 RT 등의 종별도 착용해 언어까지 표시해 줍니다.
도중의 함수의 process로 텍스트를 어떻게, 자연언어 처리 하는지, 여기에 플러스 해 나가면(자), 크다고 생각합니다.

고마워요. 또한 추가합니다.

정형문
트위터 (@kenkensz9)에 항상 있기 때문에 뭔가 있으면 부디
잘하면 좋겠어요!

좋은 웹페이지 즐겨찾기