Merge pull request #160 from sglmsn/sgssxln

使用多线程处理消息,增加简单的交互
This commit is contained in:
ttttupup 2023-06-01 17:52:12 +08:00 committed by GitHub
commit a2ebb527f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 85 additions and 13 deletions

View File

@ -11,6 +11,8 @@ com.example.wxhk.tcp.vertx.VertxTcp 这个是tcp服务端,接受信息
com.example.wxhk.tcp.vertx.InitWeChat 微信环境初始化
com.example.wxhk.tcp.vertx.ArrHandle 循环消息处理
![image](https://github.com/sglmsn/wxhelper/assets/36943585/59d49401-a492-46a9-8ed9-dab7fb1822b4)
启动项目需要去修改配置文件的微信路径

View File

@ -7,6 +7,8 @@ import com.example.wxhk.util.HttpAsyncUtil;
import com.example.wxhk.util.HttpSendUtil;
import com.example.wxhk.util.HttpSyncUtil;
import io.vertx.core.json.JsonObject;
import jakarta.annotation.PostConstruct;
import org.dromara.hutool.core.text.StrUtil;
import org.dromara.hutool.core.util.XmlUtil;
import org.dromara.hutool.log.Log;
import org.springframework.stereotype.Component;
@ -21,28 +23,53 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@Component
public class WxMsgHandle {
public static final ConcurrentHashMap<Integer, Handle> map = new ConcurrentHashMap<>(32);
protected static final Log log = Log.get();
public static ConcurrentHashMap<String, String> cache = new ConcurrentHashMap<>();
/**
* 文件传输助手
*/
public static final String FILEHELPER = "filehelper";
/**
* 收款码缓存 因为有2段信息,一段是交易id,里面可以解析出来源方,二段解析出金额
*/
public static ConcurrentHashMap<String, String> collection_code_caching = new ConcurrentHashMap<>();
/**
*
*/
public static final ReentrantReadWriteLock LOOK = new ReentrantReadWriteLock();
public WxMsgHandle() {
@PostConstruct
public void init() {
add(chatMsg -> {
if(Objects.equals(chatMsg.getIsSendMsg(), 1) && Objects.equals(chatMsg.getIsSendByPhone(), 1)){
log.info("手机端对:{}发出:{}",chatMsg.getFromUser(),chatMsg.getContent());
if (Objects.equals(chatMsg.getIsSendMsg(), 1) && Objects.equals(chatMsg.getIsSendByPhone(), 1)) {
log.info("手机端对:{}发出:{}", chatMsg.getFromUser(), chatMsg.getContent());
return 1;
}
return 1;
}, WxMsgType.私聊信息);
add(chatMsg -> {
if("filehelper".equals(chatMsg.getFromUser())){
log.info("文件助手:{},",chatMsg.getContent());
if (FILEHELPER.equals(chatMsg.getFromUser())) {
log.info("文件助手:{},", chatMsg.getContent());
}
return 1;
}, WxMsgType.收到转账之后或者文件助手等信息);
add(chatMsg -> {
if (FILEHELPER.equals(chatMsg.getFromUser())) {
Document document = XmlUtil.parseXml(chatMsg.getContent());
Element documentElement = document.getDocumentElement();
String username = documentElement.getAttribute("username");
if (StrUtil.isNotBlank(username)) {
HttpSendUtil.发送文本(username);
}
}
return 1;
}, WxMsgType.收到名片);
add(chatMsg -> {
HttpSendUtil.通过好友请求(chatMsg);
return 1;
@ -61,8 +88,6 @@ public class WxMsgHandle {
boolean f = 解析扫码支付第一段(chatMsg);
return null;
}, WxMsgType.扫码触发);
}
/**
@ -83,7 +108,7 @@ public class WxMsgHandle {
if (outtradeno.getLength() > 0) {
String textContent = outtradeno.item(0).getTextContent();
String textContent1 = documentElement.getElementsByTagName("username").item(0).getTextContent();
cache.put(textContent, textContent1);
collection_code_caching.put(textContent, textContent1);
return false;
}
}
@ -110,7 +135,7 @@ public class WxMsgHandle {
NodeList outtradeno = documentElement.getElementsByTagName("weapp_path");
if (outtradeno.getLength() > 1) {
String textContent = outtradeno.item(1).getTextContent();
Set<Map.Entry<String, String>> entries = cache.entrySet();
Set<Map.Entry<String, String>> entries = collection_code_caching.entrySet();
Iterator<Map.Entry<String, String>> iterator = entries.iterator();
while (iterator.hasNext()) {
Map.Entry<String, String> next = iterator.next();

View File

@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit;
@Component
public class ArrHandle {
public static final ThreadPoolExecutor sub = new ThreadPoolExecutor(1, 10, 30, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new NamedThreadFactory("sub", false));
public static final ThreadPoolExecutor sub = new ThreadPoolExecutor(4, 10, 30, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new NamedThreadFactory("sub", false));
public static final ThreadLocal<PrivateChatMsg> chatMsgThreadLocal = new InheritableThreadLocal<>();
protected static final Log log = Log.get();
@ -37,7 +37,7 @@ public class ArrHandle {
@PostConstruct
public void exec() {
for (int i = 0; i < sub.getCorePoolSize(); i++) {
for (int i = 0; i < sub.getCorePoolSize()-1; i++) {
sub.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
@ -59,6 +59,26 @@ public class ArrHandle {
log.error("退出线程了");
});
}
sub.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
JsonObject take = VertxTcp.LINKED_BLOCKING_QUEUE_MON.take();
log.info("{}", take.encode());
PrivateChatMsg privateChatMsg = take.mapTo(PrivateChatMsg.class);
chatMsgThreadLocal.set(privateChatMsg);
if ("weixin".equals(privateChatMsg.getFromUser())) {
String s = HttpSendUtil.获取当前登陆微信id();
InitWeChat.WXID_MAP.add(s);
continue;
}
WxMsgHandle.exec(privateChatMsg);
chatMsgThreadLocal.remove();
} catch (Exception e) {
log.error(e);
}
}
log.error("退出线程了");
});
}

View File

@ -1,6 +1,7 @@
package com.example.wxhk.tcp.vertx;
import com.example.wxhk.WxhkApplication;
import com.example.wxhk.constant.WxMsgType;
import com.example.wxhk.util.HttpAsyncUtil;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
@ -15,6 +16,7 @@ import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
/**
@ -27,6 +29,10 @@ import java.util.concurrent.LinkedBlockingQueue;
@Order()
public class VertxTcp extends AbstractVerticle implements CommandLineRunner {
public final static LinkedBlockingQueue<JsonObject> LINKED_BLOCKING_QUEUE = new LinkedBlockingQueue<>();
/**
* 这个只保留交易相关的类型
*/
public final static LinkedBlockingQueue<JsonObject> LINKED_BLOCKING_QUEUE_MON = new LinkedBlockingQueue<>();
protected static final Log log = Log.get();
NetServer netServer;
@ -51,7 +57,14 @@ public class VertxTcp extends AbstractVerticle implements CommandLineRunner {
case END_ARRAY -> {
}
case VALUE -> {
LINKED_BLOCKING_QUEUE.add(event.objectValue());
JsonObject entries = event.objectValue();
if(Objects.equals(entries.getInteger("type"), WxMsgType.扫码触发.getType()) ||
Objects.equals(entries.getInteger("type"), WxMsgType.转账和收款.getType())){
LINKED_BLOCKING_QUEUE_MON.add(entries);
}else{
LINKED_BLOCKING_QUEUE.add(entries);
}
}
}
});

View File

@ -69,4 +69,16 @@ public class XmlTest {
WxMsgHandle.解析扫码支付第二段(new JsonObject(smg).mapTo(PrivateChatMsg.class));
}
@Test
void 解析名片(){
String con = "{\"content\":\"<?xml version=\\\"1.0\\\"?>\\n<msg bigheadimgurl=\\\"http://wx.qlogo.cn/mmhead/ver_1/TMa2Yt3QnCWSbWM3SAwQkCuxYkJXou9II9q7kfuQvqiapESialRXicibxUekL4tyVoic4qJMmmsFNxZaLDzusSUj44wvErxkk89RZrPKhtibkIbTw/0\\\" smallheadimgurl=\\\"http://wx.qlogo.cn/mmhead/ver_1/TMa2Yt3QnCWSbWM3SAwQkCuxYkJXou9II9q7kfuQvqiapESialRXicibxUekL4tyVoic4qJMmmsFNxZaLDzusSUj44wvErxkk89RZrPKhtibkIbTw/132\\\" username=\\\"wxid_gf1fogt5a0pq22\\\" nickname=\\\"时光似水戏流年\\\" fullpy=\\\"时光似水戏流年\\\" shortpy=\\\"SGSSHLN\\\" alias=\\\"jys-wt\\\" imagestatus=\\\"3\\\" scene=\\\"17\\\" province=\\\"广东\\\" city=\\\"深圳\\\" sign=\\\"\\\" sex=\\\"1\\\" certflag=\\\"0\\\" certinfo=\\\"\\\" brandIconUrl=\\\"\\\" brandHomeUrl=\\\"\\\" brandSubscriptConfigUrl=\\\"\\\" brandFlags=\\\"0\\\" regionCode=\\\"CN_Guangdong_Shenzhen\\\" biznamecardinfo=\\\"\\\" />\\n\",\"fromGroup\":\"filehelper\",\"fromUser\":\"filehelper\",\"isSendByPhone\":1,\"isSendMsg\":1,\"msgId\":3235211232446491438,\"pid\":21868,\"sign\":\"bfb1db52fe99dc947586af50e6964c37\",\"signature\":\"<msgsource>\\n\\t<signature>v1_aebFg5gw</signature>\\n\\t<tmp_node>\\n\\t\\t<publisher-id>&lt;![CDATA[]]&gt;</publisher-id>\\n\\t</tmp_node>\\n</msgsource>\\n\",\"time\":\"2023-06-01 16:48:39\",\"timestamp\":1685609319,\"type\":42}";
PrivateChatMsg privateChatMsg = new JsonObject(con).mapTo(PrivateChatMsg.class);
Document document = XmlUtil.parseXml(privateChatMsg.getContent());
Element documentElement = document.getDocumentElement();
String username = documentElement.getAttribute("username");
String alias = documentElement.getAttribute("alias");
Console.log(alias,username);
}
}