Ahogrammer

Deep Dive Into NLP, ML and Cloud

Tweetを定期的に取得してBigQueryに格納する

本記事ではTwitter Search APIを使ってTweetを取得し、結果をBigQueryに格納する方法について紹介します。データ分析のフローである「取得」「保存」「分析」「可視化」のうち、「取得」と「保存」までを行うことになります。

使うもの

本記事では以下のサービスを使います。

TwitterAPIを使うためにはAPIキーを取得する必要があります。以下から取得しましょう。

また、BigQueryとCloud StorageはGCPのサービスなので、使うためにはGCPに登録する必要があります。

Tweetを検索する

まずは、Twitterを検索してTweetを取得しましょう。今回はPythonのパッケージであるtweepyを使ってTweetを取得します。以下のようにしてインストールします。

$ pip install tweepy

まずはじめに、TwitterAPIキーを読み込みます。今回は、APIキーは環境変数に設定されていることを想定しています。以下のコードでTwitterAPIキーを読み込むことができます。

import os

# Load api keys.
consumer_key = os.environ.get('CONSUMER_KEY')
consumer_secret = os.environ.get('CONSUMER_SECRET')
access_token = os.environ.get('ACCESS_TOKEN')
access_token_secret = os.environ.get('ACCESS_TOKEN_SECRET')

次に、APIを叩いてTweetを取得します。ここでは、まず先ほど読み込んだAPIキーを使ってAPIの認証を行います。その後、キーワードを指定してTweetを取得します。今回はキーワードとして「地震」を指定しています。

import tweepy

# Search tweets.
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

api = tweepy.API(auth)
tweets = tweepy.Cursor(api.search, q='地震', lang='ja').items(100)

APIのレスポンスについては以下のドキュメントで確認することができます。

最後に取得したTweetをファイルに保存します。保存する形式はjsonとします。ファイル名はプログラムを実行した時刻を付けました。

import json
from datetime import datetime

# Write tweets to json file.
filename = 'tweets_{}.json'.format(datetime.now().strftime('%Y%m%d%H%M%S'))
with open(filename, 'w') as f:
    for tweet in tweets:
        dic = {'texts': tweet.text,
               'dates': str(tweet.created_at),
               'id': tweet.id,
               'user_id':tweet.user.id}
        json.dump(dic, f)
        f.write('\n')

以上のコードを実行することで指定したキーワードが含まれるTweetを取得することができます。全体のコードは以下のgistで公開しています。

次は、取得したTweetをBigQueryに格納してみましょう。

TweetをBigQueryに格納する

まずは、Tweetを格納するためのテーブルをBigQueryに作成する必要があります。以下のようなテーブルを作りましょう。

f:id:Hironsan:20180523153014p:plain

Tweetを格納するためのテーブルをBigQueryに作成したら、格納するためのbashスクリプトを書いていきます。スクリプトの中では、定期的にTweetを取得するPythonプログラムを実行します。取得したJSONファイルをCloud Storageにコピーし、Cloud StorageからBigQueryに格納します。Tweetを取得する間隔はsleepコマンドで調整しています。

#!/bin/bash
while true; do 
 today=$(date +"%Y-%m-%d")
 python fetch_tweets.py
 gsutil cp *.json gs://your-bucket-name/tweets/$today/
 
 bq load --source_format=NEWLINE_DELIMITED_JSON   tweet.earthquake gs://your-bucket-name/tweets/$today/`ls *.json`  text:STRING,date:STRING,id:INTEGER,user_id:INTEGER
 
 rm *.json
 sleep 5m
done

シェルスクリプトの実行

では書いたシェルスクリプトを実行しましょう。スクリプトGoogle Compute Engineで実行することにします。接続が切断されても実行し続けるようにnohupを付けてシェルスクリプトを実行します。

$ nohup bash -c "source store_tweets.sh" &

結果を確認する

きちんとTweetをCloud StorageとBigQueryに格納できているか確認してみましょう。

まずは、Cloud Storageです。以下のように格納されていました。

f:id:Hironsan:20180523153610p:plain

次に、BigQueryを確認してみましょう。以下のようにTweetが格納されています。

f:id:Hironsan:20180523153729p:plain

これで分析のための準備が整いました。ただ、dateはDate型で格納したほうが良かったですね。

おわりに

本記事ではTwitter Search APIを使ってTweetを取得し、結果をBigQueryとCloud Storageに格納する方法について説明しました。データ分析のフローである「取得」「保存」「分析」「可視化」のうち、「取得」と「保存」までを行ったことになります。ここから先は、Cloud ML EngineやCloud Datalabを使って分析や可視化を行うことができます。

私のTwitterアカウントでも機械学習自然言語処理に関する情報をつぶやいています。

この分野にご興味のある方のフォローをお待ちしています。

参考資料