-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathlinebot.py
More file actions
104 lines (81 loc) · 3.01 KB
/
linebot.py
File metadata and controls
104 lines (81 loc) · 3.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from bottle import Bottle, request, HTTPResponse
from functools import wraps
import base64
import hashlib
import hmac
import json
import requests
import os
import configparser
channel_secret = os.environ.get('channel_secret')
access_token = os.environ.get('access_token')
PORT = os.environ.get('PORT')
HEADERS = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {{{access_token}}}'
}
REPLY_URL = 'https://api.line.me/v2/bot/message/reply'
class simple_bot(Bottle):
def __init__(self, reply_cfg='reply.cfg'):
Bottle.__init__(self)
self.reply_cfg = reply_cfg
self.init_reply()
def init_reply(self):
reply_list = configparser.ConfigParser()
reply_list.optionxform = str
reply_list.read(self.reply_cfg)
for key in reply_list.keys():
if key in globals():
raise ValueError(f'Name \'{key}\' has been used.')
r = {**reply_list[key]}
rep = r.get('regex')
kwd = r.get('keywd')
if rep:
globals()[key] = self.regex(rep)(lambda: r)
elif kwd:
kwds_list = kwd.split('|')
globals()[key] = self.keywds(kwds_list)(lambda: r)
def regex(self, rep):
def msg_func(func):
reply_func = reply(func())
return self.post(f'/regex/<:re:{rep}>')(reply_func)
return msg_func
def keywds(self, kwds_list):
rep = f'.*({"|".join(kwds_list)}).*'
return self.regex(rep)
def webhook(self, func):
def webhook_func(gunc):
@wraps(gunc)
@validate_signature
def wrapper(*args, **kwds):
ret = gunc(*args, **kwds)
body = json.loads(request.body.read().decode('utf8')).get('events')
if body and body[0].get('message', {}).get('type') == 'text':
token = body[0].get('replyToken') if body else None
text = body[0]['message']['text']
data = {'token': token}
requests.post(f'http://localhost:{PORT}/regex/{text}', data=data)
return ret
return wrapper
return self.post('/webhook')(webhook_func(func))
def validate_signature(func):
@wraps(func)
def wrapper(*args, **kwds):
body = request.body.read()
hash_value = hmac.new(channel_secret.encode('utf-8'), body, hashlib.sha256).digest()
signature = base64.b64encode(hash_value)
X_Line_Signature = request.headers.get('X-Line-Signature', '').encode('utf-8')
if not (X_Line_Signature and signature == X_Line_Signature):
return HTTPResponse(status=403)
return func(*args, **kwds)
return wrapper
def reply(msg):
def reply_func():
token = request.forms.get('token')
if token:
data = {
'replyToken': token,
'messages': [msg]
}
requests.post(REPLY_URL, headers=HEADERS, json=data)
return reply_func