增加流响应错误时重试机制

This commit is contained in:
Vinlic 2024-03-16 04:46:39 +08:00
parent 08a4b2e720
commit 1395278a6e
2 changed files with 101 additions and 64 deletions

View File

@ -14,6 +14,10 @@ import util from '@/lib/util.ts';
const MODEL_NAME = 'kimi'; const MODEL_NAME = 'kimi';
// access_token有效期 // access_token有效期
const ACCESS_TOKEN_EXPIRES = 300; const ACCESS_TOKEN_EXPIRES = 300;
// 最大重试次数
const MAX_RETRY_COUNT = 3;
// 重试延迟
const RETRY_DELAY = 5000;
// 伪装headers // 伪装headers
const FAKE_HEADERS = { const FAKE_HEADERS = {
'Accept': '*/*', 'Accept': '*/*',
@ -163,45 +167,59 @@ async function removeConversation(convId: string, refreshToken: string) {
* @param messages gpt系列消息格式 * @param messages gpt系列消息格式
* @param refreshToken access_token的refresh_token * @param refreshToken access_token的refresh_token
* @param useSearch * @param useSearch
* @param retryCount
*/ */
async function createCompletion(messages: any[], refreshToken: string, useSearch = true) { async function createCompletion(messages: any[], refreshToken: string, useSearch = true, retryCount = 0) {
logger.info(messages); return (async () => {
logger.info(messages);
// 提取引用文件URL并上传kimi获得引用的文件ID列表 // 提取引用文件URL并上传kimi获得引用的文件ID列表
const refFileUrls = extractRefFileUrls(messages); const refFileUrls = extractRefFileUrls(messages);
const refs = refFileUrls.length ? await Promise.all(refFileUrls.map(fileUrl => uploadFile(fileUrl, refreshToken))) : []; const refs = refFileUrls.length ? await Promise.all(refFileUrls.map(fileUrl => uploadFile(fileUrl, refreshToken))) : [];
// 创建会话 // 创建会话
const convId = await createConversation(`cmpl-${util.uuid(false)}`, refreshToken); const convId = await createConversation(`cmpl-${util.uuid(false)}`, refreshToken);
// 请求流 // 请求流
const token = await acquireToken(refreshToken); const token = await acquireToken(refreshToken);
const result = await axios.post(`https://kimi.moonshot.cn/api/chat/${convId}/completion/stream`, { const result = await axios.post(`https://kimi.moonshot.cn/api/chat/${convId}/completion/stream`, {
messages: messagesPrepare(messages), messages: messagesPrepare(messages),
refs, refs,
use_search: useSearch use_search: useSearch
}, { }, {
headers: { headers: {
Authorization: `Bearer ${token}`, Authorization: `Bearer ${token}`,
Referer: `https://kimi.moonshot.cn/chat/${convId}`, Referer: `https://kimi.moonshot.cn/chat/${convId}`,
...FAKE_HEADERS ...FAKE_HEADERS
}, },
// 120秒超时 // 120秒超时
timeout: 120000, timeout: 120000,
validateStatus: () => true, validateStatus: () => true,
responseType: 'stream' responseType: 'stream'
}); });
const streamStartTime = util.timestamp(); const streamStartTime = util.timestamp();
// 接收流为输出文本 // 接收流为输出文本
const answer = await receiveStream(convId, result.data); const answer = await receiveStream(convId, result.data);
logger.success(`Stream has completed transfer ${util.timestamp() - streamStartTime}ms`); logger.success(`Stream has completed transfer ${util.timestamp() - streamStartTime}ms`);
// 异步移除会话,如果消息不合规,此操作可能会抛出数据库错误异常,请忽略 // 异步移除会话,如果消息不合规,此操作可能会抛出数据库错误异常,请忽略
removeConversation(convId, refreshToken) removeConversation(convId, refreshToken)
.catch(err => console.error(err)); .catch(err => console.error(err));
return answer; return answer;
})()
.catch(err => {
if(retryCount < MAX_RETRY_COUNT) {
logger.error(`Stream response error: ${err.message}`);
logger.warn(`Try again after ${RETRY_DELAY / 1000}s...`);
return (async () => {
await new Promise(resolve => setTimeout(resolve, RETRY_DELAY));
return createCompletion(messages, refreshToken, useSearch, retryCount + 1);
})();
}
throw err;
});
} }
/** /**
@ -210,42 +228,56 @@ async function createCompletion(messages: any[], refreshToken: string, useSearch
* @param messages gpt系列消息格式 * @param messages gpt系列消息格式
* @param refreshToken access_token的refresh_token * @param refreshToken access_token的refresh_token
* @param useSearch * @param useSearch
* @param retryCount
*/ */
async function createCompletionStream(messages: any[], refreshToken: string, useSearch = true) { async function createCompletionStream(messages: any[], refreshToken: string, useSearch = true, retryCount = 0) {
logger.info(messages); return (async () => {
logger.info(messages);
// 提取引用文件URL并上传kimi获得引用的文件ID列表 // 提取引用文件URL并上传kimi获得引用的文件ID列表
const refFileUrls = extractRefFileUrls(messages); const refFileUrls = extractRefFileUrls(messages);
const refs = refFileUrls.length ? await Promise.all(refFileUrls.map(fileUrl => uploadFile(fileUrl, refreshToken))) : []; const refs = refFileUrls.length ? await Promise.all(refFileUrls.map(fileUrl => uploadFile(fileUrl, refreshToken))) : [];
// 创建会话 // 创建会话
const convId = await createConversation(`cmpl-${util.uuid(false)}`, refreshToken); const convId = await createConversation(`cmpl-${util.uuid(false)}`, refreshToken);
// 请求流 // 请求流
const token = await acquireToken(refreshToken); const token = await acquireToken(refreshToken);
const result = await axios.post(`https://kimi.moonshot.cn/api/chat/${convId}/completion/stream`, { const result = await axios.post(`https://kimi.moonshot.cn/api/chat/${convId}/completion/stream`, {
messages: messagesPrepare(messages), messages: messagesPrepare(messages),
refs, refs,
use_search: useSearch use_search: useSearch
}, { }, {
// 120秒超时 // 120秒超时
timeout: 120000, timeout: 120000,
headers: { headers: {
Authorization: `Bearer ${token}`, Authorization: `Bearer ${token}`,
Referer: `https://kimi.moonshot.cn/chat/${convId}`, Referer: `https://kimi.moonshot.cn/chat/${convId}`,
...FAKE_HEADERS ...FAKE_HEADERS
}, },
validateStatus: () => true, validateStatus: () => true,
responseType: 'stream' responseType: 'stream'
}); });
const streamStartTime = util.timestamp(); const streamStartTime = util.timestamp();
// 创建转换流将消息格式转换为gpt兼容格式 // 创建转换流将消息格式转换为gpt兼容格式
return createTransStream(convId, result.data, () => { return createTransStream(convId, result.data, () => {
logger.success(`Stream has completed transfer ${util.timestamp() - streamStartTime}ms`); logger.success(`Stream has completed transfer ${util.timestamp() - streamStartTime}ms`);
// 流传输结束后异步移除会话,如果消息不合规,此操作可能会抛出数据库错误异常,请忽略 // 流传输结束后异步移除会话,如果消息不合规,此操作可能会抛出数据库错误异常,请忽略
removeConversation(convId, refreshToken) removeConversation(convId, refreshToken)
.catch(err => console.error(err)); .catch(err => console.error(err));
}); });
})()
.catch(err => {
if(retryCount < MAX_RETRY_COUNT) {
logger.error(`Stream response error: ${err.message}`);
logger.warn(`Try again after ${RETRY_DELAY / 1000}s...`);
return (async () => {
await new Promise(resolve => setTimeout(resolve, RETRY_DELAY));
return createCompletionStream(messages, refreshToken, useSearch, retryCount + 1);
})();
}
throw err;
});
} }
/** /**

View File

@ -29,6 +29,11 @@ export default class Exception extends Error {
this.errmsg = _errmsg || errmsg; this.errmsg = _errmsg || errmsg;
} }
compare(exception: (string | number)[]) {
const [errcode] = exception as [number, string];
return this.errcode == errcode;
}
setHTTPStatusCode(value: number) { setHTTPStatusCode(value: number) {
this.httpStatusCode = value; this.httpStatusCode = value;
return this; return this;