논문을 서보하고 있다고 응답으로 꾸짖어주는 bot를 만들었다(AWS Lambda + CloudWatch Events)
17580 단어 GitHub람다CloudWatchTwitterAPIAWS
수행한 작업: 게시 논문의 진행 관리를 위한 TwitterBot 만들기
htps : // 라고 해서 r. 코 m/나오키_마에지마/s타츠 s/1061196481762222080
파일 구성
twitter_bot
├── config.yml
├── lambda_function.py
└── requirements.txt
이 밖에 나중에 파이썬 패키지가 설치된다.
requirements.txt
twitter_bot
├── config.yml
├── lambda_function.py
└── requirements.txt
requirements.txt
requests_oauthlib==1.0.0
GitPython==2.1.11
git_lambda==1.1.3
PyYAML==3.13
다양한 설정을위한 yaml 파일 (config.yml)
config.yml
keys:
twitter:
consumer_key: "XXXXXXXXXXXXXXXXXXX"
consumer_secret: "XXXXXXXXXXXXXXXXXXX"
access_token: "XXXXXXXXXXXXXXXXXXX"
access_secret: "XXXXXXXXXXXXXXXXXXX"
github:
repository: "https://username:[email protected]/username/repository.git"
config:
twitter:
user: "@twitter_user"
alert_text: #サボった時のお叱り
- "論文を書け、まだ間に合う。"
- "論文を書きなさ~い!!"
- "論文を進めれば、不安は消える。"
- "明日やろうはバカ野郎だ。"
praise_text: #サボってなかった時の褒め
- "論文を書いて、えらーい!!"
- "論文を書いたので救われた。"
- "いいね、この調子!!明日はもっと書ける!!"
- "がんばった!!投稿まであと少し!!"
스크립트 (lambda_function.py)
lambda_function.pyimport git_lambda
git_lambda.setup() #lambdaからgitを使えるように設定しておく
from git import Repo, RemoteProgress
import yaml
import os
import random
from datetime import datetime, timedelta, timezone
from requests_oauthlib import OAuth1Session
def lambda_handler(event, context):
#各種パラメータの設定
TEMP_DIR = '/tmp/git_test_' + str(datetime.now().timestamp()) #git init用の一時フォルダ。lambdaでは/tmp以下でしか書き込みができない
JST = timezone(timedelta(hours=+9),'JST') #日本時間に設定
TODAY_DATE = datetime.now(JST).date() #現在の月日を取得
UPDATE_URL = 'https://api.twitter.com/1.1/statuses/update.json' #Twitter投稿用
#設定ファイルを読み込む
with open('./config.yml','r') as f:
keys = yaml.load(f)
#Twitterの認証情報
consumer_key = keys['keys']['twitter']['consumer_key']
consumer_secret = keys['keys']['twitter']['consumer_secret']
access_token = keys['keys']['twitter']['access_token']
access_secret = keys['keys']['twitter']['access_secret']
#gitリモートリポジトリからfetchする
os.makedirs(TEMP_DIR, exist_ok=True)
repo = Repo.init(TEMP_DIR)
origin = repo.create_remote('origin',keys['keys']['github']['repository'])
origin.fetch()
commits = repo.iter_commits('origin/master')
#最新のコミットから順に読まれていくので、今日のコミットでなくなるまで詳細を取得していく
today_insertions = 0
today_deletions = 0
today_commit_count = 0
for commit in commits:
commit_date = datetime.fromtimestamp(commit.committed_date,JST).date()
if(commit_date == TODAY_DATE):
commit_stats = commit.stats.total
today_insertions += commit_stats['insertions']
today_deletions += commit_stats['deletions']
today_commit_count += 1
else:
break
#ツイートを部品ごとに生成していく
header_text = "\n".join([keys['config']['twitter']['user'], #リプライ先の指定
TODAY_DATE.strftime("%Y/%m/%d")]) #日付
result_text = "\n".join(["コミット数:" + str(today_commit_count),
"追加した行:" + str(today_insertions),
"削除した行:" + str(today_deletions)])
if(today_commit_count == 0):
#サボってた場合のテキスト
comment_text = random.choice(keys['config']['twitter']['alert_text'])
else:
#頑張って論文を書いた場合のテキスト
comment_text = random.choice(keys['config']['twitter']['praise_text'])
text = "\n".join([header_text,result_text,comment_text])
#TwitterAPI経由で実際にツイートを行う
params = {"status": text}
twitter = OAuth1Session(consumer_key, consumer_secret, access_token, access_secret)
req = twitter.post(UPDATE_URL, params = params)
return "Tweet OK"
패키지 당 zip으로 굳힌다
pip install -r requirements.txt -t .
zip -r twitter_bot.zip ./*
import git_lambda
git_lambda.setup() #lambdaからgitを使えるように設定しておく
from git import Repo, RemoteProgress
import yaml
import os
import random
from datetime import datetime, timedelta, timezone
from requests_oauthlib import OAuth1Session
def lambda_handler(event, context):
#各種パラメータの設定
TEMP_DIR = '/tmp/git_test_' + str(datetime.now().timestamp()) #git init用の一時フォルダ。lambdaでは/tmp以下でしか書き込みができない
JST = timezone(timedelta(hours=+9),'JST') #日本時間に設定
TODAY_DATE = datetime.now(JST).date() #現在の月日を取得
UPDATE_URL = 'https://api.twitter.com/1.1/statuses/update.json' #Twitter投稿用
#設定ファイルを読み込む
with open('./config.yml','r') as f:
keys = yaml.load(f)
#Twitterの認証情報
consumer_key = keys['keys']['twitter']['consumer_key']
consumer_secret = keys['keys']['twitter']['consumer_secret']
access_token = keys['keys']['twitter']['access_token']
access_secret = keys['keys']['twitter']['access_secret']
#gitリモートリポジトリからfetchする
os.makedirs(TEMP_DIR, exist_ok=True)
repo = Repo.init(TEMP_DIR)
origin = repo.create_remote('origin',keys['keys']['github']['repository'])
origin.fetch()
commits = repo.iter_commits('origin/master')
#最新のコミットから順に読まれていくので、今日のコミットでなくなるまで詳細を取得していく
today_insertions = 0
today_deletions = 0
today_commit_count = 0
for commit in commits:
commit_date = datetime.fromtimestamp(commit.committed_date,JST).date()
if(commit_date == TODAY_DATE):
commit_stats = commit.stats.total
today_insertions += commit_stats['insertions']
today_deletions += commit_stats['deletions']
today_commit_count += 1
else:
break
#ツイートを部品ごとに生成していく
header_text = "\n".join([keys['config']['twitter']['user'], #リプライ先の指定
TODAY_DATE.strftime("%Y/%m/%d")]) #日付
result_text = "\n".join(["コミット数:" + str(today_commit_count),
"追加した行:" + str(today_insertions),
"削除した行:" + str(today_deletions)])
if(today_commit_count == 0):
#サボってた場合のテキスト
comment_text = random.choice(keys['config']['twitter']['alert_text'])
else:
#頑張って論文を書いた場合のテキスト
comment_text = random.choice(keys['config']['twitter']['praise_text'])
text = "\n".join([header_text,result_text,comment_text])
#TwitterAPI経由で実際にツイートを行う
params = {"status": text}
twitter = OAuth1Session(consumer_key, consumer_secret, access_token, access_secret)
req = twitter.post(UPDATE_URL, params = params)
return "Tweet OK"
pip install -r requirements.txt -t .
zip -r twitter_bot.zip ./*
Lambda 콘솔에서 테스트
실제로 트윗되었습니다.
main.py
from lambda_function import lambda_handler
lambda_handler(None,None)
CloudWatch Events에서 정기 실행 설정
59 14 ? * MON,TUE,THU *
참고
Reference
이 문제에 관하여(논문을 서보하고 있다고 응답으로 꾸짖어주는 bot를 만들었다(AWS Lambda + CloudWatch Events)), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/malimo1024/items/566c954384cd2722a778텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)