docker-ComWechat/test.py

335 lines
14 KiB
Python
Raw Normal View History

# by @0honus0
from ensurepip import version
import json
import copy
import threading
import requests
import socketserver
# from signal import signal, SIGPIPE, SIG_DFL
# signal(SIGPIPE,SIG_DFL)
class WECHAT_HTTP_APIS:
# login check
WECHAT_IS_LOGIN = 0 # 登录检查
# self info
WECHAT_GET_SELF_INFO = 1 # 获取个人信息
# send message
WECHAT_MSG_SEND_TEXT = 2 # 发送文本
WECHAT_MSG_SEND_AT = 3 # 发送群艾特
WECHAT_MSG_SEND_CARD = 4 # 分享好友名片
WECHAT_MSG_SEND_IMAGE = 5 # 发送图片
WECHAT_MSG_SEND_FILE = 6 # 发送文件
WECHAT_MSG_SEND_ARTICLE = 7 # 发送xml文章
WECHAT_MSG_SEND_APP = 8 # 发送小程序
# receive message
WECHAT_MSG_START_HOOK = 9 # 开启接收消息HOOK只支持socket监听
WECHAT_MSG_STOP_HOOK = 10 # 关闭接收消息HOOK
WECHAT_MSG_START_IMAGE_HOOK = 11 # 开启图片消息HOOK
WECHAT_MSG_STOP_IMAGE_HOOK = 12 # 关闭图片消息HOOK
WECHAT_MSG_START_VOICE_HOOK = 13 # 开启语音消息HOOK
WECHAT_MSG_STOP_VOICE_HOOK = 14 # 关闭语音消息HOOK
# contact
WECHAT_CONTACT_GET_LIST = 15 # 获取联系人列表
WECHAT_CONTACT_CHECK_STATUS = 16 # 检查是否被好友删除
WECHAT_CONTACT_DEL = 17 # 删除好友
WECHAT_CONTACT_SEARCH_BY_CACHE = 18 # 从内存中获取好友信息
WECHAT_CONTACT_SEARCH_BY_NET = 19 # 网络搜索用户信息
WECHAT_CONTACT_ADD_BY_WXID = 20 # wxid加好友
WECHAT_CONTACT_ADD_BY_V3 = 21 # v3数据加好友
WECHAT_CONTACT_ADD_BY_PUBLIC_ID = 22 # 关注公众号
WECHAT_CONTACT_VERIFY_APPLY = 23 # 通过好友请求
WECHAT_CONTACT_EDIT_REMARK = 24 # 修改备注
# chatroom
WECHAT_CHATROOM_GET_MEMBER_LIST = 25 # 获取群成员列表
WECHAT_CHATROOM_GET_MEMBER_NICKNAME = 26 # 获取指定群成员昵称
WECHAT_CHATROOM_DEL_MEMBER = 27 # 删除群成员
WECHAT_CHATROOM_ADD_MEMBER = 28 # 添加群成员
WECHAT_CHATROOM_SET_ANNOUNCEMENT = 29 # 设置群公告
WECHAT_CHATROOM_SET_CHATROOM_NAME = 30 # 设置群聊名称
WECHAT_CHATROOM_SET_SELF_NICKNAME = 31 # 设置群内个人昵称
# database
WECHAT_DATABASE_GET_HANDLES = 32 # 获取数据库句柄
WECHAT_DATABASE_BACKUP = 33 # 备份数据库
WECHAT_DATABASE_QUERY = 34 # 数据库查询
# version
WECHAT_SET_VERSION = 35 # 修改微信版本号
# log
WECHAT_LOG_START_HOOK = 36 # 开启日志信息HOOK
WECHAT_LOG_STOP_HOOK = 37 # 关闭日志信息HOOK
# browser
WECHAT_BROWSER_OPEN_WITH_URL = 38 # 打开微信内置浏览器
WECHAT_GET_PUBLIC_MSG = 39 # 获取公众号历史消息
WECHAT_MSG_FORWARD_MESSAGE = 40 # 转发消息
WECHAT_GET_QRCODE_IMAGE = 41 # 获取二维码
APIS = WECHAT_HTTP_APIS
# http api 参数模板
class WECHAT_HTTP_API_PARAM_TEMPLATES:
__HTTP_API_PARAM_TEMPLATE = {
# login check
APIS.WECHAT_IS_LOGIN: {},
# self info
APIS.WECHAT_GET_SELF_INFO: {},
# send message
APIS.WECHAT_MSG_SEND_TEXT: {"wxid": "",
"msg": ""},
# wxids需要以`,`分隔,例如`wxid1,wxid2,wxid3`
APIS.WECHAT_MSG_SEND_AT: {"chatroom_id":"",
"wxids": "",
"msg": "",
"auto_nickname": 1},
APIS.WECHAT_MSG_SEND_CARD: {"receiver":"",
"shared_wxid":"",
"nickname":""},
APIS.WECHAT_MSG_SEND_IMAGE: {"receiver":"",
"img_path":""},
APIS.WECHAT_MSG_SEND_FILE: {"receiver":"",
"file_path":""},
APIS.WECHAT_MSG_SEND_ARTICLE: {"wxid":"",
"title":"",
"abstract":"",
"url":"",
"img_path":""},
APIS.WECHAT_MSG_SEND_APP: {"wxid":"",
"appid":""},
# receive message
APIS.WECHAT_MSG_START_HOOK: {"port": 10808},
APIS.WECHAT_MSG_STOP_HOOK: {},
APIS.WECHAT_MSG_START_IMAGE_HOOK: {"save_path":""},
APIS.WECHAT_MSG_STOP_IMAGE_HOOK: {},
APIS.WECHAT_MSG_START_VOICE_HOOK: {"save_path":""},
APIS.WECHAT_MSG_STOP_VOICE_HOOK: {},
# contact
APIS.WECHAT_CONTACT_GET_LIST: {},
APIS.WECHAT_CONTACT_CHECK_STATUS: {"wxid":""},
APIS.WECHAT_CONTACT_DEL: {"wxid":""},
APIS.WECHAT_CONTACT_SEARCH_BY_CACHE: {"wxid":""},
APIS.WECHAT_CONTACT_SEARCH_BY_NET: {"keyword":""},
APIS.WECHAT_CONTACT_ADD_BY_WXID: {"wxid":"",
"msg":""},
APIS.WECHAT_CONTACT_ADD_BY_V3: {"v3":"",
"msg":"",
"add_type": 0x6},
APIS.WECHAT_CONTACT_ADD_BY_PUBLIC_ID: {"public_id":""},
APIS.WECHAT_CONTACT_VERIFY_APPLY: {"v3":"",
"v4":""},
APIS.WECHAT_CONTACT_EDIT_REMARK: {"wxid":"",
"remark":""},
# chatroom
APIS.WECHAT_CHATROOM_GET_MEMBER_LIST: {"chatroom_id":""},
APIS.WECHAT_CHATROOM_GET_MEMBER_NICKNAME: {"chatroom_id":"",
"wxid":""},
# wxids需要以`,`分隔,例如`wxid1,wxid2,wxid3`
APIS.WECHAT_CHATROOM_DEL_MEMBER: {"chatroom_id":"",
"wxids":""},
# wxids需要以`,`分隔,例如`wxid1,wxid2,wxid3`
APIS.WECHAT_CHATROOM_ADD_MEMBER: {"chatroom_id":"",
"wxids":""},
APIS.WECHAT_CHATROOM_SET_ANNOUNCEMENT: {"chatroom_id":"",
"announcement":""},
APIS.WECHAT_CHATROOM_SET_CHATROOM_NAME: {"chatroom_id":"",
"chatroom_name":""},
APIS.WECHAT_CHATROOM_SET_SELF_NICKNAME: {"chatroom_id":"",
"nickname":""},
# database
APIS.WECHAT_DATABASE_GET_HANDLES: {},
APIS.WECHAT_DATABASE_BACKUP: {"db_handle":0,
"save_path":""},
APIS.WECHAT_DATABASE_QUERY: {"db_handle":0,
"sql":""},
# version
APIS.WECHAT_SET_VERSION: {"version": "3.7.0.30"},
# log
APIS.WECHAT_LOG_START_HOOK: {},
APIS.WECHAT_LOG_STOP_HOOK: {},
# browser
APIS.WECHAT_BROWSER_OPEN_WITH_URL: {"url": "https://www.baidu.com/"},
APIS.WECHAT_GET_PUBLIC_MSG: {"public_id": "","offset": ""},
APIS.WECHAT_MSG_FORWARD_MESSAGE: {"wxid": "filehelper","msgid": 2 ** 64 - 1},
APIS.WECHAT_GET_QRCODE_IMAGE: {}
}
def get_http_template(self, api_number):
try:
return copy.deepcopy(self.__HTTP_API_PARAM_TEMPLATE[api_number])
except KeyError:
raise ValueError("There is no interface numbered %s." % api_number)
get_http_template = WECHAT_HTTP_API_PARAM_TEMPLATES().get_http_template
class ReceiveMsgSocketServer(socketserver.BaseRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def handle(self):
conn = self.request
while True:
try:
ptr_data = b""
while True:
data = conn.recv(1024)
ptr_data += data
if len(data) == 0 or data[-1] == 0xA:
break
msg = json.loads(ptr_data.decode('utf-8'))
ReceiveMsgSocketServer.msg_callback(msg)
conn.sendall("200 OK".encode())
except OSError:
break
except json.JSONDecodeError:
pass
conn.close()
@staticmethod
def msg_callback(msg):
print(msg)
# TODO: 在这里写额外的消息处理逻辑
def post_wechat_http_api(api,port,data = {}):
url = "http://127.0.0.1:{}/api/?type={}".format(port,api)
resp = requests.post(url = url,data = json.dumps(data))
return resp.json()
def get_wechat_http_api(api,port,data = {}):
url = "http://127.0.0.1:{}/api/?type={}".format(port,api)
resp = requests.get(url = url,params = data)
return resp.json()
def get_wechat_pid_list() -> list:
import psutil
pid_list = []
process_list = psutil.pids()
for pid in process_list:
try:
if psutil.Process(pid).name() == 'WeChat.exe':
pid_list.append(pid)
except psutil.NoSuchProcess:
pass
return pid_list
def start_socket_server(port: int = 10808,
request_handler = ReceiveMsgSocketServer,
main_thread: bool = True) -> int or None:
ip_port = ("127.0.0.1", port)
try:
s = socketserver.ThreadingTCPServer(ip_port, request_handler)
if main_thread:
s.serve_forever()
else:
socket_server = threading.Thread(target=s.serve_forever)
socket_server.setDaemon(True)
socket_server.start()
return socket_server.ident
except KeyboardInterrupt:
pass
except Exception as e:
print(e)
return None
def test_send_msg(test_port):
post_wechat_http_api(APIS.WECHAT_LOG_START_HOOK,port = test_port)
if post_wechat_http_api(APIS.WECHAT_IS_LOGIN,port = test_port)["is_login"] == 1:
print(post_wechat_http_api(APIS.WECHAT_GET_SELF_INFO,port = test_port))
data = {"wxid":"filehelper","msg":"hello http"}
post_wechat_http_api(APIS.WECHAT_MSG_SEND_TEXT,data = data,port = test_port)
data = {"receiver":'filehelper',"shared_wxid":"filehelper","nickname":"文件传输助手"}
post_wechat_http_api(APIS.WECHAT_MSG_SEND_CARD,data = data,port = test_port)
data = {"receiver":'filehelper',"img_path":r"D:\VS2019C++\MyWeChatRobot\test\测试图片.png"}
post_wechat_http_api(APIS.WECHAT_MSG_SEND_IMAGE,data = data,port = test_port)
data = {"receiver":'filehelper',"file_path":r"D:\VS2019C++\MyWeChatRobot\test\测试文件"}
post_wechat_http_api(APIS.WECHAT_MSG_SEND_FILE,data = data,port = test_port)
data = {"wxid":'filehelper',
"title":"百度",
"abstract":"百度一下,你就知道",
"url":"https://www.baidu.com/",
"img_path":""}
post_wechat_http_api(APIS.WECHAT_MSG_SEND_ARTICLE,data = data,port = test_port)
print(post_wechat_http_api(APIS.WECHAT_CONTACT_GET_LIST,port = test_port))
data = {"wxid":"filehelper"}
print(post_wechat_http_api(APIS.WECHAT_CONTACT_CHECK_STATUS,data = data,port = test_port))
post_wechat_http_api(APIS.WECHAT_LOG_STOP_HOOK,port = test_port)
def test_get_public_msg(test_port,public_id):
import time
param = {"public_id": public_id,"offset": ""}
data = post_wechat_http_api(APIS.WECHAT_GET_PUBLIC_MSG,test_port,param)
msg_list = json.loads(data['msg'])['MsgList']
next_offset = msg_list['PagingInfo']['Offset']
for msg in msg_list['Msg']:
detail_info = msg['AppMsg']['DetailInfo']
for info in detail_info:
Title = info['Title']
Digest = info['Digest']
ContentUrl = info['ContentUrl']
post_wechat_http_api(APIS.WECHAT_BROWSER_OPEN_WITH_URL,
test_port,
{"url":ContentUrl}
)
time.sleep(3)
break
break
def test_get_chatroom_list_from_db(test_port):
dbs = post_wechat_http_api(APIS.WECHAT_DATABASE_GET_HANDLES,port = test_port)
db_handle = [i for i in dbs['data'] if i['db_name'] == 'MicroMsg.db'][0]['handle']
sql = "select UserName,Alias,EncryptUserName,Type,VerifyFlag,Remark,NickName,ChatRoomType,ExtraBuf from Contact where Type=2;"
data = {"db_handle":db_handle,"sql":sql}
res = post_wechat_http_api(APIS.WECHAT_DATABASE_QUERY,data = data,port = test_port)
return res['data']
if __name__ == '__main__':
port = 18888
# pids = get_wechat_pid_list()
# if len(pids) == 0:
# pids.append(new_wechat())
# start_listen(pids[0],port)
#post_wechat_http_api(APIS.WECHAT_LOG_START_HOOK,port)
print(post_wechat_http_api(APIS.WECHAT_GET_SELF_INFO, port))
# 获取最新版本微信版本号 @tom-snow
# https://api.github.com/repos/tom-snow/wechat-windows-versions/releases?per_page=1 # json 读 [0]["tag_name"]
resp = requests.get(url = "https://api.github.com/repos/tom-snow/wechat-windows-versions/releases?per_page=1",params = {"per_page": 1})
latest_version = resp.json()[0]["tag_name"]
print(post_wechat_http_api(APIS.WECHAT_SET_VERSION, port, {"version": latest_version})) # 修改成最新版本,防止更新
print(post_wechat_http_api(APIS.WECHAT_MSG_START_HOOK, port, {"port":10808}))
start_socket_server()
# stop_listen(pids[0])