92 lines
3.3 KiB
Python
92 lines
3.3 KiB
Python
import re
|
||
from typing import Optional, Tuple
|
||
from datetime import datetime, timedelta
|
||
from sqlmodel import select, Session
|
||
from openai import OpenAI
|
||
from backend.config.settings import settings
|
||
from backend.models.sms import SMSRecord, engine
|
||
|
||
|
||
def extract_code_with_context(sms_content: str) -> Optional[str]:
|
||
# 判断AI相关的三个环境变量是否都有值
|
||
# 如果有值,调用AI接口
|
||
# 如果没有值,调用正则表达式
|
||
if settings.ai_base_url != "" and settings.ai_api_key != "" and settings.ai_model != "":
|
||
return _extract_code_with_ai(sms_content)
|
||
else:
|
||
return _extract_code_with_reg(sms_content)
|
||
|
||
|
||
def _extract_code_with_reg(sms_content: str) -> Optional[str]:
|
||
"""
|
||
从短信内容中提取验证码
|
||
"""
|
||
pattern = r'(?:验证码|auth|code)[^0-9]{0,20}(\d{4,8})'
|
||
match = re.search(pattern, sms_content, re.IGNORECASE)
|
||
if match:
|
||
return match.group(1)
|
||
fallback = re.findall(r'(\d{4,8})', sms_content)
|
||
return fallback[0] if fallback else None
|
||
|
||
|
||
def _extract_code_with_ai(sms_content: str) -> Optional[str]:
|
||
"""
|
||
通过AI提取短信中的验证码
|
||
"""
|
||
client = OpenAI(api_key=settings.ai_api_key, base_url=settings.ai_base_url)
|
||
response = client.chat.completions.create(
|
||
messages=[
|
||
{
|
||
"role": "system",
|
||
"content": "你是一个验证码提取助手,我会告诉你短信内容,请将其中的验证码提取给我,只需要返回验证码即可,如果没有验证码,请返回null,不需要返回任何其他无关的内容。",
|
||
},
|
||
{
|
||
"role": "user",
|
||
"content": sms_content,
|
||
}
|
||
],
|
||
model=settings.ai_model,
|
||
)
|
||
code = response.choices[0].message.content
|
||
return code if code != "null" else None
|
||
|
||
|
||
def extract_phone_from_sim_slot(sim_slot: Optional[str]) -> Optional[str]:
|
||
"""从SIM卡信息中提取手机号"""
|
||
if not sim_slot:
|
||
return None
|
||
# 尝试提取11位手机号
|
||
match = re.search(r'1\d{10}', sim_slot)
|
||
if match:
|
||
return match.group(0)
|
||
return None
|
||
|
||
|
||
def query_latest_code(phone_number: str, platform_keyword: Optional[str]) -> Tuple[Optional[str], Optional[SMSRecord]]:
|
||
"""查询最新的验证码"""
|
||
ten_minutes_ago = datetime.utcnow() - timedelta(minutes=10)
|
||
|
||
with Session(engine) as session:
|
||
query = select(SMSRecord).where(
|
||
SMSRecord.receive_time >= ten_minutes_ago
|
||
)
|
||
|
||
primary_query = query.where(SMSRecord.phone_number == phone_number)
|
||
if platform_keyword:
|
||
primary_query = primary_query.where(SMSRecord.sms.contains(platform_keyword))
|
||
|
||
primary_records = session.exec(primary_query.order_by(SMSRecord.receive_time.desc()).limit(5)).all()
|
||
|
||
if not primary_records:
|
||
fallback_query = query.where(SMSRecord.sim_slot.contains(phone_number))
|
||
if platform_keyword:
|
||
fallback_query = fallback_query.where(SMSRecord.sms.contains(platform_keyword))
|
||
|
||
primary_records = session.exec(fallback_query.order_by(SMSRecord.receive_time.desc()).limit(5)).all()
|
||
|
||
for record in primary_records:
|
||
if record.extracted_code:
|
||
return record.extracted_code, record
|
||
|
||
return None, None
|