解析名片

This commit is contained in:
王涛 2023-06-01 17:24:20 +08:00
parent 292b9da378
commit e39e4362fe
4 changed files with 66 additions and 16 deletions

View File

@ -13,4 +13,6 @@ com.example.wxhk.tcp.vertx.InitWeChat 微信环境初始化
com.example.wxhk.tcp.vertx.ArrHandle 循环消息处理 com.example.wxhk.tcp.vertx.ArrHandle 循环消息处理
<b>对于收款码收款的,因为收款码会产生3次事件,其中第二次有交易id和里面有付款人信息,第三次里面有收款金额和交易id,如果要使用多线程处理消息,则需要额外想办法处理这个数据,目前我的办法是对于扫码收款的这几个type类型单独做一个线程处理,其他的多线程处理</b>
启动项目需要去修改配置文件的微信路径 启动项目需要去修改配置文件的微信路径

View File

@ -7,6 +7,8 @@ import com.example.wxhk.util.HttpAsyncUtil;
import com.example.wxhk.util.HttpSendUtil; import com.example.wxhk.util.HttpSendUtil;
import com.example.wxhk.util.HttpSyncUtil; import com.example.wxhk.util.HttpSyncUtil;
import io.vertx.core.json.JsonObject; import io.vertx.core.json.JsonObject;
import jakarta.annotation.PostConstruct;
import org.dromara.hutool.core.text.StrUtil;
import org.dromara.hutool.core.util.XmlUtil; import org.dromara.hutool.core.util.XmlUtil;
import org.dromara.hutool.log.Log; import org.dromara.hutool.log.Log;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -21,15 +23,29 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@Component @Component
public class WxMsgHandle { public class WxMsgHandle {
public static final ConcurrentHashMap<Integer, Handle> map = new ConcurrentHashMap<>(32); public static final ConcurrentHashMap<Integer, Handle> map = new ConcurrentHashMap<>(32);
protected static final Log log = Log.get(); protected static final Log log = Log.get();
public static ConcurrentHashMap<String, String> cache = new ConcurrentHashMap<>(); /**
* 文件传输助手
*/
public static final String FILEHELPER = "filehelper";
/**
* 收款码缓存 因为有2段信息,一段是交易id,里面可以解析出来源方,二段解析出金额
*/
public static ConcurrentHashMap<String, String> collection_code_caching = new ConcurrentHashMap<>();
/**
*
*/
public static final ReentrantReadWriteLock LOOK = new ReentrantReadWriteLock();
public WxMsgHandle() { @PostConstruct
public void init() {
add(chatMsg -> { add(chatMsg -> {
if (Objects.equals(chatMsg.getIsSendMsg(), 1) && Objects.equals(chatMsg.getIsSendByPhone(), 1)) { if (Objects.equals(chatMsg.getIsSendMsg(), 1) && Objects.equals(chatMsg.getIsSendByPhone(), 1)) {
log.info("手机端对:{}发出:{}", chatMsg.getFromUser(), chatMsg.getContent()); log.info("手机端对:{}发出:{}", chatMsg.getFromUser(), chatMsg.getContent());
@ -38,19 +54,20 @@ public class WxMsgHandle {
return 1; return 1;
}, WxMsgType.私聊信息); }, WxMsgType.私聊信息);
add(chatMsg -> { add(chatMsg -> {
if("filehelper".equals(chatMsg.getFromUser())){ if (FILEHELPER.equals(chatMsg.getFromUser())) {
log.info("文件助手:{},", chatMsg.getContent()); log.info("文件助手:{},", chatMsg.getContent());
} }
return 1; return 1;
}, WxMsgType.收到转账之后或者文件助手等信息); }, WxMsgType.收到转账之后或者文件助手等信息);
add(chatMsg -> { add(chatMsg -> {
if("filehelper".equals(chatMsg.getFromUser())){ if (FILEHELPER.equals(chatMsg.getFromUser())) {
Document document = XmlUtil.parseXml(chatMsg.getContent()); Document document = XmlUtil.parseXml(chatMsg.getContent());
Element documentElement = document.getDocumentElement(); Element documentElement = document.getDocumentElement();
String username = documentElement.getAttribute("username"); String username = documentElement.getAttribute("username");
String alias = documentElement.getAttribute("alias"); if (StrUtil.isNotBlank(username)) {
HttpSendUtil.发送文本(username); HttpSendUtil.发送文本(username);
} }
}
return 1; return 1;
}, WxMsgType.收到名片); }, WxMsgType.收到名片);
add(chatMsg -> { add(chatMsg -> {
@ -71,8 +88,6 @@ public class WxMsgHandle {
boolean f = 解析扫码支付第一段(chatMsg); boolean f = 解析扫码支付第一段(chatMsg);
return null; return null;
}, WxMsgType.扫码触发); }, WxMsgType.扫码触发);
} }
/** /**
@ -93,7 +108,7 @@ public class WxMsgHandle {
if (outtradeno.getLength() > 0) { if (outtradeno.getLength() > 0) {
String textContent = outtradeno.item(0).getTextContent(); String textContent = outtradeno.item(0).getTextContent();
String textContent1 = documentElement.getElementsByTagName("username").item(0).getTextContent(); String textContent1 = documentElement.getElementsByTagName("username").item(0).getTextContent();
cache.put(textContent, textContent1); collection_code_caching.put(textContent, textContent1);
return false; return false;
} }
} }
@ -120,7 +135,7 @@ public class WxMsgHandle {
NodeList outtradeno = documentElement.getElementsByTagName("weapp_path"); NodeList outtradeno = documentElement.getElementsByTagName("weapp_path");
if (outtradeno.getLength() > 1) { if (outtradeno.getLength() > 1) {
String textContent = outtradeno.item(1).getTextContent(); String textContent = outtradeno.item(1).getTextContent();
Set<Map.Entry<String, String>> entries = cache.entrySet(); Set<Map.Entry<String, String>> entries = collection_code_caching.entrySet();
Iterator<Map.Entry<String, String>> iterator = entries.iterator(); Iterator<Map.Entry<String, String>> iterator = entries.iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
Map.Entry<String, String> next = iterator.next(); Map.Entry<String, String> next = iterator.next();

View File

@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit;
@Component @Component
public class ArrHandle { public class ArrHandle {
public static final ThreadPoolExecutor sub = new ThreadPoolExecutor(1, 10, 30, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new NamedThreadFactory("sub", false)); public static final ThreadPoolExecutor sub = new ThreadPoolExecutor(4, 10, 30, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new NamedThreadFactory("sub", false));
public static final ThreadLocal<PrivateChatMsg> chatMsgThreadLocal = new InheritableThreadLocal<>(); public static final ThreadLocal<PrivateChatMsg> chatMsgThreadLocal = new InheritableThreadLocal<>();
protected static final Log log = Log.get(); protected static final Log log = Log.get();
@ -37,7 +37,7 @@ public class ArrHandle {
@PostConstruct @PostConstruct
public void exec() { public void exec() {
for (int i = 0; i < sub.getCorePoolSize(); i++) { for (int i = 0; i < sub.getCorePoolSize()-1; i++) {
sub.submit(() -> { sub.submit(() -> {
while (!Thread.currentThread().isInterrupted()) { while (!Thread.currentThread().isInterrupted()) {
try { try {
@ -59,6 +59,26 @@ public class ArrHandle {
log.error("退出线程了"); log.error("退出线程了");
}); });
} }
sub.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
JsonObject take = VertxTcp.LINKED_BLOCKING_QUEUE_MON.take();
log.info("{}", take.encode());
PrivateChatMsg privateChatMsg = take.mapTo(PrivateChatMsg.class);
chatMsgThreadLocal.set(privateChatMsg);
if ("weixin".equals(privateChatMsg.getFromUser())) {
String s = HttpSendUtil.获取当前登陆微信id();
InitWeChat.WXID_MAP.add(s);
continue;
}
WxMsgHandle.exec(privateChatMsg);
chatMsgThreadLocal.remove();
} catch (Exception e) {
log.error(e);
}
}
log.error("退出线程了");
});
} }

View File

@ -1,6 +1,7 @@
package com.example.wxhk.tcp.vertx; package com.example.wxhk.tcp.vertx;
import com.example.wxhk.WxhkApplication; import com.example.wxhk.WxhkApplication;
import com.example.wxhk.constant.WxMsgType;
import com.example.wxhk.util.HttpAsyncUtil; import com.example.wxhk.util.HttpAsyncUtil;
import io.vertx.core.AbstractVerticle; import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions; import io.vertx.core.DeploymentOptions;
@ -15,6 +16,7 @@ import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
/** /**
@ -27,6 +29,10 @@ import java.util.concurrent.LinkedBlockingQueue;
@Order() @Order()
public class VertxTcp extends AbstractVerticle implements CommandLineRunner { public class VertxTcp extends AbstractVerticle implements CommandLineRunner {
public final static LinkedBlockingQueue<JsonObject> LINKED_BLOCKING_QUEUE = new LinkedBlockingQueue<>(); public final static LinkedBlockingQueue<JsonObject> LINKED_BLOCKING_QUEUE = new LinkedBlockingQueue<>();
/**
* 这个只保留交易相关的类型
*/
public final static LinkedBlockingQueue<JsonObject> LINKED_BLOCKING_QUEUE_MON = new LinkedBlockingQueue<>();
protected static final Log log = Log.get(); protected static final Log log = Log.get();
NetServer netServer; NetServer netServer;
@ -51,7 +57,14 @@ public class VertxTcp extends AbstractVerticle implements CommandLineRunner {
case END_ARRAY -> { case END_ARRAY -> {
} }
case VALUE -> { case VALUE -> {
LINKED_BLOCKING_QUEUE.add(event.objectValue()); JsonObject entries = event.objectValue();
if(Objects.equals(entries.getInteger("type"), WxMsgType.扫码触发.getType()) ||
Objects.equals(entries.getInteger("type"), WxMsgType.转账和收款.getType())){
LINKED_BLOCKING_QUEUE_MON.add(entries);
}else{
LINKED_BLOCKING_QUEUE.add(entries);
}
} }
} }
}); });