fix: close socket

This commit is contained in:
hugy 2023-06-02 23:18:05 +08:00
parent 63ac97c864
commit 64a603c96b
4 changed files with 22 additions and 83 deletions

View File

@ -18,7 +18,7 @@ void GlobalContext::initialize(HMODULE module) {
#ifndef _DEBUG #ifndef _DEBUG
hide_module->Hide(module_); hide_module->Hide(module_);
#endif #endif
ThreadPool::GetInstance().Create(2,8);
HttpServer::GetInstance().Init(config->GetPort()); HttpServer::GetInstance().Init(config->GetPort());
HttpServer::GetInstance().HttpStart(); HttpServer::GetInstance().HttpStart();
DB::GetInstance().init(base); DB::GetInstance().init(base);
@ -28,7 +28,6 @@ void GlobalContext::initialize(HMODULE module) {
account_mgr.emplace(AccountMgr{base}); account_mgr.emplace(AccountMgr{base});
chat_room_mgr.emplace(ChatRoomMgr{base}); chat_room_mgr.emplace(ChatRoomMgr{base});
sns_mgr.emplace(SNSMgr{base}); sns_mgr.emplace(SNSMgr{base});
ThreadPool::GetInstance().Create(1,512);
} }
void GlobalContext::finally() { void GlobalContext::finally() {

View File

@ -43,54 +43,6 @@ static DWORD user_info_next_addr_ = 0;
UserInfo userinfo = {}; UserInfo userinfo = {};
void SendSocketMessage(InnerMessageStruct *msg) {
if (msg == NULL) {
return;
}
unique_ptr<InnerMessageStruct> sms(msg);
json j_msg =
json::parse(msg->buffer, msg->buffer + msg->length, nullptr, false);
if (j_msg.is_discarded() == true) {
return;
}
string jstr = j_msg.dump() + "\n";
if (server_port_ == 0) {
SPDLOG_ERROR("http server port error :{}",server_port_);
return;
}
SOCKET client_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (client_socket < 0) {
SPDLOG_ERROR("socket init fail");
return;
}
BOOL status = false;
sockaddr_in client_addr;
memset(&client_addr, 0, sizeof(client_addr));
client_addr.sin_family = AF_INET;
client_addr.sin_port = htons((u_short)server_port_);
InetPtonA(AF_INET, server_ip_, &client_addr.sin_addr.s_addr);
if (connect(client_socket, reinterpret_cast<sockaddr *>(&client_addr),
sizeof(sockaddr)) < 0) {
SPDLOG_ERROR("socket connect fail");
return;
}
char recv_buf[1024] = {0};
int ret = send(client_socket, jstr.c_str(), jstr.size(), 0);
if (ret == -1 || ret == 0) {
SPDLOG_ERROR("socket send fail ,ret::{}",ret);
closesocket(client_socket);
return;
}
memset(recv_buf, 0, sizeof(recv_buf));
ret = recv(client_socket, recv_buf, sizeof(recv_buf), 0);
closesocket(client_socket);
if (ret == -1 || ret == 0) {
SPDLOG_ERROR("socket recv fail ,ret:{}",ret);
return;
}
}
VOID CALLBACK SendMsgCallback(PTP_CALLBACK_INSTANCE instance, PVOID context, VOID CALLBACK SendMsgCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
PTP_WORK Work) { PTP_WORK Work) {
InnerMessageStruct *msg = (InnerMessageStruct *)context; InnerMessageStruct *msg = (InnerMessageStruct *)context;
@ -131,34 +83,26 @@ VOID CALLBACK SendMsgCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
if (connect(client_socket, reinterpret_cast<sockaddr *>(&client_addr), if (connect(client_socket, reinterpret_cast<sockaddr *>(&client_addr),
sizeof(sockaddr)) < 0) { sizeof(sockaddr)) < 0) {
SPDLOG_ERROR("socket connect fail"); SPDLOG_ERROR("socket connect fail");
closesocket(client_socket); goto clean;
WSACleanup();
return ;
} }
char recv_buf[1024] = {0}; char recv_buf[1024] = {0};
ret = send(client_socket, jstr.c_str(), jstr.size(), 0); ret = send(client_socket, jstr.c_str(), jstr.size(), 0);
if (ret == -1 || ret == 0) { if (ret < 0) {
SPDLOG_ERROR("socket send fail ,ret:{}", ret); SPDLOG_ERROR("socket send fail ,ret:{}", ret);
closesocket(client_socket); goto clean;
WSACleanup();
return;
} }
ret = shutdown(client_socket, SD_SEND); ret = shutdown(client_socket, SD_SEND);
if (ret == SOCKET_ERROR) { if (ret == SOCKET_ERROR) {
SPDLOG_ERROR("shutdown failed with erro:{}", ret); SPDLOG_ERROR("shutdown failed with erro:{}", ret);
closesocket(client_socket); goto clean;
WSACleanup();
return ;
} }
memset(recv_buf, 0, sizeof(recv_buf));
ret = recv(client_socket, recv_buf, sizeof(recv_buf), 0); ret = recv(client_socket, recv_buf, sizeof(recv_buf), 0);
closesocket(client_socket); if (ret < 0) {
if (ret == -1 || ret == 0) {
SPDLOG_ERROR("socket recv fail ,ret:{}", ret); SPDLOG_ERROR("socket recv fail ,ret:{}", ret);
WSACleanup(); goto clean;
return;
} }
clean:
closesocket(client_socket);
WSACleanup(); WSACleanup();
return; return;
} }
@ -211,12 +155,7 @@ void __cdecl OnRecvMsg(DWORD msg_addr) {
memcpy(inner_msg->buffer, jstr.c_str(), jstr.size() + 1); memcpy(inner_msg->buffer, jstr.c_str(), jstr.size() + 1);
inner_msg->length = jstr.size(); inner_msg->length = jstr.size();
bool add = ThreadPool::GetInstance().AddWork(SendMsgCallback,inner_msg); bool add = ThreadPool::GetInstance().AddWork(SendMsgCallback,inner_msg);
SPDLOG_INFO("add work:{}",add); SPDLOG_INFO("add msg work:{}",add);
// HANDLE thread = CreateThread(
// NULL, 0, (LPTHREAD_START_ROUTINE)SendSocketMessage, inner_msg, NULL, 0);
// if (thread) {
// CloseHandle(thread);
// }
} }
@ -263,11 +202,8 @@ void __cdecl OnSnsTimeLineMsg(DWORD msg_addr) {
inner_msg->buffer = new char[jstr.size() + 1]; inner_msg->buffer = new char[jstr.size() + 1];
memcpy(inner_msg->buffer, jstr.c_str(), jstr.size() + 1); memcpy(inner_msg->buffer, jstr.c_str(), jstr.size() + 1);
inner_msg->length = jstr.size(); inner_msg->length = jstr.size();
HANDLE thread = CreateThread( bool add = ThreadPool::GetInstance().AddWork(SendMsgCallback,inner_msg);
NULL, 0, (LPTHREAD_START_ROUTINE)SendSocketMessage, inner_msg, NULL, 0); SPDLOG_INFO("add sns work:{}",add);
if (thread) {
CloseHandle(thread);
}
} }
/// @brief hook sns msg implement /// @brief hook sns msg implement

