]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/migrate.c
kes Implement code to pass the MediaId to the SD. The SD then uses
[bacula/bacula] / bacula / src / dird / migrate.c
1 /*
2  *
3  *   Bacula Director -- migrate.c -- responsible for doing
4  *     migration jobs.
5  *
6  *     Kern Sibbald, September MMIV
7  *
8  *  Basic tasks done here:
9  *     Open DB and create records for this job.
10  *     Open Message Channel with Storage daemon to tell him a job will be starting.
11  *     Open connection with Storage daemon and pass him commands
12  *       to do the backup.
13  *     When the Storage daemon finishes the job, update the DB.
14  *
15  *   Version $Id$
16  */
17 /*
18    Copyright (C) 2004-2006 Kern Sibbald
19
20    This program is free software; you can redistribute it and/or
21    modify it under the terms of the GNU General Public License
22    version 2 as amended with additional clauses defined in the
23    file LICENSE in the main source directory.
24
25    This program is distributed in the hope that it will be useful,
26    but WITHOUT ANY WARRANTY; without even the implied warranty of
27    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
28    the file LICENSE for additional details.
29
30  */
31
32 #include "bacula.h"
33 #include "dird.h"
34 #include "ua.h"
35 #ifndef HAVE_REGEX_H
36 #include "lib/bregex.h"
37 #else
38 #include <regex.h>
39 #endif
40
41 static const int dbglevel = 100;
42
43 static char OKbootstrap[] = "3000 OK bootstrap\n";
44 static bool get_job_to_migrate(JCR *jcr);
45 struct idpkt;
46 static bool regex_find_jobids(JCR *jcr, idpkt *ids, const char *query1,
47                  const char *query2, const char *type);
48 static bool find_mediaid_then_jobids(JCR *jcr, idpkt *ids, const char *query1,
49                  const char *type);
50 static bool find_jobids_from_mediaid_list(JCR *jcr, idpkt *ids, const char *type);
51 static void start_migration_job(JCR *jcr);
52 static int get_next_dbid_from_list(char **p, DBId_t *DBId);
53
54 /* 
55  * Called here before the job is run to do the job
56  *   specific setup.
57  */
58 bool do_migration_init(JCR *jcr)
59 {
60    /* If we find a job or jobs to migrate it is previous_jr.JobId */
61    if (!get_job_to_migrate(jcr)) {
62       return false;
63    }
64
65    if (jcr->previous_jr.JobId == 0) {
66       return true;                    /* no work */
67    }
68
69    if (!get_or_create_fileset_record(jcr)) {
70       return false;
71    }
72
73    apply_pool_overrides(jcr);
74
75    jcr->jr.PoolId = get_or_create_pool_record(jcr, jcr->pool->hdr.name);
76    if (jcr->jr.PoolId == 0) {
77       return false;
78    }
79
80    /* If pool storage specified, use it instead of job storage */
81    copy_wstorage(jcr, jcr->pool->storage, _("Pool resource"));
82
83    if (!jcr->wstorage) {
84       Jmsg(jcr, M_FATAL, 0, _("No Storage specification found in Job or Pool.\n"));
85       return false;
86    }
87
88    create_restore_bootstrap_file(jcr);
89    return true;
90 }
91
92 /*
93  * Do a Migration of a previous job
94  *
95  *  Returns:  false on failure
96  *            true  on success
97  */
98 bool do_migration(JCR *jcr)
99 {
100    POOL_DBR pr;
101    POOL *pool;
102    char ed1[100];
103    BSOCK *sd;
104    JOB *job, *prev_job;
105    JCR *mig_jcr;                   /* newly migrated job */
106
107    /* 
108     *  previous_jr refers to the job DB record of the Job that is
109     *    going to be migrated.
110     *  prev_job refers to the job resource of the Job that is
111     *    going to be migrated.
112     *  jcr is the jcr for the current "migration" job.  It is a
113     *    control job that is put in the DB as a migration job, which
114     *    means that this job migrated a previous job to a new job.
115     *    No Volume or File data is associated with this control
116     *    job.
117     *  mig_jcr refers to the newly migrated job that is run by
118     *    the current jcr.  It is a backup job that moves (migrates) the
119     *    data written for the previous_jr into the new pool.  This
120     *    job (mig_jcr) becomes the new backup job that replaces
121     *    the original backup job.
122     */
123    if (jcr->previous_jr.JobId == 0 || jcr->ExpectedFiles == 0) {
124       set_jcr_job_status(jcr, JS_Terminated);
125       migration_cleanup(jcr, jcr->JobStatus);
126       return true;                    /* no work */
127    }
128
129    Dmsg4(dbglevel, "Previous: Name=%s JobId=%d Type=%c Level=%c\n",
130       jcr->previous_jr.Name, jcr->previous_jr.JobId, 
131       jcr->previous_jr.JobType, jcr->previous_jr.JobLevel);
132
133    Dmsg4(dbglevel, "Current: Name=%s JobId=%d Type=%c Level=%c\n",
134       jcr->jr.Name, jcr->jr.JobId, 
135       jcr->jr.JobType, jcr->jr.JobLevel);
136
137    LockRes();
138    job = (JOB *)GetResWithName(R_JOB, jcr->jr.Name);
139    prev_job = (JOB *)GetResWithName(R_JOB, jcr->previous_jr.Name);
140    UnlockRes();
141    if (!job || !prev_job) {
142       return false;
143    }
144
145    /* Create a migation jcr */
146    mig_jcr = jcr->mig_jcr = new_jcr(sizeof(JCR), dird_free_jcr);
147    memcpy(&mig_jcr->previous_jr, &jcr->previous_jr, sizeof(mig_jcr->previous_jr));
148
149    /*
150     * Turn the mig_jcr into a "real" job that takes on the aspects of
151     *   the previous backup job "prev_job".
152     */
153    set_jcr_defaults(mig_jcr, prev_job);
154    if (!setup_job(mig_jcr)) {
155       return false;
156    }
157
158    /* Now reset the job record from the previous job */
159    memcpy(&mig_jcr->jr, &jcr->previous_jr, sizeof(mig_jcr->jr));
160    /* Update the jr to reflect the new values of PoolId, FileSetId, and JobId. */
161    mig_jcr->jr.PoolId = jcr->jr.PoolId;
162    mig_jcr->jr.FileSetId = jcr->jr.FileSetId;
163    mig_jcr->jr.JobId = mig_jcr->JobId;
164
165    Dmsg4(dbglevel, "mig_jcr: Name=%s JobId=%d Type=%c Level=%c\n",
166       mig_jcr->jr.Name, mig_jcr->jr.JobId, 
167       mig_jcr->jr.JobType, mig_jcr->jr.JobLevel);
168
169    /*
170     * Get the PoolId used with the original job. Then
171     *  find the pool name from the database record.
172     */
173    memset(&pr, 0, sizeof(pr));
174    pr.PoolId = mig_jcr->previous_jr.PoolId;
175    if (!db_get_pool_record(jcr, jcr->db, &pr)) {
176       Jmsg(jcr, M_FATAL, 0, _("Pool for JobId %s not in database. ERR=%s\n"),
177             edit_int64(pr.PoolId, ed1), db_strerror(jcr->db));
178          return false;
179    }
180    /* Get the pool resource corresponding to the original job */
181    pool = (POOL *)GetResWithName(R_POOL, pr.Name);
182    if (!pool) {
183       Jmsg(jcr, M_FATAL, 0, _("Pool resource \"%s\" not found.\n"), pr.Name);
184       return false;
185    }
186
187    /* If pool storage specified, use it for restore */
188    copy_rstorage(mig_jcr, pool->storage, _("Pool resource"));
189    copy_rstorage(jcr, pool->storage, _("Pool resource"));
190
191    /*
192     * If the original backup pool has a NextPool, make sure a 
193     *  record exists in the database. Note, in this case, we
194     *  will be migrating from pool to pool->NextPool.
195     */
196    if (pool->NextPool) {
197       jcr->jr.PoolId = get_or_create_pool_record(jcr, pool->NextPool->hdr.name);
198       if (jcr->jr.PoolId == 0) {
199          return false;
200       }
201       /*
202        * put the "NextPool" resource pointer in our jcr so that we
203        * can pull the Storage reference from it.
204        */
205       mig_jcr->pool = jcr->pool = pool->NextPool;
206       mig_jcr->jr.PoolId = jcr->jr.PoolId;
207       pm_strcpy(jcr->pool_source, _("NextPool in Pool resource"));
208    }
209
210    /* If pool storage specified, use it instead of job storage for backup */
211    copy_wstorage(jcr, jcr->pool->storage, _("Pool resource"));
212
213    /* Print Job Start message */
214    Jmsg(jcr, M_INFO, 0, _("Start Migration JobId %s, Job=%s\n"),
215         edit_uint64(jcr->JobId, ed1), jcr->Job);
216
217    set_jcr_job_status(jcr, JS_Running);
218    set_jcr_job_status(mig_jcr, JS_Running);
219    Dmsg2(dbglevel, "JobId=%d JobLevel=%c\n", jcr->jr.JobId, jcr->jr.JobLevel);
220
221    /* Update job start record for this migration control job */
222    if (!db_update_job_start_record(jcr, jcr->db, &jcr->jr)) {
223       Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(jcr->db));
224       return false;
225    }
226
227    Dmsg4(dbglevel, "mig_jcr: Name=%s JobId=%d Type=%c Level=%c\n",
228       mig_jcr->jr.Name, mig_jcr->jr.JobId, 
229       mig_jcr->jr.JobType, mig_jcr->jr.JobLevel);
230
231    /* Update job start record for the real migration backup job */
232    if (!db_update_job_start_record(mig_jcr, mig_jcr->db, &mig_jcr->jr)) {
233       Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(mig_jcr->db));
234       return false;
235    }
236
237
238    /*
239     * Open a message channel connection with the Storage
240     * daemon. This is to let him know that our client
241     * will be contacting him for a backup  session.
242     *
243     */
244    Dmsg0(110, "Open connection with storage daemon\n");
245    set_jcr_job_status(jcr, JS_WaitSD);
246    set_jcr_job_status(mig_jcr, JS_WaitSD);
247    /*
248     * Start conversation with Storage daemon
249     */
250    if (!connect_to_storage_daemon(jcr, 10, SDConnectTimeout, 1)) {
251       return false;
252    }
253    sd = jcr->store_bsock;
254    /*
255     * Now start a job with the Storage daemon
256     */
257    Dmsg2(dbglevel, "Read store=%s, write store=%s\n", 
258       ((STORE *)jcr->rstorage->first())->name(),
259       ((STORE *)jcr->wstorage->first())->name());
260    if (!start_storage_daemon_job(jcr, jcr->rstorage, jcr->wstorage)) {
261       return false;
262    }
263    Dmsg0(150, "Storage daemon connection OK\n");
264
265    if (!send_bootstrap_file(jcr, sd) ||
266        !response(jcr, sd, OKbootstrap, "Bootstrap", DISPLAY_ERROR)) {
267       return false;
268    }
269
270    if (!bnet_fsend(sd, "run")) {
271       return false;
272    }
273
274    /*
275     * Now start a Storage daemon message thread
276     */
277    if (!start_storage_daemon_message_thread(jcr)) {
278       return false;
279    }
280
281
282    set_jcr_job_status(jcr, JS_Running);
283    set_jcr_job_status(mig_jcr, JS_Running);
284
285    /* Pickup Job termination data */
286    /* Note, the SD stores in jcr->JobFiles/ReadBytes/JobBytes/Errors */
287    wait_for_storage_daemon_termination(jcr);
288
289    set_jcr_job_status(jcr, jcr->SDJobStatus);
290    if (jcr->JobStatus != JS_Terminated) {
291       return false;
292    }
293    migration_cleanup(jcr, jcr->JobStatus);
294    if (mig_jcr) {
295       UAContext *ua = new_ua_context(jcr);
296       purge_files_from_job(ua, jcr->previous_jr.JobId);
297       free_ua_context(ua);
298    }
299    return true;
300 }
301
302 struct idpkt {
303    POOLMEM *list;
304    uint32_t count;
305 };
306
307 /*
308  * Callback handler make list of DB Ids
309  */
310 static int dbid_handler(void *ctx, int num_fields, char **row)
311 {
312    idpkt *ids = (idpkt *)ctx;
313
314    if (ids->count == 0) {
315       ids->list[0] = 0;
316    } else {
317       pm_strcat(ids->list, ",");
318    }
319    pm_strcat(ids->list, row[0]);
320    ids->count++;
321    Dmsg3(dbglevel, "dbid_hdlr count=%d Ids=%p %s\n", ids->count, ids->list, ids->list);
322    return 0;
323 }
324
325
326 struct uitem {
327    dlink link;   
328    char *item;
329 };
330
331 static int item_compare(void *item1, void *item2)
332 {
333    uitem *i1 = (uitem *)item1;
334    uitem *i2 = (uitem *)item2;
335    return strcmp(i1->item, i2->item);
336 }
337
338 static int unique_name_handler(void *ctx, int num_fields, char **row)
339 {
340    dlist *list = (dlist *)ctx;
341
342    uitem *new_item = (uitem *)malloc(sizeof(uitem));
343    uitem *item;
344    
345    memset(new_item, 0, sizeof(uitem));
346    new_item->item = bstrdup(row[0]);
347    Dmsg1(dbglevel, "Unique_name_hdlr Item=%s\n", row[0]);
348    item = (uitem *)list->binary_insert((void *)new_item, item_compare);
349    if (item != new_item) {            /* already in list */
350       free(new_item->item);
351       free((char *)new_item);
352       return 0;
353    }
354    return 0;
355 }
356
357 #ifdef xxx  /* in development */
358 static int unique_dbid_handler(void *ctx, int num_fields, char **row)
359 {
360    dlist *list = (dlist *)ctx;
361
362    uitem *new_item = (uitem *)malloc(sizeof(uitem));
363    uitem *item;
364    
365    memset(new_item, 0, sizeof(uitem));
366    new_item->item = bstrdup(row[0]);
367    Dmsg1(dbglevel, "Item=%s\n", row[0]);
368    item = (uitem *)list->binary_insert((void *)new_item, item_compare);
369    if (item != new_item) {            /* already in list */
370       free(new_item->item);
371       free((char *)new_item);
372       return 0;
373    }
374    return 0;
375 }
376 #endif
377
378
379
380 /* Get Job names in Pool */
381 const char *sql_job =
382    "SELECT DISTINCT Job.Name from Job,Pool"
383    " WHERE Pool.Name='%s' AND Job.PoolId=Pool.PoolId";
384
385 /* Get JobIds from regex'ed Job names */
386 const char *sql_jobids_from_job =
387    "SELECT DISTINCT Job.JobId,Job.StartTime FROM Job,Pool"
388    " WHERE Job.Name='%s' AND Pool.Name='%s' AND Job.PoolId=Pool.PoolId"
389    " ORDER by Job.StartTime";
390
391 /* Get Client names in Pool */
392 const char *sql_client =
393    "SELECT DISTINCT Client.Name from Client,Pool,Job"
394    " WHERE Pool.Name='%s' AND Job.ClientId=Client.ClientId AND"
395    " Job.PoolId=Pool.PoolId";
396
397 /* Get JobIds from regex'ed Client names */
398 const char *sql_jobids_from_client =
399    "SELECT DISTINCT Job.JobId,Job.StartTime FROM Job,Pool,Client"
400    " WHERE Client.Name='%s' AND Pool.Name='%s' AND Job.PoolId=Pool.PoolId"
401    " AND Job.ClientId=Client.ClientId "
402    " ORDER by Job.StartTime";
403
404 /* Get Volume names in Pool */
405 const char *sql_vol = 
406    "SELECT DISTINCT VolumeName FROM Media,Pool WHERE"
407    " VolStatus in ('Full','Used','Error') AND Media.Enabled=1 AND"
408    " Media.PoolId=Pool.PoolId AND Pool.Name='%s'";
409
410 /* Get JobIds from regex'ed Volume names */
411 const char *sql_jobids_from_vol =
412    "SELECT DISTINCT Job.JobId,Job.StartTime FROM Media,JobMedia,Job"
413    " WHERE Media.VolumeName='%s' AND Media.MediaId=JobMedia.MediaId"
414    " AND JobMedia.JobId=Job.JobId" 
415    " ORDER by Job.StartTime";
416
417
418 const char *sql_smallest_vol = 
419    "SELECT MediaId FROM Media,Pool WHERE"
420    " VolStatus in ('Full','Used','Error') AND Media.Enabled=1 AND"
421    " Media.PoolId=Pool.PoolId AND Pool.Name='%s'"
422    " ORDER BY VolBytes ASC LIMIT 1";
423
424 const char *sql_oldest_vol = 
425    "SELECT MediaId FROM Media,Pool WHERE"
426    " VolStatus in ('Full','Used','Error') AND Media.Enabled=1 AND"
427    " Media.PoolId=Pool.PoolId AND Pool.Name='%s'"
428    " ORDER BY LastWritten ASC LIMIT 1";
429
430 /* Get JobIds when we have selected MediaId */
431 const char *sql_jobids_from_mediaid =
432    "SELECT DISTINCT Job.JobId,Job.StartTime FROM JobMedia,Job"
433    " WHERE JobMedia.JobId=Job.JobId AND JobMedia.MediaId=%s"
434    " ORDER by Job.StartTime";
435
436 /* Get tne number of bytes in the pool */
437 const char *sql_pool_bytes =
438    "SELECT SUM(VolBytes) FROM Media,Pool WHERE"
439    " VolStatus in ('Full','Used','Error','Append') AND Media.Enabled=1 AND"
440    " Media.PoolId=Pool.PoolId AND Pool.Name='%s'";
441
442 /* Get tne number of bytes in the Jobs */
443 const char *sql_job_bytes =
444    "SELECT SUM(JobBytes) FROM Job WHERE JobId IN (%s)";
445
446
447 /* Get Media Ids in Pool */
448 const char *sql_mediaids =
449    "SELECT MediaId FROM Media,Pool WHERE"
450    " VolStatus in ('Full','Used','Error') AND Media.Enabled=1 AND"
451    " Media.PoolId=Pool.PoolId AND Pool.Name='%s' ORDER BY LastWritten ASC";
452
453 /* Get JobIds in Pool longer than specified time */
454 const char *sql_pool_time = 
455    "SELECT DISTINCT Job.JobId from Pool,Job,Media,JobMedia WHERE"
456    " Pool.Name='%s' AND Media.PoolId=Pool.PoolId AND"
457    " VolStatus in ('Full','Used','Error') AND Media.Enabled=1 AND"
458    " JobMedia.JobId=Job.JobId AND Job.PoolId=Media.PoolId"
459    " AND Job.RealEndTime<='%s'";
460
461 /*
462 * const char *sql_ujobid =
463 *   "SELECT DISTINCT Job.Job from Client,Pool,Media,Job,JobMedia "
464 *   " WHERE Media.PoolId=Pool.PoolId AND Pool.Name='%s' AND"
465 *   " JobMedia.JobId=Job.JobId AND Job.PoolId=Media.PoolId";
466 */
467
468
469
470 /*
471  *
472  * This is the central piece of code that finds a job or jobs 
473  *   actually JobIds to migrate.  It first looks to see if one
474  *   has been "manually" specified in jcr->MigrateJobId, and if
475  *   so, it returns that JobId to be run.  Otherwise, it
476  *   examines the Selection Type to see what kind of migration
477  *   we are doing (Volume, Job, Client, ...) and applies any
478  *   Selection Pattern if appropriate to obtain a list of JobIds.
479  *   Finally, it will loop over all the JobIds found, except the last
480  *   one starting a new job with MigrationJobId set to that JobId, and
481  *   finally, it returns the last JobId to the caller.
482  *
483  * Returns: false on error
484  *          true  if OK and jcr->previous_jr filled in
485  */
486 static bool get_job_to_migrate(JCR *jcr)
487 {
488    char ed1[30];
489    POOL_MEM query(PM_MESSAGE);
490    JobId_t JobId;
491    DBId_t  MediaId = 0;
492    int stat;
493    char *p;
494    idpkt ids, mid, jids;
495    db_int64_ctx ctx;
496    int64_t pool_bytes;
497    bool ok;
498    time_t ttime;
499    struct tm tm;
500    char dt[MAX_TIME_LENGTH];
501
502    ids.list = get_pool_memory(PM_MESSAGE);
503    ids.list[0] = 0;
504    ids.count = 0;
505    mid.list = get_pool_memory(PM_MESSAGE);
506    mid.list[0] = 0;
507    mid.count = 0;
508    jids.list = get_pool_memory(PM_MESSAGE);
509    jids.list[0] = 0;
510    jids.count = 0;
511
512
513    /*
514     * If MigrateJobId is set, then we migrate only that Job,
515     *  otherwise, we go through the full selection of jobs to
516     *  migrate.
517     */
518    if (jcr->MigrateJobId != 0) {
519       Dmsg1(dbglevel, "At Job start previous jobid=%u\n", jcr->MigrateJobId);
520       edit_uint64(jcr->MigrateJobId, ids.list);
521       ids.count = 1;
522    } else {
523       switch (jcr->job->selection_type) {
524       case MT_JOB:
525          if (!regex_find_jobids(jcr, &ids, sql_job, sql_jobids_from_job, "Job")) {
526             goto bail_out;
527          } 
528          break;
529       case MT_CLIENT:
530          if (!regex_find_jobids(jcr, &ids, sql_client, sql_jobids_from_client, "Client")) {
531             goto bail_out;
532          } 
533          break;
534       case MT_VOLUME:
535          if (!regex_find_jobids(jcr, &ids, sql_vol, sql_jobids_from_vol, "Volume")) {
536             goto bail_out;
537          } 
538          break;
539       case MT_SQLQUERY:
540          if (!jcr->job->selection_pattern) {
541             Jmsg(jcr, M_FATAL, 0, _("No Migration SQL selection pattern specified.\n"));
542             goto bail_out;
543          }
544          Dmsg1(dbglevel, "SQL=%s\n", jcr->job->selection_pattern);
545          if (!db_sql_query(jcr->db, jcr->job->selection_pattern,
546               dbid_handler, (void *)&ids)) {
547             Jmsg(jcr, M_FATAL, 0,
548                  _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
549             goto bail_out;
550          }
551          break;
552       case MT_SMALLEST_VOL:
553          if (!find_mediaid_then_jobids(jcr, &ids, sql_smallest_vol, "Smallest Volume")) {
554             goto bail_out;
555          }
556          break;
557       case MT_OLDEST_VOL:
558          if (!find_mediaid_then_jobids(jcr, &ids, sql_oldest_vol, "Oldest Volume")) {
559             goto bail_out;
560          }
561          break;
562
563       case MT_POOL_OCCUPANCY:
564          ctx.count = 0;
565          /* Find count of bytes in pool */
566          Mmsg(query, sql_pool_bytes, jcr->pool->hdr.name);
567          if (!db_sql_query(jcr->db, query.c_str(), db_int64_handler, (void *)&ctx)) {
568             Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
569             goto bail_out;
570          }
571          if (ctx.count == 0) {
572             Jmsg(jcr, M_INFO, 0, _("No Volumes found to migrate.\n"));
573             goto ok_out;
574          }
575          pool_bytes = ctx.value;
576          Dmsg2(dbglevel, "highbytes=%d pool=%d\n", (int)jcr->pool->MigrationHighBytes,
577                (int)pool_bytes);
578          if (pool_bytes < (int64_t)jcr->pool->MigrationHighBytes) {
579             Jmsg(jcr, M_INFO, 0, _("No Volumes found to migrate.\n"));
580             goto ok_out;
581          }
582          Dmsg0(dbglevel, "We should do Occupation migration.\n");
583
584          ids.count = 0;
585          /* Find a list of MediaIds that could be migrated */
586          Mmsg(query, sql_mediaids, jcr->pool->hdr.name);
587 //       Dmsg1(dbglevel, "query=%s\n", query.c_str());
588          if (!db_sql_query(jcr->db, query.c_str(), dbid_handler, (void *)&ids)) {
589             Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
590             goto bail_out;
591          }
592          if (ids.count == 0) {
593             Jmsg(jcr, M_INFO, 0, _("No Volumes found to migrate.\n"));
594             goto ok_out;
595          }
596          Dmsg2(dbglevel, "Pool Occupancy ids=%d MediaIds=%s\n", ids.count, ids.list);
597
598          /*
599           * Now loop over MediaIds getting more JobIds to migrate until
600           *  we reduce the pool occupancy below the low water mark.
601           */
602          p = ids.list;
603          for (int i=0; i < (int)ids.count; i++) {
604             stat = get_next_dbid_from_list(&p, &MediaId);
605             Dmsg2(dbglevel, "get_next_dbid stat=%d MediaId=%u\n", stat, MediaId);
606             if (stat < 0) {
607                Jmsg(jcr, M_FATAL, 0, _("Invalid MediaId found.\n"));
608                goto bail_out;
609             } else if (stat == 0) {
610                break;
611             }
612             mid.count = 1;
613             Mmsg(mid.list, "%s", edit_int64(MediaId, ed1));
614             ok = find_jobids_from_mediaid_list(jcr, &mid, "Volumes");
615             if (!ok) {
616                continue;
617             }
618             if (i != 0) {
619                pm_strcat(jids.list, ",");
620             }
621             pm_strcat(jids.list, mid.list);
622             jids.count += mid.count;
623
624             /* Now get the count of bytes added */
625             ctx.count = 0;
626             /* Find count of bytes from Jobs */
627             Mmsg(query, sql_job_bytes, mid.list);
628             if (!db_sql_query(jcr->db, query.c_str(), db_int64_handler, (void *)&ctx)) {
629                Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
630                goto bail_out;
631             }
632             pool_bytes -= ctx.value;
633             Dmsg1(dbglevel, "Job bytes=%d\n", (int)ctx.value);
634             Dmsg2(dbglevel, "lowbytes=%d pool=%d\n", (int)jcr->pool->MigrationLowBytes,
635                   (int)pool_bytes);
636             if (pool_bytes <= (int64_t)jcr->pool->MigrationLowBytes) {
637                Dmsg0(dbglevel, "We should be done.\n");
638                break;
639             }
640
641          }
642          Dmsg2(dbglevel, "Pool Occupancy ids=%d JobIds=%s\n", jids.count, jids.list);
643
644          break;
645
646       case MT_POOL_TIME:
647          ttime = time(NULL) - (time_t)jcr->pool->MigrationTime;
648          (void)localtime_r(&ttime, &tm);
649          strftime(dt, sizeof(dt), "%Y-%m-%d %H:%M:%S", &tm);
650
651          ids.count = 0;
652          Mmsg(query, sql_pool_time, jcr->pool->hdr.name, dt);
653 //       Dmsg1(000, "query=%s\n", query.c_str());
654          if (!db_sql_query(jcr->db, query.c_str(), dbid_handler, (void *)&ids)) {
655             Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
656             goto bail_out;
657          }
658          if (ids.count == 0) {
659             Jmsg(jcr, M_INFO, 0, _("No Volumes found to migrate.\n"));
660             goto ok_out;
661          }
662          Dmsg2(dbglevel, "PoolTime ids=%d JobIds=%s\n", ids.count, ids.list);
663          break;
664
665       default:
666          Jmsg(jcr, M_FATAL, 0, _("Unknown Migration Selection Type.\n"));
667          goto bail_out;
668       }
669    }
670
671    /*
672     * Loop over all jobids except the last one, sending
673     *  them to start_migration_job(), which will start a job
674     *  for each of them.  For the last JobId, we handle it below.
675     */
676    p = ids.list;
677    Jmsg(jcr, M_INFO, 0, _("The following %u JobIds will be migrated: %s\n"),
678       ids.count, ids.list);
679    Dmsg2(dbglevel, "Before loop count=%d ids=%s\n", ids.count, ids.list);
680    for (int i=1; i < (int)ids.count; i++) {
681       JobId = 0;
682       stat = get_next_jobid_from_list(&p, &JobId);
683       Dmsg3(dbglevel, "get_jobid_no=%d stat=%d JobId=%u\n", i, stat, JobId);
684       jcr->MigrateJobId = JobId;
685       start_migration_job(jcr);
686       Dmsg0(dbglevel, "Back from start_migration_job\n");
687       if (stat < 0) {
688          Jmsg(jcr, M_FATAL, 0, _("Invalid JobId found.\n"));
689          goto bail_out;
690       } else if (stat == 0) {
691          Jmsg(jcr, M_INFO, 0, _("No JobIds found to migrate.\n"));
692          goto ok_out;
693       }
694    }
695    
696    /* Now get the last JobId and handle it in the current job */
697    JobId = 0;
698    stat = get_next_jobid_from_list(&p, &JobId);
699    Dmsg2(dbglevel, "Last get_next_jobid stat=%d JobId=%u\n", stat, JobId);
700    if (stat < 0) {
701       Jmsg(jcr, M_FATAL, 0, _("Invalid JobId found.\n"));
702       goto bail_out;
703    } else if (stat == 0) {
704       Jmsg(jcr, M_INFO, 0, _("No JobIds found to migrate.\n"));
705       goto ok_out;
706    }
707
708    jcr->previous_jr.JobId = JobId;
709    Dmsg1(100, "Previous jobid=%d\n", jcr->previous_jr.JobId);
710
711    if (!db_get_job_record(jcr, jcr->db, &jcr->previous_jr)) {
712       Jmsg(jcr, M_FATAL, 0, _("Could not get job record for JobId %s to migrate. ERR=%s"),
713            edit_int64(jcr->previous_jr.JobId, ed1),
714            db_strerror(jcr->db));
715       goto bail_out;
716    }
717    Jmsg(jcr, M_INFO, 0, _("Migration using JobId=%d Job=%s\n"),
718       jcr->previous_jr.JobId, jcr->previous_jr.Job);
719
720 ok_out:
721    free_pool_memory(ids.list);
722    free_pool_memory(mid.list);
723    free_pool_memory(jids.list);
724    return true;
725
726 bail_out:
727    free_pool_memory(ids.list);
728    free_pool_memory(mid.list);
729    free_pool_memory(jids.list);
730    return false;
731 }
732
733 static void start_migration_job(JCR *jcr)
734 {
735    UAContext *ua = new_ua_context(jcr);
736    char ed1[50];
737    ua->batch = true;
738    Mmsg(ua->cmd, "run %s jobid=%s", jcr->job->hdr.name, 
739         edit_uint64(jcr->MigrateJobId, ed1));
740    Dmsg1(dbglevel, "=============== Migration cmd=%s\n", ua->cmd);
741    parse_ua_args(ua);                 /* parse command */
742    int stat = run_cmd(ua, ua->cmd);
743    if (stat == 0) {
744       Jmsg(jcr, M_ERROR, 0, _("Could not start migration job.\n"));
745    } else {
746       Jmsg(jcr, M_INFO, 0, _("Migration JobId %d started.\n"), stat);
747    }
748    free_ua_context(ua);
749 }
750
751 static bool find_mediaid_then_jobids(JCR *jcr, idpkt *ids, const char *query1,
752                  const char *type) 
753 {
754    bool ok = false;
755    POOL_MEM query(PM_MESSAGE);
756
757    ids->count = 0;
758    /* Basic query for MediaId */
759    Mmsg(query, query1, jcr->pool->hdr.name);
760    if (!db_sql_query(jcr->db, query.c_str(), dbid_handler, (void *)ids)) {
761       Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
762       goto bail_out;
763    }
764    if (ids->count == 0) {
765       Jmsg(jcr, M_INFO, 0, _("No %ss found to migrate.\n"), type);
766    }
767    if (ids->count != 1) {
768       Jmsg(jcr, M_FATAL, 0, _("SQL logic error. Count should be 1 but is %d\n"), 
769          ids->count);
770       goto bail_out;
771    }
772    Dmsg1(dbglevel, "Smallest Vol Jobids=%s\n", ids->list);
773
774    ok = find_jobids_from_mediaid_list(jcr, ids, type);
775
776 bail_out:
777    return ok;
778 }
779
780 static bool find_jobids_from_mediaid_list(JCR *jcr, idpkt *ids, const char *type) 
781 {
782    bool ok = false;
783    POOL_MEM query(PM_MESSAGE);
784
785    Mmsg(query, sql_jobids_from_mediaid, ids->list);
786    ids->count = 0;
787    if (!db_sql_query(jcr->db, query.c_str(), dbid_handler, (void *)ids)) {
788       Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
789       goto bail_out;
790    }
791    if (ids->count == 0) {
792       Jmsg(jcr, M_INFO, 0, _("No %ss found to migrate.\n"), type);
793    }
794    ok = true;
795 bail_out:
796    return ok;
797 }
798
799 static bool regex_find_jobids(JCR *jcr, idpkt *ids, const char *query1,
800                  const char *query2, const char *type) 
801 {
802    dlist *item_chain;
803    uitem *item = NULL;
804    uitem *last_item = NULL;
805    regex_t preg;
806    char prbuf[500];
807    int rc;
808    bool ok = false;
809    POOL_MEM query(PM_MESSAGE);
810
811    item_chain = New(dlist(item, &item->link));
812    if (!jcr->job->selection_pattern) {
813       Jmsg(jcr, M_FATAL, 0, _("No Migration %s selection pattern specified.\n"),
814          type);
815       goto bail_out;
816    }
817    Dmsg1(dbglevel, "regex=%s\n", jcr->job->selection_pattern);
818    /* Compile regex expression */
819    rc = regcomp(&preg, jcr->job->selection_pattern, REG_EXTENDED);
820    if (rc != 0) {
821       regerror(rc, &preg, prbuf, sizeof(prbuf));
822       Jmsg(jcr, M_FATAL, 0, _("Could not compile regex pattern \"%s\" ERR=%s\n"),
823            jcr->job->selection_pattern, prbuf);
824       goto bail_out;
825    }
826    /* Basic query for names */
827    Mmsg(query, query1, jcr->pool->hdr.name);
828    Dmsg1(dbglevel, "get name query1=%s\n", query.c_str());
829    if (!db_sql_query(jcr->db, query.c_str(), unique_name_handler, 
830         (void *)item_chain)) {
831       Jmsg(jcr, M_FATAL, 0,
832            _("SQL to get %s failed. ERR=%s\n"), type, db_strerror(jcr->db));
833       goto bail_out;
834    }
835    /* Now apply the regex to the names and remove any item not matched */
836    foreach_dlist(item, item_chain) {
837       const int nmatch = 30;
838       regmatch_t pmatch[nmatch];
839       if (last_item) {
840          Dmsg1(dbglevel, "Remove item %s\n", last_item->item);
841          free(last_item->item);
842          item_chain->remove(last_item);
843       }
844       Dmsg1(dbglevel, "get name Item=%s\n", item->item);
845       rc = regexec(&preg, item->item, nmatch, pmatch,  0);
846       if (rc == 0) {
847          last_item = NULL;   /* keep this one */
848       } else {   
849          last_item = item;
850       }
851    }
852    if (last_item) {
853       free(last_item->item);
854       Dmsg1(dbglevel, "Remove item %s\n", last_item->item);
855       item_chain->remove(last_item);
856    }
857    regfree(&preg);
858    /* 
859     * At this point, we have a list of items in item_chain
860     *  that have been matched by the regex, so now we need
861     *  to look up their jobids.
862     */
863    ids->count = 0;
864    foreach_dlist(item, item_chain) {
865       Dmsg2(dbglevel, "Got %s: %s\n", type, item->item);
866       Mmsg(query, query2, item->item, jcr->pool->hdr.name);
867       Dmsg1(dbglevel, "get id from name query2=%s\n", query.c_str());
868       if (!db_sql_query(jcr->db, query.c_str(), dbid_handler, (void *)ids)) {
869          Jmsg(jcr, M_FATAL, 0,
870               _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
871          goto bail_out;
872       }
873    }
874    if (ids->count == 0) {
875       Jmsg(jcr, M_INFO, 0, _("No %ss found to migrate.\n"), type);
876    }
877    ok = true;
878 bail_out:
879    Dmsg2(dbglevel, "Count=%d Jobids=%s\n", ids->count, ids->list);
880    delete item_chain;
881    Dmsg0(dbglevel, "After delete item_chain\n");
882    return ok;
883 }
884
885
886 /*
887  * Release resources allocated during backup.
888  */
889 void migration_cleanup(JCR *jcr, int TermCode)
890 {
891    char sdt[MAX_TIME_LENGTH], edt[MAX_TIME_LENGTH];
892    char ec1[30], ec2[30], ec3[30], ec4[30], ec5[30], elapsed[50];
893    char ec6[50], ec7[50], ec8[50];
894    char term_code[100], sd_term_msg[100];
895    const char *term_msg;
896    int msg_type;
897    MEDIA_DBR mr;
898    double kbps;
899    utime_t RunTime;
900    JCR *mig_jcr = jcr->mig_jcr;
901    POOL_MEM query(PM_MESSAGE);
902
903    Dmsg2(100, "Enter migrate_cleanup %d %c\n", TermCode, TermCode);
904    dequeue_messages(jcr);             /* display any queued messages */
905    memset(&mr, 0, sizeof(mr));
906    set_jcr_job_status(jcr, TermCode);
907    update_job_end_record(jcr);        /* update database */
908
909    /* 
910     * Check if we actually did something.  
911     *  mig_jcr is jcr of the newly migrated job.
912     */
913    if (mig_jcr) {
914       mig_jcr->JobFiles = jcr->JobFiles = jcr->SDJobFiles;
915       mig_jcr->JobBytes = jcr->JobBytes = jcr->SDJobBytes;
916       mig_jcr->VolSessionId = jcr->VolSessionId;
917       mig_jcr->VolSessionTime = jcr->VolSessionTime;
918       mig_jcr->jr.RealEndTime = 0; 
919       mig_jcr->jr.PriorJobId = jcr->previous_jr.JobId;
920
921       set_jcr_job_status(mig_jcr, TermCode);
922
923   
924       update_job_end_record(mig_jcr);
925      
926       /* Update final items to set them to the previous job's values */
927       Mmsg(query, "UPDATE Job SET StartTime='%s',EndTime='%s',"
928                   "JobTDate=%s WHERE JobId=%s", 
929          jcr->previous_jr.cStartTime, jcr->previous_jr.cEndTime, 
930          edit_uint64(jcr->previous_jr.JobTDate, ec1),
931          edit_uint64(mig_jcr->jr.JobId, ec2));
932       db_sql_query(mig_jcr->db, query.c_str(), NULL, NULL);
933
934       /* Now marke the previous job as migrated */
935       Mmsg(query, "UPDATE Job SET Type='%c' WHERE JobId=%s",
936            (char)JT_MIGRATED_JOB, edit_uint64(jcr->previous_jr.JobId, ec1));
937       db_sql_query(mig_jcr->db, query.c_str(), NULL, NULL);
938
939       if (!db_get_job_record(jcr, jcr->db, &jcr->jr)) {
940          Jmsg(jcr, M_WARNING, 0, _("Error getting job record for stats: %s"),
941             db_strerror(jcr->db));
942          set_jcr_job_status(jcr, JS_ErrorTerminated);
943       }
944
945       bstrncpy(mr.VolumeName, jcr->VolumeName, sizeof(mr.VolumeName));
946       if (!db_get_media_record(jcr, jcr->db, &mr)) {
947          Jmsg(jcr, M_WARNING, 0, _("Error getting Media record for Volume \"%s\": ERR=%s"),
948             mr.VolumeName, db_strerror(jcr->db));
949          set_jcr_job_status(jcr, JS_ErrorTerminated);
950       }
951
952       update_bootstrap_file(mig_jcr);
953
954       if (!db_get_job_volume_names(mig_jcr, mig_jcr->db, mig_jcr->jr.JobId, &mig_jcr->VolumeName)) {
955          /*
956           * Note, if the job has erred, most likely it did not write any
957           *  tape, so suppress this "error" message since in that case
958           *  it is normal.  Or look at it the other way, only for a
959           *  normal exit should we complain about this error.
960           */
961          if (jcr->JobStatus == JS_Terminated && jcr->jr.JobBytes) {
962             Jmsg(jcr, M_ERROR, 0, "%s", db_strerror(mig_jcr->db));
963          }
964          mig_jcr->VolumeName[0] = 0;         /* none */
965       }
966   }
967
968    msg_type = M_INFO;                 /* by default INFO message */
969    switch (jcr->JobStatus) {
970    case JS_Terminated:
971       if (jcr->Errors || jcr->SDErrors) {
972          term_msg = _("%s OK -- with warnings");
973       } else {
974          term_msg = _("%s OK");
975       }
976       break;
977    case JS_FatalError:
978    case JS_ErrorTerminated:
979       term_msg = _("*** %s Error ***");
980       msg_type = M_ERROR;          /* Generate error message */
981       if (jcr->store_bsock) {
982          bnet_sig(jcr->store_bsock, BNET_TERMINATE);
983          if (jcr->SD_msg_chan) {
984             pthread_cancel(jcr->SD_msg_chan);
985          }
986       }
987       break;
988    case JS_Canceled:
989       term_msg = _("%s Canceled");
990       if (jcr->store_bsock) {
991          bnet_sig(jcr->store_bsock, BNET_TERMINATE);
992          if (jcr->SD_msg_chan) {
993             pthread_cancel(jcr->SD_msg_chan);
994          }
995       }
996       break;
997    default:
998       term_msg = _("Inappropriate %s term code");
999       break;
1000    }
1001    bsnprintf(term_code, sizeof(term_code), term_msg, "Migration");
1002    bstrftimes(sdt, sizeof(sdt), jcr->jr.StartTime);
1003    bstrftimes(edt, sizeof(edt), jcr->jr.EndTime);
1004    RunTime = jcr->jr.EndTime - jcr->jr.StartTime;
1005    if (RunTime <= 0) {
1006       kbps = 0;
1007    } else {
1008       kbps = (double)jcr->SDJobBytes / (1000 * RunTime);
1009    }
1010
1011
1012    jobstatus_to_ascii(jcr->SDJobStatus, sd_term_msg, sizeof(sd_term_msg));
1013
1014    Jmsg(jcr, msg_type, 0, _("Bacula %s (%s): %s\n"
1015 "  Prev Backup JobId:      %s\n"
1016 "  New Backup JobId:       %s\n"
1017 "  Migration JobId:        %s\n"
1018 "  Migration Job:          %s\n"
1019 "  Backup Level:           %s%s\n"
1020 "  Client:                 %s\n"
1021 "  FileSet:                \"%s\" %s\n"
1022 "  Pool:                   \"%s\" (From %s)\n"
1023 "  Storage:                \"%s\" (From %s)\n"
1024 "  Start time:             %s\n"
1025 "  End time:               %s\n"
1026 "  Elapsed time:           %s\n"
1027 "  Priority:               %d\n"
1028 "  SD Files Written:       %s\n"
1029 "  SD Bytes Written:       %s (%sB)\n"
1030 "  Rate:                   %.1f KB/s\n"
1031 "  Volume name(s):         %s\n"
1032 "  Volume Session Id:      %d\n"
1033 "  Volume Session Time:    %d\n"
1034 "  Last Volume Bytes:      %s (%sB)\n"
1035 "  SD Errors:              %d\n"
1036 "  SD termination status:  %s\n"
1037 "  Termination:            %s\n\n"),
1038    VERSION,
1039    LSMDATE,
1040         edt, 
1041         mig_jcr ? edit_uint64(jcr->previous_jr.JobId, ec6) : "0", 
1042         mig_jcr ? edit_uint64(mig_jcr->jr.JobId, ec7) : "0",
1043         edit_uint64(jcr->jr.JobId, ec8),
1044         jcr->jr.Job,
1045         level_to_str(jcr->JobLevel), jcr->since,
1046         jcr->client->name(),
1047         jcr->fileset->name(), jcr->FSCreateTime,
1048         jcr->pool->name(), jcr->pool_source,
1049         jcr->wstore->name(), jcr->storage_source,
1050         sdt,
1051         edt,
1052         edit_utime(RunTime, elapsed, sizeof(elapsed)),
1053         jcr->JobPriority,
1054         edit_uint64_with_commas(jcr->SDJobFiles, ec1),
1055         edit_uint64_with_commas(jcr->SDJobBytes, ec2),
1056         edit_uint64_with_suffix(jcr->SDJobBytes, ec3),
1057         (float)kbps,
1058         mig_jcr ? mig_jcr->VolumeName : "",
1059         jcr->VolSessionId,
1060         jcr->VolSessionTime,
1061         edit_uint64_with_commas(mr.VolBytes, ec4),
1062         edit_uint64_with_suffix(mr.VolBytes, ec5),
1063         jcr->SDErrors,
1064         sd_term_msg,
1065         term_code);
1066
1067    Dmsg1(100, "migrate_cleanup() mig_jcr=0x%x\n", jcr->mig_jcr);
1068    if (jcr->mig_jcr) {
1069       free_jcr(jcr->mig_jcr);
1070       jcr->mig_jcr = NULL;
1071    }
1072    Dmsg0(100, "Leave migrate_cleanup()\n");
1073 }
1074
1075 /* 
1076  * Return next DBId from comma separated list   
1077  *
1078  * Returns:
1079  *   1 if next DBId returned
1080  *   0 if no more DBIds are in list
1081  *  -1 there is an error
1082  */
1083 static int get_next_dbid_from_list(char **p, DBId_t *DBId)
1084 {
1085    char id[30];
1086    char *q = *p;
1087
1088    id[0] = 0;
1089    for (int i=0; i<(int)sizeof(id); i++) {
1090       if (*q == 0) {
1091          break;
1092       } else if (*q == ',') {
1093          q++;
1094          break;
1095       }
1096       id[i] = *q++;
1097       id[i+1] = 0;
1098    }
1099    if (id[0] == 0) {
1100       return 0;
1101    } else if (!is_a_number(id)) {
1102       return -1;                      /* error */
1103    }
1104    *p = q;
1105    *DBId = str_to_int64(id);
1106    return 1;
1107 }