wait_pkt *sched_pkt;
- Dmsg0(100, "jobq_add\n");
+ Dmsg1(100, "jobq_add jobid=%d\n", jcr->JobId);
if (jq->valid != JOBQ_VALID) {
return EINVAL;
}
}
item->jcr = jcr;
- Dmsg1(100, "add 0x%x to queue\n", (unsigned)item);
if (job_canceled(jcr)) {
/* Add job to ready queue so that it is canceled quickly */
jq->ready_jobs->prepend(item);
+ Dmsg1(100, "Prepended job=%d to ready queue\n", jcr->JobId);
} else {
/* Add this job to the wait queue in priority sorted order */
for (li=NULL; (li=(jobq_item_t *)jq->waiting_jobs->next(li)); ) {
- if (li->jcr->JobPriority < jcr->JobPriority) {
+ Dmsg2(100, "waiting item jobid=%d priority=%d\n",
+ li->jcr->JobId, li->jcr->JobPriority);
+ if (li->jcr->JobPriority > jcr->JobPriority) {
jq->waiting_jobs->insert_before(item, li);
- Dmsg1(100, "insert_before 0x%x\n", (unsigned)li);
+ Dmsg2(100, "insert_before jobid=%d before %d\n",
+ li->jcr->JobId, jcr->JobId);
inserted = true;
+ break;
}
}
/* If not jobs in wait queue, append it */
if (!inserted) {
jq->waiting_jobs->append(item);
- Dmsg0(100, "Appended item.\n");
+ Dmsg1(100, "Appended item jobid=%d\n", jcr->JobId);
}
- Dmsg1(100, "Next=0x%x\n", (unsigned)jq->waiting_jobs->next(item));
}
stat = start_server(jq);
{
int stat;
bool found = false;
- pthread_t id;
jobq_item_t *item;
- Dmsg0(100, "jobq_remove\n");
+ Dmsg1(100, "jobq_remove jobid=%d\n", jcr->JobId);
if (jq->valid != JOBQ_VALID) {
return EINVAL;
}
jq->waiting_jobs->remove(item);
jq->ready_jobs->prepend(item);
- /* if any threads are idle, wake one */
- if (jq->idle_workers > 0) {
- Dmsg0(100, "Signal worker\n");
- if ((stat = pthread_cond_signal(&jq->work)) != 0) {
- pthread_mutex_unlock(&jq->mutex);
- return stat;
- }
- } else {
- Dmsg0(100, "Create worker thread\n");
- /* No idle threads so create a new one */
- set_thread_concurrency(jq->max_workers + 1);
- if ((stat = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
- pthread_mutex_unlock(&jq->mutex);
- return stat;
- }
- jq->num_workers++;
+ stat = start_server(jq);
+ if (stat != 0) {
+ return stat;
}
pthread_mutex_unlock(&jq->mutex);
Dmsg0(100, "Return jobq_remove\n");
/* if any threads are idle, wake one */
if (jq->idle_workers > 0) {
- Dmsg0(100, "Signal worker\n");
+ Dmsg0(100, "Signal worker to wake up\n");
if ((stat = pthread_cond_signal(&jq->work)) != 0) {
pthread_mutex_unlock(&jq->mutex);
return stat;
jobq_item_t *je; /* job entry in queue */
int stat;
bool timedout;
+ bool work = true;
Dmsg0(100, "Start jobq_server\n");
if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
timeout.tv_nsec = 0;
timeout.tv_sec = tv.tv_sec + 4;
- while (jq->waiting_jobs->empty() && jq->ready_jobs->empty() && !jq->quit) {
+ while (!work && !jq->quit) {
/*
* Wait 4 seconds, then if no more work, exit
*/
- Dmsg0(100, "pthread_cond_timedwait()\n");
+ Dmsg0(200, "pthread_cond_timedwait()\n");
stat = pthread_cond_timedwait(&jq->work, &jq->mutex, &timeout);
Dmsg1(100, "timedwait=%d\n", stat);
if (stat == ETIMEDOUT) {
je = (jobq_item_t *)jq->ready_jobs->first();
jq->ready_jobs->remove(je);
if (!jq->ready_jobs->empty()) {
+ Dmsg0(100, "ready queue not empty start server\n");
if (start_server(jq) != 0) {
return NULL;
}
}
jq->running_jobs->append(je);
+ Dmsg1(100, "Took jobid=%d from ready and appended to run\n", je->jcr->JobId);
if ((stat = pthread_mutex_unlock(&jq->mutex)) != 0) {
return NULL;
}
/* Call user's routine here */
- Dmsg0(100, "Calling user engine.\n");
+ Dmsg1(100, "Calling user engine for jobid=%d\n", je->jcr->JobId);
jq->engine(je->jcr);
- Dmsg0(100, "Back from user engine.\n");
+ Dmsg1(100, "Back from user engine jobid=%d.\n", je->jcr->JobId);
if ((stat = pthread_mutex_lock(&jq->mutex)) != 0) {
free(je); /* release job entry */
return NULL;
}
- Dmsg0(100, "Done lock mutex\n");
+ Dmsg0(200, "Done lock mutex\n");
jq->running_jobs->remove(je);
/*
* Release locks if acquired. Note, they will not have
for ( ; je; ) {
JCR *jcr = je->jcr;
jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);
- Dmsg2(100, "je=0x%x jn=0x%x\n", (unsigned)je, (unsigned)jn);
Dmsg3(100, "Examining Job=%d JobPri=%d want Pri=%d\n",
jcr->JobId, jcr->JobPriority, Priority);
/* Take only jobs of correct Priority */
jcr->acquired_resource_locks = true;
jq->waiting_jobs->remove(je);
jq->ready_jobs->append(je);
- Dmsg1(100, "moved JobId=%d from wait to ready queue\n",
- je->jcr->JobId);
+ Dmsg1(100, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
je = jn;
} /* end for loop */
+ break;
} /* end while loop */
Dmsg0(100, "Done checking wait queue.\n");
/*
- * If no more work request, and we are asked to quit, then do it
+ * If no more ready work and we are asked to quit, then do it
*/
- if (jq->waiting_jobs->empty() && jq->ready_jobs->empty() && jq->quit) {
+ if (jq->ready_jobs->empty() && jq->quit) {
jq->num_workers--;
if (jq->num_workers == 0) {
Dmsg0(100, "Wake up destroy routine\n");
/*
* If no more work requests, and we waited long enough, quit
*/
- Dmsg1(100, "jq empty = %d\n", jq->waiting_jobs->empty());
- Dmsg1(100, "timedout=%d\n", timedout);
- if (jq->waiting_jobs->empty() && jq->ready_jobs->empty() && timedout) {
+ Dmsg2(100, "timedout=%d read empty=%d\n", timedout,
+ jq->ready_jobs->empty());
+ if (jq->ready_jobs->empty() && timedout) {
Dmsg0(100, "break big loop\n");
jq->num_workers--;
break;
}
Dmsg0(100, "Loop again\n");
+ work = false;
} /* end of big for loop */
- Dmsg0(100, "unlock mutex\n");
+ Dmsg0(200, "unlock mutex\n");
pthread_mutex_unlock(&jq->mutex);
Dmsg0(100, "End jobq_server\n");
return NULL;