Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/controllers/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (jc *jobCache) deleteJob(job *apis.JobInfo) {
}

func (jc *jobCache) retryDeleteJob(job *apis.JobInfo) {
klog.V(3).Infof("Retry to delete Job <%v/%v>",
klog.V(4).Infof("Retry to delete Job <%v/%v>",
job.Namespace, job.Name)

jc.deletedJobs.AddRateLimited(job)
Expand Down
16 changes: 9 additions & 7 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (alloc *Action) Execute(ssn *framework.Session) {

alloc.session = ssn
alloc.pickUpQueuesAndJobs(queues, jobsMap)
klog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap))
klog.V(4).Infof("Try to allocate resource to %d Queues", len(jobsMap))
alloc.allocateResources(queues, jobsMap)
}

Expand Down Expand Up @@ -152,6 +152,7 @@ func (alloc *Action) allocateResources(queues *util.PriorityQueue, jobsMap map[a
}

job := jobs.Pop().(*api.JobInfo)
klog.V(3).Infof("[poolside] Scheduling job %s", job.Name)
if _, found = pendingTasks[job.UID]; !found {
tasks := util.NewPriorityQueue(ssn.TaskOrderFn)
for _, task := range job.TaskStatusIndex[api.Pending] {
Expand Down Expand Up @@ -363,17 +364,17 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
for !tasks.Empty() {
task := tasks.Pop().(*api.TaskInfo)
if !ssn.Allocatable(queue, task) {
klog.V(3).Infof("Queue <%s> is overused when considering task <%s>, ignore it.", queue.Name, task.Name)
klog.V(3).Infof("Queue <%s> is overused when considering task <%s/%s>, ignore it.", queue.Name, task.Job, task.Name)
continue
}

// check if the task with its spec has already predicates failed
if job.TaskHasFitErrors(task) {
klog.V(5).Infof("Task %s with role spec %s has already predicated failed, skip", task.Name, task.TaskRole)
klog.V(4).Infof("Task %s with role spec %s has already predicated failed, skip", task.Name, task.TaskRole)
continue
}

klog.V(3).Infof("There are <%d> nodes for Job <%v/%v>", len(ssn.Nodes), job.Namespace, job.Name)
klog.V(4).Infof("There are <%d> nodes for Job <%v/%v>", len(ssn.Nodes), job.Namespace, job.Name)

if err := ssn.PrePredicateFn(task); err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
Expand Down Expand Up @@ -440,11 +441,12 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
}

if ssn.JobReady(job) {
klog.V(3).InfoS("Job ready, return statement", "jobName", job.UID)
klog.V(3).InfoS("[poolside] Job ready, return statement", "jobName", job.UID)
updateJobAllocatedHyperNode(job, jobNewAllocatedHyperNode)
return stmt
} else {
if !ssn.JobPipelined(job) {
klog.V(3).InfoS("[poolside] cannot find free capacity for job", "job", string(job.Queue)+"/"+string(job.UID))
stmt.Discard()
}
return nil
Expand Down Expand Up @@ -548,7 +550,7 @@ func (alloc *Action) prioritizeNodes(ssn *framework.Session, task *api.TaskInfo,
func (alloc *Action) allocateResourcesForTask(stmt *framework.Statement, task *api.TaskInfo, node *api.NodeInfo, job *api.JobInfo) (err error) {
// Allocate idle resource to the task.
if task.InitResreq.LessEqual(node.Idle, api.Zero) {
klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
klog.V(3).Infof("Binding Task <%s/%s/%s> to node <%v>", job.Queue, task.Job, task.Name, node.Name)
if err = stmt.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v",
task.UID, node.Name, alloc.session.UID, err)
Expand All @@ -563,7 +565,7 @@ func (alloc *Action) allocateResourcesForTask(stmt *framework.Statement, task *a
return
}

klog.V(3).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources",
klog.V(4).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources",
task.Namespace, task.Name, node.Name)

// Allocate releasing resource to the task if any.
Expand Down
124 changes: 66 additions & 58 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,13 @@ func (pmpt *Action) Execute(ssn *framework.Session) {

// If not preemptor tasks, next job.
if preemptorTasks[preemptorJob.UID].Empty() {
klog.V(3).Infof("No preemptor task in job <%s/%s>.",
klog.V(4).Infof("No preemptor task in job <%s/%s>.",
preemptorJob.Namespace, preemptorJob.Name)
break
}

preemptor := preemptorTasks[preemptorJob.UID].Pop().(*api.TaskInfo)
klog.V(3).Infof("Preemptor task <%s/%s/%s>", preemptorJob.Queue, preemptorJob.Name, preemptor.Name)

assigned, err = pmpt.preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool {
// Ignore non running task.
Expand Down Expand Up @@ -204,6 +205,9 @@ func (pmpt *Action) Execute(ssn *framework.Session) {

// Commit changes only if job is pipelined, otherwise try next job.
if ssn.JobPipelined(preemptorJob) {
for _, op := range stmt.Operations() {
klog.V(3).Infof("[poolside] Preemptor <%s/%s> committed operation: %s", preemptorJob.Queue, preemptorJob.Name, op.String())
}
stmt.Commit()
} else {
stmt.Discard()
Expand All @@ -216,56 +220,56 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
}

// Preemption between Task within Job.
for _, job := range underRequest {
// Fix: preemptor numbers lose when in same job
preemptorTasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn)
for _, task := range job.TaskStatusIndex[api.Pending] {
// Again, skip scheduling gated tasks
if task.SchGated {
continue
}
preemptorTasks[job.UID].Push(task)
}
for {
if _, found := preemptorTasks[job.UID]; !found {
break
}

if preemptorTasks[job.UID].Empty() {
break
}

preemptor := preemptorTasks[job.UID].Pop().(*api.TaskInfo)

stmt := framework.NewStatement(ssn)
assigned, err := pmpt.preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool {
// Ignore non running task.
if !api.PreemptableStatus(task.Status) {
return false
}
// BestEffort pod is not supported to preempt unBestEffort pod.
if preemptor.BestEffort && !task.BestEffort {
return false
}
// should skip not preemptable pod
if !task.Preemptable {
return false
}

// Preempt tasks within job.
return preemptor.Job == task.Job
}, ph)
if err != nil {
klog.V(3).Infof("Preemptor <%s/%s> failed to preempt Task , err: %s", preemptor.Namespace, preemptor.Name, err)
}
stmt.Commit()

// If no preemption, next job.
if !assigned {
break
}
}
}
// for _, job := range underRequest {
// // Fix: preemptor numbers lose when in same job
// preemptorTasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn)
// for _, task := range job.TaskStatusIndex[api.Pending] {
// // Again, skip scheduling gated tasks
// if task.SchGated {
// continue
// }
// preemptorTasks[job.UID].Push(task)
// }
// for {
// if _, found := preemptorTasks[job.UID]; !found {
// break
// }

// if preemptorTasks[job.UID].Empty() {
// break
// }

// preemptor := preemptorTasks[job.UID].Pop().(*api.TaskInfo)

// stmt := framework.NewStatement(ssn)
// assigned, err := pmpt.preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool {
// // Ignore non running task.
// if !api.PreemptableStatus(task.Status) {
// return false
// }
// // BestEffort pod is not supported to preempt unBestEffort pod.
// if preemptor.BestEffort && !task.BestEffort {
// return false
// }
// // should skip not preemptable pod
// if !task.Preemptable {
// return false
// }

// // Preempt tasks within job.
// return preemptor.Job == task.Job
// }, ph)
// if err != nil {
// klog.V(3).Infof("Preemptor <%s/%s> failed to preempt Task , err: %s", preemptor.Namespace, preemptor.Name, err)
// }
// stmt.Commit()

// // If no preemption, next job.
// if !assigned {
// break
// }
// }
// }
}
}

Expand Down Expand Up @@ -319,7 +323,7 @@ func (pmpt *Action) normalPreempt(
assigned := false

for _, node := range selectedNodes {
klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.",
klog.V(4).Infof("Considering Task <%s/%s> on Node <%s>.",
preemptor.Namespace, preemptor.Name, node.Name)

var preemptees []*api.TaskInfo
Expand All @@ -334,7 +338,7 @@ func (pmpt *Action) normalPreempt(
metrics.UpdatePreemptionVictimsCount(len(victims))

if err := util.ValidateVictims(preemptor, node, victims); err != nil {
klog.V(3).Infof("No validated victims on Node <%s>: %v", node.Name, err)
klog.V(4).Infof("No validated victims on Node <%s>: %v", node.Name, err)
continue
}

Expand All @@ -357,9 +361,13 @@ func (pmpt *Action) normalPreempt(
break
}
preemptee := victimsQueue.Pop().(*api.TaskInfo)
klog.V(3).Infof("Try to preempt Task <%s/%s> for Task <%s/%s>",
preemptee.Namespace, preemptee.Name, preemptor.Namespace, preemptor.Name)
if err := stmt.Evict(preemptee, "preempt"); err != nil {
preempteeQueue := preemptee.Namespace
preempteeJob := ssn.Jobs[preemptee.Job]
if preempteeJob != nil && preempteeJob.Queue != "" {
preempteeQueue = string(preempteeJob.Queue)
}
klog.V(3).Infof("Try to preempt Task <%s/%s> for Task <%s/%s>", preempteeQueue, preemptee.Name, currentQueue.Name, preemptor.Name)
if err := stmt.Evict(preemptee, fmt.Sprintf("preempt for task <%s/%s>", currentQueue.Name, preemptor.Name)); err != nil {
klog.Errorf("Failed to preempt Task <%s/%s> for Task <%s/%s>: %v",
preemptee.Namespace, preemptee.Name, preemptor.Namespace, preemptor.Name, err)
continue
Expand Down Expand Up @@ -411,7 +419,7 @@ func (pmpt *Action) taskEligibleToPreempt(preemptor *api.TaskInfo) error {

err := pmpt.ssn.PredicateFn(preemptor, nodeInfo)
if err == nil {
return fmt.Errorf("not eligible due to the pod's nominated node is already schedulable, which should not happen as preemption means no node is schedulable")
return fmt.Errorf("not eligible due to the pod's nominated node is already schedulable, which should not happen as preemption means no node is schedulable. %v", err)
}

fitError, ok := err.(*api.FitError)
Expand Down Expand Up @@ -720,7 +728,7 @@ func SelectVictimsOnNode(
metrics.UpdatePreemptionVictimsCount(len(allVictims))

if err := util.ValidateVictims(preemptor, nodeInfo, allVictims); err != nil {
klog.V(3).Infof("No validated victims on Node <%s>: %v", nodeInfo.Name, err)
klog.V(4).Infof("No validated victims on Node <%s>: %v", nodeInfo.Name, err)
return nil, api.AsStatus(fmt.Errorf("no validated victims on Node <%s>: %v", nodeInfo.Name, err))
}

Expand Down
Loading
Loading