🎉 我觉得我搞完了

This commit is contained in:
李寻欢 2025-04-17 15:00:18 +08:00
parent 47fe5cead3
commit 1ed4ba6c08
14 changed files with 0 additions and 10228 deletions

View File

@ -1,49 +0,0 @@
from WechatAPI.errors import *
from .base import WechatAPIClientBase, Proxy, Section
from .chatroom import ChatroomMixin
from .friend import FriendMixin
from .hongbao import HongBaoMixin
from .login import LoginMixin
from .message import MessageMixin
from .protect import protector
from .protect import protector
from .tool import ToolMixin
from .user import UserMixin
class WechatAPIClient(LoginMixin, MessageMixin, FriendMixin, ChatroomMixin, UserMixin,
ToolMixin, HongBaoMixin):
# 这里都是需要结合多个功能的方法
async def send_at_message(self, wxid: str, content: str, at: list[str]) -> tuple[int, int, int]:
"""发送@消息
Args:
wxid (str): 接收人
content (str): 消息内容
at (list[str]): @的用户ID列表
Returns:
tuple[int, int, int]: 包含以下三个值的元组:
- ClientMsgid (int): 客户端消息ID
- CreateTime (int): 创建时间
- NewMsgId (int): 新消息ID
Raises:
UserLoggedOut: 用户未登录时抛出
BanProtection: 新设备登录4小时内操作时抛出
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
elif not self.ignore_protect and protector.check(14400):
raise BanProtection("风控保护: 新设备登录后4小时内请挂机")
output = ""
for id in at:
nickname = await self.get_nickname(id)
output += f"@{nickname}\u2005"
output += content
return await self.send_text_message(wxid, output, at)

View File

@ -1,106 +0,0 @@
from dataclasses import dataclass
from WechatAPI.errors import *
@dataclass
class Proxy:
"""代理(无效果,别用!)
Args:
ip (str): 代理服务器IP地址
port (int): 代理服务器端口
username (str, optional): 代理认证用户名. 默认为空字符串
password (str, optional): 代理认证密码. 默认为空字符串
"""
ip: str
port: int
username: str = ""
password: str = ""
@dataclass
class Section:
"""数据段配置类
Args:
data_len (int): 数据长度
start_pos (int): 起始位置
"""
data_len: int
start_pos: int
class WechatAPIClientBase:
"""微信API客户端基类
Args:
ip (str): 服务器IP地址
port (int): 服务器端口
Attributes:
wxid (str): 微信ID
nickname (str): 昵称
alias (str): 别名
phone (str): 手机号
ignore_protect (bool): 是否忽略保护机制
"""
def __init__(self, ip: str, port: int):
self.ip = ip
self.port = port
self.wxid = ""
self.nickname = ""
self.alias = ""
self.phone = ""
self.ignore_protect = False
# 调用所有 Mixin 的初始化方法
super().__init__()
@staticmethod
def error_handler(json_resp):
"""处理API响应中的错误码
Args:
json_resp (dict): API响应的JSON数据
Raises:
ValueError: 参数错误时抛出
MarshallingError: 序列化错误时抛出
UnmarshallingError: 反序列化错误时抛出
MMTLSError: MMTLS初始化错误时抛出
PacketError: 数据包长度错误时抛出
UserLoggedOut: 用户已退出登录时抛出
ParsePacketError: 解析数据包错误时抛出
DatabaseError: 数据库错误时抛出
Exception: 其他类型错误时抛出
"""
code = json_resp.get("Code")
if code == -1: # 参数错误
raise ValueError(json_resp.get("Message"))
elif code == -2: # 其他错误
raise Exception(json_resp.get("Message"))
elif code == -3: # 序列化错误
raise MarshallingError(json_resp.get("Message"))
elif code == -4: # 反序列化错误
raise UnmarshallingError(json_resp.get("Message"))
elif code == -5: # MMTLS初始化错误
raise MMTLSError(json_resp.get("Message"))
elif code == -6: # 收到的数据包长度错误
raise PacketError(json_resp.get("Message"))
elif code == -7: # 已退出登录
raise UserLoggedOut("Already logged out")
elif code == -8: # 链接过期
raise Exception(json_resp.get("Message"))
elif code == -9: # 解析数据包错误
raise ParsePacketError(json_resp.get("Message"))
elif code == -10: # 数据库错误
raise DatabaseError(json_resp.get("Message"))
elif code == -11: # 登陆异常
raise UserLoggedOut(json_resp.get("Message"))
elif code == -12: # 操作过于频繁
raise Exception(json_resp.get("Message"))
elif code == -13: # 上传失败
raise Exception(json_resp.get("Message"))

View File

@ -1,157 +0,0 @@
from typing import Union, Any
import aiohttp
from .base import *
from .protect import protector
from ..errors import *
class ChatroomMixin(WechatAPIClientBase):
async def add_chatroom_member(self, chatroom: str, wxid: str) -> bool:
"""添加群成员(群聊最多40人)
Args:
chatroom: 群聊wxid
wxid: 要添加的wxid
Returns:
bool: 成功返回True, 失败False或者报错
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
elif not self.ignore_protect and protector.check(14400):
raise BanProtection("风控保护: 新设备登录后4小时内请挂机")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Chatroom": chatroom, "InviteWxids": wxid}
response = await session.post(f'http://{self.ip}:{self.port}/AddChatroomMember', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return True
else:
self.error_handler(json_resp)
async def get_chatroom_announce(self, chatroom: str) -> dict:
"""获取群聊公告
Args:
chatroom: 群聊id
Returns:
dict: 群聊信息字典
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Chatroom": chatroom}
response = await session.post(f'http://{self.ip}:{self.port}/GetChatroomInfo', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
data = dict(json_resp.get("Data"))
data.pop("BaseResponse")
return data
else:
self.error_handler(json_resp)
async def get_chatroom_info(self, chatroom: str) -> dict:
"""获取群聊信息
Args:
chatroom: 群聊id
Returns:
dict: 群聊信息字典
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
elif not self.ignore_protect and protector.check(14400):
raise BanProtection("风控保护: 新设备登录后4小时内请挂机")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Chatroom": chatroom}
response = await session.post(f'http://{self.ip}:{self.port}/GetChatroomInfoNoAnnounce', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data").get("ContactList")[0]
else:
self.error_handler(json_resp)
async def get_chatroom_member_list(self, chatroom: str) -> list[dict]:
"""获取群聊成员列表
Args:
chatroom: 群聊id
Returns:
list[dict]: 群聊成员列表
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Chatroom": chatroom}
response = await session.post(f'http://{self.ip}:{self.port}/GetChatroomMemberDetail', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data").get("NewChatroomData").get("ChatRoomMember")
else:
self.error_handler(json_resp)
async def get_chatroom_qrcode(self, chatroom: str) -> dict[str, Any]:
"""获取群聊二维码
Args:
chatroom: 群聊id
Returns:
dict: {"base64": 二维码的base64, "description": 二维码描述}
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
elif not self.ignore_protect and protector.check(86400):
raise BanProtection("获取二维码需要在登录后24小时才可使用")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Chatroom": chatroom}
response = await session.post(f'http://{self.ip}:{self.port}/GetChatroomQRCode', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
data = json_resp.get("Data")
return {"base64": data.get("qrcode").get("buffer"), "description": data.get("revokeQrcodeWording")}
else:
self.error_handler(json_resp)
async def invite_chatroom_member(self, wxid: Union[str, list], chatroom: str) -> bool:
"""邀请群聊成员(群聊大于40人)
Args:
wxid: 要邀请的用户wxid或wxid列表
chatroom: 群聊id
Returns:
bool: 成功返回True, 失败False或者报错
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
elif not self.ignore_protect and protector.check(14400):
raise BanProtection("风控保护: 新设备登录后4小时内请挂机")
if isinstance(wxid, list):
wxid = ",".join(wxid)
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Chatroom": chatroom, "InviteWxids": wxid}
response = await session.post(f'http://{self.ip}:{self.port}/InviteChatroomMember', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return True
else:
self.error_handler(json_resp)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 5.0 KiB

View File

@ -1,152 +0,0 @@
from typing import Union
import aiohttp
from .base import *
from .protect import protector
from ..errors import *
class FriendMixin(WechatAPIClientBase):
async def accept_friend(self, scene: int, v1: str, v2: str) -> bool:
"""接受好友请求
主动添加好友单天上限如下所示1小时内上限为 5超过上限时无法发出好友请求也收不到好友请求
- 新账号5/
- 注册超过7天10/
- 注册满3个月&&近期登录过该电脑15/
- 注册满6个月&&近期经常登录过该电脑20/
- 注册满6个月&&近期频繁登陆过该电脑30/
- 注册1年以上&&一直登录50/
- 上一次通过好友到下一次通过间隔20-40s
- 收到加人申请到通过好友申请每天最多通过300个好友申请间隔30s+随机时间
Args:
scene: 来源 在消息的xml获取
v1: v1key
v2: v2key
Returns:
bool: 操作是否成功
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
elif not self.ignore_protect and protector.check(14400):
raise BanProtection("风控保护: 新设备登录后4小时内请挂机")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Scene": scene, "V1": v1, "V2": v2}
response = await session.post(f'http://{self.ip}:{self.port}/AcceptFriend', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return True
else:
self.error_handler(json_resp)
async def get_contact(self, wxid: Union[str, list[str]]) -> Union[dict, list[dict]]:
"""获取联系人信息
Args:
wxid: 联系人wxid, 可以是多个wxid在list里也可查询chatroom
Returns:
Union[dict, list[dict]]: 单个联系人返回dict多个联系人返回list[dict]
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
if isinstance(wxid, list):
wxid = ",".join(wxid)
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "RequestWxids": wxid}
response = await session.post(f'http://{self.ip}:{self.port}/GetContact', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
contact_list = json_resp.get("Data").get("ContactList")
if len(contact_list) == 1:
return contact_list[0]
else:
return contact_list
else:
self.error_handler(json_resp)
async def get_contract_detail(self, wxid: Union[str, list[str]], chatroom: str = "") -> list:
"""获取联系人详情
Args:
wxid: 联系人wxid
chatroom: 群聊wxid
Returns:
list: 联系人详情列表
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
if isinstance(wxid, list):
if len(wxid) > 20:
raise ValueError("一次最多查询20个联系人")
wxid = ",".join(wxid)
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "RequestWxids": wxid, "Chatroom": chatroom}
response = await session.post(f'http://{self.ip}:{self.port}/GetContractDetail', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data").get("ContactList")
else:
self.error_handler(json_resp)
async def get_contract_list(self, wx_seq: int = 0, chatroom_seq: int = 0) -> dict:
"""获取联系人列表
Args:
wx_seq: 联系人序列
chatroom_seq: 群聊序列
Returns:
dict: 联系人列表数据
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "CurrentWxcontactSeq": wx_seq, "CurrentChatroomContactSeq": chatroom_seq}
response = await session.post(f'http://{self.ip}:{self.port}/GetContractList', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data")
else:
self.error_handler(json_resp)
async def get_nickname(self, wxid: Union[str, list[str]]) -> Union[str, list[str]]:
"""获取用户昵称
Args:
wxid: 用户wxid可以是单个wxid或最多20个wxid的列表
Returns:
Union[str, list[str]]: 如果输入单个wxid返回str如果输入wxid列表则返回对应的昵称列表
"""
data = await self.get_contract_detail(wxid)
if isinstance(wxid, str):
try:
return data[0].get("NickName").get("string")
except:
return ""
else:
result = []
for contact in data:
try:
result.append(contact.get("NickName").get("string"))
except:
result.append("")
return result

View File

@ -1,30 +0,0 @@
import aiohttp
from .base import *
from ..errors import *
class HongBaoMixin(WechatAPIClientBase):
async def get_hongbao_detail(self, xml: str, encrypt_key: str, encrypt_userinfo: str) -> dict:
"""获取红包详情
Args:
xml: 红包 XML 数据
encrypt_key: 加密密钥
encrypt_userinfo: 加密的用户信息
Returns:
dict: 红包详情数据
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Xml": xml, "EncryptKey": encrypt_key, "EncryptUserinfo": encrypt_userinfo}
response = await session.post(f'http://{self.ip}:{self.port}/GetHongBaoDetail', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data")
else:
self.error_handler(json_resp)

View File

@ -1,315 +0,0 @@
import hashlib
import io
import string
from random import choice
from typing import Union
import aiohttp
import qrcode
from .base import *
from .protect import protector
from ..errors import *
class LoginMixin(WechatAPIClientBase):
async def is_running(self) -> bool:
"""检查WechatAPI是否在运行。
Returns:
bool: 如果WechatAPI正在运行返回True否则返回False
"""
try:
async with aiohttp.ClientSession() as session:
response = await session.get(f'http://{self.ip}:{self.port}/IsRunning')
return await response.text() == 'OK'
except aiohttp.client_exceptions.ClientConnectorError:
return False
async def get_qr_code(self, device_name: str, device_id: str = "", proxy: Proxy = None) -> (
str, str):
"""获取登录二维码。
Args:
device_name (str): 设备名称
device_id (str, optional): 设备ID. Defaults to "".
proxy (Proxy, optional): 代理信息. Defaults to None.
Returns:
tuple[str, str, str]: 返回UUIDURL登录二维码
Raises:
根据error_handler处理错误
"""
async with aiohttp.ClientSession() as session:
json_param = {'DeviceName': device_name, 'DeviceID': device_id}
if proxy:
json_param['ProxyInfo'] = {'ProxyIp': f'{proxy.ip}:{proxy.port}',
'ProxyPassword': proxy.password,
'ProxyUser': proxy.username}
response = await session.post(f'http://{self.ip}:{self.port}/GetQRCode', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
qr = qrcode.QRCode(
version=1,
box_size=10,
border=4,
)
qr.add_data(f'http://weixin.qq.com/x/{json_resp.get("Data").get("Uuid")}')
qr.make(fit=True)
f = io.StringIO()
qr.print_ascii(out=f)
f.seek(0)
return json_resp.get("Data").get("Uuid"), json_resp.get("Data").get("QRCodeURL"), f.read()
else:
self.error_handler(json_resp)
async def check_login_uuid(self, uuid: str, device_id: str = "") -> tuple[bool, Union[dict, int]]:
"""检查登录的UUID状态。
Args:
uuid (str): 登录的UUID
device_id (str, optional): 设备ID. Defaults to "".
Returns:
tuple[bool, Union[dict, int]]: 如果登录成功返回(True, 用户信息)否则返回(False, 过期时间)
Raises:
根据error_handler处理错误
"""
async with aiohttp.ClientSession() as session:
json_param = {"Uuid": uuid}
response = await session.post(f'http://{self.ip}:{self.port}/CheckUuid', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
if json_resp.get("Data").get("acctSectResp", ""):
self.wxid = json_resp.get("Data").get("acctSectResp").get("userName")
self.nickname = json_resp.get("Data").get("acctSectResp").get("nickName")
protector.update_login_status(device_id=device_id)
return True, json_resp.get("Data")
else:
return False, json_resp.get("Data").get("expiredTime")
else:
self.error_handler(json_resp)
async def log_out(self) -> bool:
"""登出当前账号。
Returns:
bool: 登出成功返回True否则返回False
Raises:
UserLoggedOut: 如果未登录时调用
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid}
response = await session.post(f'http://{self.ip}:{self.port}/Logout', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return True
elif json_resp.get("Success"):
return False
else:
self.error_handler(json_resp)
async def awaken_login(self, wxid: str = "") -> str:
"""唤醒登录。
Args:
wxid (str, optional): 要唤醒的微信ID. Defaults to "".
Returns:
str: 返回新的登录UUID
Raises:
Exception: 如果未提供wxid且未登录
LoginError: 如果无法获取UUID
根据error_handler处理错误
"""
if not wxid and not self.wxid:
raise Exception("Please login using QRCode first")
if not wxid and self.wxid:
wxid = self.wxid
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": wxid}
response = await session.post(f'http://{self.ip}:{self.port}/AwakenLogin', json=json_param)
json_resp = await response.json()
if json_resp.get("Success") and json_resp.get("Data").get("QrCodeResponse").get("Uuid"):
return json_resp.get("Data").get("QrCodeResponse").get("Uuid")
elif not json_resp.get("Data").get("QrCodeResponse").get("Uuid"):
raise LoginError("Please login using QRCode first")
else:
self.error_handler(json_resp)
async def get_cached_info(self, wxid: str = None) -> dict:
"""获取登录缓存信息。
Args:
wxid (str, optional): 要查询的微信ID. Defaults to None.
Returns:
dict: 返回缓存信息如果未提供wxid且未登录返回空字典
"""
if not wxid:
wxid = self.wxid
if not wxid:
return {}
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": wxid}
response = await session.post(f'http://{self.ip}:{self.port}/GetCachedInfo', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data")
else:
return {}
async def heartbeat(self) -> bool:
"""发送心跳包。
Returns:
bool: 成功返回True否则返回False
Raises:
UserLoggedOut: 如果未登录时调用
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid}
response = await session.post(f'http://{self.ip}:{self.port}/Heartbeat', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return True
else:
self.error_handler(json_resp)
async def start_auto_heartbeat(self) -> bool:
"""开始自动心跳。
Returns:
bool: 成功返回True否则返回False
Raises:
UserLoggedOut: 如果未登录时调用
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid}
response = await session.post(f'http://{self.ip}:{self.port}/AutoHeartbeatStart', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return True
else:
self.error_handler(json_resp)
async def stop_auto_heartbeat(self) -> bool:
"""停止自动心跳。
Returns:
bool: 成功返回True否则返回False
Raises:
UserLoggedOut: 如果未登录时调用
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid}
response = await session.post(f'http://{self.ip}:{self.port}/AutoHeartbeatStop', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return True
else:
self.error_handler(json_resp)
async def get_auto_heartbeat_status(self) -> bool:
"""获取自动心跳状态。
Returns:
bool: 如果正在运行返回True否则返回False
Raises:
UserLoggedOut: 如果未登录时调用
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid}
response = await session.post(f'http://{self.ip}:{self.port}/AutoHeartbeatStatus', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data").get("Running")
else:
return self.error_handler(json_resp)
@staticmethod
def create_device_name() -> str:
"""生成一个随机的设备名。
Returns:
str: 返回生成的设备名
"""
first_names = [
"Oliver", "Emma", "Liam", "Ava", "Noah", "Sophia", "Elijah", "Isabella",
"James", "Mia", "William", "Amelia", "Benjamin", "Harper", "Lucas", "Evelyn",
"Henry", "Abigail", "Alexander", "Ella", "Jackson", "Scarlett", "Sebastian",
"Grace", "Aiden", "Chloe", "Matthew", "Zoey", "Samuel", "Lily", "David",
"Aria", "Joseph", "Riley", "Carter", "Nora", "Owen", "Luna", "Daniel",
"Sofia", "Gabriel", "Ellie", "Matthew", "Avery", "Isaac", "Mila", "Leo",
"Julian", "Layla"
]
last_names = [
"Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis",
"Rodriguez", "Martinez", "Hernandez", "Lopez", "Gonzalez", "Wilson", "Anderson",
"Thomas", "Taylor", "Moore", "Jackson", "Martin", "Lee", "Perez", "Thompson",
"White", "Harris", "Sanchez", "Clark", "Ramirez", "Lewis", "Robinson", "Walker",
"Young", "Allen", "King", "Wright", "Scott", "Torres", "Nguyen", "Hill",
"Flores", "Green", "Adams", "Nelson", "Baker", "Hall", "Rivera", "Campbell",
"Mitchell", "Carter", "Roberts", "Gomez", "Phillips", "Evans"
]
return choice(first_names) + " " + choice(last_names) + "'s Pad"
@staticmethod
def create_device_id(s: str = "") -> str:
"""生成设备ID。
Args:
s (str, optional): 用于生成ID的字符串. Defaults to "".
Returns:
str: 返回生成的设备ID
"""
if s == "" or s == "string":
s = ''.join(choice(string.ascii_letters) for _ in range(15))
md5_hash = hashlib.md5(s.encode()).hexdigest()
return "49" + md5_hash[2:]

View File

@ -1,4 +0,0 @@
{
"login_time": 0,
"device_id": ""
}

View File

@ -1,641 +0,0 @@
import asyncio
import base64
import os
from asyncio import Future
from asyncio import Queue, sleep
from io import BytesIO
from pathlib import Path
from typing import Union
import aiohttp
import pysilk
from loguru import logger
from pydub import AudioSegment
from pymediainfo import MediaInfo
from .base import *
from .protect import protector
from ..errors import *
class MessageMixin(WechatAPIClientBase):
def __init__(self, ip: str, port: int):
# 初始化消息队列
super().__init__(ip, port)
self._message_queue = Queue()
self._is_processing = False
async def _process_message_queue(self):
"""
处理消息队列的异步方法
"""
if self._is_processing:
return
self._is_processing = True
while True:
if self._message_queue.empty():
self._is_processing = False
break
func, args, kwargs, future = await self._message_queue.get()
try:
result = await func(*args, **kwargs)
future.set_result(result)
except Exception as e:
future.set_exception(e)
finally:
self._message_queue.task_done()
await sleep(1) # 消息发送间隔1秒
async def _queue_message(self, func, *args, **kwargs):
"""
将消息添加到队列
"""
future = Future()
await self._message_queue.put((func, args, kwargs, future))
if not self._is_processing:
asyncio.create_task(self._process_message_queue())
return await future
async def revoke_message(self, wxid: str, client_msg_id: int, create_time: int, new_msg_id: int) -> bool:
"""撤回消息。
Args:
wxid (str): 接收人wxid
client_msg_id (int): 发送消息的返回值
create_time (int): 发送消息的返回值
new_msg_id (int): 发送消息的返回值
Returns:
bool: 成功返回True失败返回False
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
elif not self.ignore_protect and protector.check(14400):
raise BanProtection("风控保护: 新设备登录后4小时内请挂机")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToWxid": wxid, "ClientMsgId": client_msg_id, "CreateTime": create_time,
"NewMsgId": new_msg_id}
response = await session.post(f'http://{self.ip}:{self.port}/RevokeMsg', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
logger.info("消息撤回成功: 对方wxid:{} ClientMsgId:{} CreateTime:{} NewMsgId:{}",
wxid,
client_msg_id,
new_msg_id)
return True
else:
self.error_handler(json_resp)
async def send_text_message(self, wxid: str, content: str, at: Union[list, str] = "") -> tuple[int, int, int]:
"""发送文本消息。
Args:
wxid (str): 接收人wxid
content (str): 消息内容
at (list, str, optional): @的用户
Returns:
tuple[int, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
根据error_handler处理错误
"""
return await self._queue_message(self._send_text_message, wxid, content, at)
async def _send_text_message(self, wxid: str, content: str, at: list[str] = None) -> tuple[int, int, int]:
"""
实际发送文本消息的方法
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
elif not self.ignore_protect and protector.check(14400):
raise BanProtection("风控保护: 新设备登录后4小时内请挂机")
if isinstance(at, str):
at_str = at
elif isinstance(at, list):
if at is None:
at = []
at_str = ",".join(at)
else:
raise ValueError("Argument 'at' should be str or list")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Content": content, "Type": 1, "At": at_str}
response = await session.post(f'http://{self.ip}:{self.port}/SendTextMsg', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
logger.info("发送文字消息: 对方wxid:{} at:{} 内容:{}", wxid, at, content)
data = json_resp.get("Data")
return data.get("List")[0].get("ClientMsgid"), data.get("List")[0].get("Createtime"), data.get("List")[
0].get("NewMsgId")
else:
self.error_handler(json_resp)
async def send_image_message(self, wxid: str, image: Union[str, bytes, os.PathLike]) -> tuple[int, int, int]:
"""发送图片消息。
Args:
wxid (str): 接收人wxid
image (str, byte, os.PathLike): 图片支持base64字符串图片byte图片路径
Returns:
tuple[int, int, int]: 返回(ClientImgId, CreateTime, NewMsgId)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
ValueError: image_path和image_base64都为空或都不为空时
根据error_handler处理错误
"""
return await self._queue_message(self._send_image_message, wxid, image)
async def _send_image_message(self, wxid: str, image: Union[str, bytes, os.PathLike]) -> tuple[
int, int, int]:
if not self.wxid:
raise UserLoggedOut("请先登录")
elif not self.ignore_protect and protector.check(14400):
raise BanProtection("风控保护: 新设备登录后4小时内请挂机")
if isinstance(image, str):
pass
elif isinstance(image, bytes):
image = base64.b64encode(image).decode()
elif isinstance(image, os.PathLike):
with open(image, 'rb') as f:
image = base64.b64encode(f.read()).decode()
else:
raise ValueError("Argument 'image' can only be str, bytes, or os.PathLike")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Base64": image}
response = await session.post(f'http://{self.ip}:{self.port}/SendImageMsg', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
json_param.pop('Base64')
logger.info("发送图片消息: 对方wxid:{} 图片base64略", wxid)
data = json_resp.get("Data")
return data.get("ClientImgId").get("string"), data.get("CreateTime"), data.get("Newmsgid")
else:
self.error_handler(json_resp)
async def send_video_message(self, wxid: str, video: Union[str, bytes, os.PathLike],
image: [str, bytes, os.PathLike] = None):
"""发送视频消息。不推荐使用上传速度很慢300KB/s。如要使用可压缩视频或者发送链接卡片而不是视频。
Args:
wxid (str): 接收人wxid
video (str, bytes, os.PathLike): 视频 接受base64字符串字节文件路径
image (str, bytes, os.PathLike): 视频封面图片 接受base64字符串字节文件路径
Returns:
tuple[int, int]: 返回(ClientMsgid, NewMsgId)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
ValueError: 视频或图片参数都为空或都不为空时
根据error_handler处理错误
"""
if not image:
image = Path(os.path.join(Path(__file__).resolve().parent, "fallback.png"))
# get video base64 and duration
if isinstance(video, str):
vid_base64 = video
video = base64.b64decode(video)
file_len = len(video)
media_info = MediaInfo.parse(BytesIO(video))
elif isinstance(video, bytes):
vid_base64 = base64.b64encode(video).decode()
file_len = len(video)
media_info = MediaInfo.parse(BytesIO(video))
elif isinstance(video, os.PathLike):
with open(video, "rb") as f:
file_len = len(f.read())
vid_base64 = base64.b64encode(f.read()).decode()
media_info = MediaInfo.parse(video)
else:
raise ValueError("video should be str, bytes, or path")
duration = media_info.tracks[0].duration
# get image base64
if isinstance(image, str):
image_base64 = image
elif isinstance(image, bytes):
image_base64 = base64.b64encode(image).decode()
elif isinstance(image, os.PathLike):
with open(image, "rb") as f:
image_base64 = base64.b64encode(f.read()).decode()
else:
raise ValueError("image should be str, bytes, or path")
# 打印预估时间300KB/s
predict_time = int(file_len / 1024 / 300)
logger.info("开始发送视频: 对方wxid:{} 视频base64略 图片base64略 预计耗时:{}", wxid, predict_time)
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Base64": vid_base64, "ImageBase64": image_base64,
"PlayLength": duration}
async with session.post(f'http://{self.ip}:{self.port}/SendVideoMsg', json=json_param) as resp:
json_resp = await resp.json()
if json_resp.get("Success"):
json_param.pop('Base64')
json_param.pop('ImageBase64')
logger.info("发送视频成功: 对方wxid:{} 时长:{} 视频base64略 图片base64略", wxid, duration)
data = json_resp.get("Data")
return data.get("clientMsgId"), data.get("newMsgId")
else:
self.error_handler(json_resp)
async def send_voice_message(self, wxid: str, voice: Union[str, bytes, os.PathLike], format: str = "amr") -> \
tuple[int, int, int]:
"""发送语音消息。
Args:
wxid (str): 接收人wxid
voice (str, bytes, os.PathLike): 语音 接受base64字符串字节文件路径
format (str, optional): 语音格式支持amr/wav/mp3. Defaults to "amr".
Returns:
tuple[int, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
ValueError: voice_path和voice_base64都为空或都不为空时或format不支持时
根据error_handler处理错误
"""
return await self._queue_message(self._send_voice_message, wxid, voice, format)
async def _send_voice_message(self, wxid: str, voice: Union[str, bytes, os.PathLike], format: str = "amr") -> \
tuple[int, int, int]:
if not self.wxid:
raise UserLoggedOut("请先登录")
elif not self.ignore_protect and protector.check(14400):
raise BanProtection("风控保护: 新设备登录后4小时内请挂机")
elif format not in ["amr", "wav", "mp3"]:
raise ValueError("format must be one of amr, wav, mp3")
# read voice to byte
if isinstance(voice, str):
voice_byte = base64.b64decode(voice)
elif isinstance(voice, bytes):
voice_byte = voice
elif isinstance(voice, os.PathLike):
with open(voice, "rb") as f:
voice_byte = f.read()
else:
raise ValueError("voice should be str, bytes, or path")
# get voice duration and b64
if format.lower() == "amr":
audio = AudioSegment.from_file(BytesIO(voice_byte), format="amr")
voice_base64 = base64.b64encode(voice_byte).decode()
elif format.lower() == "wav":
audio = AudioSegment.from_file(BytesIO(voice_byte), format="wav").set_channels(1)
audio = audio.set_frame_rate(self._get_closest_frame_rate(audio.frame_rate))
voice_base64 = base64.b64encode(
await pysilk.async_encode(audio.raw_data, sample_rate=audio.frame_rate)).decode()
elif format.lower() == "mp3":
audio = AudioSegment.from_file(BytesIO(voice_byte), format="mp3").set_channels(1)
audio = audio.set_frame_rate(self._get_closest_frame_rate(audio.frame_rate))
voice_base64 = base64.b64encode(
await pysilk.async_encode(audio.raw_data, sample_rate=audio.frame_rate)).decode()
else:
raise ValueError("format must be one of amr, wav, mp3")
duration = len(audio)
format_dict = {"amr": 0, "wav": 4, "mp3": 4}
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Base64": voice_base64, "VoiceTime": duration,
"Type": format_dict[format]}
response = await session.post(f'http://{self.ip}:{self.port}/SendVoiceMsg', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
json_param.pop('Base64')
logger.info("发送语音消息: 对方wxid:{} 时长:{} 格式:{} 音频base64略", wxid, duration, format)
data = json_resp.get("Data")
return int(data.get("ClientMsgId")), data.get("CreateTime"), data.get("NewMsgId")
else:
self.error_handler(json_resp)
@staticmethod
def _get_closest_frame_rate(frame_rate: int) -> int:
supported = [8000, 12000, 16000, 24000]
closest_rate = None
smallest_diff = float('inf')
for num in supported:
diff = abs(frame_rate - num)
if diff < smallest_diff:
smallest_diff = diff
closest_rate = num
return closest_rate
async def send_link_message(self, wxid: str, url: str, title: str = "", description: str = "",
thumb_url: str = "") -> tuple[str, int, int]:
"""发送链接消息。
Args:
wxid (str): 接收人wxid
url (str): 跳转链接
title (str, optional): 标题. Defaults to "".
description (str, optional): 描述. Defaults to "".
thumb_url (str, optional): 缩略图链接. Defaults to "".
Returns:
tuple[str, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
根据error_handler处理错误
"""
return await self._queue_message(self._send_link_message, wxid, url, title, description, thumb_url)
async def _send_link_message(self, wxid: str, url: str, title: str = "", description: str = "",
thumb_url: str = "") -> tuple[int, int, int]:
if not self.wxid:
raise UserLoggedOut("请先登录")
elif not self.ignore_protect and protector.check(14400):
raise BanProtection("风控保护: 新设备登录后4小时内请挂机")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Url": url, "Title": title, "Desc": description,
"ThumbUrl": thumb_url}
response = await session.post(f'http://{self.ip}:{self.port}/SendShareLink', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
logger.info("发送链接消息: 对方wxid:{} 链接:{} 标题:{} 描述:{} 缩略图链接:{}",
wxid,
url,
title,
description,
thumb_url)
data = json_resp.get("Data")
return data.get("clientMsgId"), data.get("createTime"), data.get("newMsgId")
else:
self.error_handler(json_resp)
async def send_emoji_message(self, wxid: str, md5: str, total_length: int) -> list[dict]:
"""发送表情消息。
Args:
wxid (str): 接收人wxid
md5 (str): 表情md5值
total_length (int): 表情总长度
Returns:
list[dict]: 返回表情项列表(list of emojiItem)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
根据error_handler处理错误
"""
return await self._queue_message(self._send_emoji_message, wxid, md5, total_length)
async def _send_emoji_message(self, wxid: str, md5: str, total_length: int) -> tuple[int, int, int]:
if not self.wxid:
raise UserLoggedOut("请先登录")
elif not self.ignore_protect and protector.check(14400):
raise BanProtection("风控保护: 新设备登录后4小时内请挂机")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Md5": md5, "TotalLen": total_length}
response = await session.post(f'http://{self.ip}:{self.port}/SendEmojiMsg', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
logger.info("发送表情消息: 对方wxid:{} md5:{} 总长度:{}", wxid, md5, total_length)
return json_resp.get("Data").get("emojiItem")
else:
self.error_handler(json_resp)
async def send_card_message(self, wxid: str, card_wxid: str, card_nickname: str, card_alias: str = "") -> tuple[
int, int, int]:
"""发送名片消息。
Args:
wxid (str): 接收人wxid
card_wxid (str): 名片用户的wxid
card_nickname (str): 名片用户的昵称
card_alias (str, optional): 名片用户的备注. Defaults to "".
Returns:
tuple[int, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
根据error_handler处理错误
"""
return await self._queue_message(self._send_card_message, wxid, card_wxid, card_nickname, card_alias)
async def _send_card_message(self, wxid: str, card_wxid: str, card_nickname: str, card_alias: str = "") -> tuple[
int, int, int]:
if not self.wxid:
raise UserLoggedOut("请先登录")
elif not self.ignore_protect and protector.check(14400):
raise BanProtection("风控保护: 新设备登录后4小时内请挂机")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToWxid": wxid, "CardWxid": card_wxid, "CardAlias": card_alias,
"CardNickname": card_nickname}
response = await session.post(f'http://{self.ip}:{self.port}/SendCardMsg', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
logger.info("发送名片消息: 对方wxid:{} 名片wxid:{} 名片备注:{} 名片昵称:{}", wxid,
card_wxid,
card_alias,
card_nickname)
data = json_resp.get("Data")
return data.get("List")[0].get("ClientMsgid"), data.get("List")[0].get("Createtime"), data.get("List")[
0].get("NewMsgId")
else:
self.error_handler(json_resp)
async def send_app_message(self, wxid: str, xml: str, type: int) -> tuple[str, int, int]:
"""发送应用消息。
Args:
wxid (str): 接收人wxid
xml (str): 应用消息的xml内容
type (int): 应用消息类型
Returns:
tuple[str, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
根据error_handler处理错误
"""
return await self._queue_message(self._send_app_message, wxid, xml, type)
async def _send_app_message(self, wxid: str, xml: str, type: int) -> tuple[int, int, int]:
if not self.wxid:
raise UserLoggedOut("请先登录")
elif not self.ignore_protect and protector.check(14400):
raise BanProtection("风控保护: 新设备登录后4小时内请挂机")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Xml": xml, "Type": type}
response = await session.post(f'http://{self.ip}:{self.port}/SendAppMsg', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
json_param["Xml"] = json_param["Xml"].replace("\n", "")
logger.info("发送app消息: 对方wxid:{} 类型:{} xml:{}", wxid, type, json_param["Xml"])
return json_resp.get("Data").get("clientMsgId"), json_resp.get("Data").get(
"createTime"), json_resp.get("Data").get("newMsgId")
else:
self.error_handler(json_resp)
async def send_cdn_file_msg(self, wxid: str, xml: str) -> tuple[str, int, int]:
"""转发文件消息。
Args:
wxid (str): 接收人wxid
xml (str): 要转发的文件消息xml内容
Returns:
tuple[str, int, int]: 返回(ClientMsgid, CreateTime, NewMsgId)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
根据error_handler处理错误
"""
return await self._queue_message(self._send_cdn_file_msg, wxid, xml)
async def _send_cdn_file_msg(self, wxid: str, xml: str) -> tuple[int, int, int]:
if not self.wxid:
raise UserLoggedOut("请先登录")
elif not self.ignore_protect and protector.check(14400):
raise BanProtection("风控保护: 新设备登录后4小时内请挂机")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Content": xml}
response = await session.post(f'http://{self.ip}:{self.port}/SendCDNFileMsg', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
logger.info("转发文件消息: 对方wxid:{} xml:{}", wxid, xml)
data = json_resp.get("Data")
return data.get("clientMsgId"), data.get("createTime"), data.get("newMsgId")
else:
self.error_handler(json_resp)
async def send_cdn_img_msg(self, wxid: str, xml: str) -> tuple[str, int, int]:
"""转发图片消息。
Args:
wxid (str): 接收人wxid
xml (str): 要转发的图片消息xml内容
Returns:
tuple[str, int, int]: 返回(ClientImgId, CreateTime, NewMsgId)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
根据error_handler处理错误
"""
return await self._queue_message(self._send_cdn_img_msg, wxid, xml)
async def _send_cdn_img_msg(self, wxid: str, xml: str) -> tuple[int, int, int]:
if not self.wxid:
raise UserLoggedOut("请先登录")
elif not self.ignore_protect and protector.check(14400):
raise BanProtection("风控保护: 新设备登录后4小时内请挂机")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Content": xml}
response = await session.post(f'http://{self.ip}:{self.port}/SendCDNImgMsg', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
logger.info("转发图片消息: 对方wxid:{} xml:{}", wxid, xml)
data = json_resp.get("Data")
return data.get("ClientImgId").get("string"), data.get("CreateTime"), data.get("Newmsgid")
else:
self.error_handler(json_resp)
async def send_cdn_video_msg(self, wxid: str, xml: str) -> tuple[str, int]:
"""转发视频消息。
Args:
wxid (str): 接收人wxid
xml (str): 要转发的视频消息xml内容
Returns:
tuple[str, int]: 返回(ClientMsgid, NewMsgId)
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 登录新设备后4小时内操作
根据error_handler处理错误
"""
return await self._queue_message(self._send_cdn_video_msg, wxid, xml)
async def _send_cdn_video_msg(self, wxid: str, xml: str) -> tuple[int, int]:
if not self.wxid:
raise UserLoggedOut("请先登录")
elif not self.ignore_protect and protector.check(14400):
raise BanProtection("风控保护: 新设备登录后4小时内请挂机")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "ToWxid": wxid, "Content": xml}
response = await session.post(f'http://{self.ip}:{self.port}/SendCDNVideoMsg', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
logger.info("转发视频消息: 对方wxid:{} xml:{}", wxid, xml)
data = json_resp.get("Data")
return data.get("clientMsgId"), data.get("newMsgId")
else:
self.error_handler(json_resp)
async def sync_message(self) -> dict:
"""同步消息。
Returns:
dict: 返回同步到的消息数据
Raises:
UserLoggedOut: 未登录时调用
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) as session:
json_param = {"Wxid": self.wxid, "Scene": 0, "Synckey": ""}
response = await session.post(f'http://{self.ip}:{self.port}/Sync', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data")
else:
self.error_handler(json_resp)

View File

@ -1,94 +0,0 @@
import json
import os
from datetime import datetime
class Singleton(type):
"""单例模式的元类。
用于确保一个类只有一个实例
Attributes:
_instances (dict): 存储类的实例的字典
"""
_instances = {}
def __call__(cls, *args, **kwargs):
"""创建或返回类的单例实例。
Args:
*args: 位置参数
**kwargs: 关键字参数
Returns:
object: 类的单例实例
"""
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class Protect(metaclass=Singleton):
"""保护类,风控保护机制。
使用单例模式确保全局只有一个实例
Attributes:
login_stat_path (str): 登录状态文件的路径
login_stat (dict): 登录状态信息
login_time (int): 最后登录时间戳
login_device_id (str): 最后登录的设备ID
"""
def __init__(self):
"""初始化保护类实例。
创建或加载登录状态文件初始化登录时间和设备ID
"""
self.login_stat_path = os.path.join(os.path.dirname(__file__), "login_stat.json")
if not os.path.exists(self.login_stat_path):
default_config = {
"login_time": 0,
"device_id": ""
}
with open(self.login_stat_path, "w", encoding="utf-8") as f:
f.write(json.dumps(default_config, indent=4, ensure_ascii=False))
self.login_stat = default_config
else:
with open(self.login_stat_path, "r", encoding="utf-8") as f:
self.login_stat = json.loads(f.read())
self.login_time = self.login_stat.get("login_time", 0)
self.login_device_id = self.login_stat.get("device_id", "")
def check(self, second: int) -> bool:
"""检查是否在指定时间内,风控保护。
Args:
second (int): 指定的秒数
Returns:
bool: 如果当前时间与上次登录时间的差小于指定秒数返回True否则返回False
"""
now = datetime.now().timestamp()
return now - self.login_time < second
def update_login_status(self, device_id: str = ""):
"""更新登录状态。
如果设备ID发生变化更新登录时间和设备ID并保存到文件
Args:
device_id (str, optional): 设备ID. Defaults to "".
"""
if device_id == self.login_device_id:
return
self.login_time = int(datetime.now().timestamp())
self.login_stat["login_time"] = self.login_time
self.login_stat["device_id"] = device_id
with open(self.login_stat_path, "w", encoding="utf-8") as f:
f.write(json.dumps(self.login_stat, indent=4, ensure_ascii=False))
protector = Protect()

View File

@ -1,360 +0,0 @@
import base64
import io
import os
import aiohttp
import pysilk
from pydub import AudioSegment
from .base import *
from .protect import protector
from ..errors import *
class ToolMixin(WechatAPIClientBase):
async def download_image(self, aeskey: str, cdnmidimgurl: str) -> str:
"""CDN下载高清图片。
Args:
aeskey (str): 图片的AES密钥
cdnmidimgurl (str): 图片的CDN URL
Returns:
str: 图片的base64编码字符串
Raises:
UserLoggedOut: 未登录时调用
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "AesKey": aeskey, "Cdnmidimgurl": cdnmidimgurl}
response = await session.post(f'http://{self.ip}:{self.port}/CdnDownloadImg', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data")
else:
self.error_handler(json_resp)
async def download_voice(self, msg_id: str, voiceurl: str, length: int) -> str:
"""下载语音文件。
Args:
msg_id (str): 消息的msgid
voiceurl (str): 语音的url从xml获取
length (int): 语音长度从xml获取
Returns:
str: 语音的base64编码字符串
Raises:
UserLoggedOut: 未登录时调用
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "MsgId": msg_id, "Voiceurl": voiceurl, "Length": length}
response = await session.post(f'http://{self.ip}:{self.port}/DownloadVoice', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data").get("data").get("buffer")
else:
self.error_handler(json_resp)
async def download_attach(self, attach_id: str) -> dict:
"""下载附件。
Args:
attach_id (str): 附件ID
Returns:
dict: 附件数据
Raises:
UserLoggedOut: 未登录时调用
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "AttachId": attach_id}
response = await session.post(f'http://{self.ip}:{self.port}/DownloadAttach', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data").get("data").get("buffer")
else:
self.error_handler(json_resp)
async def download_video(self, msg_id) -> str:
"""下载视频。
Args:
msg_id (str): 消息的msg_id
Returns:
str: 视频的base64编码字符串
Raises:
UserLoggedOut: 未登录时调用
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "MsgId": msg_id}
response = await session.post(f'http://{self.ip}:{self.port}/DownloadVideo', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data").get("data").get("buffer")
else:
self.error_handler(json_resp)
async def set_step(self, count: int) -> bool:
"""设置步数。
Args:
count (int): 要设置的步数
Returns:
bool: 成功返回True失败返回False
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 风控保护: 新设备登录后4小时内请挂机
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
elif not self.ignore_protect and protector.check(14400):
raise BanProtection("风控保护: 新设备登录后4小时内请挂机")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "StepCount": count}
response = await session.post(f'http://{self.ip}:{self.port}/SetStep', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return True
else:
self.error_handler(json_resp)
async def set_proxy(self, proxy: Proxy) -> bool:
"""设置代理。
Args:
proxy (Proxy): 代理配置对象
Returns:
bool: 成功返回True失败返回False
Raises:
UserLoggedOut: 未登录时调用
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid,
"Proxy": {"ProxyIp": f"{proxy.ip}:{proxy.port}",
"ProxyUser": proxy.username,
"ProxyPassword": proxy.password}}
response = await session.post(f'http://{self.ip}:{self.port}/SetProxy', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return True
else:
self.error_handler(json_resp)
async def check_database(self) -> bool:
"""检查数据库状态。
Returns:
bool: 数据库正常返回True否则返回False
"""
async with aiohttp.ClientSession() as session:
response = await session.get(f'http://{self.ip}:{self.port}/CheckDatabaseOK')
json_resp = await response.json()
if json_resp.get("Running"):
return True
else:
return False
@staticmethod
def base64_to_file(base64_str: str, file_name: str, file_path: str) -> bool:
"""将base64字符串转换为文件并保存。
Args:
base64_str (str): base64编码的字符串
file_name (str): 要保存的文件名
file_path (str): 文件保存路径
Returns:
bool: 转换成功返回True失败返回False
"""
try:
os.makedirs(file_path, exist_ok=True)
# 拼接完整的文件路径
full_path = os.path.join(file_path, file_name)
# 移除可能存在的 base64 头部信息
if ',' in base64_str:
base64_str = base64_str.split(',')[1]
# 解码 base64 并写入文件
with open(full_path, 'wb') as f:
f.write(base64.b64decode(base64_str))
return True
except Exception as e:
return False
@staticmethod
def file_to_base64(file_path: str) -> str:
"""将文件转换为base64字符串。
Args:
file_path (str): 文件路径
Returns:
str: base64编码的字符串
"""
with open(file_path, 'rb') as f:
return base64.b64encode(f.read()).decode()
@staticmethod
def base64_to_byte(base64_str: str) -> bytes:
"""将base64字符串转换为bytes。
Args:
base64_str (str): base64编码的字符串
Returns:
bytes: 解码后的字节数据
"""
# 移除可能存在的 base64 头部信息
if ',' in base64_str:
base64_str = base64_str.split(',')[1]
return base64.b64decode(base64_str)
@staticmethod
def byte_to_base64(byte: bytes) -> str:
"""将bytes转换为base64字符串。
Args:
byte (bytes): 字节数据
Returns:
str: base64编码的字符串
"""
return base64.b64encode(byte).decode("utf-8")
@staticmethod
async def silk_byte_to_byte_wav_byte(silk_byte: bytes) -> bytes:
"""将silk字节转换为wav字节。
Args:
silk_byte (bytes): silk格式的字节数据
Returns:
bytes: wav格式的字节数据
"""
return await pysilk.async_decode(silk_byte, to_wav=True)
@staticmethod
def wav_byte_to_amr_byte(wav_byte: bytes) -> bytes:
"""将WAV字节数据转换为AMR格式。
Args:
wav_byte (bytes): WAV格式的字节数据
Returns:
bytes: AMR格式的字节数据
Raises:
Exception: 转换失败时抛出异常
"""
try:
# 从字节数据创建 AudioSegment 对象
audio = AudioSegment.from_wav(io.BytesIO(wav_byte))
# 设置 AMR 编码的标准参数
audio = audio.set_frame_rate(8000).set_channels(1)
# 创建一个字节缓冲区来存储 AMR 数据
output = io.BytesIO()
# 导出为 AMR 格式
audio.export(output, format="amr")
# 获取字节数据
return output.getvalue()
except Exception as e:
raise Exception(f"转换WAV到AMR失败: {str(e)}")
@staticmethod
def wav_byte_to_amr_base64(wav_byte: bytes) -> str:
"""将WAV字节数据转换为AMR格式的base64字符串。
Args:
wav_byte (bytes): WAV格式的字节数据
Returns:
str: AMR格式的base64编码字符串
"""
return base64.b64encode(ToolMixin.wav_byte_to_amr_byte(wav_byte)).decode()
@staticmethod
async def wav_byte_to_silk_byte(wav_byte: bytes) -> bytes:
"""将WAV字节数据转换为silk格式。
Args:
wav_byte (bytes): WAV格式的字节数据
Returns:
bytes: silk格式的字节数据
"""
# get pcm data
audio = AudioSegment.from_wav(io.BytesIO(wav_byte))
pcm = audio.raw_data
return await pysilk.async_encode(pcm, data_rate=audio.frame_rate, sample_rate=audio.frame_rate)
@staticmethod
async def wav_byte_to_silk_base64(wav_byte: bytes) -> str:
"""将WAV字节数据转换为silk格式的base64字符串。
Args:
wav_byte (bytes): WAV格式的字节数据
Returns:
str: silk格式的base64编码字符串
"""
return base64.b64encode(await ToolMixin.wav_byte_to_silk_byte(wav_byte)).decode()
@staticmethod
async def silk_base64_to_wav_byte(silk_base64: str) -> bytes:
"""将silk格式的base64字符串转换为WAV字节数据。
Args:
silk_base64 (str): silk格式的base64编码字符串
Returns:
bytes: WAV格式的字节数据
"""
return await ToolMixin.silk_byte_to_byte_wav_byte(base64.b64decode(silk_base64))

View File

@ -1,82 +0,0 @@
import aiohttp
from .base import *
from .protect import protector
from ..errors import *
class UserMixin(WechatAPIClientBase):
async def get_profile(self, wxid: str = None) -> dict:
"""获取用户信息。
Args:
wxid (str, optional): 用户wxid. Defaults to None.
Returns:
dict: 用户信息字典
Raises:
UserLoggedOut: 未登录时调用
根据error_handler处理错误
"""
if not self.wxid and not wxid:
raise UserLoggedOut("请先登录")
if not wxid:
wxid = self.wxid
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": wxid}
response = await session.post(f'http://{self.ip}:{self.port}/GetProfile', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data").get("userInfo")
else:
self.error_handler(json_resp)
async def get_my_qrcode(self, style: int = 0) -> str:
"""获取个人二维码。
Args:
style (int, optional): 二维码样式. Defaults to 0.
Returns:
str: 图片的base64编码字符串
Raises:
UserLoggedOut: 未登录时调用
BanProtection: 风控保护: 新设备登录后4小时内请挂机
根据error_handler处理错误
"""
if not self.wxid:
raise UserLoggedOut("请先登录")
elif protector.check(14400) and not self.ignore_protect:
raise BanProtection("风控保护: 新设备登录后4小时内请挂机")
async with aiohttp.ClientSession() as session:
json_param = {"Wxid": self.wxid, "Style": style}
response = await session.post(f'http://{self.ip}:{self.port}/GetMyQRCode', json=json_param)
json_resp = await response.json()
if json_resp.get("Success"):
return json_resp.get("Data").get("qrcode").get("buffer")
else:
self.error_handler(json_resp)
async def is_logged_in(self, wxid: str = None) -> bool:
"""检查是否登录。
Args:
wxid (str, optional): 用户wxid. Defaults to None.
Returns:
bool: 已登录返回True未登录返回False
"""
if not wxid:
wxid = self.wxid
try:
await self.get_profile(wxid)
return True
except:
return False

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff