diff --git a/connector/docker.go b/connector/docker.go index 6595059..4a39bf9 100644 --- a/connector/docker.go +++ b/connector/docker.go @@ -2,6 +2,7 @@ package connector import ( "fmt" + "github.com/op/go-logging" "strings" "sync" @@ -13,10 +14,26 @@ import ( func init() { enabled["docker"] = NewDocker } +var actionToStatus = map[string]string{ + "create": "created", + "start": "running", + "die": "exited", + "stop": "exited", + "pause": "paused", + "unpause": "running", +} + +type StatusUpdate struct { + Cid string + Field string // "status" or "health" + Status string +} + type Docker struct { client *api.Client containers map[string]*container.Container needsRefresh chan string // container IDs requiring refresh + statuses chan StatusUpdate closed chan struct{} lock sync.RWMutex } @@ -31,6 +48,7 @@ func NewDocker() (Connector, error) { client: client, containers: make(map[string]*container.Container), needsRefresh: make(chan string, 60), + statuses: make(chan StatusUpdate, 60), closed: make(chan struct{}), lock: sync.RWMutex{}, } @@ -48,6 +66,7 @@ func NewDocker() (Connector, error) { log.Debugf("docker-connector ServerVersion: %s", info.ServerVersion) go cm.Loop() + go cm.LoopStatuses() cm.refreshAll() go cm.watchEvents() return cm, nil @@ -67,15 +86,45 @@ func (cm *Docker) watchEvents() { continue } - actionName := strings.Split(e.Action, ":")[0] + actionName := e.Action + // fast skip all exec_* events: exec_create, exec_start, exec_die + if strings.HasPrefix(actionName, "exec_") { + continue + } + // Action may have additional param i.e. "health_status: healthy" + // We need to strip to have only action name + sepIdx := strings.Index(actionName, ": ") + if sepIdx != -1 { + actionName = actionName[:sepIdx] + } switch actionName { - case "start", "die", "pause", "unpause", "health_status": - log.Debugf("handling docker event: action=%s id=%s", e.Action, e.ID) + // most frequent event is a health checks + case "health_status": + healthStatus := e.Action[sepIdx+2:] + if log.IsEnabledFor(logging.DEBUG) { + log.Debugf("handling docker event: action=health_status id=%s %s", e.ID, healthStatus) + } + cm.statuses <- StatusUpdate{e.ID, "health", healthStatus} + case "create": + if log.IsEnabledFor(logging.DEBUG) { + log.Debugf("handling docker event: action=create id=%s", e.ID) + } cm.needsRefresh <- e.ID case "destroy": - log.Debugf("handling docker event: action=%s id=%s", e.Action, e.ID) + if log.IsEnabledFor(logging.DEBUG) { + log.Debugf("handling docker event: action=destroy id=%s", e.ID) + } cm.delByID(e.ID) + default: + // check if this action changes status e.g. start -> running + status := actionToStatus[actionName] + if status != "" { + if log.IsEnabledFor(logging.DEBUG) { + log.Debugf("handling docker event: action=%s id=%s %s", actionName, e.ID, status) + } + cm.statuses <- StatusUpdate{e.ID, "status", status} + } } } log.Info("docker event listener exited") @@ -169,6 +218,24 @@ func (cm *Docker) Loop() { } } +func (cm *Docker) LoopStatuses() { + for { + select { + case statusUpdate := <-cm.statuses: + c, _ := cm.Get(statusUpdate.Cid) + if c != nil { + if statusUpdate.Field == "health" { + c.SetMeta("health", statusUpdate.Status) + } else { + c.SetState(statusUpdate.Status) + } + } + case <-cm.closed: + return + } + } +} + // MustGet gets a single container, creating one anew if not existing func (cm *Docker) MustGet(id string) *container.Container { c, ok := cm.Get(id)