mirror of
https://github.com/miloira/wxhook.git
synced 2024-11-22 10:29:25 +08:00
version 0.0.5
This commit is contained in:
parent
95b6af8b79
commit
4b481252f9
11
README.md
11
README.md
@ -60,12 +60,13 @@ pip install wxhook
|
|||||||
```python
|
```python
|
||||||
# import os
|
# import os
|
||||||
# os.environ["WXHOOK_LOG_LEVEL"] = "INFO" # 修改日志输出级别
|
# os.environ["WXHOOK_LOG_LEVEL"] = "INFO" # 修改日志输出级别
|
||||||
|
# os.environ["WXHOOK_LOG_FORMAT"] = "INFO" # 修改日志输出格式
|
||||||
from wxhook import Bot
|
from wxhook import Bot
|
||||||
from wxhook import events
|
from wxhook import events
|
||||||
from wxhook.model import Event
|
from wxhook.model import Event
|
||||||
|
|
||||||
|
|
||||||
def on_login(bot: Bot):
|
def on_login(bot: Bot, event: Event):
|
||||||
print("登录成功之后会触发这个函数")
|
print("登录成功之后会触发这个函数")
|
||||||
|
|
||||||
|
|
||||||
@ -115,11 +116,9 @@ from wxhook import Bot
|
|||||||
from wxhook import events
|
from wxhook import events
|
||||||
from wxhook.model import Event
|
from wxhook.model import Event
|
||||||
|
|
||||||
# faked_version="3.9.10.19"解除微信低版本登录限制
|
|
||||||
bot = Bot()
|
bot = Bot()
|
||||||
|
|
||||||
msgid_list = []
|
msg_id_list = []
|
||||||
|
|
||||||
|
|
||||||
@bot.handle(events.TEXT_MESSAGE)
|
@bot.handle(events.TEXT_MESSAGE)
|
||||||
def on_text_message(bot: Bot, event: Event):
|
def on_text_message(bot: Bot, event: Event):
|
||||||
@ -164,9 +163,9 @@ def on_text_message(bot: Bot, event: Event):
|
|||||||
bot.send_pat(sender, sender)
|
bot.send_pat(sender, sender)
|
||||||
elif content.find("置顶消息") != -1:
|
elif content.find("置顶消息") != -1:
|
||||||
bot.top_msg(msg_id)
|
bot.top_msg(msg_id)
|
||||||
msgid_list.append(msg_id)
|
msg_id_list.append(msg_id)
|
||||||
elif content.find("取消置顶的消息") != -1:
|
elif content.find("取消置顶的消息") != -1:
|
||||||
bot.remove_top_msg(sender, msgid_list.pop())
|
bot.remove_top_msg(sender, msg_id_list.pop())
|
||||||
elif content.find("获取联系人列表") != -1:
|
elif content.find("获取联系人列表") != -1:
|
||||||
bot.send_text(sender, json.dumps(bot.get_contacts()))
|
bot.send_text(sender, json.dumps(bot.get_contacts()))
|
||||||
elif content.find("获取联系人详情") != -1:
|
elif content.find("获取联系人详情") != -1:
|
||||||
|
2
setup.py
2
setup.py
@ -18,7 +18,7 @@ URL = 'https://github.com/miloira/wxhook'
|
|||||||
EMAIL = '690126048@qq.com'
|
EMAIL = '690126048@qq.com'
|
||||||
AUTHOR = 'Msky'
|
AUTHOR = 'Msky'
|
||||||
REQUIRES_PYTHON = '>=3.8.0'
|
REQUIRES_PYTHON = '>=3.8.0'
|
||||||
VERSION = '0.0.4'
|
VERSION = '0.0.5'
|
||||||
|
|
||||||
# What packages are required for this module to be executed?
|
# What packages are required for this module to be executed?
|
||||||
REQUIRED = [
|
REQUIRED = [
|
||||||
|
@ -1,3 +1,3 @@
|
|||||||
from .core import Bot
|
from .core import Bot
|
||||||
|
|
||||||
version = "0.0.4"
|
version = "0.0.5"
|
||||||
|
@ -9,8 +9,8 @@ import psutil
|
|||||||
import pyee
|
import pyee
|
||||||
import requests
|
import requests
|
||||||
|
|
||||||
from .events import ALL_MESSAGE
|
|
||||||
from .logger import logger
|
from .logger import logger
|
||||||
|
from .events import ALL_MESSAGE
|
||||||
from .model import Event, Account, Contact, ContactDetail, Room, RoomMembers, Table, DB, Response
|
from .model import Event, Account, Contact, ContactDetail, Room, RoomMembers, Table, DB, Response
|
||||||
from .utils import WeChatManager, start_wechat_with_inject, fake_wechat_version, get_pid, parse_event
|
from .utils import WeChatManager, start_wechat_with_inject, fake_wechat_version, get_pid, parse_event
|
||||||
|
|
||||||
@ -429,7 +429,7 @@ class Bot:
|
|||||||
def info(self) -> Account:
|
def info(self) -> Account:
|
||||||
return self.get_self_info()
|
return self.get_self_info()
|
||||||
|
|
||||||
def on_event(self, raw_data: bytes):
|
def on_event(self, raw_data: bytes) -> None:
|
||||||
try:
|
try:
|
||||||
data = json.loads(raw_data)
|
data = json.loads(raw_data)
|
||||||
event = Event(**parse_event(data))
|
event = Event(**parse_event(data))
|
||||||
@ -443,7 +443,7 @@ class Bot:
|
|||||||
logger.error(traceback.format_exc())
|
logger.error(traceback.format_exc())
|
||||||
logger.error(raw_data)
|
logger.error(raw_data)
|
||||||
|
|
||||||
def handle(self, events: typing.Union[list[str], str, None] = None, once: bool = False):
|
def handle(self, events: typing.Union[list[str], str, None] = None, once: bool = False) -> typing.Callable[[typing.Callable], None]:
|
||||||
def wrapper(func):
|
def wrapper(func):
|
||||||
listen = self.event_emitter.on if not once else self.event_emitter.once
|
listen = self.event_emitter.on if not once else self.event_emitter.once
|
||||||
if not events:
|
if not events:
|
||||||
@ -454,11 +454,11 @@ class Bot:
|
|||||||
|
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
def exit(self):
|
def exit(self) -> None:
|
||||||
self.call_hook_func(self.on_stop, self)
|
self.call_hook_func(self.on_stop, self)
|
||||||
self.process.terminate()
|
self.process.terminate()
|
||||||
|
|
||||||
def run(self):
|
def run(self) -> None:
|
||||||
try:
|
try:
|
||||||
server = socketserver.ThreadingTCPServer((self.server_host, self.server_port), RequestHandler)
|
server = socketserver.ThreadingTCPServer((self.server_host, self.server_port), RequestHandler)
|
||||||
server.bot = self
|
server.bot = self
|
||||||
|
@ -6,6 +6,6 @@ from loguru import logger
|
|||||||
logger.remove()
|
logger.remove()
|
||||||
logger.add(
|
logger.add(
|
||||||
sink=sys.stdout,
|
sink=sys.stdout,
|
||||||
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level}</level> | <level>{message}</level>",
|
format=os.environ.get("WXHOOK_LOG_FORMAT", "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level}</level> | <level>{message}</level>"),
|
||||||
level=os.environ.get("WXHOOK_LOG_LEVEL", "DEBUG")
|
level=os.environ.get("WXHOOK_LOG_LEVEL", "DEBUG")
|
||||||
)
|
)
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
|
import typing
|
||||||
import pathlib
|
import pathlib
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
@ -13,18 +14,18 @@ START_WECHAT = TOOLS / "start-wechat.exe"
|
|||||||
FAKER = TOOLS / "faker.exe"
|
FAKER = TOOLS / "faker.exe"
|
||||||
|
|
||||||
|
|
||||||
def start_wechat_with_inject(port: int):
|
def start_wechat_with_inject(port: int) -> typing.Tuple[int, str]:
|
||||||
result = subprocess.run(f"{START_WECHAT} {DLL} {port}", capture_output=True, text=True)
|
result = subprocess.run(f"{START_WECHAT} {DLL} {port}", capture_output=True, text=True)
|
||||||
code, output = result.stdout.split(",")
|
code, output = result.stdout.split(",")
|
||||||
return int(code), output
|
return int(code), output
|
||||||
|
|
||||||
|
|
||||||
def fake_wechat_version(pid: int, old_version: str, new_version: str):
|
def fake_wechat_version(pid: int, old_version: str, new_version: str) -> int:
|
||||||
result = subprocess.run(f"{FAKER} {pid} {old_version} {new_version}", capture_output=True, text=True)
|
result = subprocess.run(f"{FAKER} {pid} {old_version} {new_version}", capture_output=True, text=True)
|
||||||
return int(result.stdout)
|
return int(result.stdout)
|
||||||
|
|
||||||
|
|
||||||
def get_processes(process_name: str):
|
def get_processes(process_name: str) -> list[psutil.Process]:
|
||||||
processes = []
|
processes = []
|
||||||
for process in psutil.process_iter():
|
for process in psutil.process_iter():
|
||||||
if process.name().lower() == process_name.lower():
|
if process.name().lower() == process_name.lower():
|
||||||
@ -32,16 +33,16 @@ def get_processes(process_name: str):
|
|||||||
return processes
|
return processes
|
||||||
|
|
||||||
|
|
||||||
def get_pid(port: int):
|
def get_pid(port: int) -> int:
|
||||||
output = subprocess.run(f"netstat -ano | findStr \"{port}\"", capture_output=True, text=True, shell=True).stdout
|
output = subprocess.run(f"netstat -ano | findStr \"{port}\"", capture_output=True, text=True, shell=True).stdout
|
||||||
return int(output.split("\n")[0].split("LISTENING")[-1])
|
return int(output.split("\n")[0].split("LISTENING")[-1])
|
||||||
|
|
||||||
|
|
||||||
def parse_xml(xml: str):
|
def parse_xml(xml: str) -> dict:
|
||||||
return xmltodict.parse(xml)
|
return xmltodict.parse(xml)
|
||||||
|
|
||||||
|
|
||||||
def parse_event(event: dict, fields=None):
|
def parse_event(event: dict, fields=None) -> dict:
|
||||||
for field in fields or ["content", "signature"]:
|
for field in fields or ["content", "signature"]:
|
||||||
try:
|
try:
|
||||||
if field in event:
|
if field in event:
|
||||||
@ -63,23 +64,23 @@ class WeChatManager:
|
|||||||
else:
|
else:
|
||||||
self.clean()
|
self.clean()
|
||||||
|
|
||||||
def init_file(self):
|
def init_file(self) -> None:
|
||||||
with open(self.filename, "w", encoding="utf-8") as file:
|
with open(self.filename, "w", encoding="utf-8") as file:
|
||||||
json.dump({
|
json.dump({
|
||||||
"increase_remote_port": 19000,
|
"increase_remote_port": 19000,
|
||||||
"wechat": []
|
"wechat": []
|
||||||
}, file)
|
}, file)
|
||||||
|
|
||||||
def read(self):
|
def read(self) -> dict:
|
||||||
with open(self.filename, "r", encoding="utf-8") as file:
|
with open(self.filename, "r", encoding="utf-8") as file:
|
||||||
data = json.load(file)
|
data = json.load(file)
|
||||||
return data
|
return data
|
||||||
|
|
||||||
def write(self, data: dict):
|
def write(self, data: dict) -> None:
|
||||||
with open(self.filename, "w", encoding="utf-8") as file:
|
with open(self.filename, "w", encoding="utf-8") as file:
|
||||||
json.dump(data, file)
|
json.dump(data, file)
|
||||||
|
|
||||||
def refresh(self, pid_list: list[int]):
|
def refresh(self, pid_list: list[int]) -> None:
|
||||||
data = self.read()
|
data = self.read()
|
||||||
cleaned_data = []
|
cleaned_data = []
|
||||||
remote_port_list = [19000]
|
remote_port_list = [19000]
|
||||||
@ -92,22 +93,22 @@ class WeChatManager:
|
|||||||
data["wechat"] = cleaned_data
|
data["wechat"] = cleaned_data
|
||||||
self.write(data)
|
self.write(data)
|
||||||
|
|
||||||
def clean(self):
|
def clean(self) -> None:
|
||||||
pid_list = [process.pid for process in get_processes("WeChat.exe")]
|
pid_list = [process.pid for process in get_processes("WeChat.exe")]
|
||||||
self.refresh(pid_list)
|
self.refresh(pid_list)
|
||||||
|
|
||||||
def get_remote_port(self):
|
def get_remote_port(self) -> int:
|
||||||
data = self.read()
|
data = self.read()
|
||||||
return data["increase_remote_port"] + 1
|
return data["increase_remote_port"] + 1
|
||||||
|
|
||||||
def get_listen_port(self, remote_port: int):
|
def get_listen_port(self, remote_port: int) -> int:
|
||||||
return 19000 - (remote_port - 19000)
|
return 19000 - (remote_port - 19000)
|
||||||
|
|
||||||
def get_port(self):
|
def get_port(self) -> typing.Tuple[int, int]:
|
||||||
remote_port = self.get_remote_port()
|
remote_port = self.get_remote_port()
|
||||||
return remote_port, self.get_listen_port(remote_port)
|
return remote_port, self.get_listen_port(remote_port)
|
||||||
|
|
||||||
def add(self, pid: int, remote_port: int, server_port: int):
|
def add(self, pid: int, remote_port: int, server_port: int) -> None:
|
||||||
data = self.read()
|
data = self.read()
|
||||||
data["increase_remote_port"] = remote_port
|
data["increase_remote_port"] = remote_port
|
||||||
data["wechat"].append({
|
data["wechat"].append({
|
||||||
|
Loading…
Reference in New Issue
Block a user