From 9dec6b4c67d4d5af7ba8ee4359ac43ef94a5a7df Mon Sep 17 00:00:00 2001 From: Alexandr Kozlenkov Date: Mon, 29 Oct 2018 12:56:33 +0300 Subject: [PATCH] Added loading CPU and Mem for containers in pod --- config/param.go | 5 ++ connector/collector/kubernetes.go | 79 ++++++++++++++----------------- connector/kubernetes.go | 11 +++-- main.go | 2 + 4 files changed, 49 insertions(+), 48 deletions(-) diff --git a/config/param.go b/config/param.go index 30dd036..d954bdc 100644 --- a/config/param.go +++ b/config/param.go @@ -12,6 +12,11 @@ var params = []*Param{ Val: "state", Label: "Container Sort Field", }, + &Param{ + Key: "namespace", + Val: "state", + Label: "Kubernetes namespace for monitoring", + }, } type Param struct { diff --git a/connector/collector/kubernetes.go b/connector/collector/kubernetes.go index 2222df5..af40472 100644 --- a/connector/collector/kubernetes.go +++ b/connector/collector/kubernetes.go @@ -1,13 +1,16 @@ package collector import ( + "time" + + "k8s.io/metrics/pkg/apis/metrics/v1alpha1" "k8s.io/metrics/pkg/client/clientset_generated/clientset" "github.com/bcicen/ctop/config" "github.com/bcicen/ctop/models" - "k8s.io/client-go/kubernetes" + "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" ) // Kubernetes collector @@ -42,30 +45,20 @@ func (k *Kubernetes) Start() { k.running = false for { - cm, err := k.client.Metrics().PodMetricses("akozlenkov").List(metav1.ListOptions{}) + result := &v1alpha1.PodMetrics{} + err := k.clientset.RESTClient().Get().AbsPath("/api/v1/namespaces/kube-system/services/http:heapster:/proxy/apis/metrics/v1alpha1/namespaces/" + config.GetVal("namespace") + "/pods/" + k.name).Do().Into(result) + if err != nil { - log.Errorf(">>>>>> %s here %s", k.name, err.Error()) + log.Errorf("has error %s here %s", k.name, err.Error()) + time.Sleep(1 * time.Second) continue } - log.Debugf(">>>> %+v", cm) - //for _, m := range cm.Containers { - // log.Debugf(">>>> %+v", m) - //} + k.ReadCPU(result) + k.ReadMem(result) + k.stream <- k.Metrics } }() - //go func() { - // defer close(c.stream) - // for s := range stats { - // c.ReadCPU(s) - // c.ReadMem(s) - // c.ReadNet(s) - // c.ReadIO(s) - // c.stream <- c.Metrics - // } - // log.Infof("collector stopped for container: %s", c.id) - //}() - k.running = true log.Infof("collector started for container: %s", k.name) } @@ -87,30 +80,30 @@ func (c *Kubernetes) Stop() { c.done <- true } -// -//func (c *Kubernetes) ReadCPU(stats *api.Stats) { -// ncpus := float64(len(stats.CPUStats.CPUUsage.PercpuUsage)) -// total := float64(stats.CPUStats.CPUUsage.TotalUsage) -// system := float64(stats.CPUStats.SystemCPUUsage) -// -// cpudiff := total - c.lastCpu -// syscpudiff := system - c.lastSysCpu -// -// if c.scaleCpu { -// c.CPUUtil = round((cpudiff / syscpudiff * 100)) -// } else { -// c.CPUUtil = round((cpudiff / syscpudiff * 100) * ncpus) -// } -// c.lastCpu = total -// c.lastSysCpu = system -// c.Pids = int(stats.PidsStats.Current) -//} +func (k *Kubernetes) ReadCPU(metrics *v1alpha1.PodMetrics) { + all := int64(0) + for _, c := range metrics.Containers { + v := c.Usage[v1.ResourceCPU] + all += v.Value() + } + if all != 0 { + k.CPUUtil = round(float64(all)) + } +} -//func (c *Kubernetes) ReadMem(stats *api.Stats) { -// c.MemUsage = int64(stats.MemoryStats.Usage - stats.MemoryStats.Stats.Cache) -// c.MemLimit = int64(stats.MemoryStats.Limit) -// c.MemPercent = percent(float64(c.MemUsage), float64(c.MemLimit)) -//} +func (k *Kubernetes) ReadMem(metrics *v1alpha1.PodMetrics) { + all := int64(0) + for _, c := range metrics.Containers { + v := c.Usage[v1.ResourceMemory] + a, ok := v.AsInt64() + if ok { + all += a + } + } + k.MemUsage = all + k.MemLimit = int64(0) + //k.MemPercent = percent(float64(k.MemUsage), float64(k.MemLimit)) +} //func (c *Kubernetes) ReadNet(stats *api.Stats) { // var rx, tx int64 diff --git a/connector/kubernetes.go b/connector/kubernetes.go index 4e7db32..6ca41c0 100644 --- a/connector/kubernetes.go +++ b/connector/kubernetes.go @@ -8,6 +8,7 @@ import ( "sync" "time" + bcfg "github.com/bcicen/ctop/config" "github.com/bcicen/ctop/connector/collector" "github.com/bcicen/ctop/connector/manager" "github.com/bcicen/ctop/container" @@ -20,9 +21,8 @@ import ( func init() { enabled["kubernetes"] = NewKubernetes } -var namespace = "akozlenkov" - type Kubernetes struct { + namespace string clientset *kubernetes.Clientset containers map[string]*container.Container needsRefresh chan string // container IDs requiring refresh @@ -59,6 +59,7 @@ func NewKubernetes() Connector { containers: make(map[string]*container.Container), needsRefresh: make(chan string, 60), lock: sync.RWMutex{}, + namespace: bcfg.GetVal("namespace"), } go k.Loop() k.refreshAll() @@ -69,7 +70,7 @@ func NewKubernetes() Connector { func (k *Kubernetes) watchEvents() { for { log.Info("kubernetes event listener starting") - allEvents, err := k.clientset.CoreV1().Events(namespace).List(metav1.ListOptions{}) + allEvents, err := k.clientset.CoreV1().Events(k.namespace).List(metav1.ListOptions{}) if err != nil { log.Error(err.Error()) return @@ -152,7 +153,7 @@ func k8sPort(ports []v1.ContainerPort) string { } func (k *Kubernetes) inspect(id string) *v1.Pod { - p, err := k.clientset.CoreV1().Pods(namespace).Get(id, metav1.GetOptions{}) + p, err := k.clientset.CoreV1().Pods(k.namespace).Get(id, metav1.GetOptions{}) if err != nil { if _, ok := err.(*api.NoSuchContainer); !ok { log.Errorf(err.Error()) @@ -178,7 +179,7 @@ func (k *Kubernetes) Get(name string) (c *container.Container, ok bool) { // Mark all container IDs for refresh func (k *Kubernetes) refreshAll() { - allPods, err := k.clientset.CoreV1().Pods(namespace).List(metav1.ListOptions{}) + allPods, err := k.clientset.CoreV1().Pods(k.namespace).List(metav1.ListOptions{}) if err != nil { log.Error(err.Error()) return diff --git a/main.go b/main.go index 87df383..d0e0fbf 100644 --- a/main.go +++ b/main.go @@ -45,6 +45,7 @@ func main() { invertFlag = flag.Bool("i", false, "invert default colors") scaleCpu = flag.Bool("scale-cpu", false, "show cpu as % of system total") connectorFlag = flag.String("connector", "docker", "container connector to use") + namespace = flag.String("n", "default", "container connector to use") ) flag.Parse() @@ -91,6 +92,7 @@ func main() { if *invertFlag { InvertColorMap() } + config.Update("namespace", *namespace) ui.ColorMap = ColorMap // override default colormap if err := ui.Init(); err != nil { panic(err)