diff --git a/package.json b/package.json index cf292f0..96901ff 100644 --- a/package.json +++ b/package.json @@ -14,6 +14,7 @@ ], "scripts": { "dev": "tsup src/index.ts --format cjs,esm --sourcemap --dts --publicDir public --watch --onSuccess \"node dist/index.js\"", + "start": "node dist/index.js", "build": "tsup src/index.ts --format cjs,esm --sourcemap --dts --clean --publicDir public" }, "author": "Vinlic", diff --git a/src/api/consts/exceptions.ts b/src/api/consts/exceptions.ts index c61966b..11a5f2e 100644 --- a/src/api/consts/exceptions.ts +++ b/src/api/consts/exceptions.ts @@ -2,4 +2,5 @@ export default { API_TEST: [-9999, 'API异常错误'], API_REQUEST_PARAMS_INVALID: [-2000, '请求参数非法'], API_REQUEST_FAILED: [-2001, '请求失败'], + API_TOKEN_EXPIRES: [-2002, 'Token已失效'] } \ No newline at end of file diff --git a/src/api/controllers/chat.ts b/src/api/controllers/chat.ts index 051df5c..a2bc0fc 100644 --- a/src/api/controllers/chat.ts +++ b/src/api/controllers/chat.ts @@ -8,69 +8,70 @@ import { createParser } from 'eventsource-parser' import logger from '@/lib/logger.ts'; import util from '@/lib/util.ts'; -const TOKEN_EXPIRES = 120; -let currentAccessToken: string | null = null; -let currentRefreshToken: string | null = null; -let latestRefreshTime = 0; +const ACCESS_TOKEN_EXPIRES = 300; +const accessTokenMap = new Map(); -function setRefreshToken(refreshToken: string) { - currentRefreshToken = refreshToken || 'eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJ1c2VyLWNlbnRlciIsImV4cCI6MTcxNzY2NzkzMSwiaWF0IjoxNzA5ODkxOTMxLCJqdGkiOiJjbmxlMm1wcmRpamFpbGxzcHJuMCIsInR5cCI6InJlZnJlc2giLCJzdWIiOiJjbmVyMGgybG5sOTU3N3MzMmluZyIsInNwYWNlX2lkIjoiY25lcXA1ODNyMDdkajd1a3JqcjAiLCJhYnN0cmFjdF91c2VyX2lkIjoiY25lcXA1ODNyMDdkajd1a3JqcWcifQ.XMDecAmBq817_n3xtRqIwIlS9QQLIClS1PaVh4EY8bqhiHr8SxFxbiTEyuRuPPTnCB90eUJNc_LchLMjUo8cKA'; -} - -async function refreshToken() { - const refreshToken = currentRefreshToken; +async function requestToken(refreshToken: string) { const result = await axios.get('https://kimi.moonshot.cn/api/auth/token/refresh', { headers: { Authorization: `Bearer ${refreshToken}` - } + }, + validateStatus: () => true }); const { access_token, refresh_token - } = checkResult(result); - currentAccessToken = access_token; - currentRefreshToken = refresh_token; - logger.info(`Current access_token: ${currentAccessToken}`); - logger.info(`Current refresh_token: ${currentRefreshToken}`); - logger.success('Token refresh completed'); + } = checkResult(result, refreshToken); + return { + accessToken: access_token, + refreshToken: refresh_token, + refreshTime: util.unixTimestamp() + ACCESS_TOKEN_EXPIRES + } } -async function requestToken() { - if (util.unixTimestamp() - latestRefreshTime > TOKEN_EXPIRES) - await refreshToken(); - return currentAccessToken; +async function acquireToken(refreshToken: string): Promise { + let result = accessTokenMap.get(refreshToken); + if (!result) { + result = await requestToken(refreshToken); + accessTokenMap.set(refreshToken, result); + } + if (util.unixTimestamp() > result.refreshTime) + result = await requestToken(refreshToken); + return result.accessToken; } -async function createConversation(name: string) { - const token = await requestToken(); +async function createConversation(name: string, refreshToken: string) { + const token = await acquireToken(refreshToken); const result = await axios.post('https://kimi.moonshot.cn/api/chat', { name, is_example: false }, { headers: { Authorization: `Bearer ${token}` - } + }, + validateStatus: () => true }); const { id: convId - } = checkResult(result); + } = checkResult(result, refreshToken); return convId; } -async function removeConversation(convId: string) { - const token = await requestToken(); +async function removeConversation(convId: string, refreshToken: string) { + const token = await acquireToken(refreshToken); const result = await axios.delete(`https://kimi.moonshot.cn/api/chat/${convId}`, { headers: { Authorization: `Bearer ${token}` - } + }, + validateStatus: () => true }); - checkResult(result); + checkResult(result, refreshToken); } -async function createCompletionStream(messages: any[], useSearch = true) { - console.log(messages); - const convId = await createConversation(`cmpl-${util.uuid(false)}`); - const token = await requestToken(); +async function createCompletion(messages: any[], refreshToken: string, useSearch = true) { + logger.info(messages); + const convId = await createConversation(`cmpl-${util.uuid(false)}`, refreshToken); + const token = await acquireToken(refreshToken); const result = await axios.post(`https://kimi.moonshot.cn/api/chat/${convId}/completion/stream`, { messages, use_search: useSearch @@ -78,25 +79,87 @@ async function createCompletionStream(messages: any[], useSearch = true) { headers: { Authorization: `Bearer ${token}` }, + validateStatus: () => true, responseType: 'stream' }); - return createTransStream(convId, result.data); + const answer = await receiveStream(convId, result.data); + removeConversation(convId, refreshToken) + .catch(err => console.error(err)); + return answer; } -function checkResult(result: AxiosResponse) { - if(!result.data) +async function createCompletionStream(messages: any[], refreshToken: string, useSearch = true) { + logger.info(messages); + const convId = await createConversation(`cmpl-${util.uuid(false)}`, refreshToken); + const token = await acquireToken(refreshToken); + const result = await axios.post(`https://kimi.moonshot.cn/api/chat/${convId}/completion/stream`, { + messages, + use_search: useSearch + }, { + headers: { + Authorization: `Bearer ${token}` + }, + validateStatus: () => true, + responseType: 'stream' + }); + return createTransStream(convId, result.data, () => { + removeConversation(convId, refreshToken) + .catch(err => console.error(err)); + }); +} + +function checkResult(result: AxiosResponse, refreshToken: string) { + if(result.status == 401) { + accessTokenMap.delete(refreshToken); + throw new APIException(EX.API_REQUEST_FAILED); + } + if (!result.data) return null; const { error_type, message } = result.data; if (!_.isString(error_type)) return result.data; - console.log(result.data); - throw new APIException(EX.API_REQUEST_FAILED, message); + if (error_type == 'auth.token.invalid') + accessTokenMap.delete(refreshToken); + throw new APIException(EX.API_REQUEST_FAILED, `[请求kimi失败]: ${message}`); } -function createTransStream(convId: string, stream: any) { +async function receiveStream(convId: string, stream: any) { + return new Promise((resolve, reject) => { + const data = { + id: convId, + model: 'kimi', + object: 'chat.completion', + choices: [ + { index: 0, message: { role: 'assistant', content: '' }, finish_reason: 'stop' } + ], + created: parseInt(performance.now() as any) + }; + const parser = createParser(event => { + try { + if (event.type !== "event") return; + const result = _.attempt(() => JSON.parse(event.data)); + if (_.isError(result)) + throw new Error(`stream response invalid: ${event.data}`); + if (result.event == 'cmpl') { + data.choices[0].message.content += result.text; + } + else if (result.event == 'all_done') + resolve(data); + } + catch (err) { + logger.error(err); + reject(err); + } + }); + stream.on("data", buffer => parser.feed(buffer.toString())); + stream.once("error", err => reject(err)); + stream.once("close", () => resolve(data)); + }); +} + +function createTransStream(convId: string, stream: any, endCallback?: Function) { const created = parseInt(performance.now() as any); const transStream = new PassThrough(); - !transStream.closed && transStream.write(`data: ${JSON.stringify({ id: convId, model: 'kimi', @@ -135,8 +198,8 @@ function createTransStream(convId: string, stream: any) { created })}\n\n`; !transStream.closed && transStream.write(data); - !transStream.closed && transStream.end('[DONE]'); - removeConversation(convId).catch(err => console.error(err)); + !transStream.closed && transStream.end('data: [DONE]\n\n'); + endCallback && endCallback(); } } catch (err) { @@ -145,14 +208,13 @@ function createTransStream(convId: string, stream: any) { } }); stream.on("data", buffer => parser.feed(buffer.toString())); - stream.once("error", () => !transStream.closed && transStream.end('[DONE]')); - stream.once("close", () => !transStream.closed && transStream.end('[DONE]')); + stream.once("error", () => !transStream.closed && transStream.end('data: [DONE]\n\n')); + stream.once("close", () => !transStream.closed && transStream.end('data: [DONE]\n\n')); return transStream; } export default { - setRefreshToken, - refreshToken, createConversation, + createCompletion, createCompletionStream }; diff --git a/src/api/routes/chat.ts b/src/api/routes/chat.ts index 1d51ad4..26e2531 100644 --- a/src/api/routes/chat.ts +++ b/src/api/routes/chat.ts @@ -3,9 +3,10 @@ import _ from 'lodash'; import Request from '@/lib/request/Request.ts'; import Response from '@/lib/response/Response.ts'; import chat from '@/api/controllers/chat.ts'; +import logger from '@/lib/logger.ts'; export default { - + prefix: '/v1/chat', post: { @@ -13,11 +14,19 @@ export default { '/completions': async (request: Request) => { request .validate('body.messages', _.isArray) - chat.setRefreshToken(request.body.refresh_token); - const stream = await chat.createCompletionStream(request.body.messages, request.body.use_search); - return new Response(stream, { - type: "text/event-stream" - }); + .validate('headers.authorization', _.isString) + const token = request.headers.authorization; + const refreshToken = token.replace('Bearer ', ''); + logger.info(`Refresh token: ${refreshToken}`); + const messages = request.body.messages; + if (request.body.stream) { + const stream = await chat.createCompletionStream(request.body.messages, refreshToken, request.body.use_search); + return new Response(stream, { + type: "text/event-stream" + }); + } + else + return await chat.createCompletion(messages, refreshToken, request.body.use_search); } }