]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/migrate.c
4c6b4c706f5fc34c2faf5c9902200295c4f3a4bb
[bacula/bacula] / bacula / src / dird / migrate.c
1 /*
2  *
3  *   Bacula Director -- migrate.c -- responsible for doing
4  *     migration jobs.
5  *
6  *     Kern Sibbald, September MMIV
7  *
8  *  Basic tasks done here:
9  *     Open DB and create records for this job.
10  *     Open Message Channel with Storage daemon to tell him a job will be starting.
11  *     Open connection with Storage daemon and pass him commands
12  *       to do the backup.
13  *     When the Storage daemon finishes the job, update the DB.
14  *
15  *   Version $Id$
16  */
17 /*
18    Copyright (C) 2004-2006 Kern Sibbald
19
20    This program is free software; you can redistribute it and/or
21    modify it under the terms of the GNU General Public License
22    version 2 as amended with additional clauses defined in the
23    file LICENSE in the main source directory.
24
25    This program is distributed in the hope that it will be useful,
26    but WITHOUT ANY WARRANTY; without even the implied warranty of
27    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
28    the file LICENSE for additional details.
29
30  */
31
32 #include "bacula.h"
33 #include "dird.h"
34 #include "ua.h"
35 #ifndef HAVE_REGEX_H
36 #include "lib/bregex.h"
37 #else
38 #include <regex.h>
39 #endif
40
41 static const int dbglevel = 100;
42
43 static char OKbootstrap[] = "3000 OK bootstrap\n";
44 static bool get_job_to_migrate(JCR *jcr);
45 struct idpkt;
46 static bool regex_find_jobids(JCR *jcr, idpkt *ids, const char *query1,
47                  const char *query2, const char *type);
48 static bool find_mediaid_then_jobids(JCR *jcr, idpkt *ids, const char *query1,
49                  const char *type);
50 static bool find_jobids_from_mediaid_list(JCR *jcr, idpkt *ids, const char *type);
51 static void start_migration_job(JCR *jcr);
52 static int get_next_dbid_from_list(char **p, DBId_t *DBId);
53
54 /* 
55  * Called here before the job is run to do the job
56  *   specific setup.
57  */
58 bool do_migration_init(JCR *jcr)
59 {
60    /* If we find a job or jobs to migrate it is previous_jr.JobId */
61    if (!get_job_to_migrate(jcr)) {
62       return false;
63    }
64
65    if (jcr->previous_jr.JobId == 0) {
66       return true;                    /* no work */
67    }
68
69    if (!get_or_create_fileset_record(jcr)) {
70       return false;
71    }
72
73    apply_pool_overrides(jcr);
74
75    jcr->jr.PoolId = get_or_create_pool_record(jcr, jcr->pool->hdr.name);
76    if (jcr->jr.PoolId == 0) {
77       return false;
78    }
79
80    /* If pool storage specified, use it instead of job storage */
81    copy_wstorage(jcr, jcr->pool->storage, _("Pool resource"));
82
83    if (!jcr->wstorage) {
84       Jmsg(jcr, M_FATAL, 0, _("No Storage specification found in Job or Pool.\n"));
85       return false;
86    }
87
88    create_restore_bootstrap_file(jcr);
89    return true;
90 }
91
92 /*
93  * Do a Migration of a previous job
94  *
95  *  Returns:  false on failure
96  *            true  on success
97  */
98 bool do_migration(JCR *jcr)
99 {
100    POOL_DBR pr;
101    POOL *pool;
102    char ed1[100];
103    BSOCK *sd;
104    JOB *job, *prev_job;
105    JCR *mig_jcr;                   /* newly migrated job */
106
107    /* 
108     *  previous_jr refers to the job DB record of the Job that is
109     *    going to be migrated.
110     *  prev_job refers to the job resource of the Job that is
111     *    going to be migrated.
112     *  jcr is the jcr for the current "migration" job.  It is a
113     *    control job that is put in the DB as a migration job, which
114     *    means that this job migrated a previous job to a new job.
115     *    No Volume or File data is associated with this control
116     *    job.
117     *  mig_jcr refers to the newly migrated job that is run by
118     *    the current jcr.  It is a backup job that moves (migrates) the
119     *    data written for the previous_jr into the new pool.  This
120     *    job (mig_jcr) becomes the new backup job that replaces
121     *    the original backup job.
122     */
123    if (jcr->previous_jr.JobId == 0 || jcr->ExpectedFiles == 0) {
124       set_jcr_job_status(jcr, JS_Terminated);
125       migration_cleanup(jcr, jcr->JobStatus);
126       return true;                    /* no work */
127    }
128
129    Dmsg4(dbglevel, "Previous: Name=%s JobId=%d Type=%c Level=%c\n",
130       jcr->previous_jr.Name, jcr->previous_jr.JobId, 
131       jcr->previous_jr.JobType, jcr->previous_jr.JobLevel);
132
133    Dmsg4(dbglevel, "Current: Name=%s JobId=%d Type=%c Level=%c\n",
134       jcr->jr.Name, jcr->jr.JobId, 
135       jcr->jr.JobType, jcr->jr.JobLevel);
136
137    LockRes();
138    job = (JOB *)GetResWithName(R_JOB, jcr->jr.Name);
139    prev_job = (JOB *)GetResWithName(R_JOB, jcr->previous_jr.Name);
140    UnlockRes();
141    if (!job || !prev_job) {
142       return false;
143    }
144
145    /* Create a migation jcr */
146    mig_jcr = jcr->mig_jcr = new_jcr(sizeof(JCR), dird_free_jcr);
147    memcpy(&mig_jcr->previous_jr, &jcr->previous_jr, sizeof(mig_jcr->previous_jr));
148
149    /*
150     * Turn the mig_jcr into a "real" job that takes on the aspects of
151     *   the previous backup job "prev_job".
152     */
153    set_jcr_defaults(mig_jcr, prev_job);
154    if (!setup_job(mig_jcr)) {
155       return false;
156    }
157
158    /* Now reset the job record from the previous job */
159    memcpy(&mig_jcr->jr, &jcr->previous_jr, sizeof(mig_jcr->jr));
160    /* Update the jr to reflect the new values of PoolId, FileSetId, and JobId. */
161    mig_jcr->jr.PoolId = jcr->jr.PoolId;
162    mig_jcr->jr.FileSetId = jcr->jr.FileSetId;
163    mig_jcr->jr.JobId = mig_jcr->JobId;
164
165    Dmsg4(dbglevel, "mig_jcr: Name=%s JobId=%d Type=%c Level=%c\n",
166       mig_jcr->jr.Name, mig_jcr->jr.JobId, 
167       mig_jcr->jr.JobType, mig_jcr->jr.JobLevel);
168
169    /*
170     * Get the PoolId used with the original job. Then
171     *  find the pool name from the database record.
172     */
173    memset(&pr, 0, sizeof(pr));
174    pr.PoolId = mig_jcr->previous_jr.PoolId;
175    if (!db_get_pool_record(jcr, jcr->db, &pr)) {
176       Jmsg(jcr, M_FATAL, 0, _("Pool for JobId %s not in database. ERR=%s\n"),
177             edit_int64(pr.PoolId, ed1), db_strerror(jcr->db));
178          return false;
179    }
180    /* Get the pool resource corresponding to the original job */
181    pool = (POOL *)GetResWithName(R_POOL, pr.Name);
182    if (!pool) {
183       Jmsg(jcr, M_FATAL, 0, _("Pool resource \"%s\" not found.\n"), pr.Name);
184       return false;
185    }
186
187    /* If pool storage specified, use it for restore */
188    copy_rstorage(mig_jcr, pool->storage, _("Pool resource"));
189    copy_rstorage(jcr, pool->storage, _("Pool resource"));
190
191    /*
192     * If the original backup pool has a NextPool, make sure a 
193     *  record exists in the database. Note, in this case, we
194     *  will be migrating from pool to pool->NextPool.
195     */
196    if (pool->NextPool) {
197       jcr->jr.PoolId = get_or_create_pool_record(jcr, pool->NextPool->hdr.name);
198       if (jcr->jr.PoolId == 0) {
199          return false;
200       }
201       /*
202        * put the "NextPool" resource pointer in our jcr so that we
203        * can pull the Storage reference from it.
204        */
205       mig_jcr->pool = jcr->pool = pool->NextPool;
206       mig_jcr->jr.PoolId = jcr->jr.PoolId;
207       pm_strcpy(jcr->pool_source, _("NextPool in Pool resource"));
208    }
209
210    /* If pool storage specified, use it instead of job storage for backup */
211    copy_wstorage(jcr, jcr->pool->storage, _("Pool resource"));
212
213    /* Print Job Start message */
214    Jmsg(jcr, M_INFO, 0, _("Start Migration JobId %s, Job=%s\n"),
215         edit_uint64(jcr->JobId, ed1), jcr->Job);
216
217    set_jcr_job_status(jcr, JS_Running);
218    set_jcr_job_status(mig_jcr, JS_Running);
219    Dmsg2(dbglevel, "JobId=%d JobLevel=%c\n", jcr->jr.JobId, jcr->jr.JobLevel);
220
221    /* Update job start record for this migration control job */
222    if (!db_update_job_start_record(jcr, jcr->db, &jcr->jr)) {
223       Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(jcr->db));
224       return false;
225    }
226
227    Dmsg4(dbglevel, "mig_jcr: Name=%s JobId=%d Type=%c Level=%c\n",
228       mig_jcr->jr.Name, mig_jcr->jr.JobId, 
229       mig_jcr->jr.JobType, mig_jcr->jr.JobLevel);
230
231    /* Update job start record for the real migration backup job */
232    if (!db_update_job_start_record(mig_jcr, mig_jcr->db, &mig_jcr->jr)) {
233       Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(mig_jcr->db));
234       return false;
235    }
236
237
238    /*
239     * Open a message channel connection with the Storage
240     * daemon. This is to let him know that our client
241     * will be contacting him for a backup  session.
242     *
243     */
244    Dmsg0(110, "Open connection with storage daemon\n");
245    set_jcr_job_status(jcr, JS_WaitSD);
246    set_jcr_job_status(mig_jcr, JS_WaitSD);
247    /*
248     * Start conversation with Storage daemon
249     */
250    if (!connect_to_storage_daemon(jcr, 10, SDConnectTimeout, 1)) {
251       return false;
252    }
253    sd = jcr->store_bsock;
254    /*
255     * Now start a job with the Storage daemon
256     */
257    Dmsg2(dbglevel, "Read store=%s, write store=%s\n", 
258       ((STORE *)jcr->rstorage->first())->name(),
259       ((STORE *)jcr->wstorage->first())->name());
260    if (!start_storage_daemon_job(jcr, jcr->rstorage, jcr->wstorage)) {
261       return false;
262    }
263    Dmsg0(150, "Storage daemon connection OK\n");
264
265    if (!send_bootstrap_file(jcr, sd) ||
266        !response(jcr, sd, OKbootstrap, "Bootstrap", DISPLAY_ERROR)) {
267       return false;
268    }
269
270    if (!bnet_fsend(sd, "run")) {
271       return false;
272    }
273
274    /*
275     * Now start a Storage daemon message thread
276     */
277    if (!start_storage_daemon_message_thread(jcr)) {
278       return false;
279    }
280
281
282    set_jcr_job_status(jcr, JS_Running);
283    set_jcr_job_status(mig_jcr, JS_Running);
284
285    /* Pickup Job termination data */
286    /* Note, the SD stores in jcr->JobFiles/ReadBytes/JobBytes/Errors */
287    wait_for_storage_daemon_termination(jcr);
288
289    set_jcr_job_status(jcr, jcr->SDJobStatus);
290    if (jcr->JobStatus != JS_Terminated) {
291       return false;
292    }
293    migration_cleanup(jcr, jcr->JobStatus);
294    if (mig_jcr) {
295       UAContext *ua = new_ua_context(jcr);
296       purge_files_from_job(ua, jcr->previous_jr.JobId);
297       free_ua_context(ua);
298    }
299    return true;
300 }
301
302 struct idpkt {
303    POOLMEM *list;
304    uint32_t count;
305 };
306
307 /*
308  * Callback handler make list of DB Ids
309  */
310 static int dbid_handler(void *ctx, int num_fields, char **row)
311 {
312    idpkt *ids = (idpkt *)ctx;
313
314    Dmsg3(dbglevel, "count=%d Ids=%p %s\n", ids->count, ids->list, ids->list);
315    if (ids->count == 0) {
316       ids->list[0] = 0;
317    } else {
318       pm_strcat(ids->list, ",");
319    }
320    pm_strcat(ids->list, row[0]);
321    ids->count++;
322    return 0;
323 }
324
325
326 struct uitem {
327    dlink link;   
328    char *item;
329 };
330
331 static int item_compare(void *item1, void *item2)
332 {
333    uitem *i1 = (uitem *)item1;
334    uitem *i2 = (uitem *)item2;
335    return strcmp(i1->item, i2->item);
336 }
337
338 static int unique_name_handler(void *ctx, int num_fields, char **row)
339 {
340    dlist *list = (dlist *)ctx;
341
342    uitem *new_item = (uitem *)malloc(sizeof(uitem));
343    uitem *item;
344    
345    memset(new_item, 0, sizeof(uitem));
346    new_item->item = bstrdup(row[0]);
347    Dmsg1(dbglevel, "Item=%s\n", row[0]);
348    item = (uitem *)list->binary_insert((void *)new_item, item_compare);
349    if (item != new_item) {            /* already in list */
350       free(new_item->item);
351       free((char *)new_item);
352       return 0;
353    }
354    return 0;
355 }
356
357 #ifdef xxx  /* in development */
358 static int unique_dbid_handler(void *ctx, int num_fields, char **row)
359 {
360    dlist *list = (dlist *)ctx;
361
362    uitem *new_item = (uitem *)malloc(sizeof(uitem));
363    uitem *item;
364    
365    memset(new_item, 0, sizeof(uitem));
366    new_item->item = bstrdup(row[0]);
367    Dmsg1(dbglevel, "Item=%s\n", row[0]);
368    item = (uitem *)list->binary_insert((void *)new_item, item_compare);
369    if (item != new_item) {            /* already in list */
370       free(new_item->item);
371       free((char *)new_item);
372       return 0;
373    }
374    return 0;
375 }
376 #endif
377
378
379
380 /* Get Job names in Pool */
381 const char *sql_job =
382    "SELECT DISTINCT Job.Name from Job,Pool"
383    " WHERE Pool.Name='%s' AND Job.PoolId=Pool.PoolId";
384
385 /* Get JobIds from regex'ed Job names */
386 const char *sql_jobids_from_job =
387    "SELECT DISTINCT Job.JobId,Job.StartTime FROM Job,Pool"
388    " WHERE Job.Name='%s' AND Pool.Name='%s' AND Job.PoolId=Pool.PoolId"
389    " ORDER by Job.StartTime";
390
391 /* Get Client names in Pool */
392 const char *sql_client =
393    "SELECT DISTINCT Client.Name from Client,Pool,Job"
394    " WHERE Pool.Name='%s' AND Job.ClientId=Client.ClientId AND"
395    " Job.PoolId=Pool.PoolId";
396
397 /* Get JobIds from regex'ed Client names */
398 const char *sql_jobids_from_client =
399    "SELECT DISTINCT Job.JobId,Job.StartTime FROM Job,Pool,Client"
400    " WHERE Client.Name='%s' AND Pool.Name='%s' AND Job.PoolId=Pool.PoolId"
401    " AND Job.ClientId=Client.ClientId "
402    " ORDER by Job.StartTime";
403
404 /* Get Volume names in Pool */
405 const char *sql_vol = 
406    "SELECT DISTINCT VolumeName FROM Media,Pool WHERE"
407    " VolStatus in ('Full','Used','Error') AND Media.Enabled=1 AND"
408    " Media.PoolId=Pool.PoolId AND Pool.Name='%s'";
409
410 /* Get JobIds from regex'ed Volume names */
411 const char *sql_jobids_from_vol =
412    "SELECT DISTINCT Job.JobId,Job.StartTime FROM Media,JobMedia,Job"
413    " WHERE Media.VolumeName='%s' AND Media.MediaId=JobMedia.MediaId"
414    " AND JobMedia.JobId=Job.JobId" 
415    " ORDER by Job.StartTime";
416
417
418 const char *sql_smallest_vol = 
419    "SELECT MediaId FROM Media,Pool WHERE"
420    " VolStatus in ('Full','Used','Error') AND Media.Enabled=1 AND"
421    " Media.PoolId=Pool.PoolId AND Pool.Name='%s'"
422    " ORDER BY VolBytes ASC LIMIT 1";
423
424 const char *sql_oldest_vol = 
425    "SELECT MediaId FROM Media,Pool WHERE"
426    " VolStatus in ('Full','Used','Error') AND Media.Enabled=1 AND"
427    " Media.PoolId=Pool.PoolId AND Pool.Name='%s'"
428    " ORDER BY LastWritten ASC LIMIT 1";
429
430 /* Get JobIds when we have selected MediaId */
431 const char *sql_jobids_from_mediaid =
432    "SELECT DISTINCT Job.JobId,Job.StartTime FROM JobMedia,Job"
433    " WHERE JobMedia.JobId=Job.JobId AND JobMedia.MediaId=%s"
434    " ORDER by Job.StartTime";
435
436 /* Get tne number of bytes in the pool */
437 const char *sql_pool_bytes =
438    "SELECT SUM(VolBytes) FROM Media,Pool WHERE"
439    " VolStatus in ('Full','Used','Error','Append') AND Media.Enabled=1 AND"
440    " Media.PoolId=Pool.PoolId AND Pool.Name='%s'";
441
442 /* Get tne number of bytes in the Jobs */
443 const char *sql_job_bytes =
444    "SELECT SUM(JobBytes) FROM Job WHERE JobId IN (%s)";
445
446
447 /* Get Media Ids in Pool */
448 const char *sql_mediaids =
449    "SELECT MediaId FROM Media,Pool WHERE"
450    " VolStatus in ('Full','Used','Error') AND Media.Enabled=1 AND"
451    " Media.PoolId=Pool.PoolId AND Pool.Name='%s' ORDER BY LastWritten ASC";
452
453 /* Get JobIds in Pool longer than specified time */
454 const char *sql_pool_time = 
455    "SELECT DISTINCT Job.JobId from Pool,Job,Media,JobMedia WHERE"
456    " Pool.Name='%s' AND Media.PoolId=Pool.PoolId AND"
457    " VolStatus in ('Full','Used','Error') AND Media.Enabled=1 AND"
458    " JobMedia.JobId=Job.JobId AND Job.PoolId=Media.PoolId"
459    " AND Job.RealEndTime<='%s'";
460
461 /*
462 * const char *sql_ujobid =
463 *   "SELECT DISTINCT Job.Job from Client,Pool,Media,Job,JobMedia "
464 *   " WHERE Media.PoolId=Pool.PoolId AND Pool.Name='%s' AND"
465 *   " JobMedia.JobId=Job.JobId AND Job.PoolId=Media.PoolId";
466 */
467
468
469
470 /*
471  *
472  * This is the central piece of code that finds a job or jobs 
473  *   actually JobIds to migrate.  It first looks to see if one
474  *   has been "manually" specified in jcr->MigrateJobId, and if
475  *   so, it returns that JobId to be run.  Otherwise, it
476  *   examines the Selection Type to see what kind of migration
477  *   we are doing (Volume, Job, Client, ...) and applies any
478  *   Selection Pattern if appropriate to obtain a list of JobIds.
479  *   Finally, it will loop over all the JobIds found, except the last
480  *   one starting a new job with MigrationJobId set to that JobId, and
481  *   finally, it returns the last JobId to the caller.
482  *
483  * Returns: false on error
484  *          true  if OK and jcr->previous_jr filled in
485  */
486 static bool get_job_to_migrate(JCR *jcr)
487 {
488    char ed1[30];
489    POOL_MEM query(PM_MESSAGE);
490    JobId_t JobId;
491    DBId_t  MediaId = 0;
492    int stat;
493    char *p;
494    idpkt ids, mid, jids;
495    db_int64_ctx ctx;
496    int64_t pool_bytes;
497    bool ok;
498    time_t ttime;
499    struct tm tm;
500    char dt[MAX_TIME_LENGTH];
501
502    ids.list = get_pool_memory(PM_MESSAGE);
503    ids.list[0] = 0;
504    ids.count = 0;
505    mid.list = get_pool_memory(PM_MESSAGE);
506    mid.list[0] = 0;
507    mid.count = 0;
508    jids.list = get_pool_memory(PM_MESSAGE);
509    jids.list[0] = 0;
510    jids.count = 0;
511
512
513    /*
514     * If MigrateJobId is set, then we migrate only that Job,
515     *  otherwise, we go through the full selection of jobs to
516     *  migrate.
517     */
518    if (jcr->MigrateJobId != 0) {
519       Dmsg1(dbglevel, "At Job start previous jobid=%u\n", jcr->MigrateJobId);
520       edit_uint64(jcr->MigrateJobId, ids.list);
521       ids.count = 1;
522    } else {
523       switch (jcr->job->selection_type) {
524       case MT_JOB:
525          if (!regex_find_jobids(jcr, &ids, sql_job, sql_jobids_from_job, "Job")) {
526             goto bail_out;
527          } 
528          break;
529       case MT_CLIENT:
530          if (!regex_find_jobids(jcr, &ids, sql_client, sql_jobids_from_client, "Client")) {
531             goto bail_out;
532          } 
533          break;
534       case MT_VOLUME:
535          if (!regex_find_jobids(jcr, &ids, sql_vol, sql_jobids_from_vol, "Volume")) {
536             goto bail_out;
537          } 
538          break;
539       case MT_SQLQUERY:
540          if (!jcr->job->selection_pattern) {
541             Jmsg(jcr, M_FATAL, 0, _("No Migration SQL selection pattern specified.\n"));
542             goto bail_out;
543          }
544          Dmsg1(dbglevel, "SQL=%s\n", jcr->job->selection_pattern);
545          if (!db_sql_query(jcr->db, jcr->job->selection_pattern,
546               dbid_handler, (void *)&ids)) {
547             Jmsg(jcr, M_FATAL, 0,
548                  _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
549             goto bail_out;
550          }
551          break;
552       case MT_SMALLEST_VOL:
553          if (!find_mediaid_then_jobids(jcr, &ids, sql_smallest_vol, "Smallest Volume")) {
554             goto bail_out;
555          }
556          break;
557       case MT_OLDEST_VOL:
558          if (!find_mediaid_then_jobids(jcr, &ids, sql_oldest_vol, "Oldest Volume")) {
559             goto bail_out;
560          }
561          break;
562
563       case MT_POOL_OCCUPANCY:
564          ctx.count = 0;
565          /* Find count of bytes in pool */
566          Mmsg(query, sql_pool_bytes, jcr->pool->hdr.name);
567          if (!db_sql_query(jcr->db, query.c_str(), db_int64_handler, (void *)&ctx)) {
568             Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
569             goto bail_out;
570          }
571          if (ctx.count == 0) {
572             Jmsg(jcr, M_INFO, 0, _("No Volumes found to migrate.\n"));
573             goto ok_out;
574          }
575          pool_bytes = ctx.value;
576          Dmsg2(dbglevel, "highbytes=%d pool=%d\n", (int)jcr->pool->MigrationHighBytes,
577                (int)pool_bytes);
578          if (pool_bytes < (int64_t)jcr->pool->MigrationHighBytes) {
579             Jmsg(jcr, M_INFO, 0, _("No Volumes found to migrate.\n"));
580             goto ok_out;
581          }
582          Dmsg0(dbglevel, "We should do Occupation migration.\n");
583
584          ids.count = 0;
585          /* Find a list of MediaIds that could be migrated */
586          Mmsg(query, sql_mediaids, jcr->pool->hdr.name);
587 //       Dmsg1(dbglevel, "query=%s\n", query.c_str());
588          if (!db_sql_query(jcr->db, query.c_str(), dbid_handler, (void *)&ids)) {
589             Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
590             goto bail_out;
591          }
592          if (ids.count == 0) {
593             Jmsg(jcr, M_INFO, 0, _("No Volumes found to migrate.\n"));
594             goto ok_out;
595          }
596          Dmsg2(dbglevel, "Pool Occupancy ids=%d MediaIds=%s\n", ids.count, ids.list);
597
598          /*
599           * Now loop over MediaIds getting more JobIds to migrate until
600           *  we reduce the pool occupancy below the low water mark.
601           */
602          p = ids.list;
603          for (int i=0; i < (int)ids.count; i++) {
604             stat = get_next_dbid_from_list(&p, &MediaId);
605             Dmsg2(dbglevel, "get_next_dbid stat=%d MediaId=%u\n", stat, MediaId);
606             if (stat < 0) {
607                Jmsg(jcr, M_FATAL, 0, _("Invalid MediaId found.\n"));
608                goto bail_out;
609             } else if (stat == 0) {
610                break;
611             }
612             mid.count = 1;
613             Mmsg(mid.list, "%s", edit_int64(MediaId, ed1));
614             ok = find_jobids_from_mediaid_list(jcr, &mid, "Volumes");
615             if (!ok) {
616                continue;
617             }
618             if (i != 0) {
619                pm_strcat(jids.list, ",");
620             }
621             pm_strcat(jids.list, mid.list);
622             jids.count += mid.count;
623
624             /* Now get the count of bytes added */
625             ctx.count = 0;
626             /* Find count of bytes from Jobs */
627             Mmsg(query, sql_job_bytes, mid.list);
628             if (!db_sql_query(jcr->db, query.c_str(), db_int64_handler, (void *)&ctx)) {
629                Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
630                goto bail_out;
631             }
632             pool_bytes -= ctx.value;
633             Dmsg1(dbglevel, "Job bytes=%d\n", (int)ctx.value);
634             Dmsg2(dbglevel, "lowbytes=%d pool=%d\n", (int)jcr->pool->MigrationLowBytes,
635                   (int)pool_bytes);
636             if (pool_bytes <= (int64_t)jcr->pool->MigrationLowBytes) {
637                Dmsg0(dbglevel, "We should be done.\n");
638                break;
639             }
640
641          }
642          Dmsg2(dbglevel, "Pool Occupancy ids=%d JobIds=%s\n", jids.count, jids.list);
643
644          break;
645
646       case MT_POOL_TIME:
647          ttime = time(NULL) - (time_t)jcr->pool->MigrationTime;
648          (void)localtime_r(&ttime, &tm);
649          strftime(dt, sizeof(dt), "%Y-%m-%d %H:%M:%S", &tm);
650
651          ids.count = 0;
652          Mmsg(query, sql_pool_time, jcr->pool->hdr.name, dt);
653 //       Dmsg1(000, "query=%s\n", query.c_str());
654          if (!db_sql_query(jcr->db, query.c_str(), dbid_handler, (void *)&ids)) {
655             Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
656             goto bail_out;
657          }
658          if (ids.count == 0) {
659             Jmsg(jcr, M_INFO, 0, _("No Volumes found to migrate.\n"));
660             goto ok_out;
661          }
662          Dmsg2(dbglevel, "PoolTime ids=%d JobIds=%s\n", ids.count, ids.list);
663          break;
664
665       default:
666          Jmsg(jcr, M_FATAL, 0, _("Unknown Migration Selection Type.\n"));
667          goto bail_out;
668       }
669    }
670
671    /*
672     * Loop over all jobids except the last one, sending
673     *  them to start_migration_job(), which will start a job
674     *  for each of them.  For the last JobId, we handle it below.
675     */
676    p = ids.list;
677    Jmsg(jcr, M_INFO, 0, _("The following %u JobIds will be migrated: %s\n"),
678       ids.count, ids.list);
679    for (int i=1; i < (int)ids.count; i++) {
680       JobId = 0;
681       stat = get_next_jobid_from_list(&p, &JobId);
682       Dmsg2(dbglevel, "get_next_jobid stat=%d JobId=%u\n", stat, JobId);
683       jcr->MigrateJobId = JobId;
684       start_migration_job(jcr);
685       if (stat < 0) {
686          Jmsg(jcr, M_FATAL, 0, _("Invalid JobId found.\n"));
687          goto bail_out;
688       } else if (stat == 0) {
689          Jmsg(jcr, M_INFO, 0, _("No JobIds found to migrate.\n"));
690          goto ok_out;
691       }
692    }
693    
694    /* Now get the last JobId and handle it in the current job */
695    JobId = 0;
696    stat = get_next_jobid_from_list(&p, &JobId);
697    Dmsg2(dbglevel, "Last get_next_jobid stat=%d JobId=%u\n", stat, JobId);
698    if (stat < 0) {
699       Jmsg(jcr, M_FATAL, 0, _("Invalid JobId found.\n"));
700       goto bail_out;
701    } else if (stat == 0) {
702       Jmsg(jcr, M_INFO, 0, _("No JobIds found to migrate.\n"));
703       goto ok_out;
704    }
705
706    jcr->previous_jr.JobId = JobId;
707    Dmsg1(100, "Previous jobid=%d\n", jcr->previous_jr.JobId);
708
709    if (!db_get_job_record(jcr, jcr->db, &jcr->previous_jr)) {
710       Jmsg(jcr, M_FATAL, 0, _("Could not get job record for JobId %s to migrate. ERR=%s"),
711            edit_int64(jcr->previous_jr.JobId, ed1),
712            db_strerror(jcr->db));
713       goto bail_out;
714    }
715    Jmsg(jcr, M_INFO, 0, _("Migration using JobId=%d Job=%s\n"),
716       jcr->previous_jr.JobId, jcr->previous_jr.Job);
717
718 ok_out:
719    free_pool_memory(ids.list);
720    free_pool_memory(mid.list);
721    free_pool_memory(jids.list);
722    return true;
723
724 bail_out:
725    free_pool_memory(ids.list);
726    free_pool_memory(mid.list);
727    free_pool_memory(jids.list);
728    return false;
729 }
730
731 static void start_migration_job(JCR *jcr)
732 {
733    UAContext *ua = new_ua_context(jcr);
734    char ed1[50];
735    ua->batch = true;
736    Mmsg(ua->cmd, "run %s jobid=%s", jcr->job->hdr.name, 
737         edit_uint64(jcr->MigrateJobId, ed1));
738    Dmsg1(dbglevel, "=============== Migration cmd=%s\n", ua->cmd);
739    parse_ua_args(ua);                 /* parse command */
740    int stat = run_cmd(ua, ua->cmd);
741    if (stat == 0) {
742       Jmsg(jcr, M_ERROR, 0, _("Could not start migration job.\n"));
743    } else {
744       Jmsg(jcr, M_INFO, 0, _("Migration JobId %d started.\n"), stat);
745    }
746    free_ua_context(ua);
747 }
748
749 static bool find_mediaid_then_jobids(JCR *jcr, idpkt *ids, const char *query1,
750                  const char *type) 
751 {
752    bool ok = false;
753    POOL_MEM query(PM_MESSAGE);
754
755    ids->count = 0;
756    /* Basic query for MediaId */
757    Mmsg(query, query1, jcr->pool->hdr.name);
758    if (!db_sql_query(jcr->db, query.c_str(), dbid_handler, (void *)ids)) {
759       Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
760       goto bail_out;
761    }
762    if (ids->count == 0) {
763       Jmsg(jcr, M_INFO, 0, _("No %ss found to migrate.\n"), type);
764    }
765    if (ids->count != 1) {
766       Jmsg(jcr, M_FATAL, 0, _("SQL logic error. Count should be 1 but is %d\n"), 
767          ids->count);
768       goto bail_out;
769    }
770    Dmsg1(dbglevel, "Smallest Vol Jobids=%s\n", ids->list);
771
772    ok = find_jobids_from_mediaid_list(jcr, ids, type);
773
774 bail_out:
775    return ok;
776 }
777
778 static bool find_jobids_from_mediaid_list(JCR *jcr, idpkt *ids, const char *type) 
779 {
780    bool ok = false;
781    POOL_MEM query(PM_MESSAGE);
782
783    Mmsg(query, sql_jobids_from_mediaid, ids->list);
784    ids->count = 0;
785    if (!db_sql_query(jcr->db, query.c_str(), dbid_handler, (void *)ids)) {
786       Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
787       goto bail_out;
788    }
789    if (ids->count == 0) {
790       Jmsg(jcr, M_INFO, 0, _("No %ss found to migrate.\n"), type);
791    }
792    ok = true;
793 bail_out:
794    return ok;
795 }
796
797 static bool regex_find_jobids(JCR *jcr, idpkt *ids, const char *query1,
798                  const char *query2, const char *type) 
799 {
800    dlist *item_chain;
801    uitem *item = NULL;
802    uitem *last_item = NULL;
803    regex_t preg;
804    char prbuf[500];
805    int rc;
806    bool ok = false;
807    POOL_MEM query(PM_MESSAGE);
808
809    item_chain = New(dlist(item, &item->link));
810    if (!jcr->job->selection_pattern) {
811       Jmsg(jcr, M_FATAL, 0, _("No Migration %s selection pattern specified.\n"),
812          type);
813       goto bail_out;
814    }
815    Dmsg1(dbglevel, "regex=%s\n", jcr->job->selection_pattern);
816    /* Compile regex expression */
817    rc = regcomp(&preg, jcr->job->selection_pattern, REG_EXTENDED);
818    if (rc != 0) {
819       regerror(rc, &preg, prbuf, sizeof(prbuf));
820       Jmsg(jcr, M_FATAL, 0, _("Could not compile regex pattern \"%s\" ERR=%s\n"),
821            jcr->job->selection_pattern, prbuf);
822       goto bail_out;
823    }
824    /* Basic query for names */
825    Mmsg(query, query1, jcr->pool->hdr.name);
826    Dmsg1(dbglevel, "query1=%s\n", query.c_str());
827    if (!db_sql_query(jcr->db, query.c_str(), unique_name_handler, 
828         (void *)item_chain)) {
829       Jmsg(jcr, M_FATAL, 0,
830            _("SQL to get %s failed. ERR=%s\n"), type, db_strerror(jcr->db));
831       goto bail_out;
832    }
833    /* Now apply the regex to the names and remove any item not matched */
834    foreach_dlist(item, item_chain) {
835       const int nmatch = 30;
836       regmatch_t pmatch[nmatch];
837       if (last_item) {
838          Dmsg1(dbglevel, "Remove item %s\n", last_item->item);
839          free(last_item->item);
840          item_chain->remove(last_item);
841       }
842       Dmsg1(dbglevel, "Item=%s\n", item->item);
843       rc = regexec(&preg, item->item, nmatch, pmatch,  0);
844       if (rc == 0) {
845          last_item = NULL;   /* keep this one */
846       } else {   
847          last_item = item;
848       }
849    }
850    if (last_item) {
851       free(last_item->item);
852       Dmsg1(dbglevel, "Remove item %s\n", last_item->item);
853       item_chain->remove(last_item);
854    }
855    regfree(&preg);
856    /* 
857     * At this point, we have a list of items in item_chain
858     *  that have been matched by the regex, so now we need
859     *  to look up their jobids.
860     */
861    ids->count = 0;
862    foreach_dlist(item, item_chain) {
863       Dmsg2(dbglevel, "Got %s: %s\n", type, item->item);
864       Mmsg(query, query2, item->item, jcr->pool->hdr.name);
865       Dmsg1(dbglevel, "query2=%s\n", query.c_str());
866       if (!db_sql_query(jcr->db, query.c_str(), dbid_handler, (void *)ids)) {
867          Jmsg(jcr, M_FATAL, 0,
868               _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
869          goto bail_out;
870       }
871    }
872    if (ids->count == 0) {
873       Jmsg(jcr, M_INFO, 0, _("No %ss found to migrate.\n"), type);
874    }
875    ok = true;
876 bail_out:
877    Dmsg2(dbglevel, "Count=%d Jobids=%s\n", ids->count, ids->list);
878    delete item_chain;
879    Dmsg0(dbglevel, "After delete item_chain\n");
880    return ok;
881 }
882
883
884 /*
885  * Release resources allocated during backup.
886  */
887 void migration_cleanup(JCR *jcr, int TermCode)
888 {
889    char sdt[MAX_TIME_LENGTH], edt[MAX_TIME_LENGTH];
890    char ec1[30], ec2[30], ec3[30], ec4[30], ec5[30], elapsed[50];
891    char ec6[50], ec7[50], ec8[50];
892    char term_code[100], sd_term_msg[100];
893    const char *term_msg;
894    int msg_type;
895    MEDIA_DBR mr;
896    double kbps;
897    utime_t RunTime;
898    JCR *mig_jcr = jcr->mig_jcr;
899    POOL_MEM query(PM_MESSAGE);
900
901    Dmsg2(100, "Enter migrate_cleanup %d %c\n", TermCode, TermCode);
902    dequeue_messages(jcr);             /* display any queued messages */
903    memset(&mr, 0, sizeof(mr));
904    set_jcr_job_status(jcr, TermCode);
905    update_job_end_record(jcr);        /* update database */
906
907    /* 
908     * Check if we actually did something.  
909     *  mig_jcr is jcr of the newly migrated job.
910     */
911    if (mig_jcr) {
912       mig_jcr->JobFiles = jcr->JobFiles = jcr->SDJobFiles;
913       mig_jcr->JobBytes = jcr->JobBytes = jcr->SDJobBytes;
914       mig_jcr->VolSessionId = jcr->VolSessionId;
915       mig_jcr->VolSessionTime = jcr->VolSessionTime;
916       mig_jcr->jr.RealEndTime = 0; 
917       mig_jcr->jr.PriorJobId = jcr->previous_jr.JobId;
918
919       set_jcr_job_status(mig_jcr, TermCode);
920
921   
922       update_job_end_record(mig_jcr);
923      
924       /* Update final items to set them to the previous job's values */
925       Mmsg(query, "UPDATE Job SET StartTime='%s',EndTime='%s',"
926                   "JobTDate=%s WHERE JobId=%s", 
927          jcr->previous_jr.cStartTime, jcr->previous_jr.cEndTime, 
928          edit_uint64(jcr->previous_jr.JobTDate, ec1),
929          edit_uint64(mig_jcr->jr.JobId, ec2));
930       db_sql_query(mig_jcr->db, query.c_str(), NULL, NULL);
931
932       /* Now marke the previous job as migrated */
933       Mmsg(query, "UPDATE Job SET Type='%c' WHERE JobId=%s",
934            (char)JT_MIGRATED_JOB, edit_uint64(jcr->previous_jr.JobId, ec1));
935       db_sql_query(mig_jcr->db, query.c_str(), NULL, NULL);
936
937       if (!db_get_job_record(jcr, jcr->db, &jcr->jr)) {
938          Jmsg(jcr, M_WARNING, 0, _("Error getting job record for stats: %s"),
939             db_strerror(jcr->db));
940          set_jcr_job_status(jcr, JS_ErrorTerminated);
941       }
942
943       bstrncpy(mr.VolumeName, jcr->VolumeName, sizeof(mr.VolumeName));
944       if (!db_get_media_record(jcr, jcr->db, &mr)) {
945          Jmsg(jcr, M_WARNING, 0, _("Error getting Media record for Volume \"%s\": ERR=%s"),
946             mr.VolumeName, db_strerror(jcr->db));
947          set_jcr_job_status(jcr, JS_ErrorTerminated);
948       }
949
950       update_bootstrap_file(mig_jcr);
951
952       if (!db_get_job_volume_names(mig_jcr, mig_jcr->db, mig_jcr->jr.JobId, &mig_jcr->VolumeName)) {
953          /*
954           * Note, if the job has erred, most likely it did not write any
955           *  tape, so suppress this "error" message since in that case
956           *  it is normal.  Or look at it the other way, only for a
957           *  normal exit should we complain about this error.
958           */
959          if (jcr->JobStatus == JS_Terminated && jcr->jr.JobBytes) {
960             Jmsg(jcr, M_ERROR, 0, "%s", db_strerror(mig_jcr->db));
961          }
962          mig_jcr->VolumeName[0] = 0;         /* none */
963       }
964   }
965
966    msg_type = M_INFO;                 /* by default INFO message */
967    switch (jcr->JobStatus) {
968    case JS_Terminated:
969       if (jcr->Errors || jcr->SDErrors) {
970          term_msg = _("%s OK -- with warnings");
971       } else {
972          term_msg = _("%s OK");
973       }
974       break;
975    case JS_FatalError:
976    case JS_ErrorTerminated:
977       term_msg = _("*** %s Error ***");
978       msg_type = M_ERROR;          /* Generate error message */
979       if (jcr->store_bsock) {
980          bnet_sig(jcr->store_bsock, BNET_TERMINATE);
981          if (jcr->SD_msg_chan) {
982             pthread_cancel(jcr->SD_msg_chan);
983          }
984       }
985       break;
986    case JS_Canceled:
987       term_msg = _("%s Canceled");
988       if (jcr->store_bsock) {
989          bnet_sig(jcr->store_bsock, BNET_TERMINATE);
990          if (jcr->SD_msg_chan) {
991             pthread_cancel(jcr->SD_msg_chan);
992          }
993       }
994       break;
995    default:
996       term_msg = _("Inappropriate %s term code");
997       break;
998    }
999    bsnprintf(term_code, sizeof(term_code), term_msg, "Migration");
1000    bstrftimes(sdt, sizeof(sdt), jcr->jr.StartTime);
1001    bstrftimes(edt, sizeof(edt), jcr->jr.EndTime);
1002    RunTime = jcr->jr.EndTime - jcr->jr.StartTime;
1003    if (RunTime <= 0) {
1004       kbps = 0;
1005    } else {
1006       kbps = (double)jcr->SDJobBytes / (1000 * RunTime);
1007    }
1008
1009
1010    jobstatus_to_ascii(jcr->SDJobStatus, sd_term_msg, sizeof(sd_term_msg));
1011
1012    Jmsg(jcr, msg_type, 0, _("Bacula %s (%s): %s\n"
1013 "  Prev Backup JobId:      %s\n"
1014 "  New Backup JobId:       %s\n"
1015 "  Migration JobId:        %s\n"
1016 "  Migration Job:          %s\n"
1017 "  Backup Level:           %s%s\n"
1018 "  Client:                 %s\n"
1019 "  FileSet:                \"%s\" %s\n"
1020 "  Pool:                   \"%s\" (From %s)\n"
1021 "  Storage:                \"%s\" (From %s)\n"
1022 "  Start time:             %s\n"
1023 "  End time:               %s\n"
1024 "  Elapsed time:           %s\n"
1025 "  Priority:               %d\n"
1026 "  SD Files Written:       %s\n"
1027 "  SD Bytes Written:       %s (%sB)\n"
1028 "  Rate:                   %.1f KB/s\n"
1029 "  Volume name(s):         %s\n"
1030 "  Volume Session Id:      %d\n"
1031 "  Volume Session Time:    %d\n"
1032 "  Last Volume Bytes:      %s (%sB)\n"
1033 "  SD Errors:              %d\n"
1034 "  SD termination status:  %s\n"
1035 "  Termination:            %s\n\n"),
1036    VERSION,
1037    LSMDATE,
1038         edt, 
1039         mig_jcr ? edit_uint64(jcr->previous_jr.JobId, ec6) : "0", 
1040         mig_jcr ? edit_uint64(mig_jcr->jr.JobId, ec7) : "0",
1041         edit_uint64(jcr->jr.JobId, ec8),
1042         jcr->jr.Job,
1043         level_to_str(jcr->JobLevel), jcr->since,
1044         jcr->client->name(),
1045         jcr->fileset->name(), jcr->FSCreateTime,
1046         jcr->pool->name(), jcr->pool_source,
1047         jcr->wstore->name(), jcr->storage_source,
1048         sdt,
1049         edt,
1050         edit_utime(RunTime, elapsed, sizeof(elapsed)),
1051         jcr->JobPriority,
1052         edit_uint64_with_commas(jcr->SDJobFiles, ec1),
1053         edit_uint64_with_commas(jcr->SDJobBytes, ec2),
1054         edit_uint64_with_suffix(jcr->SDJobBytes, ec3),
1055         (float)kbps,
1056         mig_jcr ? mig_jcr->VolumeName : "",
1057         jcr->VolSessionId,
1058         jcr->VolSessionTime,
1059         edit_uint64_with_commas(mr.VolBytes, ec4),
1060         edit_uint64_with_suffix(mr.VolBytes, ec5),
1061         jcr->SDErrors,
1062         sd_term_msg,
1063         term_code);
1064
1065    Dmsg1(100, "migrate_cleanup() mig_jcr=0x%x\n", jcr->mig_jcr);
1066    if (jcr->mig_jcr) {
1067       free_jcr(jcr->mig_jcr);
1068       jcr->mig_jcr = NULL;
1069    }
1070    Dmsg0(100, "Leave migrate_cleanup()\n");
1071 }
1072
1073 /* 
1074  * Return next DBId from comma separated list   
1075  *
1076  * Returns:
1077  *   1 if next DBId returned
1078  *   0 if no more DBIds are in list
1079  *  -1 there is an error
1080  */
1081 static int get_next_dbid_from_list(char **p, DBId_t *DBId)
1082 {
1083    char id[30];
1084    char *q = *p;
1085
1086    id[0] = 0;
1087    for (int i=0; i<(int)sizeof(id); i++) {
1088       if (*q == 0) {
1089          break;
1090       } else if (*q == ',') {
1091          q++;
1092          break;
1093       }
1094       id[i] = *q++;
1095       id[i+1] = 0;
1096    }
1097    if (id[0] == 0) {
1098       return 0;
1099    } else if (!is_a_number(id)) {
1100       return -1;                      /* error */
1101    }
1102    *p = q;
1103    *DBId = str_to_int64(id);
1104    return 1;
1105 }