Merge e2e47798169ad007b74b8a85a829baea3805f711 into 3b8bf35c651fbd14b4b982d5227b48f7aa1a9be4

This commit is contained in:
Clivia 2025-01-25 03:24:50 +00:00 committed by GitHub
commit edbe449df6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 476 additions and 165 deletions

View File

@ -87,7 +87,7 @@ async function requestToken(refreshToken: string) {
timeout: 15000,
validateStatus: () => true
});
if(!userResult.data.id)
if (!userResult.data.id)
throw new APIException(EX.API_REQUEST_FAILED, '获取用户信息失败');
return {
userId: userResult.data.id,
@ -317,7 +317,7 @@ async function createCompletion(model = MODEL_NAME, messages: any[], refreshToke
.catch(err => logger.error(err));
tokenSize(sendMessages[0].content, refs, refreshToken, convId)
.catch(err => logger.error(err));
const isMath = model.indexOf('math') != -1;
const isSearchModel = model.indexOf('search') != -1;
const isResearchModel = model.indexOf('research') != -1;
@ -325,22 +325,22 @@ async function createCompletion(model = MODEL_NAME, messages: any[], refreshToke
logger.info(`使用模型: ${model},是否联网检索: ${isSearchModel},是否探索版: ${isResearchModel}是否K1模型: ${isK1Model},是否数学模型: ${isMath}`);
if(segmentId)
if (segmentId)
logger.info(`继续请求segmentId: ${segmentId}`);
// 检查探索版使用量
if(isResearchModel) {
if (isResearchModel) {
const {
total,
used
} = await getResearchUsage(refreshToken);
if(used >= total)
if (used >= total)
throw new APIException(EX.API_RESEARCH_EXCEEDS_LIMIT, `探索版使用量已达到上限`);
logger.info(`探索版当前额度: ${used}/${total}`);
}
const kimiplusId = isK1Model ? 'crm40ee9e5jvhsn7ptcg' : (/^[0-9a-z]{20}$/.test(model) ? model : 'kimi');
// 请求补全流
const stream = await request('POST', `/api/chat/${convId}/completion/stream`, refreshToken, {
data: segmentId ? {
@ -368,14 +368,14 @@ async function createCompletion(model = MODEL_NAME, messages: any[], refreshToke
const streamStartTime = util.timestamp();
// 接收流为输出文本
const answer = await receiveStream(model, convId, stream);
const answer = await receiveStream(model, convId, refreshToken, stream);
// 如果上次请求生成长度超限,则继续请求
if(answer.choices[0].finish_reason == 'length' && answer.segment_id) {
if (answer.choices[0].finish_reason == 'length' && answer.segment_id) {
const continueAnswer = await createCompletion(model, [], refreshToken, convId, retryCount, answer.segment_id);
answer.choices[0].message.content += continueAnswer.choices[0].message.content;
}
logger.success(`Stream has completed transfer ${util.timestamp() - streamStartTime}ms`);
// 异步移除会话,如果消息不合规,此操作可能会抛出数据库错误异常,请忽略
@ -443,7 +443,7 @@ async function createCompletionStream(model = MODEL_NAME, messages: any[], refre
.catch(err => logger.error(err));
tokenSize(sendMessages[0].content, refs, refreshToken, convId)
.catch(err => logger.error(err));
const isMath = model.indexOf('math') != -1;
const isSearchModel = model.indexOf('search') != -1;
const isResearchModel = model.indexOf('research') != -1;
@ -452,12 +452,12 @@ async function createCompletionStream(model = MODEL_NAME, messages: any[], refre
logger.info(`使用模型: ${model},是否联网检索: ${isSearchModel},是否探索版: ${isResearchModel}是否K1模型: ${isK1Model},是否数学模型: ${isMath}`);
// 检查探索版使用量
if(isResearchModel) {
if (isResearchModel) {
const {
total,
used
} = await getResearchUsage(refreshToken);
if(used >= total)
if (used >= total)
throw new APIException(EX.API_RESEARCH_EXCEEDS_LIMIT, `探索版使用量已达到上限`);
logger.info(`探索版当前额度: ${used}/${total}`);
}
@ -484,7 +484,7 @@ async function createCompletionStream(model = MODEL_NAME, messages: any[], refre
const streamStartTime = util.timestamp();
// 创建转换流将消息格式转换为gpt兼容格式
return createTransStream(model, convId, stream, () => {
return createTransStream(model, convId, stream, refreshToken, () => {
logger.success(`Stream has completed transfer ${util.timestamp() - streamStartTime}ms`);
// 流传输结束后异步移除会话,如果消息不合规,此操作可能会抛出数据库错误异常,请忽略
// 如果引用会话将不会清除,因为我们不知道什么时候你会结束会话
@ -834,86 +834,271 @@ function checkResult(result: AxiosResponse, refreshToken: string) {
}
/**
*
*
*
* @param model
* @param text_buffer
* @param refs
* @param convId ID
* @param stream
* @param sid SID
* @param refreshToken access_token的refresh_token
* @param request
* @param logger
*/
async function receiveStream(model: string, convId: string, stream: any): Promise<IStreamMessage> {
let webSearchCount = 0;
let temp = Buffer.from('');
return new Promise((resolve, reject) => {
// 消息初始化
const data = {
id: convId,
model,
object: 'chat.completion',
choices: [
{ index: 0, message: { role: 'assistant', content: '' }, finish_reason: 'stop' }
],
usage: { prompt_tokens: 1, completion_tokens: 1, total_tokens: 2 },
segment_id: '',
created: util.unixTimestamp()
};
let refContent = '';
const silentSearch = model.indexOf('silent') != -1;
const parser = createParser(event => {
try {
if (event.type !== "event") return;
// 解析JSON
const result = _.attempt(() => JSON.parse(event.data));
if (_.isError(result))
throw new Error(`Stream response invalid: ${event.data}`);
// 处理消息
if (result.event == 'cmpl' && result.text) {
data.choices[0].message.content += result.text;
}
// 处理请求ID
else if(result.event == 'req') {
data.segment_id = result.id;
}
// 处理超长文本
else if(result.event == 'length') {
logger.warn('此次生成达到max_tokens稍候将继续请求拼接完整响应');
data.choices[0].finish_reason = 'length';
}
// 处理结束或错误
else if (result.event == 'all_done' || result.event == 'error') {
data.choices[0].message.content += (result.event == 'error' ? '\n[内容由于不合规被停止生成,我们换个话题吧]' : '') + (refContent ? `\n\n搜索结果来自\n${refContent}` : '');
refContent = '';
resolve(data);
}
// 处理联网搜索
else if (!silentSearch && result.event == 'search_plus' && result.msg && result.msg.type == 'get_res') {
webSearchCount += 1;
refContent += `【检索 ${webSearchCount}】 [${result.msg.title}](${result.msg.url})\n\n`;
}
// else
// logger.warn(result.event, result);
async function processReferences(text_buffer, refs, convId, sid, refreshToken, request, logger) {
const findRefUrl = (refId, refs, logger) => {
for (const ref of refs) {
if (ref.ref_id == refId) {
logger.debug(`ref_id: ${ref.ref_id}, match: ${refId}, url: ${ref.ref_doc.url}`);
return ref.ref_doc.url;
}
catch (err) {
logger.error(err);
}
return null;
};
let newRefs = [...refs];
let resultText = "";
let lastIndex = 0;
logger.debug(`text_buffer: ${text_buffer}`);
for (const match of text_buffer.matchAll(/\[[^\]]+\]/g)) {
const matchText = match[0];
resultText += text_buffer.substring(lastIndex, match.index);
lastIndex = match.index + matchText.length;
if (/^\[\^\d+\^\]$/.test(matchText)) {
const refId = matchText.slice(2, -2);
let is_search_url = findRefUrl(refId, newRefs, logger);
if (!is_search_url) {
try {
const res = await request('POST', `/api/chat/segment/v3/rag-refs`, refreshToken, {
data: {
"queries": [
{
"chat_id": convId,
"sid": sid,
"z_idx": 0
}
]
}
});
const fetchedRefs = res?.items[0]?.refs || [];
newRefs = [...fetchedRefs];
is_search_url = findRefUrl(refId, newRefs, logger);
}
catch (err) {
logger.error(err);
is_search_url = '';
}
}
if (is_search_url) {
resultText += ` [[${refId}]](${is_search_url})`;
} else {
resultText += matchText;
}
} else {
resultText += matchText;
}
}
resultText += text_buffer.substring(lastIndex) // 添加剩余的字符串
return { text: resultText, refs: newRefs };
}
export async function receiveStream(
model: string,
convId: string,
refreshToken: string,
stream: NodeJS.ReadableStream
): Promise<IStreamMessage> {
let webSearchCount = 0;
let text_buffer = '';
let is_buffer_search = false;
let is_search_url = '';
let temp = Buffer.from('');
let refContent = '';
let sid = '';
let refs = [];
const showLink = model.indexOf('link') != -1;
const data: IStreamMessage = {
id: convId,
model,
object: 'chat.completion',
choices: [
{ index: 0, message: { role: 'assistant', content: '' }, finish_reason: 'stop' }
],
usage: { prompt_tokens: 1, completion_tokens: 1, total_tokens: 2 },
segment_id: '',
created: util.unixTimestamp()
};
// 是否静默搜索
const silentSearch = model.indexOf('silent') !== -1;
let finished = false;
return new Promise<IStreamMessage>((resolve, reject) => {
function safeResolve(value: IStreamMessage) {
if (!finished) {
finished = true;
resolve(value);
}
}
function safeReject(err: any) {
if (!finished) {
finished = true;
reject(err);
}
}
/************************************************
* 1) 队列相关: eventQueue & isProcessing
************************************************/
const eventQueue: any[] = [];
let isProcessing = false;
// 往队列里推送事件
function queueEvent(evt: any) {
eventQueue.push(evt);
if (!isProcessing) {
processQueue();
}
}
// 按顺序处理队列
async function processQueue() {
isProcessing = true;
while (eventQueue.length > 0) {
const evt = eventQueue.shift();
try {
await handleEvent(evt);
} catch (error) {
logger.error(error);
safeReject(error);
return;
}
}
isProcessing = false;
}
/************************************************
* 2) handleEvent(event)
************************************************/
async function handleEvent(event: any) {
if (event.type !== 'event') return;
// 解析JSON
const result = _.attempt(() => JSON.parse(event.data));
if (_.isError(result)) {
throw new Error(`Stream response invalid: ${event.data}`);
}
// 根据不同的 result.event 做出不同处理
if (result.event === 'cmpl') {
if (showLink) {
// 检测 [ 引用标记
if (result.text === '[' && !is_buffer_search) {
text_buffer += result.text;
is_buffer_search = true;
return;
} else if (is_buffer_search) {
text_buffer += result.text;
// 如果遇到 ']' 说明搜索引用结束
if (result.text === ']' && text_buffer.endsWith("]")) {
is_buffer_search = false;
// 处理引文
const { text, refs: newRefs } = await processReferences(
text_buffer, refs, convId, sid, refreshToken, request, logger
);
// 将替换后的内容拼回 result
result.text = text;
refs = newRefs;
text_buffer = '';
} else {
// 如果还没遇到完整的 ']', 先return 等后续数据
return;
}
}
}
// 将文本加到最终返回数据
data.choices[0].message.content += result.text;
}
else if (result.event === 'req') {
// 请求ID
data.segment_id = result.id;
}
else if (result.event === 'resp') {
// 响应ID (用于请求后续引用链接)
sid = result.id || '';
}
else if (result.event === 'length') {
logger.warn('此次生成达到max_tokens稍候将继续请求拼接完整响应');
data.choices[0].finish_reason = 'length';
}
else if (result.event === 'all_done' || result.event === 'error') {
// 拼上搜索结果的来源
if (result.event === 'error') {
data.choices[0].message.content += '\n[内容由于不合规被停止生成,我们换个话题吧]';
}
if (refContent) {
data.choices[0].message.content += `\n\n搜索结果来自\n${refContent}`;
refContent = '';
}
// 触发返回
safeResolve(data);
}
// 网络搜索
else if (!silentSearch && result.event === 'search_plus' && result.msg && result.msg.type === 'get_res') {
webSearchCount += 1;
// 累计搜索来源
refContent += `检索【${webSearchCount}】 [${result.msg.title}](${result.msg.url})\n\n`;
}
else if (result.event === 'ref_docs' && result.ref_cards) {
is_search_url = result.ref_cards.map(card => card.url)[0];
logger.info(is_search_url);
}
}
/************************************************
* 3) parser
************************************************/
const parser = createParser((evt) => {
// 不在回调里直接处理,而是入队
queueEvent(evt);
});
// 将流数据喂给SSE转换器
stream.on("data", buffer => {
// 检查buffer是否以完整UTF8字符结尾
if (buffer.toString().indexOf('<27>') != -1) {
// 如果不完整则累积buffer直到收到完整字符
/************************************************
* 4) + parser.feed
************************************************/
stream.on('data', (buffer: Buffer) => {
// 简单的 UTF8 完整性检查逻辑
if (buffer.toString().indexOf('<27>') !== -1) {
// 如果出现 <20>, 表示有可能 UTF-8 不完整;先累积
temp = Buffer.concat([temp, buffer]);
return;
}
// 将之前累积的不完整buffer拼接
// 如果之前累积过不完整数据,就拼接
if (temp.length > 0) {
buffer = Buffer.concat([temp, buffer]);
temp = Buffer.from('');
}
parser.feed(buffer.toString());
});
stream.once("error", err => reject(err));
stream.once("close", () => resolve(data));
// 当流出错,直接 reject
stream.once('error', (err: any) => {
safeReject(err);
});
// 当流关闭,如果尚未 resolve就安全结束
stream.once('close', () => {
// 有些场景下close可能比all_done先到如果还没结束就安全resolve
safeResolve(data);
});
});
}
@ -927,17 +1112,21 @@ async function receiveStream(model: string, convId: string, stream: any): Promis
* @param stream
* @param endCallback
*/
function createTransStream(model: string, convId: string, stream: any, endCallback?: Function) {
function createTransStream(model, convId, stream, refreshToken, endCallback) {
// 消息创建时间
const created = util.unixTimestamp();
// 创建转换流
// 创建转换流,最终返回给调用方(如前端)
const transStream = new PassThrough();
let webSearchCount = 0;
let searchFlag = false;
let lengthExceed = false;
let segmentId = '';
const silentSearch = model.indexOf('silent') != -1;
!transStream.closed && transStream.write(`data: ${JSON.stringify({
const showLink = model.indexOf('link') != -1;
writeChunkToTransStream(transStream, {
id: convId,
model,
object: 'chat.completion.chunk',
@ -946,97 +1135,194 @@ function createTransStream(model: string, convId: string, stream: any, endCallba
],
segment_id: '',
created
})}\n\n`);
const parser = createParser(event => {
try {
if (event.type !== "event") return;
// 解析JSON
const result = _.attempt(() => JSON.parse(event.data));
if (_.isError(result))
throw new Error(`Stream response invalid: ${event.data}`);
// 处理消息
if (result.event == 'cmpl') {
const exceptCharIndex = result.text.indexOf("<22>");
const chunk = result.text.substring(0, exceptCharIndex == -1 ? result.text.length : exceptCharIndex);
const data = `data: ${JSON.stringify({
id: convId,
model,
object: 'chat.completion.chunk',
choices: [
{ index: 0, delta: { content: (searchFlag ? '\n' : '') + chunk }, finish_reason: null }
],
segment_id: segmentId,
created
})}\n\n`;
if (searchFlag)
searchFlag = false;
!transStream.closed && transStream.write(data);
}
// 处理请求ID
else if(result.event == 'req') {
segmentId = result.id;
}
// 处理超长文本
else if (result.event == 'length') {
lengthExceed = true;
}
// 处理结束或错误
else if (result.event == 'all_done' || result.event == 'error') {
const data = `data: ${JSON.stringify({
id: convId,
model,
object: 'chat.completion.chunk',
choices: [
{
index: 0, delta: result.event == 'error' ? {
content: '\n[内容由于不合规被停止生成,我们换个话题吧]'
} : {}, finish_reason: lengthExceed ? 'length' : 'stop'
}
],
usage: { prompt_tokens: 1, completion_tokens: 1, total_tokens: 2 },
segment_id: segmentId,
created
})}\n\n`;
!transStream.closed && transStream.write(data);
!transStream.closed && transStream.end('data: [DONE]\n\n');
endCallback && endCallback();
}
// 处理联网搜索
else if (!silentSearch && result.event == 'search_plus' && result.msg && result.msg.type == 'get_res') {
if (!searchFlag)
searchFlag = true;
webSearchCount += 1;
const data = `data: ${JSON.stringify({
id: convId,
model,
object: 'chat.completion.chunk',
choices: [
{
index: 0, delta: {
content: `【检索 ${webSearchCount}】 [${result.msg.title}](${result.msg.url})\n`
}, finish_reason: null
}
],
segment_id: segmentId,
created
})}\n\n`;
!transStream.closed && transStream.write(data);
}
// else
// logger.warn(result.event, result);
}
catch (err) {
logger.error(err);
!transStream.closed && transStream.end('\n\n');
}
});
// 将流数据喂给SSE转换器
// 下面一些在解析中会用到的缓存变量
let text_buffer = '';
let is_buffer_search = false;
let sid = '';
let refs = [];
/************************************************
* 事件队列: 存储从 parser
************************************************/
const eventQueue = [];
let isProcessing = false; // 是否正在处理队列
// 把“往队列里添加事件”与“触发处理”封装成函数
function queueEvent(evt) {
eventQueue.push(evt);
if (!isProcessing) {
processQueue();
}
}
// 真正的“串行处理”逻辑:一个事件处理完,再处理下一个
async function processQueue() {
isProcessing = true;
while (eventQueue.length > 0) {
const evt = eventQueue.shift();
try {
await handleEvent(evt);
} catch (err) {
logger.error('处理事件时出错:', err);
}
}
isProcessing = false;
}
/************************************************
* handleEvent
************************************************/
async function handleEvent(event) {
// 如果不是 "event" 类型,直接跳过
if (event.type !== 'event') return;
// event.data 是 parser.feed 后得到的 SSE 数据文本
// 尝试解析成 JSON
let result;
try {
result = JSON.parse(event.data);
} catch (err) {
logger.error(`Stream response invalid: ${event.data}`);
return;
}
// 根据不同的 result.event 做不同的处理
if (result.event === 'cmpl') {
// 处理 cmpl 事件中带有 [ ... ] 的搜索引用
if (showLink) {
if (result.text === '[' && !is_buffer_search) {
text_buffer += result.text;
is_buffer_search = true;
return;
} else if (is_buffer_search) {
text_buffer += result.text;
if (result.text === ']' && text_buffer.endsWith("]")) {
is_buffer_search = false;
// 处理引文
const { text, refs: newRefs } = await processReferences(
text_buffer, refs, convId, sid, refreshToken, request, logger
);
result.text = text;
refs = newRefs;
text_buffer = '';
} else {
return;
}
}
}
// 把结果写到 transStream
writeChunkToTransStream(transStream, {
id: convId,
model,
object: 'chat.completion.chunk',
choices: [
{
index: 0,
delta: {
// 如果之前是 searchFlag就额外加一个换行
content: (searchFlag ? '\n' : '') + result.text
},
finish_reason: null
}
],
segment_id: segmentId,
created
});
// 写完后重置 searchFlag
if (searchFlag) {
searchFlag = false;
}
}
else if (result.event === 'req') {
// 处理请求ID
segmentId = result.id;
}
else if (result.event === 'resp') {
// 处理响应ID
sid = result.id;
}
else if (result.event === 'length') {
// 超长标记
lengthExceed = true;
}
else if (result.event === 'all_done' || result.event === 'error') {
// 处理结束或错误
writeChunkToTransStream(transStream, {
id: convId,
model,
object: 'chat.completion.chunk',
choices: [
{
index: 0,
delta: result.event === 'error'
? { content: '\n[内容由于不合规被停止生成,我们换个话题吧]' }
: {},
finish_reason: lengthExceed ? 'length' : 'stop'
}
],
usage: { prompt_tokens: 1, completion_tokens: 1, total_tokens: 2 },
segment_id: segmentId,
created
});
!transStream.closed && transStream.end('data: [DONE]\n\n');
endCallback && endCallback();
}
else if (!silentSearch && result.event === 'search_plus' && result.msg && result.msg.type === 'get_res') {
// 处理联网搜索
if (!searchFlag) {
searchFlag = true;
}
webSearchCount += 1;
const chunkText = `检索【${webSearchCount}】 [${result.msg.title}](${result.msg.url})\n`;
writeChunkToTransStream(transStream, {
id: convId,
model,
object: 'chat.completion.chunk',
choices: [
{
index: 0,
delta: { content: chunkText },
finish_reason: null
}
],
segment_id: segmentId,
created
});
}
}
/************************************************
* stream.on("data", ...)
* parser.feed parser callback
* queueEvent
************************************************/
const parser = createParser((event) => {
// 这里不直接处理 event而是推入队列
queueEvent(event);
});
stream.on("data", buffer => parser.feed(buffer.toString()));
stream.once("error", () => !transStream.closed && transStream.end('data: [DONE]\n\n'));
stream.once("close", () => !transStream.closed && transStream.end('data: [DONE]\n\n'));
// 返回给上层使用
return transStream;
}
/************************************************
* writeChunkToTransStream:
* JSON SSE
************************************************/
function writeChunkToTransStream(transStream, chunkObject) {
if (!transStream.closed) {
const dataStr = `data: ${JSON.stringify(chunkObject)}\n\n`;
transStream.write(dataStr);
}
}
/**
* Token切分
*

View File

@ -32,7 +32,32 @@ export default {
"id": "moonshot-v1-vision",
"object": "model",
"owned_by": "kimi-free-api"
}
},
{
"id": "kimi-search",
"object": "model",
"owned_by": "kimi-free-api"
},
{
"id": "kimi-research",
"object": "model",
"owned_by": "kimi-free-api"
},
{
"id": "kimi-k1",
"object": "model",
"owned_by": "kimi-free-api"
},
{
"id": "kimi-math",
"object": "model",
"owned_by": "kimi-free-api"
},
{
"id": "kimi-silent",
"object": "model",
"owned_by": "kimi-free-api"
},
]
};
}