[ARVADOS] created: 292856a6f1d26dcfd7a652a9a83cbe4a375588d4
git at public.curoverse.com
git at public.curoverse.com
Tue Oct 7 16:31:18 EDT 2014
at 292856a6f1d26dcfd7a652a9a83cbe4a375588d4 (commit)
commit 292856a6f1d26dcfd7a652a9a83cbe4a375588d4
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Oct 7 16:24:15 2014 -0400
4044: Clean up channel and pipe usage.
* Remove code for using one goroutine to copy stdout and stderr from
channels to pipes. Instead, copy stderr from channel to pipe in a
simple goroutine, and use give the child our own Stdout to use (we
don't use it ourselves anyway).
* Rename functions (OutputChannel -> CopyChanToPipe, ReadLineByLine ->
CopyPipeToChan).
* Add "stop" channel to shut down polling loop when child process
exits.
* Check for errors when opening cgroup stats files. Report errors
instead of displaying bogus stats.
* Split main() into run() and a short main() with os.Exit logic, so
deferred functions run regardless of exit path.
* Close dangling filehandle when cgroup_cidfile succeeds.
* Fix slight divide-by-zero opportunity when elapsed==0.
* Fix condition triggering the "could not read cid file" error
message.
diff --git a/services/crunchstat/crunchstat.go b/services/crunchstat/crunchstat.go
index 7adfaaa..79583b2 100644
--- a/services/crunchstat/crunchstat.go
+++ b/services/crunchstat/crunchstat.go
@@ -15,30 +15,17 @@ import (
"time"
)
-func ReadLineByLine(inp io.ReadCloser, out chan string, finish chan bool) {
- s := bufio.NewScanner(inp)
+func CopyPipeToChan(in io.Reader, out chan string, done chan<- bool) {
+ s := bufio.NewScanner(in)
for s.Scan() {
out <- s.Text()
}
- finish <- true
+ done <- true
}
-func OutputChannel(stdout chan string, stderr chan string) {
- for {
- select {
- case s, ok := <-stdout:
- if ok {
- fmt.Fprintln(os.Stdout, s)
- } else {
- return
- }
- case s, ok := <-stderr:
- if ok {
- fmt.Fprintln(os.Stderr, s)
- } else {
- return
- }
- }
+func CopyChanToPipe(in <-chan string, out io.Writer) {
+ for s := range in {
+ fmt.Fprintln(out, s)
}
}
@@ -63,10 +50,10 @@ func FindStat(cgroup_root string, cgroup_parent string, container_id string, sta
return ""
}
-func PollCgroupStats(cgroup_root string, cgroup_parent string, container_id string, stderr chan string, poll int64) {
+func PollCgroupStats(cgroup_root string, cgroup_parent string, container_id string, stderr chan string, poll int64, stop_poll_chan <-chan bool) {
//var last_usage int64 = 0
- var last_user int64 = 0
- var last_sys int64 = 0
+ var last_user int64 = -1
+ var last_sys int64 = -1
var last_cpucount int64 = 0
type Disk struct {
@@ -97,9 +84,25 @@ func PollCgroupStats(cgroup_root string, cgroup_parent string, container_id stri
stderr <- fmt.Sprintf("crunchstat: reading stats from %s", memory_stat)
}
- var elapsed int64 = poll
-
+ poll_chan := make(chan bool, 1)
+ go func() {
+ // Send periodic poll events.
+ poll_chan <- true
+ for {
+ time.Sleep(time.Duration(poll) * time.Millisecond)
+ poll_chan <- true
+ }
+ }()
for {
+ bedtime := time.Now()
+ select {
+ case <-stop_poll_chan:
+ return
+ case <-poll_chan:
+ // Emit stats, then select again.
+ }
+ morning := time.Now()
+ elapsed := morning.Sub(bedtime).Nanoseconds() / int64(time.Millisecond)
/*{
c, _ := os.Open(cpuacct_usage)
b, _ := ioutil.ReadAll(c)
@@ -114,7 +117,11 @@ func PollCgroupStats(cgroup_root string, cgroup_parent string, container_id stri
}*/
var cpus int64 = 0
if cpuset_cpus != "" {
- c, _ := os.Open(cpuset_cpus)
+ c, err := os.Open(cpuset_cpus)
+ if err != nil {
+ stderr <- fmt.Sprintf("open %s: %s", cpuset_cpus, err)
+ continue
+ }
b, _ := ioutil.ReadAll(c)
sp := strings.Split(string(b), ",")
for _, v := range sp {
@@ -138,14 +145,22 @@ func PollCgroupStats(cgroup_root string, cgroup_parent string, container_id stri
cpus = 1
}
if cpuacct_stat != "" {
- c, _ := os.Open(cpuacct_stat)
+ c, err := os.Open(cpuacct_stat)
+ if err != nil {
+ stderr <- fmt.Sprintf("open %s: %s", cpuacct_stat, err)
+ // Next time around, last_user would
+ // be >1 interval old, so stats will
+ // be incorrect. Start over instead.
+ last_user = -1
+ continue
+ }
b, _ := ioutil.ReadAll(c)
var next_user int64
var next_sys int64
fmt.Sscanf(string(b), "user %d\nsystem %d", &next_user, &next_sys)
c.Close()
- if last_user != 0 {
+ if elapsed > 0 && last_user != -1 {
user_diff := next_user - last_user
sys_diff := next_sys - last_sys
// Assume we're reading stats based on 100
@@ -170,7 +185,11 @@ func PollCgroupStats(cgroup_root string, cgroup_parent string, container_id stri
last_sys = next_sys
}
if blkio_io_service_bytes != "" {
- c, _ := os.Open(blkio_io_service_bytes)
+ c, err := os.Open(blkio_io_service_bytes)
+ if err != nil {
+ stderr <- fmt.Sprintf("open %s: %s", blkio_io_service_bytes, err)
+ continue
+ }
b := bufio.NewScanner(c)
var device, op string
var next int64
@@ -199,7 +218,11 @@ func PollCgroupStats(cgroup_root string, cgroup_parent string, container_id stri
}
if memory_stat != "" {
- c, _ := os.Open(memory_stat)
+ c, err := os.Open(memory_stat)
+ if err != nil {
+ stderr <- fmt.Sprintf("open %s: %s", memory_stat, err)
+ continue
+ }
b := bufio.NewScanner(c)
var stat string
var val int64
@@ -212,15 +235,10 @@ func PollCgroupStats(cgroup_root string, cgroup_parent string, container_id stri
}
c.Close()
}
-
- bedtime := time.Now()
- time.Sleep(time.Duration(poll) * time.Millisecond)
- morning := time.Now()
- elapsed = morning.Sub(bedtime).Nanoseconds() / int64(time.Millisecond)
}
}
-func main() {
+func run(logger *log.Logger) error {
var (
cgroup_root string
@@ -238,21 +256,16 @@ func main() {
flag.Parse()
- logger := log.New(os.Stderr, "crunchstat: ", 0)
-
if cgroup_root == "" {
logger.Fatal("Must provide -cgroup-root")
}
- // Make output channel
- stdout_chan := make(chan string)
- stderr_chan := make(chan string)
- finish_chan := make(chan bool)
- defer close(stdout_chan)
+ stderr_chan := make(chan string, 1)
defer close(stderr_chan)
+ finish_chan := make(chan bool)
defer close(finish_chan)
- go OutputChannel(stdout_chan, stderr_chan)
+ go CopyChanToPipe(stderr_chan, os.Stderr)
var cmd *exec.Cmd
@@ -262,9 +275,10 @@ func main() {
logger.Print("Running ", flag.Args())
- // Child process will read from our stdin pipe (we
- // close our copy below)
+ // Child process will use our stdin and stdout pipes
+ // (we close our copies below)
cmd.Stdin = os.Stdin
+ cmd.Stdout = os.Stdout
// Forward SIGINT and SIGTERM to inner process
term := make(chan os.Signal, 1)
@@ -273,69 +287,76 @@ func main() {
if cmd.Process != nil {
cmd.Process.Signal(catch)
}
- logger.Print("caught signal:", catch)
+ logger.Print("caught signal: ", catch)
}(term)
signal.Notify(term, syscall.SIGTERM)
signal.Notify(term, syscall.SIGINT)
- // Funnel stdout and stderr from subprocess to output channels
- stdout_pipe, err := cmd.StdoutPipe()
- if err != nil {
- logger.Fatal(err)
- }
- go ReadLineByLine(stdout_pipe, stdout_chan, finish_chan)
-
+ // Funnel stderr through our channel
stderr_pipe, err := cmd.StderrPipe()
if err != nil {
logger.Fatal(err)
}
- go ReadLineByLine(stderr_pipe, stderr_chan, finish_chan)
+ go CopyPipeToChan(stderr_pipe, stderr_chan, finish_chan)
// Run subprocess
if err := cmd.Start(); err != nil {
logger.Fatal(err)
}
- }
- // Close standard input in this (parent) process
- os.Stdin.Close()
+ // Close stdin/stdout in this (parent) process
+ os.Stdin.Close()
+ os.Stdout.Close()
+ }
// Read the cid file
var container_id string
if cgroup_cidfile != "" {
// wait up to 'wait' seconds for the cid file to appear
+ ok := false
var i time.Duration
for i = 0; i < time.Duration(wait)*time.Second; i += (100 * time.Millisecond) {
f, err := os.Open(cgroup_cidfile)
if err == nil {
+ defer f.Close()
cid, err2 := ioutil.ReadAll(f)
if err2 == nil && len(cid) > 0 {
+ ok = true
container_id = string(cid)
- f.Close()
break
}
}
time.Sleep(100 * time.Millisecond)
}
- if cgroup_root == "" {
+ if !ok {
logger.Printf("Could not read cid file %s", cgroup_cidfile)
}
}
- go PollCgroupStats(cgroup_root, cgroup_parent, container_id, stderr_chan, poll)
+ stop_poll_chan := make(chan bool, 1)
+ go PollCgroupStats(cgroup_root, cgroup_parent, container_id, stderr_chan, poll, stop_poll_chan)
- // Wait for each of stdout and stderr to drain
- <-finish_chan
+ // When the child exits, tell the polling goroutine to stop.
+ defer func() { stop_poll_chan <- true }()
+
+ // Wait for CopyPipeToChan to consume child's stderr pipe
<-finish_chan
- if err := cmd.Wait(); err != nil {
+ return cmd.Wait()
+}
+
+func main() {
+ logger := log.New(os.Stderr, "crunchstat: ", 0)
+ if err := run(logger); err != nil {
if exiterr, ok := err.(*exec.ExitError); ok {
// The program has exited with an exit code != 0
- // This works on both Unix and Windows. Although package
- // syscall is generally platform dependent, WaitStatus is
- // defined for both Unix and Windows and in both cases has
- // an ExitStatus() method with the same signature.
+ // This works on both Unix and
+ // Windows. Although package syscall is
+ // generally platform dependent, WaitStatus is
+ // defined for both Unix and Windows and in
+ // both cases has an ExitStatus() method with
+ // the same signature.
if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
os.Exit(status.ExitStatus())
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list