version 0.0.1

This commit is contained in:
张明明 2024-05-04 22:19:15 +08:00
commit 44bf457a5f
14 changed files with 1179 additions and 0 deletions

93
.gitignore vendored Normal file
View File

@ -0,0 +1,93 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# IPython Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# dotenv
.env
# virtualenv
venv/
ENV/
# Spyder project settings
.spyderproject
# Rope project settings
.ropeproject
*.npy
*.pkl
.idea

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2017
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

2
MANIFEST.in Normal file
View File

@ -0,0 +1,2 @@
include README.md LICENSE
include wxhook/tools/*

195
README.md Normal file
View File

@ -0,0 +1,195 @@
# WxHook
## 简介
WxHook是一个基于dll注入实现的python微信机器人框架支持多种接口、高扩展性、多线程事件高并发让你轻松应对海量消息为你的需求实现提供便捷灵活的支持。
支持的接口
1. hook同步消息
2. 取消hook同步消息
3. hook日志
4. 取消hook日志
5. 检查登录状态
6. 获取用户信息
7. 发送文本消息
8. 发送图片消息
9. 发送文件消息
10. 发送表情消息
11. 发送小程序消息
12. 发送群@消息
13. 发送拍一拍消息
14. 获取联系人列表
15. 获取联系人详情
16. 创建群聊
17. 退出群聊
18. 获取群详情
19. 获取群成员列表
20. 添加群成员
21. 删除群成员
22. 邀请群成员
23. 修改群成员昵称
24. 设置群置顶消息
25. 移除群置顶消息
26. 转发消息
27. 获取朋友圈首页
28. 获取朋友圈下一页
29. 收藏消息
30. 收藏图片
31. 下载附件
32. 转发公众号消息
33. 转发公众号消息通过消息ID
34. 解码图片
35. 获取语音通过消息ID
36. 图片文本识别
37. 获取数据库句柄
38. 执行SQL命令
39. 测试
## 微信版本下载
- [WeChatSetup3.9.5.81.exe](https://github.com/tom-snow/wechat-windows-versions/releases/download/v3.9.5.81/WeChatSetup-3.9.5.81.exe)
## 安装
```bash
pip install wxhook
```
## 使用示例
消息事件处理例子
```python
# import os
# os.environ["WXHOOK_LOG_LEVEL"] = "INFO" # 修改日志输出级别
import time
from wxhook import Bot
from wxhook import events
from wxhook.model import Event
def on_login(bot: Bot):
bot.send_text("filehelper", "登录成功之后会触发这个函数")
def on_start(bot: Bot):
print("微信客户端打开之后会触发这个函数")
def on_stop(bot: Bot):
bot.send_text("filehelper", "关闭微信客户端之前会触发这个函数")
time.sleep(1) # 防止客户端关闭太快导致消息发送失败
bot = Bot(
on_login=on_login,
on_start=on_start,
on_stop=on_stop
)
# 消息回调地址
# bot.set_webhook_url("http://127.0.0.1:8000")
@bot.handle(events.TEXT_MESSAGE, once=True)
def on_message(bot: Bot, event: Event):
bot.send_text("filehelper", "这条消息只会发送一次哦")
@bot.handle(events.TEXT_MESSAGE)
def on_message(bot: Bot, event: Event):
if event.fromUser != bot.info.wxid:
bot.send_text(event.fromUser, event.content)
@bot.handle([events.IMAGE_MESSAGE, events.EMOJI_MESSAGE, events.VIDEO_MESSAGE])
def on_message(bot: Bot, event: Event):
if event.fromUser != bot.info.wxid:
if event.type == events.IMAGE_MESSAGE:
bot.send_text(event.fromUser, "图片消息")
elif event.type == events.EMOJI_MESSAGE:
bot.send_text(event.fromUser, "表情消息")
elif event.type == events.VIDEO_MESSAGE:
bot.send_text(event.fromUser, "视频消息")
bot.run()
```
接口使用例子
```python
import os
import json
from wxhook import Bot
from wxhook import events
from wxhook.model import Event
# faked_version="3.9.10.19"解除微信低版本登录限制
bot = Bot()
msgid_list = []
@bot.handle(events.TEXT_MESSAGE)
def on_text_message(bot: Bot, event: Event):
self_id = bot.info.wxid
content = event.content
sender = event.fromUser
msg_id = event.msgId
if sender != self_id:
if content.find("发送文本") != -1:
bot.send_text(sender, "这是一条文本消息")
elif content.find("发送图片") != -1:
bot.send_image(sender, os.path.abspath("test.png"))
elif content.find("发送表情") != -1:
bot.send_emotion(sender, os.path.abspath("test.png"))
elif content.find("发送文件") != -1:
print(print(bot.send_file(sender, os.path.abspath("test.xlsx"))))
elif content.find("发送音频") != -1:
bot.send_file(sender, os.path.abspath("test.mp3"))
elif content.find("发送视频") != -1:
print(bot.send_file(sender, os.path.abspath("test.mp4")))
elif content.find("创建群聊") != -1:
bot.create_room(["wxid1", "wxid2"])
elif content.find("获取群成员列表") != -1:
print(bot.get_room_members("<chatroomid>"))
elif content.find("删除群成员") != -1:
bot.delete_room_member("<chatroomid>", ["<wxid>"])
elif content.find("添加群成员") != -1:
bot.add_room_member("<chatroomid>", ["<wxid>"])
elif content.find("邀请群成员") != -1:
bot.invite_room_member("<chatroomid>", ["<wxid>"])
elif content.find("修改在群聊中的昵称") != -1:
print(bot.modify_member_nickname(sender, "<self-wxid>", "测试机器人"))
elif content.find("退出群聊") != -1:
bot.quit_room("<chatroomid>")
elif content.find("at全体成员") != -1:
bot.send_room_at(sender, ["notify@all", "<wxid>"], "这是一条at全体成员的消息")
elif content.find("发送群at") != -1:
bot.send_room_at(sender, ["<wxid1>", "<wxid2>"], "这是一条at群成员的消息")
elif content.find("发送群聊拍一拍") != -1:
bot.send_pat(sender, "wxid_vqj81fdula0x22")
elif content.find("发送私聊拍一拍") != -1:
bot.send_pat(sender, sender)
elif content.find("置顶消息") != -1:
bot.top_msg(msg_id)
msgid_list.append(msg_id)
elif content.find("取消置顶的消息") != -1:
bot.remove_top_msg(sender, msgid_list.pop())
elif content.find("获取联系人列表") != -1:
bot.send_text(sender, json.dumps(bot.get_contacts()))
elif content.find("获取联系人详情") != -1:
bot.send_text(sender, json.dumps(bot.get_contact("<wxid>")))
elif content.find("获取群详情") != -1:
print(bot.get_room("<chatroomid>"))
elif content.find("收藏消息") != -1:
bot.collect_msg(msg_id)
elif content.find("收藏图片") != -1:
bot.collect_image(sender, os.path.abspath("test.png"))
elif content.find("ocr") != -1:
print(bot.ocr(os.path.abspath("test.png")))
bot.run()
```
QQ交流群:625920215

135
setup.py Normal file
View File

@ -0,0 +1,135 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Note: To use the 'upload' functionality of this file, you must:
# $ pipenv install twine --dev
import io
import os
import sys
from shutil import rmtree
from setuptools import find_packages, setup, Command
# Package meta-data.
NAME = 'wxhook'
DESCRIPTION = 'wechat robot framework.'
URL = 'https://github.com/miloira/wxhook'
EMAIL = '690126048@qq.com'
AUTHOR = 'Msky'
REQUIRES_PYTHON = '>=3.8.0'
VERSION = '0.0.1'
# What packages are required for this module to be executed?
REQUIRED = [
'loguru',
'psutil',
'pyee',
'requests',
'xmltodict'
]
# What packages are optional?
EXTRAS = {
# 'fancy feature': ['django'],
}
# The rest you shouldn't have to touch too much :)
# ------------------------------------------------
# Except, perhaps the License and Trove Classifiers!
# If you do change the License, remember to change the Trove Classifier for that!
here = os.path.abspath(os.path.dirname(__file__))
# Import the README and use it as the long-description.
# Note: this will only work if 'README.md' is present in your MANIFEST.in file!
try:
with io.open(os.path.join(here, 'README.md'), encoding='utf-8') as f:
long_description = '\n' + f.read()
except FileNotFoundError:
long_description = DESCRIPTION
# Load the package's __version__.py module as a dictionary.
about = {}
if not VERSION:
project_slug = NAME.lower().replace("-", "_").replace(" ", "_")
with open(os.path.join(here, project_slug, '__version__.py')) as f:
exec(f.read(), about)
else:
about['__version__'] = VERSION
class UploadCommand(Command):
"""Support setup.py upload."""
description = 'Build and publish the package.'
user_options = []
@staticmethod
def status(s):
"""Prints things in bold."""
print('\033[1m{0}\033[0m'.format(s))
def initialize_options(self):
pass
def finalize_options(self):
pass
def run(self):
try:
self.status('Removing previous builds…')
rmtree(os.path.join(here, 'dist'))
except OSError:
pass
self.status('Building Source and Wheel (universal) distribution…')
os.system('{0} setup.py sdist bdist_wheel --universal'.format(sys.executable))
self.status('Uploading the package to PyPI via Twine…')
os.system('twine upload dist/*')
self.status('Pushing git tags…')
os.system('git tag v{0}'.format(about['__version__']))
os.system('git push --tags')
sys.exit()
# Where the magic happens:
setup(
name=NAME,
version=about['__version__'],
description=DESCRIPTION,
long_description=long_description,
long_description_content_type='text/markdown',
author=AUTHOR,
author_email=EMAIL,
python_requires=REQUIRES_PYTHON,
url=URL,
packages=find_packages(exclude=["tests", "*.tests", "*.tests.*", "tests.*"]),
# If your package is a single module, use this instead of 'packages':
# py_modules=['mypackage'],
# entry_points={
# 'console_scripts': ['mycli=mymodule:cli'],
# },
install_requires=REQUIRED,
extras_require=EXTRAS,
include_package_data=True,
license='MIT',
classifiers=[
# Trove classifiers
# Full list: https://pypi.python.org/pypi?%3Aaction=list_classifiers
'License :: OSI Approved :: MIT License',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: Implementation :: CPython',
'Programming Language :: Python :: Implementation :: PyPy'
],
# $ setup.py publish support.
cmdclass={
'upload': UploadCommand,
},
)

3
wxhook/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from .core import Bot
version = "0.0.1"

461
wxhook/core.py Normal file
View File

@ -0,0 +1,461 @@
import os
import json
import typing
import psutil
import pyee
import traceback
import socketserver
import requests
from .events import ALL_MESSAGE, SYSTEM_MESSAGE
from .logger import logger
from .model import RawData, Event, Account, Contact, ContactDetail, Room, RoomMembers, Table, DB, Response
from .utils import WeChatManager, start_wechat_with_inject, fake_wechat_version, parse_event
class RequestHandler(socketserver.BaseRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def handle(self):
try:
data = b""
while True:
chunk = self.request.recv(1024)
data += chunk
if len(chunk) == 0 or chunk[-1] == 0xA:
break
bot = getattr(self.server, "bot")
bot.on_event(data)
self.request.sendall("200 OK".encode())
except Exception:
logger.error(traceback.format_exc())
finally:
self.request.close()
class Bot:
def __init__(
self,
on_login: typing.Callable = None,
on_before_message: typing.Callable = None,
on_after_message: typing.Callable = None,
on_start: typing.Callable = None,
on_stop: typing.Callable = None,
faked_version: typing.Union[str, None] = None
):
self.version = "3.9.5.81"
self.server_host = "127.0.0.1"
self.remote_host = "127.0.0.1"
self.on_start = on_start
self.on_login = on_login
self.on_before_message = on_before_message
self.on_after_message = on_after_message
self.on_stop = on_stop
self.faked_version = faked_version
self.event_emitter = pyee.EventEmitter()
self.wechat_manager = WeChatManager()
self.remote_port, self.server_port = self.wechat_manager.get_port()
self.BASE_URL = f"http://{self.remote_host}:{self.remote_port}"
self.webhook_url = None
self.info = None
self.DATA_SAVE_PATH = None
self.FILE_SAVE_PATH = None
self.IMAGE_SAVE_PATH = None
self.VIDEO_SAVE_PATH = None
code, output = start_wechat_with_inject(self.remote_port)
if code == 1:
raise Exception(output)
self.process = psutil.Process(int(output))
if self.faked_version is not None:
if fake_wechat_version(self.process.pid, self.version, faked_version) == 0:
logger.info(f"wechat version faked: {self.version} -> {faked_version}")
else:
logger.info(f"wechat version fake failed.")
self.wechat_manager.add(self.process.pid, self.remote_port, self.server_port)
self.call_hook_func(self.on_start, self)
self.handle(SYSTEM_MESSAGE, once=True)(self.init_bot)
self.hook_sync_msg(self.server_host, self.server_port)
@staticmethod
def call_hook_func(func: typing.Callable, *args, **kwargs) -> typing.Any:
if callable(func):
return func(*args, **kwargs)
def init_bot(self, bot: "Bot", event: Event) -> None:
if event.content["sysmsg"]["@type"] == "SafeModuleCfg":
bot.info = bot.get_self_info()
self.DATA_SAVE_PATH = bot.info.dataSavePath
self.FILE_SAVE_PATH = os.path.join(self.DATA_SAVE_PATH, "wxhelper/file")
self.IMAGE_SAVE_PATH = os.path.join(self.DATA_SAVE_PATH, "wxhelper/image")
self.VIDEO_SAVE_PATH = os.path.join(self.DATA_SAVE_PATH, "wxhelper/video")
logger.info(bot.info)
self.call_hook_func(self.on_login, bot)
def set_webhook_url(self, webhook_url: str) -> None:
self.webhook_url = webhook_url
def webhook(self, event: dict) -> None:
if self.webhook_url is not None:
try:
requests.post(self.webhook_url, json=event)
except Exception:
pass
def call_api(self, api: str, *args, **kwargs) -> dict:
return requests.request("POST", self.BASE_URL + api, *args, **kwargs).json()
def hook_sync_msg(
self,
ip: str,
port: int,
enable_http: int = 0,
url: str = "http://127.0.0.1:8000",
timeout: int = 30
) -> Response:
"""hook同步消息"""
data = {
"port": port,
"ip": ip,
"enableHttp": enable_http,
"url": url,
"timeout": timeout
}
return Response(**self.call_api("/api/hookSyncMsg", json=data))
def unhook_sync_msg(self) -> Response:
"""取消hook同步消息"""
return Response(**self.call_api("/api/unhookSyncMsg"))
def hook_log(self) -> Response:
"""hook日志"""
return Response(**self.call_api("/api/hookLog"))
def unhook_log(self) -> Response:
"""取消hook日志"""
return Response(**self.call_api("/api/unhookLog"))
def check_login(self) -> Response:
"""检查登录状态"""
return Response(**self.call_api("/api/checkLogin"))
def get_self_info(self) -> Account:
"""获取用户信息"""
return Account(**self.call_api("/api/userInfo")["data"])
def send_text(self, wxid: str, msg: str) -> Response:
"""发送文本消息"""
data = {
"wxid": wxid,
"msg": msg
}
return Response(**self.call_api("/api/sendTextMsg", json=data))
def send_image(self, wxid: str, image_path: str) -> Response:
"""发送图片消息"""
data = {
"wxid": wxid,
"imagePath": image_path
}
return Response(**self.call_api("/api/sendImagesMsg", json=data))
def send_emotion(self, wxid: str, file_path: str) -> Response:
"""发送表情消息"""
data = {
"wxid": wxid,
"filePath": file_path
}
return Response(**self.call_api("/api/sendCustomEmotion", json=data))
def send_file(self, wxid: str, file_path: str) -> Response:
"""发送文件消息"""
data = {
"wxid": wxid,
"filePath": file_path
}
return Response(**self.call_api("/api/sendFileMsg", json=data))
def send_applet(
self,
wxid: str,
waid_contact: str,
waid: str,
applet_wxid: str,
json_param: str,
head_img_url: str,
main_img: str,
index_page: str
) -> Response:
"""发送小程序消息"""
data = {
"wxid": wxid,
"waidConcat": waid_contact,
"waid": waid,
"appletWxid": applet_wxid,
"jsonParam": json_param,
"headImgUrl": head_img_url,
"mainImg": main_img,
"indexPage": index_page
}
return Response(**self.call_api("/api/sendApplet", json=data))
def send_room_at(self, room_id: str, wxids: list[str], msg: str) -> Response:
"""发送群@消息"""
data = {
"chatRoomId": room_id,
"wxids": ",".join(wxids),
"msg": msg
}
return Response(**self.call_api("/api/sendAtText", json=data))
def send_pat(self, room_id: str, wxid: str) -> Response:
"""发送拍一拍消息"""
data = {
"receiver": room_id,
"wxid": wxid
}
return Response(**self.call_api("/api/sendPatMsg", json=data))
def get_contacts(self) -> list[Contact]:
"""获取联系人列表"""
return [Contact(**item) for item in self.call_api("/api/getContactList")["data"]]
def get_contact(self, wxid: str) -> ContactDetail:
"""获取联系人详情"""
data = {
"wxid": wxid
}
return ContactDetail(self.call_api("/api/getContactProfile", json=data)["data"])
def create_room(self, member_ids: list[str]) -> Response:
"""创建群聊"""
data = {
"memberIds": ",".join(member_ids)
}
return Response(**self.call_api("/api/createChatRoom", json=data))
def quit_room(self, room_id: str) -> Response:
"""退出群聊"""
data = {
"chatRoomId": room_id
}
return Response(**self.call_api("/api/quitChatRoom", json=data))
def get_room(self, room_id: str) -> Room:
"""获取群详情"""
data = {
"chatRoomId": room_id
}
return Room(**self.call_api("/api/getChatRoomDetailInfo", json=data)["data"])
def get_room_members(self, room_id: str) -> RoomMembers:
"""获取群成员列表"""
data = {
"chatRoomId": room_id
}
return RoomMembers(**self.call_api("/api/getMemberFromChatRoom", json=data)["data"])
def add_room_member(self, room_id: str, member_ids: list[str]) -> Response:
"""添加群成员"""
data = {
"chatRoomId": room_id,
"memberIds": ",".join(member_ids)
}
return Response(**self.call_api("/api/addMemberToChatRoom", json=data))
def delete_room_member(self, room_id: str, member_ids: list[str]) -> Response:
"""删除群成员"""
data = {
"chatRoomId": room_id,
"memberIds": ",".join(member_ids)
}
return Response(**self.call_api("/api/delMemberFromChatRoom", json=data))
def invite_room_member(self, room_id: str, member_ids: list[str]) -> Response:
"""邀请群成员"""
data = {
"chatRoomId": room_id,
"memberIds": ",".join(member_ids)
}
return Response(**self.call_api("/api/InviteMemberToChatRoom", json=data))
def modify_member_nickname(self, room_id: str, wxid: str, nickname: str) -> Response:
"""修改群成员昵称"""
data = {
"chatRoomId": room_id,
"wxid": wxid,
"nickName": nickname
}
return Response(**self.call_api("/api/modifyNickname", json=data))
def top_msg(self, msg_id: int) -> Response:
"""设置群置顶消息"""
data = {
"msgId": msg_id
}
return Response(**self.call_api("/api/topMsg", json=data))
def remove_top_msg(self, room_id: str, msg_id: int) -> Response:
"""移除群置顶消息"""
data = {
"chatRoomId": room_id,
"msgId": msg_id
}
return Response(**self.call_api("/api/removeTopMsg", json=data))
def forward_msg(self, msg_id: int, wxid: str) -> Response:
"""转发消息"""
data = {
"msgId": msg_id,
"wxid": wxid
}
return Response(**self.call_api("/api/forwardMsg", json=data))
def get_sns_first_page(self) -> Response:
"""获取朋友圈首页"""
return Response(**self.call_api("/api/getSNSFirstPage"))
def get_sns_next_page(self, sns_id: int) -> Response:
"""获取朋友圈下一页"""
data = {
"snsId": sns_id
}
return Response(**self.call_api("/api/getSNSNextPage", json=data))
def collect_msg(self, msg_id: int) -> Response:
"""收藏消息"""
data = {
"msgId": msg_id
}
return Response(**self.call_api("/api/addFavFromMsg", json=data))
def collect_image(self, wxid: str, image_path: str) -> Response:
"""收藏图片"""
data = {
"wxid": wxid,
"imagePath": image_path
}
return Response(**self.call_api("/api/addFavFromImage", json=data))
def download_attachment(self, msg_id: int) -> Response:
"""下载附件"""
data = {
"msgId": msg_id
}
return Response(**self.call_api("/api/downloadAttach", json=data))
def forward_public_msg(
self,
wxid: str,
app_name: str,
username: str,
title: str,
url: str,
thumb_url: str,
digest: str
) -> Response:
"""转发公众号消息"""
data = {
"wxid": wxid,
"appName": app_name,
"userName": username,
"title": title,
"url": url,
"thumbUrl": thumb_url,
"digest": digest,
}
return Response(**self.call_api("/api/forwardPublicMsg", json=data))
def forward_public_msg_by_msg_id(self, wxid: str, msg_id: int) -> Response:
"""转发公众号消息通过消息ID"""
data = {
"wxid": wxid,
"msg_id": msg_id
}
return Response(**self.call_api("/api/forwardPublicMsgByMsgId", json=data))
def decode_image(self, file_path: str, store_dir: str) -> Response:
"""解码图片"""
data = {
"filePath": file_path,
"storeDir": store_dir
}
return Response(**self.call_api("/api/decodeImage", json=data))
def get_voice_by_msg_id(self, msg_id: int, store_dir: str) -> Response:
"""获取语音通过消息ID"""
data = {
"msgId": msg_id,
"storeDir": store_dir
}
return Response(**self.call_api("/api/getVoiceByMsgId", json=data))
def ocr(self, image_path: str) -> Response:
"""图片文本识别"""
data = {
"imagePath": image_path
}
return Response(**self.call_api("/api/ocr", json=data))
def get_db_info(self) -> list[DB]:
"""获取数据库句柄"""
return [DB(databaseName=item["databaseName"], handle=item["handle"],
tables=[Table(**subitem) for subitem in item["tables"]]) for item in self.call_api("/api/getDBInfo")]
def exec_sql(self, db_handle: int, sql: str) -> Response:
"""执行SQL命令"""
data = {
"dbHandle": db_handle,
"sql": sql
}
return Response(**self.call_api("/api/execSql", json=data))
def test(self) -> Response:
"""测试"""
return Response(**self.call_api("/api/test"))
def on_event(self, raw_data: bytes):
try:
data = json.loads(raw_data)
event = Event(**parse_event(data), rawData=RawData(raw_data))
logger.debug(event)
self.call_hook_func(self.on_before_message, self, event)
self.event_emitter.emit(str(ALL_MESSAGE), self, event)
self.event_emitter.emit(str(event.type), self, event)
self.call_hook_func(self.on_after_message, self, event)
self.webhook(data)
except Exception:
logger.error(traceback.format_exc())
def handle(self, events: typing.Union[list[str], str, None] = None, once: bool = False):
def wrapper(func):
listen = self.event_emitter.on if not once else self.event_emitter.once
if not events:
listen(str(ALL_MESSAGE), func)
else:
for event in events if isinstance(events, list) else [events]:
listen(str(event), func)
return wrapper
def exit(self):
self.call_hook_func(self.on_stop, self)
self.process.terminate()
def run(self):
try:
server = socketserver.ThreadingTCPServer((self.server_host, self.server_port), RequestHandler)
server.bot = self
logger.info(f"{self.server_host}:{self.server_port}")
server.serve_forever()
except (KeyboardInterrupt, SystemExit):
self.exit()

28
wxhook/events.py Normal file
View File

@ -0,0 +1,28 @@
# 通知消息事件
NOTICE_MESSAGE = 10000
# 系统消息事件
SYSTEM_MESSAGE = 10002
# 全部消息事件
ALL_MESSAGE = 99999
# 文本消息事件
TEXT_MESSAGE = 1
# 图片消息事件
IMAGE_MESSAGE = 3
# 语音消息事件
VOICE_MESSAGE = 34
# 好有验证请求消息事件
FRIEND_VERIFY_MESSAGE = 37
# 卡片消息事件
CARD_MESSAGE = 42
# 视频消息事件
VIDEO_MESSAGE = 43
# 表情消息事件
EMOJI_MESSAGE = 47
# 位置消息事件
LOCATION_MESSAGE = 48
# xml消息事件
XML_MESSAGE = 49
# 视频/语音通话消息事件
VOIP_MESSAGE = 50
# 手机端同步消息事件
PHONE_MESSAGE = 51

11
wxhook/logger.py Normal file
View File

@ -0,0 +1,11 @@
import os
import sys
from loguru import logger
logger.remove()
logger.add(
sink=sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level}</level> | <level>{message}</level>",
level=os.environ.get("WXHOOK_LOG_LEVEL", "DEBUG")
)

117
wxhook/model.py Normal file
View File

@ -0,0 +1,117 @@
import typing
from dataclasses import dataclass
from typing import List
@dataclass
class Account:
"""用户"""
account: str # 账号名
city: str # 所在城市
country: str # 所在国家代码
currentDataPath: str # 当前数据路径通常指向用户的WeChat文件夹
dataSavePath: str # 数据保存路径通常指向用户的WeChat文件夹
dbKey: str # 数据库密钥,用于加密本地数据库
headImage: str # 头像图片URL
mobile: str # 手机号码
name: str # 昵称
province: str # 所在省份
signature: str # 用户个性签名
wxid: str # 微信ID
@dataclass
class Contact:
"""联系人"""
customAccount: str # 用户自定义的账号
encryptName: str # 加密名称,如果有的话
nickname: str # 用户的昵称
pinyin: str # 用户昵称的拼音首字母
pinyinAll: str # 用户昵称的完整拼音
reserved1: int # 预留字段1具体用途未知
reserved2: int # 预留字段2具体用途未知
type: int # 联系人类型
verifyFlag: int # 验证标志,用于表示用户的验证状态
wxid: str # 用户的微信ID
@dataclass
class ContactDetail:
"""联系人详情"""
account: str # 用户账号,如果未设置则为空字符串
headImage: str # 用户的头像图片URL如果未设置则为空字符串
nickname: str # 用户昵称
v3: str # 用户的V3信息通常用于加密或验证可能包含特定的加密字符串
wxid: str # 用户的微信ID
@dataclass
class Room:
"""群聊"""
admin: str # 管理员的用户ID如果没有管理员则为空字符串
chatRoomId: str # 聊天室ID如果没有指定聊天室则为空字符串
notice: str # 聊天室公告内容,如果没有设置公告则为空字符串
xml: str # 聊天室相关的XML信息通常包含聊天室的详细配置信息如果没有则为空字符串
@dataclass
class RoomMembers:
"""群成员"""
admin: str # 聊天室管理员的微信ID
adminNickname: str # 聊天室管理员的昵称
chatRoomId: str # 聊天室的ID
memberNickname: str # 正在提及的成员昵称,可能包含特殊字符作为昵称的一部分
members: str # 聊天室成员的微信ID列表各ID之间使用特定字符分隔
@dataclass
class RawData:
"""原始数据"""
data: bytes
def __repr__(self):
return "<RawData>"
__str__ = __repr__
@dataclass
class Event:
"""消息事件"""
content: typing.Any # 消息内容可能包含用户ID和冒号之后的文本内容
createTime: int # 消息创建时间的UNIX时间戳
displayFullContent: str # 完整的消息内容,如果有的话
fromUser: str # 发送消息的用户或群组ID
msgId: int # 消息的唯一标识符
msgSequence: int # 消息序列号
pid: int # 消息的PID
signature: str # 消息签名,包含一系列的配置信息
toUser: str # 消息接收者的用户ID
type: int # 消息类型
rawData: RawData # 原始数据
base64Img: typing.Union[str, None] = None # 图片base64
@dataclass
class Table:
"""表结构"""
name: str # 任务名称
rootpage: str # 根页面
sql: str # SQL 创建表的语句
tableName: str # 表名称
@dataclass
class DB:
"""数据库"""
databaseName: str # 数据库名称
handle: int # 句柄
tables: List[Table] # 表列表
@dataclass
class Response:
"""响应"""
code: int # 状态码,例如 200
data: dict # 用户数据,当前为空对象
msg: str # 响应消息,例如 "success"

BIN
wxhook/tools/faker.exe Normal file

Binary file not shown.

Binary file not shown.

BIN
wxhook/tools/wxhook.dll Normal file

Binary file not shown.

113
wxhook/utils.py Normal file
View File

@ -0,0 +1,113 @@
import os
import json
import pathlib
import subprocess
import psutil
import xmltodict
BASE_DIR = pathlib.Path(__file__).resolve().parent
TOOLS = BASE_DIR / "tools"
DLL = TOOLS / "wxhook.dll"
START_WECHAT = TOOLS / "start-wechat.exe"
FAKER = TOOLS / "faker.exe"
def start_wechat_with_inject(port: int):
result = subprocess.run(f"{START_WECHAT} {DLL} {port}", capture_output=True, text=True)
code, output = result.stdout.split(",")
return int(code), output
def fake_wechat_version(pid: int, old_version: str, new_version: str):
result = subprocess.run(f"{FAKER} {pid} {old_version} {new_version}", capture_output=True, text=True)
return int(result.stdout)
def get_processes(process_name):
processes = []
for process in psutil.process_iter():
if process.name().lower() == process_name.lower():
processes.append(process)
return processes
def parse_xml(xml):
return xmltodict.parse(xml)
def parse_event(event, fields=None):
for field in fields or ["content", "signature"]:
try:
if field in event:
event[field] = parse_xml(event[field])
except Exception:
pass
return event
class WeChatManager:
def __init__(self):
# remote port: 19001 ~ 37999
# socket port: 18999 ~ 1
# http port: 38999 ~ 57997
self.filename = BASE_DIR / "tools" / "wxhook.json"
if not os.path.exists(self.filename):
self.init_file()
else:
self.clean()
def init_file(self):
with open(self.filename, "w", encoding="utf-8") as file:
json.dump({
"increase_remote_port": 19000,
"wechat": []
}, file)
def read(self):
with open(self.filename, "r", encoding="utf-8") as file:
data = json.load(file)
return data
def write(self, data):
with open(self.filename, "w", encoding="utf-8") as file:
json.dump(data, file)
def refresh(self, pid_list):
data = self.read()
cleaned_data = []
remote_port_list = [19000]
for item in data["wechat"]:
if item["pid"] in pid_list:
remote_port_list.append(item["remote_port"])
cleaned_data.append(item)
data["increase_remote_port"] = max(remote_port_list)
data["wechat"] = cleaned_data
self.write(data)
def clean(self):
pid_list = [process.pid for process in get_processes("WeChat.exe")]
self.refresh(pid_list)
def get_remote_port(self):
data = self.read()
return data["increase_remote_port"] + 1
def get_listen_port(self, remote_port):
return 19000 - (remote_port - 19000)
def get_port(self):
remote_port = self.get_remote_port()
return remote_port, self.get_listen_port(remote_port)
def add(self, pid, remote_port, server_port):
data = self.read()
data["increase_remote_port"] = remote_port
data["wechat"].append({
"pid": pid,
"remote_port": remote_port,
"server_port": server_port
})
self.write(data)