[ARVADOS] created: 1.1.4-748-gb7c4aff85

Git user git at public.curoverse.com
Sat Aug 4 21:17:29 EDT 2018


        at  b7c4aff85a27c2de99e20cbe944d80e26fd4766b (commit)


commit b7c4aff85a27c2de99e20cbe944d80e26fd4766b
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Sat Aug 4 21:16:55 2018 -0400

    13964: Initial design sketch (effectively psudocode, doesn't compile)
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/services/crunch-dispatch-cloud/crunch-dispatch-cloud.go b/services/crunch-dispatch-cloud/crunch-dispatch-cloud.go
new file mode 100644
index 000000000..367d4f945
--- /dev/null
+++ b/services/crunch-dispatch-cloud/crunch-dispatch-cloud.go
@@ -0,0 +1,360 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+// Dispatcher service for Crunch that runs containers on elastic cloud VMs
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+	"os"
+	"os/exec"
+	"os/signal"
+	"sync"
+	"syscall"
+	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/dispatch"
+)
+
+var version = "dev"
+
+func main() {
+	err := doMain()
+	if err != nil {
+		log.Fatalf("%q", err)
+	}
+}
+
+var (
+	runningCmds      map[string]*exec.Cmd
+	runningCmdsMutex sync.Mutex
+	waitGroup        sync.WaitGroup
+	crunchRunCommand *string
+)
+
+const (
+	Booting  = iota
+	Idle     = iota
+	Busy     = iota
+	Shutdown = iota
+)
+
+type Node struct {
+	uuid         string
+	instanceType string
+	state        int
+	allocated    string
+	status       chan int
+	ipaddr       string
+	gone         bool
+}
+
+type NodeRequest struct {
+	Container    arvados.Container
+	instanceType string
+	ready        chan *Node
+}
+
+type Scheduler struct {
+	schedulerMutex sync.Mutex
+
+	// node id to node
+	nodes map[string]*Node
+
+	// container to node
+	containerToNode map[string]*Node
+
+	// instance type to node
+	typeToNodes map[string][]*Node
+
+	requests []*NodeRequest
+}
+
+func (sch *Scheduler) setup() {
+	sch.Dispatcher = &dispatch.Dispatcher{
+		Arv:          arv,
+		RunContainer: sch.runContainer,
+		//PollPeriod:     time.Duration(disp.PollPeriod),
+		//MinRetryPeriod: time.Duration(disp.MinRetryPeriod),
+	}
+
+	go sch.schedule()
+}
+
+func startFunc(container arvados.Container, cmd *exec.Cmd) error {
+	return cmd.Start()
+}
+
+var startCmd = startFunc
+
+func (sch *Scheduler) allocateNode(nr *NodeRequest) {
+	for _, n := range sch.typeToNodes[nr.instanceType] {
+		if n.allocated == "" && n.state == Idle {
+			n.allocated = nr.Container.UUID
+			containerToNode[nr.Container.UUID] = n
+			return n
+		}
+	}
+	return nil
+}
+
+func (sch *Scheduler) removeNode(newnode *Node) {
+	delete(sch.nodes, newnode.uuid)
+	ns := sch.typeToNodes[newnode.instancetype]
+	for i, n := range ns {
+		if n == newnode {
+			ns[i] = ns[len(ns)-1]
+			sch.typeToNodes[newnode.instancetype] = ns[0 : len(ns)-1]
+			return
+		}
+	}
+}
+
+func (sch *Scheduler) createCloudNode(newnode *Node) {
+	err = sch.driver.CreateNode()
+	if err != nil {
+		sch.removeNode(newnode)
+	}
+}
+
+func (sch *Scheduler) deleteCloudNode(node *Node) {
+	node.ipaddr = ""
+	err = sch.driver.DeleteNode(node)
+}
+
+func (sch *Scheduler) nodeMonitor(node *Node) {
+	for {
+		if node.gone {
+			sch.removeNode(node)
+			break
+		}
+
+		if node.ipaddr == "" {
+			continue
+		}
+		session := ssh(node.ipaddr)
+		status := session.getStatus()
+		node.allocated = status.allocated
+		node.state = status.state
+		node.lastStateChange = status.lastStateChange
+
+		if node.lastStateChange > time.Duration(5*time.Minutes) && node.state == Idle {
+			node.state = Shutdown
+			sch.deleteCloudNode(node)
+		}
+	}
+}
+
+func (sch *Scheduler) cloudNodeList() {
+	for {
+		cloudNodes := sch.driver.CloudNodeList()
+		seen := make(map[string]bool)
+		for _, cl := range cloudNodes {
+			uuid := cl.Tag["crunch-uuid"]
+			instanceType := cl.Tag["crunch-instancetype"]
+			noderecord, found := sch.nodes[uuid]
+			seen[uuid] = true
+			if !found {
+				noderecord = Node{
+					uuid:      uuid,
+					state:     Booting,
+					allocated: "",
+					make(chan int),
+					instanceType: instanceType}
+			}
+			if noderecord.ipaddr == "" {
+				noderecord.ipaddr = cl.ipaddr
+			}
+
+			if !found {
+				go sch.nodeMonitor(noderecord)
+			}
+		}
+		for uuid, node := range sch.nodes {
+			if seen[uuid] == false && node.state != Booting {
+				if node.allocated != "" {
+					arv.CancelContainer(node.allocated)
+					node.gone = true
+				}
+			}
+		}
+	}
+}
+
+func (sch *Scheduler) schedule() {
+	for {
+		unallocated := make([]*NodeRequest, 0, len(sch.requests))
+
+		bootingCounts := make(map[string]int)
+		for t := range sch.typeToNodes {
+			bootingCounts[t] = len(sch.typeToNodes[t])
+		}
+
+		for _, nr := range sch.requests {
+			node := allocateNode(r)
+			if node != nil {
+				nr.ready <- node
+				continue
+			}
+			unallocated = append(unallocated, nr)
+			if bootingCounts[nr.instanceType] > 0 {
+				bootingCounts[nr.instanceType] -= 1
+				continue
+			}
+
+			newnode := Node{
+				uuid:      "random uuid goes here",
+				state:     Booting,
+				allocated: "",
+				make(chan int),
+				instanceType: nr.instanceType}
+			sch.nodes[newnode.uuid] = newnode
+			sch.typeToNodes[newnode.instancetype] = append(sch.typeToNodes[newnode.instancetype], newnode)
+
+			go sch.createCloudNode(newnode)
+		}
+		sch.requests = unallocated
+	}
+}
+
+func (sch *Scheduler) cancelRequest(nr *NodeRequest) {
+	close(nr.ready)
+}
+
+func (sch *Scheduler) requestNode(ctr arvados.Container, status <-chan arvados.Container) *Node {
+	sch.schedulerMutex.Lock()
+
+	if n, ok := sch.containerToNode[ctr.UUID]; ok {
+		sch.schedulerMutex.Unlock()
+		return n
+	}
+
+	nr := NodeRequest{Container: ctr, ready: make(chan *Node, 1)}
+
+	sch.requests = append(sch.requests, nr)
+
+	sch.schedulerMutex.Unlock()
+
+	go func() {
+		for {
+			st <- status
+			if st.State != "Locked" || st.Priority == 0 {
+				sch.cancelRequest(nr)
+				return
+			}
+		}
+	}()
+
+	return <-nr.ready
+}
+
+func (sch *Scheduler) releaseNode(n *Node) {
+	sch.schedulerMutex.Lock()
+	defer sch.schedulerMutex.Unlock()
+
+	n.allocated = ""
+	delete(containerToNode, ctr.UUID)
+}
+
+func (sch *Scheduler) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	node := sch.requestNode(ctr, status)
+
+	// node is allocated to us, but in a recovery scenario it may
+	// already be running the container.
+	if node.state == Idle {
+		node.ssh.crunchRun(ctr)
+	}
+
+	// Assume node should now be busy
+	for {
+		st <- status
+		if st.Priority == 0 {
+			node.ssh.Signal(os.Interrupt)
+		}
+		if st.State == dispatch.Completed || st.State == dispatch.Cancelled {
+			sch.releaseNode(node)
+			return
+		}
+	}
+}
+
+func doMain() error {
+	flags := flag.NewFlagSet("crunch-dispatch-cloud", flag.ExitOnError)
+
+	pollInterval := flags.Int(
+		"poll-interval",
+		10,
+		"Interval in seconds to poll for queued containers")
+
+	crunchRunCommand = flags.String(
+		"crunch-run-command",
+		"/usr/bin/crunch-run",
+		"Crunch command to run container")
+
+	getVersion := flags.Bool(
+		"version",
+		false,
+		"Print version information and exit.")
+
+	// Parse args; omit the first arg which is the command name
+	flags.Parse(os.Args[1:])
+
+	// Print version information if requested
+	if *getVersion {
+		fmt.Printf("crunch-dispatch-cloud %s\n", version)
+		return nil
+	}
+
+	log.Printf("crunch-dispatch-cloud %s started", version)
+
+	runningCmds = make(map[string]*exec.Cmd)
+
+	arv, err := arvadosclient.MakeArvadosClient()
+	if err != nil {
+		log.Printf("Error making Arvados client: %v", err)
+		return err
+	}
+	arv.Retries = 25
+
+	dispatcher := dispatch.Dispatcher{
+		Arv:          arv,
+		RunContainer: run,
+		PollPeriod:   time.Duration(*pollInterval) * time.Second,
+	}
+
+	ctx, cancel := context.WithCancel(context.Background())
+	err = dispatcher.Run(ctx)
+	if err != nil {
+		return err
+	}
+
+	c := make(chan os.Signal, 1)
+	signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
+	sig := <-c
+	log.Printf("Received %s, shutting down", sig)
+	signal.Stop(c)
+
+	cancel()
+
+	runningCmdsMutex.Lock()
+	// Finished dispatching; interrupt any crunch jobs that are still running
+	for _, cmd := range runningCmds {
+		cmd.Process.Signal(os.Interrupt)
+	}
+	runningCmdsMutex.Unlock()
+
+	// Wait for all running crunch jobs to complete / terminate
+	waitGroup.Wait()
+
+	return nil
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list