loki-client/client.go

145 lines
2.8 KiB
Go

package loki_client
import (
"encoding/json"
"fmt"
"log"
"sync"
"time"
)
type jsonLogEntry struct {
Ts time.Time `json:"ts"`
Line string `json:"line"`
level LogLevel // not used in JSON
}
type promtailStream struct {
Labels string `json:"labels"`
Entries []*jsonLogEntry `json:"entries"`
}
type promtailMsg struct {
Streams []promtailStream `json:"streams"`
}
type clientJson struct {
config *ClientConfig
quit chan struct{}
entries chan *jsonLogEntry
waitGroup sync.WaitGroup
client httpClient
}
func NewClient(conf ClientConfig) (Client, error) {
client := clientJson{
config: &conf,
quit: make(chan struct{}),
entries: make(chan *jsonLogEntry, LOG_ENTRIES_CHAN_SIZE),
client: httpClient{},
}
client.waitGroup.Add(1)
go client.run()
return &client, nil
}
func (c *clientJson) Debugf(format string, args ...interface{}) {
c.log(format, DEBUG, args...)
}
func (c *clientJson) Infof(format string, args ...interface{}) {
c.log(format, INFO, args...)
}
func (c *clientJson) Warnf(format string, args ...interface{}) {
c.log(format, WARN, args...)
}
func (c *clientJson) Errorf(format string, args ...interface{}) {
c.log(format, ERROR, args...)
}
func (c *clientJson) Panicf(format string, args ...interface{}) {
c.log(format, ERROR, args...)
panic(fmt.Sprintf(format, args))
}
func (c *clientJson) log(format string, level LogLevel, args ...interface{}) {
c.entries <- &jsonLogEntry{
Ts: time.Now(),
Line: fmt.Sprintf(format, args...),
level: level,
}
}
func (c *clientJson) Shutdown() {
close(c.quit)
c.waitGroup.Wait()
}
func (c *clientJson) run() {
var batch []*jsonLogEntry
batchSize := 0
maxWait := time.NewTimer(c.config.BatchWait)
defer func() {
if batchSize > 0 {
c.send(batch)
}
c.waitGroup.Done()
}()
for {
select {
case <-c.quit:
return
case entry := <-c.entries:
batch = append(batch, entry)
batchSize++
if batchSize >= c.config.BatchEntriesNumber {
c.send(batch)
batch = []*jsonLogEntry{}
batchSize = 0
maxWait.Reset(c.config.BatchWait)
}
case <-maxWait.C:
if batchSize > 0 {
c.send(batch)
batch = []*jsonLogEntry{}
batchSize = 0
}
maxWait.Reset(c.config.BatchWait)
}
}
}
func (c *clientJson) send(entries []*jsonLogEntry) {
var streams []promtailStream
streams = append(streams, promtailStream{
Labels: c.config.Labels,
Entries: entries,
})
msg := promtailMsg{Streams: streams}
jsonMsg, err := json.Marshal(msg)
if err != nil {
log.Printf("unable to marshal a JSON document: %s\n", err)
return
}
resp, body, err := c.client.sendJsonReq(c.config.PushURL, jsonMsg)
if err != nil {
log.Printf("unable to send an HTTP request: %s\n", err)
return
}
if resp.StatusCode != 204 {
log.Printf("Unexpected HTTP status code: %d, message: %s\n", resp.StatusCode, body)
return
}
}