:tida: first
This commit is contained in:
commit
6f112bf903
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
.idea
|
144
client.go
Normal file
144
client.go
Normal file
@ -0,0 +1,144 @@
|
||||
package loki_client
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type jsonLogEntry struct {
|
||||
Ts time.Time `json:"ts"`
|
||||
Line string `json:"line"`
|
||||
level LogLevel // not used in JSON
|
||||
}
|
||||
|
||||
type promtailStream struct {
|
||||
Labels string `json:"labels"`
|
||||
Entries []*jsonLogEntry `json:"entries"`
|
||||
}
|
||||
|
||||
type promtailMsg struct {
|
||||
Streams []promtailStream `json:"streams"`
|
||||
}
|
||||
|
||||
type clientJson struct {
|
||||
config *ClientConfig
|
||||
quit chan struct{}
|
||||
entries chan *jsonLogEntry
|
||||
waitGroup sync.WaitGroup
|
||||
client httpClient
|
||||
}
|
||||
|
||||
func NewClient(conf ClientConfig) (Client, error) {
|
||||
client := clientJson{
|
||||
config: &conf,
|
||||
quit: make(chan struct{}),
|
||||
entries: make(chan *jsonLogEntry, LOG_ENTRIES_CHAN_SIZE),
|
||||
client: httpClient{},
|
||||
}
|
||||
|
||||
client.waitGroup.Add(1)
|
||||
go client.run()
|
||||
|
||||
return &client, nil
|
||||
}
|
||||
|
||||
func (c *clientJson) Debugf(format string, args ...interface{}) {
|
||||
c.log(format, DEBUG, args...)
|
||||
}
|
||||
|
||||
func (c *clientJson) Infof(format string, args ...interface{}) {
|
||||
c.log(format, INFO, args...)
|
||||
}
|
||||
|
||||
func (c *clientJson) Warnf(format string, args ...interface{}) {
|
||||
c.log(format, WARN, args...)
|
||||
}
|
||||
|
||||
func (c *clientJson) Errorf(format string, args ...interface{}) {
|
||||
c.log(format, ERROR, args...)
|
||||
}
|
||||
|
||||
func (c *clientJson) Panicf(format string, args ...interface{}) {
|
||||
c.log(format, ERROR, args...)
|
||||
panic(fmt.Sprintf(format, args))
|
||||
}
|
||||
|
||||
func (c *clientJson) log(format string, level LogLevel, args ...interface{}) {
|
||||
c.entries <- &jsonLogEntry{
|
||||
Ts: time.Now(),
|
||||
Line: fmt.Sprintf(format, args...),
|
||||
level: level,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *clientJson) Shutdown() {
|
||||
close(c.quit)
|
||||
c.waitGroup.Wait()
|
||||
}
|
||||
|
||||
func (c *clientJson) run() {
|
||||
var batch []*jsonLogEntry
|
||||
batchSize := 0
|
||||
maxWait := time.NewTimer(c.config.BatchWait)
|
||||
|
||||
defer func() {
|
||||
if batchSize > 0 {
|
||||
c.send(batch)
|
||||
}
|
||||
|
||||
c.waitGroup.Done()
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.quit:
|
||||
return
|
||||
case entry := <-c.entries:
|
||||
|
||||
batch = append(batch, entry)
|
||||
batchSize++
|
||||
if batchSize >= c.config.BatchEntriesNumber {
|
||||
c.send(batch)
|
||||
batch = []*jsonLogEntry{}
|
||||
batchSize = 0
|
||||
maxWait.Reset(c.config.BatchWait)
|
||||
}
|
||||
case <-maxWait.C:
|
||||
if batchSize > 0 {
|
||||
c.send(batch)
|
||||
batch = []*jsonLogEntry{}
|
||||
batchSize = 0
|
||||
}
|
||||
maxWait.Reset(c.config.BatchWait)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *clientJson) send(entries []*jsonLogEntry) {
|
||||
var streams []promtailStream
|
||||
streams = append(streams, promtailStream{
|
||||
Labels: c.config.Labels,
|
||||
Entries: entries,
|
||||
})
|
||||
|
||||
msg := promtailMsg{Streams: streams}
|
||||
jsonMsg, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
log.Printf("unable to marshal a JSON document: %s\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
resp, body, err := c.client.sendJsonReq(c.config.PushURL, jsonMsg)
|
||||
if err != nil {
|
||||
log.Printf("unable to send an HTTP request: %s\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
if resp.StatusCode != 204 {
|
||||
log.Printf("Unexpected HTTP status code: %d, message: %s\n", resp.StatusCode, body)
|
||||
return
|
||||
}
|
||||
}
|
66
common.go
Normal file
66
common.go
Normal file
@ -0,0 +1,66 @@
|
||||
package loki_client
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
const LOG_ENTRIES_CHAN_SIZE = 5000
|
||||
|
||||
type LogLevel int
|
||||
|
||||
const (
|
||||
DEBUG LogLevel = iota
|
||||
INFO LogLevel = iota
|
||||
WARN LogLevel = iota
|
||||
ERROR LogLevel = iota
|
||||
DISABLE LogLevel = iota
|
||||
)
|
||||
|
||||
type ClientConfig struct {
|
||||
// E.g. http://localhost:3100/api/prom/push
|
||||
PushURL string
|
||||
// E.g. "{job=\"somejob\"}"
|
||||
Labels string
|
||||
BatchWait time.Duration
|
||||
BatchEntriesNumber int
|
||||
}
|
||||
|
||||
type Client interface {
|
||||
Debugf(format string, args ...interface{})
|
||||
Infof(format string, args ...interface{})
|
||||
Warnf(format string, args ...interface{})
|
||||
Errorf(format string, args ...interface{})
|
||||
Panicf(format string, args ...interface{})
|
||||
Shutdown()
|
||||
}
|
||||
|
||||
// http.Client wrapper for adding new methods, particularly sendJsonReq
|
||||
type httpClient struct {
|
||||
parent http.Client
|
||||
}
|
||||
|
||||
// 推送日志到服务器
|
||||
func (client *httpClient) sendJsonReq(url string, reqBody []byte) (resp *http.Response, resBody []byte, err error) {
|
||||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(reqBody))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err = client.parent.Do(req)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
resBody, err = ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return resp, resBody, nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user