From 6f112bf903d8c583b2ec4ef132d14bfeb0854e48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=AF=BB=E6=AC=A2?= Date: Thu, 26 Aug 2021 14:40:55 +0800 Subject: [PATCH] :tida: first --- .gitignore | 1 + client.go | 144 +++++++++++++++++++++++++++++++++++++++++++++++++++++ common.go | 66 ++++++++++++++++++++++++ go.mod | 3 ++ 4 files changed, 214 insertions(+) create mode 100644 .gitignore create mode 100644 client.go create mode 100644 common.go create mode 100644 go.mod diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..485dee6 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea diff --git a/client.go b/client.go new file mode 100644 index 0000000..ebdb3ff --- /dev/null +++ b/client.go @@ -0,0 +1,144 @@ +package loki_client + +import ( + "encoding/json" + "fmt" + "log" + "sync" + "time" +) + +type jsonLogEntry struct { + Ts time.Time `json:"ts"` + Line string `json:"line"` + level LogLevel // not used in JSON +} + +type promtailStream struct { + Labels string `json:"labels"` + Entries []*jsonLogEntry `json:"entries"` +} + +type promtailMsg struct { + Streams []promtailStream `json:"streams"` +} + +type clientJson struct { + config *ClientConfig + quit chan struct{} + entries chan *jsonLogEntry + waitGroup sync.WaitGroup + client httpClient +} + +func NewClient(conf ClientConfig) (Client, error) { + client := clientJson{ + config: &conf, + quit: make(chan struct{}), + entries: make(chan *jsonLogEntry, LOG_ENTRIES_CHAN_SIZE), + client: httpClient{}, + } + + client.waitGroup.Add(1) + go client.run() + + return &client, nil +} + +func (c *clientJson) Debugf(format string, args ...interface{}) { + c.log(format, DEBUG, args...) +} + +func (c *clientJson) Infof(format string, args ...interface{}) { + c.log(format, INFO, args...) +} + +func (c *clientJson) Warnf(format string, args ...interface{}) { + c.log(format, WARN, args...) +} + +func (c *clientJson) Errorf(format string, args ...interface{}) { + c.log(format, ERROR, args...) +} + +func (c *clientJson) Panicf(format string, args ...interface{}) { + c.log(format, ERROR, args...) + panic(fmt.Sprintf(format, args)) +} + +func (c *clientJson) log(format string, level LogLevel, args ...interface{}) { + c.entries <- &jsonLogEntry{ + Ts: time.Now(), + Line: fmt.Sprintf(format, args...), + level: level, + } +} + +func (c *clientJson) Shutdown() { + close(c.quit) + c.waitGroup.Wait() +} + +func (c *clientJson) run() { + var batch []*jsonLogEntry + batchSize := 0 + maxWait := time.NewTimer(c.config.BatchWait) + + defer func() { + if batchSize > 0 { + c.send(batch) + } + + c.waitGroup.Done() + }() + + for { + select { + case <-c.quit: + return + case entry := <-c.entries: + + batch = append(batch, entry) + batchSize++ + if batchSize >= c.config.BatchEntriesNumber { + c.send(batch) + batch = []*jsonLogEntry{} + batchSize = 0 + maxWait.Reset(c.config.BatchWait) + } + case <-maxWait.C: + if batchSize > 0 { + c.send(batch) + batch = []*jsonLogEntry{} + batchSize = 0 + } + maxWait.Reset(c.config.BatchWait) + } + } +} + +func (c *clientJson) send(entries []*jsonLogEntry) { + var streams []promtailStream + streams = append(streams, promtailStream{ + Labels: c.config.Labels, + Entries: entries, + }) + + msg := promtailMsg{Streams: streams} + jsonMsg, err := json.Marshal(msg) + if err != nil { + log.Printf("unable to marshal a JSON document: %s\n", err) + return + } + + resp, body, err := c.client.sendJsonReq(c.config.PushURL, jsonMsg) + if err != nil { + log.Printf("unable to send an HTTP request: %s\n", err) + return + } + + if resp.StatusCode != 204 { + log.Printf("Unexpected HTTP status code: %d, message: %s\n", resp.StatusCode, body) + return + } +} diff --git a/common.go b/common.go new file mode 100644 index 0000000..be12501 --- /dev/null +++ b/common.go @@ -0,0 +1,66 @@ +package loki_client + +import ( + "bytes" + "io/ioutil" + "net/http" + "time" +) + +const LOG_ENTRIES_CHAN_SIZE = 5000 + +type LogLevel int + +const ( + DEBUG LogLevel = iota + INFO LogLevel = iota + WARN LogLevel = iota + ERROR LogLevel = iota + DISABLE LogLevel = iota +) + +type ClientConfig struct { + // E.g. http://localhost:3100/api/prom/push + PushURL string + // E.g. "{job=\"somejob\"}" + Labels string + BatchWait time.Duration + BatchEntriesNumber int +} + +type Client interface { + Debugf(format string, args ...interface{}) + Infof(format string, args ...interface{}) + Warnf(format string, args ...interface{}) + Errorf(format string, args ...interface{}) + Panicf(format string, args ...interface{}) + Shutdown() +} + +// http.Client wrapper for adding new methods, particularly sendJsonReq +type httpClient struct { + parent http.Client +} + +// 推送日志到服务器 +func (client *httpClient) sendJsonReq(url string, reqBody []byte) (resp *http.Response, resBody []byte, err error) { + req, err := http.NewRequest("POST", url, bytes.NewBuffer(reqBody)) + if err != nil { + return nil, nil, err + } + + req.Header.Set("Content-Type", "application/json") + + resp, err = client.parent.Do(req) + if err != nil { + return nil, nil, err + } + defer resp.Body.Close() + + resBody, err = ioutil.ReadAll(resp.Body) + if err != nil { + return nil, nil, err + } + + return resp, resBody, nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..dc6444a --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module loki-client + +go 1.16