From e39e4362fe88be4f205b482e5fd93d3f1ff608f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=B6=9B?= <2450572350@qq.com> Date: Thu, 1 Jun 2023 17:24:20 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E6=9E=90=E5=90=8D=E7=89=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- java_client/README.md | 2 + .../com/example/wxhk/msg/WxMsgHandle.java | 41 +++++++++++++------ .../com/example/wxhk/tcp/vertx/ArrHandle.java | 24 ++++++++++- .../com/example/wxhk/tcp/vertx/VertxTcp.java | 15 ++++++- 4 files changed, 66 insertions(+), 16 deletions(-) diff --git a/java_client/README.md b/java_client/README.md index d41a462..03ce742 100644 --- a/java_client/README.md +++ b/java_client/README.md @@ -13,4 +13,6 @@ com.example.wxhk.tcp.vertx.InitWeChat 微信环境初始化 com.example.wxhk.tcp.vertx.ArrHandle 循环消息处理 +对于收款码收款的,因为收款码会产生3次事件,其中第二次有交易id和里面有付款人信息,第三次里面有收款金额和交易id,如果要使用多线程处理消息,则需要额外想办法处理这个数据,目前我的办法是对于扫码收款的这几个type类型单独做一个线程处理,其他的多线程处理 + 启动项目需要去修改配置文件的微信路径 diff --git a/java_client/src/main/java/com/example/wxhk/msg/WxMsgHandle.java b/java_client/src/main/java/com/example/wxhk/msg/WxMsgHandle.java index a2872cb..0c4f08a 100644 --- a/java_client/src/main/java/com/example/wxhk/msg/WxMsgHandle.java +++ b/java_client/src/main/java/com/example/wxhk/msg/WxMsgHandle.java @@ -7,6 +7,8 @@ import com.example.wxhk.util.HttpAsyncUtil; import com.example.wxhk.util.HttpSendUtil; import com.example.wxhk.util.HttpSyncUtil; import io.vertx.core.json.JsonObject; +import jakarta.annotation.PostConstruct; +import org.dromara.hutool.core.text.StrUtil; import org.dromara.hutool.core.util.XmlUtil; import org.dromara.hutool.log.Log; import org.springframework.stereotype.Component; @@ -21,35 +23,50 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; @Component public class WxMsgHandle { public static final ConcurrentHashMap map = new ConcurrentHashMap<>(32); protected static final Log log = Log.get(); - public static ConcurrentHashMap cache = new ConcurrentHashMap<>(); + /** + * 文件传输助手 + */ + public static final String FILEHELPER = "filehelper"; + /** + * 收款码缓存 因为有2段信息,一段是交易id,里面可以解析出来源方,二段解析出金额 + */ + public static ConcurrentHashMap collection_code_caching = new ConcurrentHashMap<>(); + + /** + * 看 + */ + public static final ReentrantReadWriteLock LOOK = new ReentrantReadWriteLock(); - public WxMsgHandle() { + @PostConstruct + public void init() { add(chatMsg -> { - if(Objects.equals(chatMsg.getIsSendMsg(), 1) && Objects.equals(chatMsg.getIsSendByPhone(), 1)){ - log.info("手机端对:{}发出:{}",chatMsg.getFromUser(),chatMsg.getContent()); + if (Objects.equals(chatMsg.getIsSendMsg(), 1) && Objects.equals(chatMsg.getIsSendByPhone(), 1)) { + log.info("手机端对:{}发出:{}", chatMsg.getFromUser(), chatMsg.getContent()); return 1; } return 1; }, WxMsgType.私聊信息); add(chatMsg -> { - if("filehelper".equals(chatMsg.getFromUser())){ - log.info("文件助手:{},",chatMsg.getContent()); + if (FILEHELPER.equals(chatMsg.getFromUser())) { + log.info("文件助手:{},", chatMsg.getContent()); } return 1; }, WxMsgType.收到转账之后或者文件助手等信息); add(chatMsg -> { - if("filehelper".equals(chatMsg.getFromUser())){ + if (FILEHELPER.equals(chatMsg.getFromUser())) { Document document = XmlUtil.parseXml(chatMsg.getContent()); Element documentElement = document.getDocumentElement(); String username = documentElement.getAttribute("username"); - String alias = documentElement.getAttribute("alias"); - HttpSendUtil.发送文本(username); + if (StrUtil.isNotBlank(username)) { + HttpSendUtil.发送文本(username); + } } return 1; }, WxMsgType.收到名片); @@ -71,8 +88,6 @@ public class WxMsgHandle { boolean f = 解析扫码支付第一段(chatMsg); return null; }, WxMsgType.扫码触发); - - } /** @@ -93,7 +108,7 @@ public class WxMsgHandle { if (outtradeno.getLength() > 0) { String textContent = outtradeno.item(0).getTextContent(); String textContent1 = documentElement.getElementsByTagName("username").item(0).getTextContent(); - cache.put(textContent, textContent1); + collection_code_caching.put(textContent, textContent1); return false; } } @@ -120,7 +135,7 @@ public class WxMsgHandle { NodeList outtradeno = documentElement.getElementsByTagName("weapp_path"); if (outtradeno.getLength() > 1) { String textContent = outtradeno.item(1).getTextContent(); - Set> entries = cache.entrySet(); + Set> entries = collection_code_caching.entrySet(); Iterator> iterator = entries.iterator(); while (iterator.hasNext()) { Map.Entry next = iterator.next(); diff --git a/java_client/src/main/java/com/example/wxhk/tcp/vertx/ArrHandle.java b/java_client/src/main/java/com/example/wxhk/tcp/vertx/ArrHandle.java index d9fa2f6..67943a1 100644 --- a/java_client/src/main/java/com/example/wxhk/tcp/vertx/ArrHandle.java +++ b/java_client/src/main/java/com/example/wxhk/tcp/vertx/ArrHandle.java @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit; @Component public class ArrHandle { - public static final ThreadPoolExecutor sub = new ThreadPoolExecutor(1, 10, 30, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new NamedThreadFactory("sub", false)); + public static final ThreadPoolExecutor sub = new ThreadPoolExecutor(4, 10, 30, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new NamedThreadFactory("sub", false)); public static final ThreadLocal chatMsgThreadLocal = new InheritableThreadLocal<>(); protected static final Log log = Log.get(); @@ -37,7 +37,7 @@ public class ArrHandle { @PostConstruct public void exec() { - for (int i = 0; i < sub.getCorePoolSize(); i++) { + for (int i = 0; i < sub.getCorePoolSize()-1; i++) { sub.submit(() -> { while (!Thread.currentThread().isInterrupted()) { try { @@ -59,6 +59,26 @@ public class ArrHandle { log.error("退出线程了"); }); } + sub.submit(() -> { + while (!Thread.currentThread().isInterrupted()) { + try { + JsonObject take = VertxTcp.LINKED_BLOCKING_QUEUE_MON.take(); + log.info("{}", take.encode()); + PrivateChatMsg privateChatMsg = take.mapTo(PrivateChatMsg.class); + chatMsgThreadLocal.set(privateChatMsg); + if ("weixin".equals(privateChatMsg.getFromUser())) { + String s = HttpSendUtil.获取当前登陆微信id(); + InitWeChat.WXID_MAP.add(s); + continue; + } + WxMsgHandle.exec(privateChatMsg); + chatMsgThreadLocal.remove(); + } catch (Exception e) { + log.error(e); + } + } + log.error("退出线程了"); + }); } diff --git a/java_client/src/main/java/com/example/wxhk/tcp/vertx/VertxTcp.java b/java_client/src/main/java/com/example/wxhk/tcp/vertx/VertxTcp.java index 4a9bd8b..069f603 100644 --- a/java_client/src/main/java/com/example/wxhk/tcp/vertx/VertxTcp.java +++ b/java_client/src/main/java/com/example/wxhk/tcp/vertx/VertxTcp.java @@ -1,6 +1,7 @@ package com.example.wxhk.tcp.vertx; import com.example.wxhk.WxhkApplication; +import com.example.wxhk.constant.WxMsgType; import com.example.wxhk.util.HttpAsyncUtil; import io.vertx.core.AbstractVerticle; import io.vertx.core.DeploymentOptions; @@ -15,6 +16,7 @@ import org.springframework.boot.CommandLineRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; +import java.util.Objects; import java.util.concurrent.LinkedBlockingQueue; /** @@ -27,6 +29,10 @@ import java.util.concurrent.LinkedBlockingQueue; @Order() public class VertxTcp extends AbstractVerticle implements CommandLineRunner { public final static LinkedBlockingQueue LINKED_BLOCKING_QUEUE = new LinkedBlockingQueue<>(); + /** + * 这个只保留交易相关的类型 + */ + public final static LinkedBlockingQueue LINKED_BLOCKING_QUEUE_MON = new LinkedBlockingQueue<>(); protected static final Log log = Log.get(); NetServer netServer; @@ -51,7 +57,14 @@ public class VertxTcp extends AbstractVerticle implements CommandLineRunner { case END_ARRAY -> { } case VALUE -> { - LINKED_BLOCKING_QUEUE.add(event.objectValue()); + JsonObject entries = event.objectValue(); + + if(Objects.equals(entries.getInteger("type"), WxMsgType.扫码触发.getType()) || + Objects.equals(entries.getInteger("type"), WxMsgType.转账和收款.getType())){ + LINKED_BLOCKING_QUEUE_MON.add(entries); + }else{ + LINKED_BLOCKING_QUEUE.add(entries); + } } } });