本記事ではTwitter Search APIを使ってTweetを取得し、結果をBigQueryに格納する方法について紹介します。データ分析のフローである「取得」「保存」「分析」「可視化」のうち、「取得」と「保存」までを行うことになります。
使うもの
本記事では以下のサービスを使います。
TwitterのAPIを使うためにはAPIキーを取得する必要があります。以下から取得しましょう。
また、BigQueryとCloud StorageはGCPのサービスなので、使うためにはGCPに登録する必要があります。
Tweetを検索する
まずは、Twitterを検索してTweetを取得しましょう。今回はPythonのパッケージであるtweepyを使ってTweetを取得します。以下のようにしてインストールします。
$ pip install tweepy
まずはじめに、TwitterのAPIキーを読み込みます。今回は、APIキーは環境変数に設定されていることを想定しています。以下のコードでTwitterのAPIキーを読み込むことができます。
import os # Load api keys. consumer_key = os.environ.get('CONSUMER_KEY') consumer_secret = os.environ.get('CONSUMER_SECRET') access_token = os.environ.get('ACCESS_TOKEN') access_token_secret = os.environ.get('ACCESS_TOKEN_SECRET')
次に、APIを叩いてTweetを取得します。ここでは、まず先ほど読み込んだAPIキーを使ってAPIの認証を行います。その後、キーワードを指定してTweetを取得します。今回はキーワードとして「地震」を指定しています。
import tweepy # Search tweets. auth = tweepy.OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) api = tweepy.API(auth) tweets = tweepy.Cursor(api.search, q='地震', lang='ja').items(100)
APIのレスポンスについては以下のドキュメントで確認することができます。
最後に取得したTweetをファイルに保存します。保存する形式はjsonとします。ファイル名はプログラムを実行した時刻を付けました。
import json from datetime import datetime # Write tweets to json file. filename = 'tweets_{}.json'.format(datetime.now().strftime('%Y%m%d%H%M%S')) with open(filename, 'w') as f: for tweet in tweets: dic = {'texts': tweet.text, 'dates': str(tweet.created_at), 'id': tweet.id, 'user_id':tweet.user.id} json.dump(dic, f) f.write('\n')
以上のコードを実行することで指定したキーワードが含まれるTweetを取得することができます。全体のコードは以下のgistで公開しています。
次は、取得したTweetをBigQueryに格納してみましょう。
TweetをBigQueryに格納する
まずは、Tweetを格納するためのテーブルをBigQueryに作成する必要があります。以下のようなテーブルを作りましょう。
Tweetを格納するためのテーブルをBigQueryに作成したら、格納するためのbashスクリプトを書いていきます。スクリプトの中では、定期的にTweetを取得するPythonプログラムを実行します。取得したJSONファイルをCloud Storageにコピーし、Cloud StorageからBigQueryに格納します。Tweetを取得する間隔はsleepコマンドで調整しています。
#!/bin/bash while true; do today=$(date +"%Y-%m-%d") python fetch_tweets.py gsutil cp *.json gs://your-bucket-name/tweets/$today/ bq load --source_format=NEWLINE_DELIMITED_JSON tweet.earthquake gs://your-bucket-name/tweets/$today/`ls *.json` text:STRING,date:STRING,id:INTEGER,user_id:INTEGER rm *.json sleep 5m done
シェルスクリプトの実行
では書いたシェルスクリプトを実行しましょう。スクリプトはGoogle Compute Engineで実行することにします。接続が切断されても実行し続けるようにnohupを付けてシェルスクリプトを実行します。
$ nohup bash -c "source store_tweets.sh" &
結果を確認する
きちんとTweetをCloud StorageとBigQueryに格納できているか確認してみましょう。
まずは、Cloud Storageです。以下のように格納されていました。
次に、BigQueryを確認してみましょう。以下のようにTweetが格納されています。
これで分析のための準備が整いました。ただ、dateはDate型で格納したほうが良かったですね。
おわりに
本記事ではTwitter Search APIを使ってTweetを取得し、結果をBigQueryとCloud Storageに格納する方法について説明しました。データ分析のフローである「取得」「保存」「分析」「可視化」のうち、「取得」と「保存」までを行ったことになります。ここから先は、Cloud ML EngineやCloud Datalabを使って分析や可視化を行うことができます。
私のTwitterアカウントでも機械学習や自然言語処理に関する情報をつぶやいています。
この分野にご興味のある方のフォローをお待ちしています。