[ARVADOS] created: 1.1.4-750-g0fe48142d

Git user git at public.curoverse.com
Mon Aug 6 13:04:30 EDT 2018


        at  0fe48142de81e20967eaac3972e35d115aaaa497 (commit)


commit 0fe48142de81e20967eaac3972e35d115aaaa497
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Mon Aug 6 12:59:30 2018 -0400

    13399: Adjust locking strategy to avoid starvation.
    
    Don't acquire the slurm lock to call scancel or sbatch.
    
    Don't acquire the slurm lock again immediately after updating the
    queue and broadcasting. Instead, give readers a chance to reacquire
    the lock after Wait().
    
    Acquire a shared lock instead of an exclusive lock in HasUUID() or
    SetPriority() when the protected operation is a read.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index d1f19dd7b..b4103cc62 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -252,9 +252,6 @@ func (disp *Dispatcher) submit(container arvados.Container, crunchRunCommand []s
 	crArgs = append(crArgs, container.UUID)
 	crScript := strings.NewReader(execScript(crArgs))
 
-	disp.sqCheck.L.Lock()
-	defer disp.sqCheck.L.Unlock()
-
 	sbArgs, err := disp.sbatchArgs(container)
 	if err != nil {
 		return err
@@ -355,10 +352,7 @@ func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
 	}
 }
 func (disp *Dispatcher) scancel(ctr arvados.Container) {
-	disp.sqCheck.L.Lock()
 	err := disp.slurm.Cancel(ctr.UUID)
-	disp.sqCheck.L.Unlock()
-
 	if err != nil {
 		log.Printf("scancel: %s", err)
 		time.Sleep(time.Second)
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 719ec98d2..4ef4ba1d5 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -116,7 +116,7 @@ func (s *IntegrationSuite) integrationTest(c *C,
 	var containers arvados.ContainerList
 	err = arv.List("containers", params, &containers)
 	c.Check(err, IsNil)
-	c.Check(len(containers.Items), Equals, 1)
+	c.Assert(len(containers.Items), Equals, 1)
 
 	s.disp.CrunchRunCommand = []string{"echo"}
 
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index 24a056264..ccbe44487 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -33,7 +33,8 @@ type SqueueChecker struct {
 	queue          map[string]*slurmJob
 	startOnce      sync.Once
 	done           chan struct{}
-	sync.Cond
+	lock           sync.RWMutex
+	notify         sync.Cond
 }
 
 // HasUUID checks if a given container UUID is in the slurm queue.
@@ -42,11 +43,11 @@ type SqueueChecker struct {
 func (sqc *SqueueChecker) HasUUID(uuid string) bool {
 	sqc.startOnce.Do(sqc.start)
 
-	sqc.L.Lock()
-	defer sqc.L.Unlock()
+	sqc.lock.RLock()
+	defer sqc.lock.RUnlock()
 
 	// block until next squeue broadcast signaling an update.
-	sqc.Wait()
+	sqc.notify.Wait()
 	_, exists := sqc.queue[uuid]
 	return exists
 }
@@ -55,25 +56,30 @@ func (sqc *SqueueChecker) HasUUID(uuid string) bool {
 // container.
 func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
 	sqc.startOnce.Do(sqc.start)
-	sqc.L.Lock()
-	defer sqc.L.Unlock()
-	job, ok := sqc.queue[uuid]
-	if !ok {
+
+	sqc.lock.RLock()
+	job := sqc.queue[uuid]
+	if job == nil {
 		// Wait in case the slurm job was just submitted and
 		// will appear in the next squeue update.
-		sqc.Wait()
-		if job, ok = sqc.queue[uuid]; !ok {
-			return
-		}
+		sqc.notify.Wait()
+		job = sqc.queue[uuid]
+	}
+	needUpdate := job != nil && job.wantPriority != want
+	sqc.lock.RUnlock()
+
+	if needUpdate {
+		sqc.lock.Lock()
+		job.wantPriority = want
+		sqc.lock.Unlock()
 	}
-	job.wantPriority = want
 }
 
 // adjust slurm job nice values as needed to ensure slurm priority
 // order matches Arvados priority order.
 func (sqc *SqueueChecker) reniceAll() {
-	sqc.L.Lock()
-	defer sqc.L.Unlock()
+	sqc.lock.RLock()
+	defer sqc.lock.RUnlock()
 
 	jobs := make([]*slurmJob, 0, len(sqc.queue))
 	for _, j := range sqc.queue {
@@ -133,12 +139,8 @@ func (sqc *SqueueChecker) Stop() {
 // queued). If it succeeds, it updates sqc.queue and wakes up any
 // goroutines that are waiting in HasUUID() or All().
 func (sqc *SqueueChecker) check() {
-	// Mutex between squeue sync and running sbatch or scancel.  This
-	// establishes a sequence so that squeue doesn't run concurrently with
-	// sbatch or scancel; the next update of squeue will occur only after
-	// sbatch or scancel has completed.
-	sqc.L.Lock()
-	defer sqc.L.Unlock()
+	sqc.lock.Lock()
+	defer sqc.lock.Unlock()
 
 	cmd := sqc.Slurm.QueueCommand([]string{"--all", "--noheader", "--format=%j %y %Q %T %r"})
 	stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
@@ -196,13 +198,13 @@ func (sqc *SqueueChecker) check() {
 		}
 	}
 	sqc.queue = newq
-	sqc.Broadcast()
+	sqc.notify.Broadcast()
 }
 
 // Initialize, and start a goroutine to call check() once per
 // squeue.Period until terminated by calling Stop().
 func (sqc *SqueueChecker) start() {
-	sqc.L = &sync.Mutex{}
+	sqc.notify.L = sqc.lock.RLocker()
 	sqc.done = make(chan struct{})
 	go func() {
 		ticker := time.NewTicker(sqc.Period)
@@ -214,6 +216,15 @@ func (sqc *SqueueChecker) start() {
 			case <-ticker.C:
 				sqc.check()
 				sqc.reniceAll()
+				select {
+				case <-ticker.C:
+					// If this iteration took
+					// longer than sqc.Period,
+					// consume the next tick and
+					// wait. Otherwise we would
+					// starve other goroutines.
+				default:
+				}
 			}
 		}
 	}()
@@ -223,9 +234,9 @@ func (sqc *SqueueChecker) start() {
 // names reported by squeue.
 func (sqc *SqueueChecker) All() []string {
 	sqc.startOnce.Do(sqc.start)
-	sqc.L.Lock()
-	defer sqc.L.Unlock()
-	sqc.Wait()
+	sqc.lock.RLock()
+	defer sqc.lock.RUnlock()
+	sqc.notify.Wait()
 	var uuids []string
 	for u := range sqc.queue {
 		uuids = append(uuids, u)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list