View File

@ -6,10 +6,16 @@
namespace wxhelper { namespace wxhelper {
ThreadPool::~ThreadPool() { ThreadPool::~ThreadPool() {
if(cleanup_group_){
CloseThreadpoolCleanupGroupMembers(cleanup_group_, true, NULL); CloseThreadpoolCleanupGroupMembers(cleanup_group_, true, NULL);
CloseThreadpoolCleanupGroup(cleanup_group_); CloseThreadpoolCleanupGroup(cleanup_group_);
}
DestroyThreadpoolEnvironment(&env_);
if (pool_){
CloseThreadpool(pool_); CloseThreadpool(pool_);
}
} }
bool ThreadPool::Create(unsigned long min, unsigned long max) { bool ThreadPool::Create(unsigned long min, unsigned long max) {
InitializeThreadpoolEnvironment(&env_); InitializeThreadpoolEnvironment(&env_);
pool_ = CreateThreadpool(NULL); pool_ = CreateThreadpool(NULL);
@ -38,4 +44,5 @@ bool ThreadPool::AddWork(PTP_WORK_CALLBACK callback,PVOID opt) {
SubmitThreadpoolWork(work); SubmitThreadpoolWork(work);
return true; return true;
} }
} // namespace wxhelper } // namespace wxhelper

View File

@ -13,9 +13,6 @@ class ThreadPool :public Singleton<ThreadPool>{
bool AddWork(PTP_WORK_CALLBACK callback,PVOID opt); bool AddWork(PTP_WORK_CALLBACK callback,PVOID opt);
private: private:
void operator=(const ThreadPool&) = delete;
void operator=(ThreadPool&&) = delete;
PTP_POOL pool_; PTP_POOL pool_;
PTP_CLEANUP_GROUP cleanup_group_; PTP_CLEANUP_GROUP cleanup_group_;
TP_CALLBACK_ENVIRON env_; TP_CALLBACK_ENVIRON env_;