docker-ComWechat/test.py

334 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# by @0honus0
from ensurepip import version
import json
import copy
import threading
import requests
import socketserver
# from signal import signal, SIGPIPE, SIG_DFL
# signal(SIGPIPE,SIG_DFL)
class WECHAT_HTTP_APIS:
# login check
WECHAT_IS_LOGIN = 0 # 登录检查
# self info
WECHAT_GET_SELF_INFO = 1 # 获取个人信息
# send message
WECHAT_MSG_SEND_TEXT = 2 # 发送文本
WECHAT_MSG_SEND_AT = 3 # 发送群艾特
WECHAT_MSG_SEND_CARD = 4 # 分享好友名片
WECHAT_MSG_SEND_IMAGE = 5 # 发送图片
WECHAT_MSG_SEND_FILE = 6 # 发送文件
WECHAT_MSG_SEND_ARTICLE = 7 # 发送xml文章
WECHAT_MSG_SEND_APP = 8 # 发送小程序
# receive message
WECHAT_MSG_START_HOOK = 9 # 开启接收消息HOOK只支持socket监听
WECHAT_MSG_STOP_HOOK = 10 # 关闭接收消息HOOK
WECHAT_MSG_START_IMAGE_HOOK = 11 # 开启图片消息HOOK
WECHAT_MSG_STOP_IMAGE_HOOK = 12 # 关闭图片消息HOOK
WECHAT_MSG_START_VOICE_HOOK = 13 # 开启语音消息HOOK
WECHAT_MSG_STOP_VOICE_HOOK = 14 # 关闭语音消息HOOK
# contact
WECHAT_CONTACT_GET_LIST = 15 # 获取联系人列表
WECHAT_CONTACT_CHECK_STATUS = 16 # 检查是否被好友删除
WECHAT_CONTACT_DEL = 17 # 删除好友
WECHAT_CONTACT_SEARCH_BY_CACHE = 18 # 从内存中获取好友信息
WECHAT_CONTACT_SEARCH_BY_NET = 19 # 网络搜索用户信息
WECHAT_CONTACT_ADD_BY_WXID = 20 # wxid加好友
WECHAT_CONTACT_ADD_BY_V3 = 21 # v3数据加好友
WECHAT_CONTACT_ADD_BY_PUBLIC_ID = 22 # 关注公众号
WECHAT_CONTACT_VERIFY_APPLY = 23 # 通过好友请求
WECHAT_CONTACT_EDIT_REMARK = 24 # 修改备注
# chatroom
WECHAT_CHATROOM_GET_MEMBER_LIST = 25 # 获取群成员列表
WECHAT_CHATROOM_GET_MEMBER_NICKNAME = 26 # 获取指定群成员昵称
WECHAT_CHATROOM_DEL_MEMBER = 27 # 删除群成员
WECHAT_CHATROOM_ADD_MEMBER = 28 # 添加群成员
WECHAT_CHATROOM_SET_ANNOUNCEMENT = 29 # 设置群公告
WECHAT_CHATROOM_SET_CHATROOM_NAME = 30 # 设置群聊名称
WECHAT_CHATROOM_SET_SELF_NICKNAME = 31 # 设置群内个人昵称
# database
WECHAT_DATABASE_GET_HANDLES = 32 # 获取数据库句柄
WECHAT_DATABASE_BACKUP = 33 # 备份数据库
WECHAT_DATABASE_QUERY = 34 # 数据库查询
# version
WECHAT_SET_VERSION = 35 # 修改微信版本号
# log
WECHAT_LOG_START_HOOK = 36 # 开启日志信息HOOK
WECHAT_LOG_STOP_HOOK = 37 # 关闭日志信息HOOK
# browser
WECHAT_BROWSER_OPEN_WITH_URL = 38 # 打开微信内置浏览器
WECHAT_GET_PUBLIC_MSG = 39 # 获取公众号历史消息
WECHAT_MSG_FORWARD_MESSAGE = 40 # 转发消息
WECHAT_GET_QRCODE_IMAGE = 41 # 获取二维码
APIS = WECHAT_HTTP_APIS
# http api 参数模板
class WECHAT_HTTP_API_PARAM_TEMPLATES:
__HTTP_API_PARAM_TEMPLATE = {
# login check
APIS.WECHAT_IS_LOGIN: {},
# self info
APIS.WECHAT_GET_SELF_INFO: {},
# send message
APIS.WECHAT_MSG_SEND_TEXT: {"wxid": "",
"msg": ""},
# wxids需要以`,`分隔,例如`wxid1,wxid2,wxid3`
APIS.WECHAT_MSG_SEND_AT: {"chatroom_id":"",
"wxids": "",
"msg": "",
"auto_nickname": 1},
APIS.WECHAT_MSG_SEND_CARD: {"receiver":"",
"shared_wxid":"",
"nickname":""},
APIS.WECHAT_MSG_SEND_IMAGE: {"receiver":"",
"img_path":""},
APIS.WECHAT_MSG_SEND_FILE: {"receiver":"",
"file_path":""},
APIS.WECHAT_MSG_SEND_ARTICLE: {"wxid":"",
"title":"",
"abstract":"",
"url":"",
"img_path":""},
APIS.WECHAT_MSG_SEND_APP: {"wxid":"",
"appid":""},
# receive message
APIS.WECHAT_MSG_START_HOOK: {"port": 10808},
APIS.WECHAT_MSG_STOP_HOOK: {},
APIS.WECHAT_MSG_START_IMAGE_HOOK: {"save_path":""},
APIS.WECHAT_MSG_STOP_IMAGE_HOOK: {},
APIS.WECHAT_MSG_START_VOICE_HOOK: {"save_path":""},
APIS.WECHAT_MSG_STOP_VOICE_HOOK: {},
# contact
APIS.WECHAT_CONTACT_GET_LIST: {},
APIS.WECHAT_CONTACT_CHECK_STATUS: {"wxid":""},
APIS.WECHAT_CONTACT_DEL: {"wxid":""},
APIS.WECHAT_CONTACT_SEARCH_BY_CACHE: {"wxid":""},
APIS.WECHAT_CONTACT_SEARCH_BY_NET: {"keyword":""},
APIS.WECHAT_CONTACT_ADD_BY_WXID: {"wxid":"",
"msg":""},
APIS.WECHAT_CONTACT_ADD_BY_V3: {"v3":"",
"msg":"",
"add_type": 0x6},
APIS.WECHAT_CONTACT_ADD_BY_PUBLIC_ID: {"public_id":""},
APIS.WECHAT_CONTACT_VERIFY_APPLY: {"v3":"",
"v4":""},
APIS.WECHAT_CONTACT_EDIT_REMARK: {"wxid":"",
"remark":""},
# chatroom
APIS.WECHAT_CHATROOM_GET_MEMBER_LIST: {"chatroom_id":""},
APIS.WECHAT_CHATROOM_GET_MEMBER_NICKNAME: {"chatroom_id":"",
"wxid":""},
# wxids需要以`,`分隔,例如`wxid1,wxid2,wxid3`
APIS.WECHAT_CHATROOM_DEL_MEMBER: {"chatroom_id":"",
"wxids":""},
# wxids需要以`,`分隔,例如`wxid1,wxid2,wxid3`
APIS.WECHAT_CHATROOM_ADD_MEMBER: {"chatroom_id":"",
"wxids":""},
APIS.WECHAT_CHATROOM_SET_ANNOUNCEMENT: {"chatroom_id":"",
"announcement":""},
APIS.WECHAT_CHATROOM_SET_CHATROOM_NAME: {"chatroom_id":"",
"chatroom_name":""},
APIS.WECHAT_CHATROOM_SET_SELF_NICKNAME: {"chatroom_id":"",
"nickname":""},
# database
APIS.WECHAT_DATABASE_GET_HANDLES: {},
APIS.WECHAT_DATABASE_BACKUP: {"db_handle":0,
"save_path":""},
APIS.WECHAT_DATABASE_QUERY: {"db_handle":0,
"sql":""},
# version
APIS.WECHAT_SET_VERSION: {"version": "3.7.0.30"},
# log
APIS.WECHAT_LOG_START_HOOK: {},
APIS.WECHAT_LOG_STOP_HOOK: {},
# browser
APIS.WECHAT_BROWSER_OPEN_WITH_URL: {"url": "https://www.baidu.com/"},
APIS.WECHAT_GET_PUBLIC_MSG: {"public_id": "","offset": ""},
APIS.WECHAT_MSG_FORWARD_MESSAGE: {"wxid": "filehelper","msgid": 2 ** 64 - 1},
APIS.WECHAT_GET_QRCODE_IMAGE: {}
}
def get_http_template(self, api_number):
try:
return copy.deepcopy(self.__HTTP_API_PARAM_TEMPLATE[api_number])
except KeyError:
raise ValueError("There is no interface numbered %s." % api_number)
get_http_template = WECHAT_HTTP_API_PARAM_TEMPLATES().get_http_template
class ReceiveMsgSocketServer(socketserver.BaseRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def handle(self):
conn = self.request
while True:
try:
ptr_data = b""
while True:
data = conn.recv(1024)
ptr_data += data
if len(data) == 0 or data[-1] == 0xA:
break
msg = json.loads(ptr_data.decode('utf-8'))
ReceiveMsgSocketServer.msg_callback(msg)
conn.sendall("200 OK".encode())
except OSError:
break
except json.JSONDecodeError:
pass
conn.close()
@staticmethod
def msg_callback(msg):
print(msg)
# TODO: 在这里写额外的消息处理逻辑
def post_wechat_http_api(api,port,data = {}):
url = "http://127.0.0.1:{}/api/?type={}".format(port,api)
resp = requests.post(url = url,data = json.dumps(data))
return resp.json()
def get_wechat_http_api(api,port,data = {}):
url = "http://127.0.0.1:{}/api/?type={}".format(port,api)
resp = requests.get(url = url,params = data)
return resp.json()
def get_wechat_pid_list() -> list:
import psutil
pid_list = []
process_list = psutil.pids()
for pid in process_list:
try:
if psutil.Process(pid).name() == 'WeChat.exe':
pid_list.append(pid)
except psutil.NoSuchProcess:
pass
return pid_list
def start_socket_server(port: int = 10808,
request_handler = ReceiveMsgSocketServer,
main_thread: bool = True) -> int or None:
ip_port = ("127.0.0.1", port)
try:
s = socketserver.ThreadingTCPServer(ip_port, request_handler)
if main_thread:
s.serve_forever()
else:
socket_server = threading.Thread(target=s.serve_forever)
socket_server.setDaemon(True)
socket_server.start()
return socket_server.ident
except KeyboardInterrupt:
pass
except Exception as e:
print(e)
return None
def test_send_msg(test_port):
post_wechat_http_api(APIS.WECHAT_LOG_START_HOOK,port = test_port)
if post_wechat_http_api(APIS.WECHAT_IS_LOGIN,port = test_port)["is_login"] == 1:
print(post_wechat_http_api(APIS.WECHAT_GET_SELF_INFO,port = test_port))
data = {"wxid":"filehelper","msg":"hello http"}
post_wechat_http_api(APIS.WECHAT_MSG_SEND_TEXT,data = data,port = test_port)
data = {"receiver":'filehelper',"shared_wxid":"filehelper","nickname":"文件传输助手"}
post_wechat_http_api(APIS.WECHAT_MSG_SEND_CARD,data = data,port = test_port)
data = {"receiver":'filehelper',"img_path":r"D:\VS2019C++\MyWeChatRobot\test\测试图片.png"}
post_wechat_http_api(APIS.WECHAT_MSG_SEND_IMAGE,data = data,port = test_port)
data = {"receiver":'filehelper',"file_path":r"D:\VS2019C++\MyWeChatRobot\test\测试文件"}
post_wechat_http_api(APIS.WECHAT_MSG_SEND_FILE,data = data,port = test_port)
data = {"wxid":'filehelper',
"title":"百度",
"abstract":"百度一下,你就知道",
"url":"https://www.baidu.com/",
"img_path":""}
post_wechat_http_api(APIS.WECHAT_MSG_SEND_ARTICLE,data = data,port = test_port)
print(post_wechat_http_api(APIS.WECHAT_CONTACT_GET_LIST,port = test_port))
data = {"wxid":"filehelper"}
print(post_wechat_http_api(APIS.WECHAT_CONTACT_CHECK_STATUS,data = data,port = test_port))
post_wechat_http_api(APIS.WECHAT_LOG_STOP_HOOK,port = test_port)
def test_get_public_msg(test_port,public_id):
import time
param = {"public_id": public_id,"offset": ""}
data = post_wechat_http_api(APIS.WECHAT_GET_PUBLIC_MSG,test_port,param)
msg_list = json.loads(data['msg'])['MsgList']
next_offset = msg_list['PagingInfo']['Offset']
for msg in msg_list['Msg']:
detail_info = msg['AppMsg']['DetailInfo']
for info in detail_info:
Title = info['Title']
Digest = info['Digest']
ContentUrl = info['ContentUrl']
post_wechat_http_api(APIS.WECHAT_BROWSER_OPEN_WITH_URL,
test_port,
{"url":ContentUrl}
)
time.sleep(3)
break
break
def test_get_chatroom_list_from_db(test_port):
dbs = post_wechat_http_api(APIS.WECHAT_DATABASE_GET_HANDLES,port = test_port)
db_handle = [i for i in dbs['data'] if i['db_name'] == 'MicroMsg.db'][0]['handle']
sql = "select UserName,Alias,EncryptUserName,Type,VerifyFlag,Remark,NickName,ChatRoomType,ExtraBuf from Contact where Type=2;"
data = {"db_handle":db_handle,"sql":sql}
res = post_wechat_http_api(APIS.WECHAT_DATABASE_QUERY,data = data,port = test_port)
return res['data']
if __name__ == '__main__':
port = 18888
# pids = get_wechat_pid_list()
# if len(pids) == 0:
# pids.append(new_wechat())
# start_listen(pids[0],port)
#post_wechat_http_api(APIS.WECHAT_LOG_START_HOOK,port)
print(post_wechat_http_api(APIS.WECHAT_GET_SELF_INFO, port))
# 获取最新版本微信版本号 @tom-snow
resp = requests.get(url = "https://api.github.com/repos/tom-snow/wechat-windows-versions/releases?per_page=1",params = {"per_page": 1})
latest_version = resp.json()[0]["tag_name"].replace("v","") # fixed by: @xixixi2000
print(post_wechat_http_api(APIS.WECHAT_SET_VERSION, port, {"version": latest_version})) # 修改成最新版本,防止更新
print(post_wechat_http_api(APIS.WECHAT_MSG_START_HOOK, port, {"port":10808}))
start_socket_server()
# stop_listen(pids[0])