diff --git a/src/api/controllers/chat.ts b/src/api/controllers/chat.ts index e09dbfb..63a4dfb 100644 --- a/src/api/controllers/chat.ts +++ b/src/api/controllers/chat.ts @@ -132,6 +132,7 @@ async function createCompletion( params: { "fileUploadBatchId": util.uuid(), "searchType": searchType, + "deepThink": model === "qwq", }, contents: messagesPrepare(messages, refs, !!refConvId), }) @@ -225,6 +226,7 @@ async function createCompletionStream( params: { "fileUploadBatchId": util.uuid(), "searchType": searchType, + "deepThink": model === "qwq", }, contents: messagesPrepare(messages, refs, !!refConvId), }) @@ -444,13 +446,22 @@ async function receiveStream(stream: any): Promise { choices: [ { index: 0, - message: { role: "assistant", content: "" }, + message: { + role: "assistant", + content: "", + reasoning_content: "" // 存储think类型内容 + }, finish_reason: "stop", }, ], usage: { prompt_tokens: 1, completion_tokens: 1, total_tokens: 2 }, created: util.unixTimestamp(), }; + + // 用于跟踪已处理的think内容 + let processedThinkContent = ""; + let hasCompletedThinking = false; // 添加标志,标记是否已经完成了完整的思考内容处理 + const parser = createParser((event) => { try { if (event.type !== "event") return; @@ -461,12 +472,59 @@ async function receiveStream(stream: any): Promise { throw new Error(`Stream response invalid: ${event.data}`); if (!data.id && result.sessionId && result.msgId) data.id = `${result.sessionId}-${result.msgId}`; + + // 处理think类型内容 + if (result.contentType === "think") { + // 如果已经处理过完整的思考内容,则不再处理 + if (result.msgStatus === "finished" && hasCompletedThinking) { + // 跳过重复的完整思考内容 + } else { + const thinkContents = (result.contents || []).filter(part => part.contentType === "think"); + for (const part of thinkContents) { + try { + const thinkContent = JSON.parse(part.content); + if (thinkContent && thinkContent.content) { + // 只添加新的内容,避免重复 + const newContent = thinkContent.content; + if (!processedThinkContent.includes(newContent)) { + // 如果是增量更新,只添加新的部分 + if (part.incremental && processedThinkContent) { + const uniquePart = newContent.substring(processedThinkContent.length); + if (uniquePart) { + data.choices[0].message.reasoning_content += uniquePart; + processedThinkContent = newContent; + } + } else { + data.choices[0].message.reasoning_content += newContent; + processedThinkContent = newContent; + } + } + + // 如果状态是finished,标记思考内容已完成 + if (part.status === "finished") { + hasCompletedThinking = true; + } + } + } catch (e) { + // 如果JSON解析失败,直接添加原始内容 + if (!processedThinkContent.includes(part.content)) { + data.choices[0].message.reasoning_content += part.content; + processedThinkContent += part.content; + } + } + } + } + } + + // 处理text类型内容 const text = (result.contents || []).reduce((str, part) => { const { contentType, role, content } = part; if (contentType != "text" && contentType != "text2image") return str; if (role != "assistant" && !_.isString(content)) return str; return str + content; }, ""); + + // ... 其余代码保持不变 ... const exceptCharIndex = text.indexOf("�"); let chunk = text.substring( exceptCharIndex != -1 @@ -523,6 +581,10 @@ function createTransStream(stream: any, endCallback?: Function) { // 创建转换流 const transStream = new PassThrough(); let content = ""; + let reasoningContent = ""; // 存储已处理的think内容 + let hasCompletedThinking = false; // 添加标志,标记是否已经完成了完整的思考内容处理 + + // 初始化响应 !transStream.closed && transStream.write( `data: ${JSON.stringify({ @@ -532,13 +594,14 @@ function createTransStream(stream: any, endCallback?: Function) { choices: [ { index: 0, - delta: { role: "assistant", content: "" }, + delta: { role: "assistant", content: "", reasoning_content: "" }, finish_reason: null, }, ], created, })}\n\n` ); + const parser = createParser((event) => { try { if (event.type !== "event") return; @@ -547,12 +610,76 @@ function createTransStream(stream: any, endCallback?: Function) { const result = _.attempt(() => JSON.parse(event.data)); if (_.isError(result)) throw new Error(`Stream response invalid: ${event.data}`); + + // 处理think类型内容 + if (result.contentType === "think") { + // 如果已经处理过完整的思考内容,则不再处理 + if (result.msgStatus === "finished" && hasCompletedThinking) { + // 跳过重复的完整思考内容 + } else { + const thinkContents = (result.contents || []).filter(part => part.contentType === "think"); + let newReasoningChunk = ""; + + for (const part of thinkContents) { + try { + const thinkContent = JSON.parse(part.content); + if (thinkContent && thinkContent.content) { + const fullThinkContent = thinkContent.content; + + // 确定新增的内容部分 + let newPart = ""; + if (part.incremental && reasoningContent && fullThinkContent.startsWith(reasoningContent)) { + // 如果是增量更新且内容是以前面内容开头,只取新增部分 + newPart = fullThinkContent.substring(reasoningContent.length); + } else if (!reasoningContent.includes(fullThinkContent)) { + // 如果是全新内容 + newPart = fullThinkContent; + } + + if (newPart) { + newReasoningChunk += newPart; + reasoningContent = fullThinkContent; // 更新已处理内容 + } + + // 如果状态是finished,标记思考内容已完成 + if (part.status === "finished") { + hasCompletedThinking = true; + } + } + } catch (e) { + // JSON解析失败,检查是否有新内容 + const rawContent = part.content; + if (!reasoningContent.includes(rawContent)) { + newReasoningChunk += rawContent; + reasoningContent += rawContent; + } + } + } + + // 如果有新的推理内容,发送增量更新 + if (newReasoningChunk) { + const data = `data: ${JSON.stringify({ + id: `${result.sessionId}-${result.msgId}`, + model: MODEL_NAME, + object: "chat.completion.chunk", + choices: [ + { index: 0, delta: { reasoning_content: newReasoningChunk }, finish_reason: null }, + ], + created, + })}\n\n`; + !transStream.closed && transStream.write(data); + } + } + } + + // 处理text类型内容 const text = (result.contents || []).reduce((str, part) => { const { contentType, role, content } = part; if (contentType != "text" && contentType != "text2image") return str; if (role != "assistant" && !_.isString(content)) return str; return str + content; }, ""); + const exceptCharIndex = text.indexOf("�"); let chunk = text.substring( exceptCharIndex != -1 @@ -560,6 +687,7 @@ function createTransStream(stream: any, endCallback?: Function) { : content.length, exceptCharIndex == -1 ? text.length : exceptCharIndex ); + if (chunk && result.contentType == "text2image") { chunk = chunk.replace( /https?:\/\/[-a-zA-Z0-9@:%._\+~#=]{2,256}\.[a-z]{2,6}\b([-a-zA-Z0-9@:%_\+.~#?&//=\,]*)/gi, @@ -570,6 +698,7 @@ function createTransStream(stream: any, endCallback?: Function) { } ); } + if (result.msgStatus != "finished") { if (chunk && result.contentType == "text") { content += chunk; @@ -607,15 +736,15 @@ function createTransStream(stream: any, endCallback?: Function) { !transStream.closed && transStream.write(data); !transStream.closed && transStream.end("data: [DONE]\n\n"); content = ""; + reasoningContent = ""; endCallback && endCallback(result.sessionId); } - // else - // logger.warn(result.event, result); } catch (err) { logger.error(err); !transStream.closed && transStream.end("\n\n"); } }); + // 将流数据喂给SSE转换器 stream.on("data", (buffer) => parser.feed(buffer.toString())); stream.once(