package loki_client import ( "encoding/json" "fmt" "log" "sync" "time" ) type jsonLogEntry struct { Ts time.Time `json:"ts"` Line string `json:"line"` level LogLevel // not used in JSON } type promtailStream struct { Labels string `json:"labels"` Entries []*jsonLogEntry `json:"entries"` } type promtailMsg struct { Streams []promtailStream `json:"streams"` } type clientJson struct { config *ClientConfig quit chan struct{} entries chan *jsonLogEntry waitGroup sync.WaitGroup client httpClient } func NewClient(conf ClientConfig) (Client, error) { client := clientJson{ config: &conf, quit: make(chan struct{}), entries: make(chan *jsonLogEntry, LOG_ENTRIES_CHAN_SIZE), client: httpClient{}, } client.waitGroup.Add(1) go client.run() return &client, nil } func (c *clientJson) Debugf(format string, args ...interface{}) { c.log(format, DEBUG, args...) } func (c *clientJson) Infof(format string, args ...interface{}) { c.log(format, INFO, args...) } func (c *clientJson) Warnf(format string, args ...interface{}) { c.log(format, WARN, args...) } func (c *clientJson) Errorf(format string, args ...interface{}) { c.log(format, ERROR, args...) } func (c *clientJson) Panicf(format string, args ...interface{}) { c.log(format, ERROR, args...) panic(fmt.Sprintf(format, args)) } func (c *clientJson) log(format string, level LogLevel, args ...interface{}) { c.entries <- &jsonLogEntry{ Ts: time.Now(), Line: fmt.Sprintf(format, args...), level: level, } } func (c *clientJson) Shutdown() { close(c.quit) c.waitGroup.Wait() } func (c *clientJson) run() { var batch []*jsonLogEntry batchSize := 0 maxWait := time.NewTimer(c.config.BatchWait) defer func() { if batchSize > 0 { c.send(batch) } c.waitGroup.Done() }() for { select { case <-c.quit: return case entry := <-c.entries: batch = append(batch, entry) batchSize++ if batchSize >= c.config.BatchEntriesNumber { c.send(batch) batch = []*jsonLogEntry{} batchSize = 0 maxWait.Reset(c.config.BatchWait) } case <-maxWait.C: if batchSize > 0 { c.send(batch) batch = []*jsonLogEntry{} batchSize = 0 } maxWait.Reset(c.config.BatchWait) } } } func (c *clientJson) send(entries []*jsonLogEntry) { var streams []promtailStream streams = append(streams, promtailStream{ Labels: c.config.Labels, Entries: entries, }) msg := promtailMsg{Streams: streams} jsonMsg, err := json.Marshal(msg) if err != nil { log.Printf("unable to marshal a JSON document: %s\n", err) return } resp, body, err := c.client.sendJsonReq(c.config.PushURL, jsonMsg) if err != nil { log.Printf("unable to send an HTTP request: %s\n", err) return } if resp.StatusCode != 204 { log.Printf("Unexpected HTTP status code: %d, message: %s\n", resp.StatusCode, body) return } }