152 lines
3.6 KiB
Go
152 lines
3.6 KiB
Go
package module
|
|
|
|
import "time"
|
|
|
|
//MessageHandler define handler function for message
|
|
type MessageHandler func(msgPtr *Message) (handled bool)
|
|
|
|
//MessageCallback callback func for message
|
|
type MessageCallback func(msgPtr *Message)
|
|
|
|
//Message - Micro service message for CEH
|
|
type Message struct {
|
|
Source string
|
|
Name string
|
|
Args []interface{}
|
|
Callback MessageCallback
|
|
ResultChan chan interface{}
|
|
Results []interface{}
|
|
Err error
|
|
Broadcast bool
|
|
}
|
|
|
|
//NewMessage create new message
|
|
func NewMessage(source string, name string, callback MessageCallback, args ...interface{}) *Message {
|
|
return &Message{
|
|
Source: source,
|
|
Name: name,
|
|
Args: args,
|
|
Callback: callback,
|
|
}
|
|
}
|
|
|
|
//AddMessageHandler add message handler by message name
|
|
func (p *Module) AddMessageHandler(messageName string, h MessageHandler) {
|
|
if status := p.GetStatus(); status == StatusStarting || status == StatusRunning || status == StatusStopping {
|
|
//Cannot modify when module is running
|
|
return
|
|
}
|
|
var handlers []MessageHandler
|
|
var ok bool
|
|
if handlers, ok = p.MessageHandlers[messageName]; ok {
|
|
handlers = append(handlers, h)
|
|
} else {
|
|
handlers = []MessageHandler{h}
|
|
}
|
|
p.MessageHandlers[messageName] = handlers
|
|
}
|
|
|
|
func (p *Module) startMessageLoop() {
|
|
ticker := time.NewTicker(p.WatchDogTick)
|
|
for {
|
|
select {
|
|
case msgPtr, ok := <-p.MessageBus:
|
|
if !ok {
|
|
p.SetStatus(StatusStopped)
|
|
break
|
|
}
|
|
p.wg.Add(1)
|
|
go p.processMessage(msgPtr)
|
|
case <-ticker.C:
|
|
//log.Printf("[Watch Dog] %s:\r\n", p.String())
|
|
if p.OnTick != nil {
|
|
go p.OnTick(p)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
//ReceiveMessage into MessageBus
|
|
func (p *Module) ReceiveMessage(msgPtr *Message) {
|
|
p.MessageBus <- msgPtr
|
|
}
|
|
|
|
func (p *Module) processMessage(msgPtr *Message) {
|
|
defer p.wg.Done()
|
|
var handled bool
|
|
if handlers, ok := p.MessageHandlers[msgPtr.Name]; ok {
|
|
//if multi-handlers registered for a message, the message considered to be handled when any one of handler returns true
|
|
for _, handler := range handlers {
|
|
//log.Printf("Handler %d\n", i+1)
|
|
handled = handler(msgPtr) || handled
|
|
}
|
|
}
|
|
if handled && !msgPtr.Broadcast {
|
|
//log.Printf("Handled\n")
|
|
if msgPtr.Callback != nil {
|
|
msgPtr.Callback(msgPtr)
|
|
}
|
|
return
|
|
}
|
|
for _, c := range p.Children {
|
|
if messageReceiver, ok := c.(MessageReceiver); ok {
|
|
//log.Printf("Message [%s] delieved to %s\r\n", msgPtr.Name, c)
|
|
messageReceiver.ReceiveMessage(msgPtr)
|
|
}
|
|
}
|
|
}
|
|
|
|
//Send message to current micro service module with a callback
|
|
func (p *Module) Send(name string, callback MessageCallback, args ...interface{}) {
|
|
target := p
|
|
if p.Root != nil {
|
|
target = p.Root
|
|
}
|
|
target.MessageBus <- &Message{
|
|
Source: target.String(),
|
|
Name: name,
|
|
Args: args,
|
|
Callback: callback,
|
|
}
|
|
}
|
|
|
|
//SendWithChan send message to current micro service module with a result chan
|
|
func (p *Module) SendWithChan(name string, resultChan chan interface{}, args ...interface{}) {
|
|
target := p
|
|
if p.Root != nil {
|
|
target = p.Root
|
|
}
|
|
target.MessageBus <- &Message{
|
|
Source: target.String(),
|
|
Name: name,
|
|
Args: args,
|
|
ResultChan: resultChan,
|
|
Callback: nil,
|
|
}
|
|
}
|
|
|
|
//SendTo message to micro service module
|
|
func (p *Module) SendTo(target *Module, name string, callback MessageCallback, args ...interface{}) {
|
|
target.MessageBus <- &Message{
|
|
Source: p.String(),
|
|
Name: name,
|
|
Args: args,
|
|
Callback: callback,
|
|
}
|
|
}
|
|
|
|
//Broadcast message
|
|
func (p *Module) Broadcast(name string, callback MessageCallback, args ...interface{}) {
|
|
target := p
|
|
if p.Root != nil {
|
|
target = p.Root
|
|
}
|
|
target.MessageBus <- &Message{
|
|
Source: target.String(),
|
|
Name: name,
|
|
Args: args,
|
|
Callback: callback,
|
|
Broadcast: true,
|
|
}
|
|
}
|