From b009a260a4c2f0798d65f9e0aaba53ad2daef1ef Mon Sep 17 00:00:00 2001 From: Bradley Cicenas Date: Thu, 8 Jun 2017 18:33:34 +0000 Subject: [PATCH] initial runc connector implementation --- connector/docker.go | 1 + connector/runc.go | 202 ++++++++++++++++++++++++++++++++++++++++++++ cursor.go | 3 +- metrics/runc.go | 70 +++++++++++++++ 4 files changed, 275 insertions(+), 1 deletion(-) create mode 100644 connector/runc.go create mode 100644 metrics/runc.go diff --git a/connector/docker.go b/connector/docker.go index 045545a..1979a0e 100644 --- a/connector/docker.go +++ b/connector/docker.go @@ -1,3 +1,4 @@ +// +build ignore package connector import ( diff --git a/connector/runc.go b/connector/runc.go new file mode 100644 index 0000000..c38a2ce --- /dev/null +++ b/connector/runc.go @@ -0,0 +1,202 @@ +package connector + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sort" + "sync" + "time" + + "github.com/bcicen/ctop/container" + "github.com/bcicen/ctop/metrics" + "github.com/opencontainers/runc/libcontainer" + "github.com/opencontainers/runc/libcontainer/cgroups/systemd" +) + +type RuncOpts struct { + root string // runc root path + systemdCgroups bool // use systemd cgroups +} + +type Runc struct { + opts RuncOpts + factory libcontainer.Factory + containers map[string]*container.Container + needsRefresh chan string // container IDs requiring refresh + lock sync.RWMutex +} + +func readRuncOpts() (RuncOpts, error) { + var opts RuncOpts + // read runc root path + root := os.Getenv("RUNC_ROOT") + if root == "" { + return opts, fmt.Errorf("RUNC_ROOT not set") + } + abs, err := filepath.Abs(root) + if err != nil { + return opts, err + } + opts.root = abs + + if os.Getenv("RUNC_SYSTEMD_CGROUP") == "1" { + opts.systemdCgroups = true + } + return opts, nil +} + +func getFactory(opts RuncOpts) (libcontainer.Factory, error) { + cgroupManager := libcontainer.Cgroupfs + if opts.systemdCgroups { + if systemd.UseSystemd() { + cgroupManager = libcontainer.SystemdCgroups + } else { + return nil, fmt.Errorf("systemd cgroup enabled, but systemd support for managing cgroups is not available") + } + } + return libcontainer.New(opts.root, cgroupManager, libcontainer.CriuPath("criu")) +} + +func NewRunc() *Runc { + opts, err := readRuncOpts() + runcFailOnErr(err) + + factory, err := getFactory(opts) + runcFailOnErr(err) + + cm := &Runc{ + opts: opts, + factory: factory, + containers: make(map[string]*container.Container), + needsRefresh: make(chan string, 60), + lock: sync.RWMutex{}, + } + + go cm.Loop() + go func() { + time.Sleep(1 * time.Second) + for _, c := range cm.containers { + cm.needsRefresh <- c.Id + } + }() + + return cm +} + +func (cm *Runc) inspect(id string) libcontainer.Container { + libc, err := cm.factory.Load(id) + if err != nil { + // remove container if no longer exists + if lerr, ok := err.(libcontainer.Error); ok && lerr.Code() == libcontainer.ContainerNotExists { + cm.delByID(id) + } else { + log.Warningf("failed to read container: %s\n", err) + } + return nil + } + return libc +} + +func (cm *Runc) refresh(c *container.Container) { + libc := cm.inspect(c.Id) + if libc == nil { + return + } + + status, err := libc.Status() + if err != nil { + log.Warningf("failed to read status for container: %s\n", err) + } else { + c.SetState(status.String()) + } + + state, err := libc.State() + if err != nil { + log.Warningf("failed to read state for container: %s\n", err) + } else { + c.SetMeta("created", state.BaseState.Created.Format("Mon Jan 2 15:04:05 2006")) + } + + conf := libc.Config() + c.SetMeta("rootfs", conf.Rootfs) +} + +func (cm *Runc) refreshAll() { + list, err := ioutil.ReadDir(cm.opts.root) + runcFailOnErr(err) + + for _, i := range list { + if i.IsDir() { + // attempt to load + libc, err := cm.factory.Load(i.Name()) + if err != nil { + log.Warningf("failed to read container: %s\n", err) + continue + } + + c := cm.MustGet(libc.ID()) + c.SetMeta("name", i.Name()) + + cm.needsRefresh <- c.Id + } + } +} + +func (cm *Runc) Loop() { + for id := range cm.needsRefresh { + c := cm.MustGet(id) + cm.refresh(c) + } +} + +// Get a single container in the map, creating one anew if not existing +func (cm *Runc) MustGet(id string) *container.Container { + c, ok := cm.Get(id) + // append container struct for new containers + if !ok { + // create collector + collector := metrics.NewRunc(2) + // create container + c = container.New(id, collector) + cm.lock.Lock() + cm.containers[id] = c + cm.lock.Unlock() + } + return c +} + +// Get a single container, by ID +func (cm *Runc) Get(id string) (*container.Container, bool) { + cm.lock.Lock() + c, ok := cm.containers[id] + cm.lock.Unlock() + return c, ok +} + +// Remove containers by ID +func (cm *Runc) delByID(id string) { + cm.lock.Lock() + delete(cm.containers, id) + cm.lock.Unlock() + log.Infof("removed dead container: %s", id) +} + +// Return array of all containers, sorted by field +func (cm *Runc) All() (containers container.Containers) { + cm.lock.Lock() + for _, c := range cm.containers { + containers = append(containers, c) + } + cm.lock.Unlock() + sort.Sort(containers) + containers.Filter() + return containers +} + +func runcFailOnErr(err error) { + if err != nil { + panic(fmt.Errorf("fatal runc error: %s", err)) + } +} diff --git a/cursor.go b/cursor.go index 3174362..71d194f 100644 --- a/cursor.go +++ b/cursor.go @@ -16,7 +16,8 @@ type GridCursor struct { func NewGridCursor() *GridCursor { return &GridCursor{ - cSource: connector.NewDocker(), + cSource: connector.NewRunc(), + //cSource: connector.NewDocker(), } } diff --git a/metrics/runc.go b/metrics/runc.go new file mode 100644 index 0000000..fbb3c01 --- /dev/null +++ b/metrics/runc.go @@ -0,0 +1,70 @@ +package metrics + +import ( + "math/rand" + "time" +) + +// Runc collector +type Runc struct { + Metrics + stream chan Metrics + done bool + running bool + aggression int64 +} + +func NewRunc(a int64) *Runc { + c := &Runc{ + Metrics: Metrics{}, + aggression: a, + } + c.MemLimit = 2147483648 + return c +} + +func (c *Runc) Running() bool { + return c.running +} + +func (c *Runc) Start() { + c.done = false + c.stream = make(chan Metrics) + go c.run() +} + +func (c *Runc) Stop() { + c.done = true +} + +func (c *Runc) Stream() chan Metrics { + return c.stream +} + +func (c *Runc) run() { + c.running = true + rand.Seed(int64(time.Now().Nanosecond())) + defer close(c.stream) + + for { + c.CPUUtil += rand.Intn(2) * int(c.aggression) + if c.CPUUtil >= 100 { + c.CPUUtil = 0 + } + + c.NetTx += rand.Int63n(60) * c.aggression + c.NetRx += rand.Int63n(60) * c.aggression + c.MemUsage += rand.Int63n(c.MemLimit/512) * c.aggression + if c.MemUsage > c.MemLimit { + c.MemUsage = 0 + } + c.MemPercent = round((float64(c.MemUsage) / float64(c.MemLimit)) * 100) + c.stream <- c.Metrics + if c.done { + break + } + time.Sleep(1 * time.Second) + } + + c.running = false +}