From 187adf0540d3515ebdd118db4315cfa897e6b5cb Mon Sep 17 00:00:00 2001 From: Alexandr Kozlenkov Date: Sun, 28 Oct 2018 23:04:51 +0300 Subject: [PATCH] A draft of connector to kubernetse --- connector/collector/kubernetes.go | 125 ++++++++++++++++ connector/collector/kubernetes_logs.go | 78 ++++++++++ connector/kubernetes.go | 195 +++++++++++++++++++++++++ connector/manager/kubernetes.go | 64 ++++++++ main.go | 2 +- 5 files changed, 463 insertions(+), 1 deletion(-) create mode 100644 connector/collector/kubernetes.go create mode 100644 connector/collector/kubernetes_logs.go create mode 100644 connector/kubernetes.go create mode 100644 connector/manager/kubernetes.go diff --git a/connector/collector/kubernetes.go b/connector/collector/kubernetes.go new file mode 100644 index 0000000..2c78179 --- /dev/null +++ b/connector/collector/kubernetes.go @@ -0,0 +1,125 @@ +package collector + +import ( + "github.com/bcicen/ctop/config" + "github.com/bcicen/ctop/models" + "k8s.io/client-go/kubernetes" +) + +// Kubernetes collector +type Kubernetes struct { + models.Metrics + name string + client *kubernetes.Clientset + running bool + stream chan models.Metrics + done chan bool + lastCpu float64 + lastSysCpu float64 + scaleCpu bool +} + +func NewKubernetes(client *kubernetes.Clientset, name string) *Kubernetes { + return &Kubernetes{ + Metrics: models.Metrics{}, + name: name, + client: client, + scaleCpu: config.GetSwitchVal("scaleCpu"), + } +} + +func (c *Kubernetes) Start() { + //c.done = make(chan bool) + //c.stream = make(chan models.Metrics) + //stats := make(chan *api.Stats) + + //go func() { + // opts := api.StatsOptions{ + // ID: c.id, + // Stats: stats, + // Stream: true, + // Done: c.done, + // } + // c.client.Stats(opts) + // c.running = false + //}() + + //go func() { + // defer close(c.stream) + // for s := range stats { + // c.ReadCPU(s) + // c.ReadMem(s) + // c.ReadNet(s) + // c.ReadIO(s) + // c.stream <- c.Metrics + // } + // log.Infof("collector stopped for container: %s", c.id) + //}() + + //c.running = true + //log.Infof("collector started for container: %s", c.id) +} + +func (c *Kubernetes) Running() bool { + return c.running +} + +func (c *Kubernetes) Stream() chan models.Metrics { + return c.stream +} + +func (c *Kubernetes) Logs() LogCollector { + return NewKubernetesLogs(c.name, c.client) +} + +// Stop collector +func (c *Kubernetes) Stop() { + c.done <- true +} + +// +//func (c *Kubernetes) ReadCPU(stats *api.Stats) { +// ncpus := float64(len(stats.CPUStats.CPUUsage.PercpuUsage)) +// total := float64(stats.CPUStats.CPUUsage.TotalUsage) +// system := float64(stats.CPUStats.SystemCPUUsage) +// +// cpudiff := total - c.lastCpu +// syscpudiff := system - c.lastSysCpu +// +// if c.scaleCpu { +// c.CPUUtil = round((cpudiff / syscpudiff * 100)) +// } else { +// c.CPUUtil = round((cpudiff / syscpudiff * 100) * ncpus) +// } +// c.lastCpu = total +// c.lastSysCpu = system +// c.Pids = int(stats.PidsStats.Current) +//} + +//func (c *Kubernetes) ReadMem(stats *api.Stats) { +// c.MemUsage = int64(stats.MemoryStats.Usage - stats.MemoryStats.Stats.Cache) +// c.MemLimit = int64(stats.MemoryStats.Limit) +// c.MemPercent = percent(float64(c.MemUsage), float64(c.MemLimit)) +//} + +//func (c *Kubernetes) ReadNet(stats *api.Stats) { +// var rx, tx int64 +// for _, network := range stats.Networks { +// rx += int64(network.RxBytes) +// tx += int64(network.TxBytes) +// } +// c.NetRx, c.NetTx = rx, tx +//} +// +//func (c *Kubernetes) ReadIO(stats *api.Stats) { +// var read, write int64 +// for _, blk := range stats.BlkioStats.IOServiceBytesRecursive { +// if blk.Op == "Read" { +// read = int64(blk.Value) +// } +// if blk.Op == "Write" { +// write = int64(blk.Value) +// } +// } +// c.IOBytesRead, c.IOBytesWrite = read, write +//} diff --git a/connector/collector/kubernetes_logs.go b/connector/collector/kubernetes_logs.go new file mode 100644 index 0000000..e118ce9 --- /dev/null +++ b/connector/collector/kubernetes_logs.go @@ -0,0 +1,78 @@ +package collector + +import ( + "time" + + "github.com/bcicen/ctop/models" + "k8s.io/client-go/kubernetes" +) + +type KubernetesLogs struct { + id string + client *kubernetes.Clientset + done chan bool +} + +func NewKubernetesLogs(id string, client *kubernetes.Clientset) *KubernetesLogs { + return &KubernetesLogs{ + id: id, + client: client, + done: make(chan bool), + } +} + +func (l *KubernetesLogs) Stream() chan models.Log { + //r, w := io.Pipe() + logCh := make(chan models.Log) + //ctx, cancel := context.WithCancel(context.Background()) + + //opts := api.LogsOptions{ + // Context: ctx, + // Container: l.id, + // OutputStream: w, + // ErrorStream: w, + // Stdout: true, + // Stderr: true, + // Tail: "10", + // Follow: true, + // Timestamps: true, + //} + + //// read io pipe into channel + //go func() { + // scanner := bufio.NewScanner(r) + // for scanner.Scan() { + // parts := strings.Split(scanner.Text(), " ") + // ts := l.parseTime(parts[0]) + // logCh <- models.Log{Timestamp: ts, Message: strings.Join(parts[1:], " ")} + // } + //}() + + //// connect to container log stream + //go func() { + // err := l.client.Logs(opts) + // if err != nil { + // log.Errorf("error reading container logs: %s", err) + // } + // log.Infof("log reader stopped for container: %s", l.id) + //}() + + //go func() { + // <-l.done + // cancel() + //}() + + log.Infof("log reader started for container: %s", l.id) + return logCh +} + +func (l *KubernetesLogs) Stop() { l.done <- true } + +func (l *KubernetesLogs) parseTime(s string) time.Time { + ts, err := time.Parse("2006-01-02T15:04:05.000000000Z", s) + if err != nil { + log.Errorf("failed to parse container log: %s", err) + ts = time.Now() + } + return ts +} diff --git a/connector/kubernetes.go b/connector/kubernetes.go new file mode 100644 index 0000000..02f4576 --- /dev/null +++ b/connector/kubernetes.go @@ -0,0 +1,195 @@ +package connector + +import ( + "os" + "path/filepath" + "sync" + + "github.com/bcicen/ctop/connector/collector" + "github.com/bcicen/ctop/connector/manager" + "github.com/bcicen/ctop/container" + api "github.com/fsouza/go-dockerclient" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" +) + +func init() { enabled["kubernetes"] = NewKubernetes } + +var namespace = "akozlenkov" + +type Kubernetes struct { + clientset *kubernetes.Clientset + containers map[string]*container.Container + needsRefresh chan string // container IDs requiring refresh + lock sync.RWMutex +} + +func NewKubernetes() Connector { + var kubeconfig string + //if home := homeDir(); home != "" { + // kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file") + //} else { + // kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file") + //} + //flag.Parse() + kubeconfig = filepath.Join(homeDir(), ".kube", "config") + + // use the current context in kubeconfig + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + log.Error(err.Error()) + return nil + } + + // create the clientset + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + log.Error(err.Error()) + return nil + } + + // init docker client + k := &Kubernetes{ + clientset: clientset, + containers: make(map[string]*container.Container), + needsRefresh: make(chan string, 60), + lock: sync.RWMutex{}, + } + go k.Loop() + k.refreshAll() + return k +} + +func (k *Kubernetes) Loop() { + for id := range k.needsRefresh { + c := k.MustGet(id) + k.refresh(c) + } + //log.Debug(">>>>>>1") + //for { + // log.Debug(">>>>>>2") + // pods, err := k.clientset.CoreV1().Pods("").List(metav1.ListOptions{}) + // if err != nil { + // panic(err.Error()) + // } + // log.Debugf("There are %d pods in the cluster\n", len(pods.Items)) + + // // Examples for error handling: + // // - Use helper functions like e.g. errors.IsNotFound() + // // - And/or cast to StatusError and use its properties like e.g. ErrStatus.Message + // namespace := "akozlenkov" + // pod := "example-xxxxx" + // _, err = k.clientset.CoreV1().Pods(namespace).Get(pod, metav1.GetOptions{}) + // if errors.IsNotFound(err) { + // log.Debugf("Pod %s in namespace %s not found\n", pod, namespace) + // } else if statusError, isStatus := err.(*errors.StatusError); isStatus { + // log.Debugf("Error getting pod %s in namespace %s: %v\n", + // pod, namespace, statusError.ErrStatus.Message) + // } else if err != nil { + // panic(err.Error()) + // } else { + // log.Debugf("Found pod %s in namespace %s\n", pod, namespace) + // } + + // time.Sleep(10 * time.Second) + //} +} + +// Get a single container, creating one anew if not existing +func (k *Kubernetes) MustGet(name string) *container.Container { + c, ok := k.Get(name) + // append container struct for new containers + if !ok { + // create collector + collector := collector.NewKubernetes(k.clientset, name) + // create manager + manager := manager.NewKubernetes(k.clientset, name) + // create container + c = container.New(name, collector, manager) + k.lock.Lock() + k.containers[name] = c + k.lock.Unlock() + } + return c +} + +func (k *Kubernetes) refresh(c *container.Container) { + insp := k.inspect(c.Id) + // remove container if no longer exists + if insp == nil { + k.delByID(c.Id) + return + } + c.SetMeta("name", insp.Name) + c.SetMeta("image", "stub") + c.SetMeta("IPs", "stub") + c.SetMeta("ports", "stub") + c.SetMeta("created", "stub") + c.SetMeta("health", "stub") + c.SetMeta("[ENV-VAR]", "stub") + c.SetState("stub") +} + +func (k *Kubernetes) inspect(id string) *v1.Pod { + p, err := k.clientset.CoreV1().Pods(namespace).Get(id, metav1.GetOptions{}) + if err != nil { + if _, ok := err.(*api.NoSuchContainer); !ok { + log.Errorf(err.Error()) + } + } + return p +} + +// Remove containers by ID +func (k *Kubernetes) delByID(name string) { + k.lock.Lock() + delete(k.containers, name) + k.lock.Unlock() + log.Infof("removed dead container: %s", name) +} + +func (k *Kubernetes) Get(name string) (c *container.Container, ok bool) { + k.lock.Lock() + c, ok = k.containers[name] + k.lock.Unlock() + return +} + +// Mark all container IDs for refresh +func (k *Kubernetes) refreshAll() { + allPods, err := k.clientset.CoreV1().Pods(namespace).List(metav1.ListOptions{}) + if err != nil { + log.Error(err.Error()) + return + } + + for _, pod := range allPods.Items { + c := k.MustGet(pod.Name) + c.SetMeta("name", pod.Name) + if pod.Initializers != nil && pod.Initializers.Result != nil { + c.SetState(pod.Initializers.Result.Status) + } + k.needsRefresh <- c.Id + } +} + +func (k *Kubernetes) All() (containers container.Containers) { + k.lock.Lock() + for _, c := range k.containers { + containers = append(containers, c) + } + + containers.Sort() + containers.Filter() + k.lock.Unlock() + return containers +} + +func homeDir() string { + if h := os.Getenv("HOME"); h != "" { + return h + } + return os.Getenv("USERPROFILE") // windows +} diff --git a/connector/manager/kubernetes.go b/connector/manager/kubernetes.go new file mode 100644 index 0000000..6a766fa --- /dev/null +++ b/connector/manager/kubernetes.go @@ -0,0 +1,64 @@ +package manager + +import ( + "k8s.io/client-go/kubernetes" +) + +type Kubernetes struct { + id string + client *kubernetes.Clientset +} + +func NewKubernetes(client *kubernetes.Clientset, id string) *Kubernetes { + return &Kubernetes{ + id: id, + client: client, + } +} + +func (dc *Kubernetes) Start() error { + //c, err := dc.client.InspectContainer(dc.id) + //if err != nil { + // return fmt.Errorf("cannot inspect container: %v", err) + //} + + //if err := dc.client.StartContainer(c.ID, c.HostConfig); err != nil { + // return fmt.Errorf("cannot start container: %v", err) + //} + return nil +} + +func (dc *Kubernetes) Stop() error { + //if err := dc.client.StopContainer(dc.id, 3); err != nil { + // return fmt.Errorf("cannot stop container: %v", err) + //} + return nil +} + +func (dc *Kubernetes) Remove() error { + //if err := dc.client.RemoveContainer(api.RemoveContainerOptions{ID: dc.id}); err != nil { + // return fmt.Errorf("cannot remove container: %v", err) + //} + return nil +} + +func (dc *Kubernetes) Pause() error { + //if err := dc.client.PauseContainer(dc.id); err != nil { + // return fmt.Errorf("cannot pause container: %v", err) + //} + return nil +} + +func (dc *Kubernetes) Unpause() error { + //if err := dc.client.UnpauseContainer(dc.id); err != nil { + // return fmt.Errorf("cannot unpause container: %v", err) + //} + return nil +} + +func (dc *Kubernetes) Restart() error { + //if err := dc.client.RestartContainer(dc.id, 3); err != nil { + // return fmt.Errorf("cannot restart container: %v", err) + //} + return nil +} diff --git a/main.go b/main.go index 3a93a4c..ea88f49 100644 --- a/main.go +++ b/main.go @@ -36,7 +36,7 @@ func main() { // parse command line arguments var ( - versionFlag = flag.Bool("v", false, "output version information and exit") + versionFlag = flag.Bool("V", false, "output version information and exit") helpFlag = flag.Bool("h", false, "display this help dialog") filterFlag = flag.String("f", "", "filter containers") activeOnlyFlag = flag.Bool("a", false, "show active containers only")