diff --git a/connector/collector/kubernetes.go b/connector/collector/kubernetes.go index 2c78179..2222df5 100644 --- a/connector/collector/kubernetes.go +++ b/connector/collector/kubernetes.go @@ -1,16 +1,21 @@ package collector import ( + "k8s.io/metrics/pkg/client/clientset_generated/clientset" + "github.com/bcicen/ctop/config" "github.com/bcicen/ctop/models" "k8s.io/client-go/kubernetes" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // Kubernetes collector type Kubernetes struct { models.Metrics name string - client *kubernetes.Clientset + client clientset.Interface + clientset *kubernetes.Clientset running bool stream chan models.Metrics done chan bool @@ -21,28 +26,33 @@ type Kubernetes struct { func NewKubernetes(client *kubernetes.Clientset, name string) *Kubernetes { return &Kubernetes{ - Metrics: models.Metrics{}, - name: name, - client: client, - scaleCpu: config.GetSwitchVal("scaleCpu"), + Metrics: models.Metrics{}, + name: name, + client: clientset.New(client.RESTClient()), + clientset: client, + scaleCpu: config.GetSwitchVal("scaleCpu"), } } -func (c *Kubernetes) Start() { - //c.done = make(chan bool) - //c.stream = make(chan models.Metrics) - //stats := make(chan *api.Stats) +func (k *Kubernetes) Start() { + k.done = make(chan bool) + k.stream = make(chan models.Metrics) - //go func() { - // opts := api.StatsOptions{ - // ID: c.id, - // Stats: stats, - // Stream: true, - // Done: c.done, - // } - // c.client.Stats(opts) - // c.running = false - //}() + go func() { + k.running = false + for { + + cm, err := k.client.Metrics().PodMetricses("akozlenkov").List(metav1.ListOptions{}) + if err != nil { + log.Errorf(">>>>>> %s here %s", k.name, err.Error()) + continue + } + log.Debugf(">>>> %+v", cm) + //for _, m := range cm.Containers { + // log.Debugf(">>>> %+v", m) + //} + } + }() //go func() { // defer close(c.stream) @@ -56,8 +66,8 @@ func (c *Kubernetes) Start() { // log.Infof("collector stopped for container: %s", c.id) //}() - //c.running = true - //log.Infof("collector started for container: %s", c.id) + k.running = true + log.Infof("collector started for container: %s", k.name) } func (c *Kubernetes) Running() bool { @@ -69,7 +79,7 @@ func (c *Kubernetes) Stream() chan models.Metrics { } func (c *Kubernetes) Logs() LogCollector { - return NewKubernetesLogs(c.name, c.client) + return NewKubernetesLogs(c.name, c.clientset) } // Stop collector diff --git a/connector/kubernetes.go b/connector/kubernetes.go index 02f4576..4e7db32 100644 --- a/connector/kubernetes.go +++ b/connector/kubernetes.go @@ -1,9 +1,12 @@ package connector import ( + "fmt" "os" "path/filepath" + "strings" "sync" + "time" "github.com/bcicen/ctop/connector/collector" "github.com/bcicen/ctop/connector/manager" @@ -59,42 +62,46 @@ func NewKubernetes() Connector { } go k.Loop() k.refreshAll() + go k.watchEvents() return k } +func (k *Kubernetes) watchEvents() { + for { + log.Info("kubernetes event listener starting") + allEvents, err := k.clientset.CoreV1().Events(namespace).List(metav1.ListOptions{}) + if err != nil { + log.Error(err.Error()) + return + } + + for _, e := range allEvents.Items { + if e.Kind != "pod" { + continue + } + + actionName := strings.Split(e.Action, ":")[0] + + switch actionName { + case "start", "die", "pause", "unpause", "health_status": + log.Debugf("handling docker event: action=%s id=%s", e.Action, e.UID) + k.needsRefresh <- e.Name + case "destroy": + log.Debugf("handling docker event: action=%s id=%s", e.Action, e.UID) + k.delByID(e.Name) + default: + log.Debugf("handling docker event: %v", e) + k.needsRefresh <- e.Name + } + } + time.Sleep(1 * time.Second) + } +} func (k *Kubernetes) Loop() { for id := range k.needsRefresh { c := k.MustGet(id) k.refresh(c) } - //log.Debug(">>>>>>1") - //for { - // log.Debug(">>>>>>2") - // pods, err := k.clientset.CoreV1().Pods("").List(metav1.ListOptions{}) - // if err != nil { - // panic(err.Error()) - // } - // log.Debugf("There are %d pods in the cluster\n", len(pods.Items)) - - // // Examples for error handling: - // // - Use helper functions like e.g. errors.IsNotFound() - // // - And/or cast to StatusError and use its properties like e.g. ErrStatus.Message - // namespace := "akozlenkov" - // pod := "example-xxxxx" - // _, err = k.clientset.CoreV1().Pods(namespace).Get(pod, metav1.GetOptions{}) - // if errors.IsNotFound(err) { - // log.Debugf("Pod %s in namespace %s not found\n", pod, namespace) - // } else if statusError, isStatus := err.(*errors.StatusError); isStatus { - // log.Debugf("Error getting pod %s in namespace %s: %v\n", - // pod, namespace, statusError.ErrStatus.Message) - // } else if err != nil { - // panic(err.Error()) - // } else { - // log.Debugf("Found pod %s in namespace %s\n", pod, namespace) - // } - - // time.Sleep(10 * time.Second) - //} } // Get a single container, creating one anew if not existing @@ -123,13 +130,25 @@ func (k *Kubernetes) refresh(c *container.Container) { return } c.SetMeta("name", insp.Name) - c.SetMeta("image", "stub") - c.SetMeta("IPs", "stub") - c.SetMeta("ports", "stub") - c.SetMeta("created", "stub") - c.SetMeta("health", "stub") - c.SetMeta("[ENV-VAR]", "stub") - c.SetState("stub") + if len(insp.Spec.Containers) >= 1 { + c.SetMeta("image", insp.Spec.Containers[0].Image) + c.SetMeta("ports", k8sPort(insp.Spec.Containers[0].Ports)) + for _, env := range insp.Spec.Containers[0].Env { + c.SetMeta("[ENV-VAR]", env.Name+"="+env.Value) + } + } + c.SetMeta("IPs", insp.Status.PodIP) + c.SetMeta("created", insp.CreationTimestamp.Format("Mon Jan 2 15:04:05 2006")) + c.SetMeta("health", string(insp.Status.Phase)) + c.SetState("running") +} + +func k8sPort(ports []v1.ContainerPort) string { + str := []string{} + for _, p := range ports { + str = append(str, fmt.Sprintf("%s:%d -> %d", p.HostIP, p.HostPort, p.ContainerPort)) + } + return strings.Join(str, "\n") } func (k *Kubernetes) inspect(id string) *v1.Pod { @@ -167,9 +186,12 @@ func (k *Kubernetes) refreshAll() { for _, pod := range allPods.Items { c := k.MustGet(pod.Name) + c.SetMeta("uid", string(pod.UID)) c.SetMeta("name", pod.Name) if pod.Initializers != nil && pod.Initializers.Result != nil { c.SetState(pod.Initializers.Result.Status) + } else { + c.SetState(string(pod.Status.Phase)) } k.needsRefresh <- c.Id } diff --git a/cwidgets/compact/status.go b/cwidgets/compact/status.go index eec4c96..07fb272 100644 --- a/cwidgets/compact/status.go +++ b/cwidgets/compact/status.go @@ -67,11 +67,11 @@ func (s *Status) SetHealth(val string) { color := ui.ColorDefault switch val { - case "healthy": + case "healthy", "Succeeded": color = ui.ThemeAttr("status.ok") - case "unhealthy": + case "unhealthy", "Failed", "Unknown": color = ui.ThemeAttr("status.danger") - case "starting": + case "starting", "Pending", "Running": color = ui.ThemeAttr("status.warn") } diff --git a/main.go b/main.go index ea88f49..87df383 100644 --- a/main.go +++ b/main.go @@ -36,7 +36,7 @@ func main() { // parse command line arguments var ( - versionFlag = flag.Bool("V", false, "output version information and exit") + versionFlag = flag.Bool("version", false, "output version information and exit") helpFlag = flag.Bool("h", false, "display this help dialog") filterFlag = flag.String("f", "", "filter containers") activeOnlyFlag = flag.Bool("a", false, "show active containers only")