]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/migrate.c
9Nov06
[bacula/bacula] / bacula / src / dird / migrate.c
1 /*
2  *
3  *   Bacula Director -- migrate.c -- responsible for doing
4  *     migration jobs.
5  *
6  *     Kern Sibbald, September MMIV
7  *
8  *  Basic tasks done here:
9  *     Open DB and create records for this job.
10  *     Open Message Channel with Storage daemon to tell him a job will be starting.
11  *     Open connection with Storage daemon and pass him commands
12  *       to do the backup.
13  *     When the Storage daemon finishes the job, update the DB.
14  *
15  *   Version $Id$
16  */
17 /*
18    Copyright (C) 2004-2006 Kern Sibbald
19
20    This program is free software; you can redistribute it and/or
21    modify it under the terms of the GNU General Public License
22    version 2 as amended with additional clauses defined in the
23    file LICENSE in the main source directory.
24
25    This program is distributed in the hope that it will be useful,
26    but WITHOUT ANY WARRANTY; without even the implied warranty of
27    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
28    the file LICENSE for additional details.
29
30  */
31
32 #include "bacula.h"
33 #include "dird.h"
34 #include "ua.h"
35 #ifndef HAVE_REGEX_H
36 #include "lib/bregex.h"
37 #else
38 #include <regex.h>
39 #endif
40
41 static const int dbglevel = 10;
42
43 static char OKbootstrap[] = "3000 OK bootstrap\n";
44 static bool get_job_to_migrate(JCR *jcr);
45 struct idpkt;
46 static bool regex_find_jobids(JCR *jcr, idpkt *ids, const char *query1,
47                  const char *query2, const char *type);
48 static bool find_mediaid_then_jobids(JCR *jcr, idpkt *ids, const char *query1,
49                  const char *type);
50 static bool find_jobids_from_mediaid_list(JCR *jcr, idpkt *ids, const char *type);
51 static void start_migration_job(JCR *jcr);
52 static int get_next_dbid_from_list(char **p, DBId_t *DBId);
53
54 /* 
55  * Called here before the job is run to do the job
56  *   specific setup.
57  */
58 bool do_migration_init(JCR *jcr)
59 {
60    /* If we find a job or jobs to migrate it is previous_jr.JobId */
61    if (!get_job_to_migrate(jcr)) {
62       return false;
63    }
64    Dmsg1(dbglevel, "Back from get_job_to_migrate JobId=%d\n", (int)jcr->JobId);
65
66    if (jcr->previous_jr.JobId == 0) {
67       Dmsg1(dbglevel, "JobId=%d no previous JobId\n", (int)jcr->JobId);
68       return true;                    /* no work */
69    }
70
71    if (!get_or_create_fileset_record(jcr)) {
72       Dmsg1(dbglevel, "JobId=%d no FileSet\n", (int)jcr->JobId);
73       return false;
74    }
75
76    apply_pool_overrides(jcr);
77
78    jcr->jr.PoolId = get_or_create_pool_record(jcr, jcr->pool->hdr.name);
79    if (jcr->jr.PoolId == 0) {
80       Dmsg1(dbglevel, "JobId=%d no PoolId\n", (int)jcr->JobId);
81       return false;
82    }
83
84   /* If pool storage specified, use it instead of job storage */
85    copy_wstorage(jcr, jcr->pool->storage, _("Pool resource"));
86
87    if (jcr->wstorage->size() == 0) {
88       Jmsg(jcr, M_FATAL, 0, _("No Storage specification found in Job or Pool.\n"));
89       return false;
90    }
91
92    create_restore_bootstrap_file(jcr);
93    return true;
94 }
95
96 /*
97  * Do a Migration of a previous job
98  *
99  *  Returns:  false on failure
100  *            true  on success
101  */
102 bool do_migration(JCR *jcr)
103 {
104    POOL_DBR pr;
105    POOL *pool;
106    char ed1[100];
107    BSOCK *sd;
108    JOB *job, *prev_job;
109    JCR *mig_jcr;                   /* newly migrated job */
110
111    /* 
112     *  previous_jr refers to the job DB record of the Job that is
113     *    going to be migrated.
114     *  prev_job refers to the job resource of the Job that is
115     *    going to be migrated.
116     *  jcr is the jcr for the current "migration" job.  It is a
117     *    control job that is put in the DB as a migration job, which
118     *    means that this job migrated a previous job to a new job.
119     *    No Volume or File data is associated with this control
120     *    job.
121     *  mig_jcr refers to the newly migrated job that is run by
122     *    the current jcr.  It is a backup job that moves (migrates) the
123     *    data written for the previous_jr into the new pool.  This
124     *    job (mig_jcr) becomes the new backup job that replaces
125     *    the original backup job.
126     */
127    if (jcr->previous_jr.JobId == 0 || jcr->ExpectedFiles == 0) {
128       set_jcr_job_status(jcr, JS_Terminated);
129       Dmsg1(dbglevel, "JobId=%d expected files == 0\n", (int)jcr->JobId);
130       migration_cleanup(jcr, jcr->JobStatus);
131       return true;                    /* no work */
132    }
133
134    Dmsg5(dbglevel, "JobId=%d: Previous: Name=%s JobId=%d Type=%c Level=%c\n",
135       (int)jcr->JobId,
136       jcr->previous_jr.Name, (int)jcr->previous_jr.JobId, 
137       jcr->previous_jr.JobType, jcr->previous_jr.JobLevel);
138
139    Dmsg5(dbglevel, "JobId=%d: Current: Name=%s JobId=%d Type=%c Level=%c\n",
140       (int)jcr->JobId,
141       jcr->jr.Name, (int)jcr->jr.JobId, 
142       jcr->jr.JobType, jcr->jr.JobLevel);
143
144    LockRes();
145    job = (JOB *)GetResWithName(R_JOB, jcr->jr.Name);
146    prev_job = (JOB *)GetResWithName(R_JOB, jcr->previous_jr.Name);
147    UnlockRes();
148    if (!job || !prev_job) {
149       return false;
150    }
151
152    /* Create a migation jcr */
153    mig_jcr = jcr->mig_jcr = new_jcr(sizeof(JCR), dird_free_jcr);
154    memcpy(&mig_jcr->previous_jr, &jcr->previous_jr, sizeof(mig_jcr->previous_jr));
155
156    /*
157     * Turn the mig_jcr into a "real" job that takes on the aspects of
158     *   the previous backup job "prev_job".
159     */
160    set_jcr_defaults(mig_jcr, prev_job);
161    if (!setup_job(mig_jcr)) {
162       return false;
163    }
164
165    /* Now reset the job record from the previous job */
166    memcpy(&mig_jcr->jr, &jcr->previous_jr, sizeof(mig_jcr->jr));
167    /* Update the jr to reflect the new values of PoolId, FileSetId, and JobId. */
168    mig_jcr->jr.PoolId = jcr->jr.PoolId;
169    mig_jcr->jr.FileSetId = jcr->jr.FileSetId;
170    mig_jcr->jr.JobId = mig_jcr->JobId;
171
172    Dmsg4(dbglevel, "mig_jcr: Name=%s JobId=%d Type=%c Level=%c\n",
173       mig_jcr->jr.Name, (int)mig_jcr->jr.JobId, 
174       mig_jcr->jr.JobType, mig_jcr->jr.JobLevel);
175
176    /*
177     * Get the PoolId used with the original job. Then
178     *  find the pool name from the database record.
179     */
180    memset(&pr, 0, sizeof(pr));
181    pr.PoolId = mig_jcr->previous_jr.PoolId;
182    if (!db_get_pool_record(jcr, jcr->db, &pr)) {
183       Jmsg(jcr, M_FATAL, 0, _("Pool for JobId %s not in database. ERR=%s\n"),
184             edit_int64(pr.PoolId, ed1), db_strerror(jcr->db));
185          return false;
186    }
187    /* Get the pool resource corresponding to the original job */
188    pool = (POOL *)GetResWithName(R_POOL, pr.Name);
189    if (!pool) {
190       Jmsg(jcr, M_FATAL, 0, _("Pool resource \"%s\" not found.\n"), pr.Name);
191       return false;
192    }
193
194    /* If pool storage specified, use it for restore */
195    copy_rstorage(mig_jcr, pool->storage, _("Pool resource"));
196    copy_rstorage(jcr, pool->storage, _("Pool resource"));
197
198    /*
199     * If the original backup pool has a NextPool, make sure a 
200     *  record exists in the database. Note, in this case, we
201     *  will be migrating from pool to pool->NextPool.
202     */
203    if (pool->NextPool) {
204       jcr->jr.PoolId = get_or_create_pool_record(jcr, pool->NextPool->hdr.name);
205       if (jcr->jr.PoolId == 0) {
206          return false;
207       }
208       /*
209        * put the "NextPool" resource pointer in our jcr so that we
210        * can pull the Storage reference from it.
211        */
212       mig_jcr->pool = jcr->pool = pool->NextPool;
213       mig_jcr->jr.PoolId = jcr->jr.PoolId;
214       pm_strcpy(jcr->pool_source, _("NextPool in Pool resource"));
215    } else {
216       Jmsg(jcr, M_FATAL, 0, _("No Next Pool specification found in Pool \"%s\".\n"),
217          pool->hdr.name);
218       return false;
219    }
220
221    if (!jcr->pool->storage) {
222       Jmsg(jcr, M_FATAL, 0, _("No Storage specification found in Next Pool \"%s\".\n"),
223          jcr->pool->hdr.name);
224       return false;
225    }
226
227    /* If pool storage specified, use it instead of job storage for backup */
228    copy_wstorage(jcr, jcr->pool->storage, _("Next pool resource"));
229
230    /* Print Job Start message */
231    Jmsg(jcr, M_INFO, 0, _("Start Migration JobId %s, Job=%s\n"),
232         edit_uint64(jcr->JobId, ed1), jcr->Job);
233
234    set_jcr_job_status(jcr, JS_Running);
235    set_jcr_job_status(mig_jcr, JS_Running);
236    Dmsg2(dbglevel, "JobId=%d JobLevel=%c\n", (int)jcr->jr.JobId, jcr->jr.JobLevel);
237
238    /* Update job start record for this migration control job */
239    if (!db_update_job_start_record(jcr, jcr->db, &jcr->jr)) {
240       Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(jcr->db));
241       return false;
242    }
243
244    Dmsg4(dbglevel, "mig_jcr: Name=%s JobId=%d Type=%c Level=%c\n",
245       mig_jcr->jr.Name, (int)mig_jcr->jr.JobId, 
246       mig_jcr->jr.JobType, mig_jcr->jr.JobLevel);
247
248    /* Update job start record for the real migration backup job */
249    if (!db_update_job_start_record(mig_jcr, mig_jcr->db, &mig_jcr->jr)) {
250       Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(mig_jcr->db));
251       return false;
252    }
253
254
255    /*
256     * Open a message channel connection with the Storage
257     * daemon. This is to let him know that our client
258     * will be contacting him for a backup  session.
259     *
260     */
261    Dmsg0(110, "Open connection with storage daemon\n");
262    set_jcr_job_status(jcr, JS_WaitSD);
263    set_jcr_job_status(mig_jcr, JS_WaitSD);
264    /*
265     * Start conversation with Storage daemon
266     */
267    if (!connect_to_storage_daemon(jcr, 10, SDConnectTimeout, 1)) {
268       return false;
269    }
270    sd = jcr->store_bsock;
271    /*
272     * Now start a job with the Storage daemon
273     */
274    Dmsg2(dbglevel, "Read store=%s, write store=%s\n", 
275       ((STORE *)jcr->rstorage->first())->name(),
276       ((STORE *)jcr->wstorage->first())->name());
277    if (((STORE *)jcr->rstorage->first())->name() == ((STORE *)jcr->wstorage->first())->name()) {
278       Jmsg(jcr, M_FATAL, 0, _("Read storage \"%s\" same as write storage.\n"),
279            ((STORE *)jcr->rstorage->first())->name());
280    }
281    if (!start_storage_daemon_job(jcr, jcr->rstorage, jcr->wstorage)) {
282       return false;
283    }
284    Dmsg0(150, "Storage daemon connection OK\n");
285
286    if (!send_bootstrap_file(jcr, sd) ||
287        !response(jcr, sd, OKbootstrap, "Bootstrap", DISPLAY_ERROR)) {
288       return false;
289    }
290
291    if (!bnet_fsend(sd, "run")) {
292       return false;
293    }
294
295    /*
296     * Now start a Storage daemon message thread
297     */
298    if (!start_storage_daemon_message_thread(jcr)) {
299       return false;
300    }
301
302
303    set_jcr_job_status(jcr, JS_Running);
304    set_jcr_job_status(mig_jcr, JS_Running);
305
306    /* Pickup Job termination data */
307    /* Note, the SD stores in jcr->JobFiles/ReadBytes/JobBytes/Errors */
308    wait_for_storage_daemon_termination(jcr);
309
310    set_jcr_job_status(jcr, jcr->SDJobStatus);
311    if (jcr->JobStatus != JS_Terminated) {
312       return false;
313    }
314    migration_cleanup(jcr, jcr->JobStatus);
315    if (mig_jcr) {
316       UAContext *ua = new_ua_context(jcr);
317       purge_files_from_job(ua, jcr->previous_jr.JobId);
318       free_ua_context(ua);
319    }
320    return true;
321 }
322
323 struct idpkt {
324    POOLMEM *list;
325    uint32_t count;
326 };
327
328 /* Add an item to the list if it is unique */
329 static void add_unique_id(idpkt *ids, char *item) 
330 {
331    char id[30];
332    char *q = ids->list;
333
334    /* Walk through current list to see if each item is the same as item */
335    for ( ; *q; ) {
336        id[0] = 0;
337        for (int i=0; i<(int)sizeof(id); i++) {
338           if (*q == 0) {
339              break;
340           } else if (*q == ',') {
341              q++;
342              break;
343           }
344           id[i] = *q++;
345           id[i+1] = 0;
346        }
347        if (strcmp(item, id) == 0) {
348           return;
349        }
350    }
351    /* Did not find item, so add it to list */
352    if (ids->count == 0) {
353       ids->list[0] = 0;
354    } else {
355       pm_strcat(ids->list, ",");
356    }
357    pm_strcat(ids->list, item);
358    ids->count++;
359 // Dmsg3(0, "add_uniq count=%d Ids=%p %s\n", ids->count, ids->list, ids->list);
360    return;
361 }
362
363 /*
364  * Callback handler make list of DB Ids
365  */
366 static int unique_dbid_handler(void *ctx, int num_fields, char **row)
367 {
368    idpkt *ids = (idpkt *)ctx;
369
370    add_unique_id(ids, row[0]);
371    Dmsg3(dbglevel, "dbid_hdlr count=%d Ids=%p %s\n", ids->count, ids->list, ids->list);
372    return 0;
373 }
374
375
376 struct uitem {
377    dlink link;   
378    char *item;
379 };
380
381 static int item_compare(void *item1, void *item2)
382 {
383    uitem *i1 = (uitem *)item1;
384    uitem *i2 = (uitem *)item2;
385    return strcmp(i1->item, i2->item);
386 }
387
388 static int unique_name_handler(void *ctx, int num_fields, char **row)
389 {
390    dlist *list = (dlist *)ctx;
391
392    uitem *new_item = (uitem *)malloc(sizeof(uitem));
393    uitem *item;
394    
395    memset(new_item, 0, sizeof(uitem));
396    new_item->item = bstrdup(row[0]);
397    Dmsg1(dbglevel, "Unique_name_hdlr Item=%s\n", row[0]);
398    item = (uitem *)list->binary_insert((void *)new_item, item_compare);
399    if (item != new_item) {            /* already in list */
400       free(new_item->item);
401       free((char *)new_item);
402       return 0;
403    }
404    return 0;
405 }
406
407 /* Get Job names in Pool */
408 const char *sql_job =
409    "SELECT DISTINCT Job.Name from Job,Pool"
410    " WHERE Pool.Name='%s' AND Job.PoolId=Pool.PoolId";
411
412 /* Get JobIds from regex'ed Job names */
413 const char *sql_jobids_from_job =
414    "SELECT DISTINCT Job.JobId,Job.StartTime FROM Job,Pool"
415    " WHERE Job.Name='%s' AND Pool.Name='%s' AND Job.PoolId=Pool.PoolId"
416    " ORDER by Job.StartTime";
417
418 /* Get Client names in Pool */
419 const char *sql_client =
420    "SELECT DISTINCT Client.Name from Client,Pool,Job"
421    " WHERE Pool.Name='%s' AND Job.ClientId=Client.ClientId AND"
422    " Job.PoolId=Pool.PoolId";
423
424 /* Get JobIds from regex'ed Client names */
425 const char *sql_jobids_from_client =
426    "SELECT DISTINCT Job.JobId,Job.StartTime FROM Job,Pool,Client"
427    " WHERE Client.Name='%s' AND Pool.Name='%s' AND Job.PoolId=Pool.PoolId"
428    " AND Job.ClientId=Client.ClientId "
429    " ORDER by Job.StartTime";
430
431 /* Get Volume names in Pool */
432 const char *sql_vol = 
433    "SELECT DISTINCT VolumeName FROM Media,Pool WHERE"
434    " VolStatus in ('Full','Used','Error') AND Media.Enabled=1 AND"
435    " Media.PoolId=Pool.PoolId AND Pool.Name='%s'";
436
437 /* Get JobIds from regex'ed Volume names */
438 const char *sql_jobids_from_vol =
439    "SELECT DISTINCT Job.JobId,Job.StartTime FROM Media,JobMedia,Job"
440    " WHERE Media.VolumeName='%s' AND Media.MediaId=JobMedia.MediaId"
441    " AND JobMedia.JobId=Job.JobId" 
442    " ORDER by Job.StartTime";
443
444
445 const char *sql_smallest_vol = 
446    "SELECT MediaId FROM Media,Pool WHERE"
447    " VolStatus in ('Full','Used','Error') AND Media.Enabled=1 AND"
448    " Media.PoolId=Pool.PoolId AND Pool.Name='%s'"
449    " ORDER BY VolBytes ASC LIMIT 1";
450
451 const char *sql_oldest_vol = 
452    "SELECT MediaId FROM Media,Pool WHERE"
453    " VolStatus in ('Full','Used','Error') AND Media.Enabled=1 AND"
454    " Media.PoolId=Pool.PoolId AND Pool.Name='%s'"
455    " ORDER BY LastWritten ASC LIMIT 1";
456
457 /* Get JobIds when we have selected MediaId */
458 const char *sql_jobids_from_mediaid =
459    "SELECT DISTINCT Job.JobId,Job.StartTime FROM JobMedia,Job"
460    " WHERE JobMedia.JobId=Job.JobId AND JobMedia.MediaId=%s"
461    " ORDER by Job.StartTime";
462
463 /* Get tne number of bytes in the pool */
464 const char *sql_pool_bytes =
465    "SELECT SUM(VolBytes) FROM Media,Pool WHERE"
466    " VolStatus in ('Full','Used','Error','Append') AND Media.Enabled=1 AND"
467    " Media.PoolId=Pool.PoolId AND Pool.Name='%s'";
468
469 /* Get tne number of bytes in the Jobs */
470 const char *sql_job_bytes =
471    "SELECT SUM(JobBytes) FROM Job WHERE JobId IN (%s)";
472
473
474 /* Get Media Ids in Pool */
475 const char *sql_mediaids =
476    "SELECT MediaId FROM Media,Pool WHERE"
477    " VolStatus in ('Full','Used','Error') AND Media.Enabled=1 AND"
478    " Media.PoolId=Pool.PoolId AND Pool.Name='%s' ORDER BY LastWritten ASC";
479
480 /* Get JobIds in Pool longer than specified time */
481 const char *sql_pool_time = 
482    "SELECT DISTINCT Job.JobId from Pool,Job,Media,JobMedia WHERE"
483    " Pool.Name='%s' AND Media.PoolId=Pool.PoolId AND"
484    " VolStatus in ('Full','Used','Error') AND Media.Enabled=1 AND"
485    " JobMedia.JobId=Job.JobId AND Job.PoolId=Media.PoolId"
486    " AND Job.RealEndTime<='%s'";
487
488 /*
489 * const char *sql_ujobid =
490 *   "SELECT DISTINCT Job.Job from Client,Pool,Media,Job,JobMedia "
491 *   " WHERE Media.PoolId=Pool.PoolId AND Pool.Name='%s' AND"
492 *   " JobMedia.JobId=Job.JobId AND Job.PoolId=Media.PoolId";
493 */
494
495
496
497 /*
498  *
499  * This is the central piece of code that finds a job or jobs 
500  *   actually JobIds to migrate.  It first looks to see if one
501  *   has been "manually" specified in jcr->MigrateJobId, and if
502  *   so, it returns that JobId to be run.  Otherwise, it
503  *   examines the Selection Type to see what kind of migration
504  *   we are doing (Volume, Job, Client, ...) and applies any
505  *   Selection Pattern if appropriate to obtain a list of JobIds.
506  *   Finally, it will loop over all the JobIds found, except the last
507  *   one starting a new job with MigrationJobId set to that JobId, and
508  *   finally, it returns the last JobId to the caller.
509  *
510  * Returns: false on error
511  *          true  if OK and jcr->previous_jr filled in
512  */
513 static bool get_job_to_migrate(JCR *jcr)
514 {
515    char ed1[30];
516    POOL_MEM query(PM_MESSAGE);
517    JobId_t JobId;
518    DBId_t  MediaId = 0;
519    int stat;
520    char *p;
521    idpkt ids, mid, jids;
522    db_int64_ctx ctx;
523    int64_t pool_bytes;
524    bool ok;
525    time_t ttime;
526    struct tm tm;
527    char dt[MAX_TIME_LENGTH];
528
529    ids.list = get_pool_memory(PM_MESSAGE);
530    ids.list[0] = 0;
531    ids.count = 0;
532    mid.list = get_pool_memory(PM_MESSAGE);
533    mid.list[0] = 0;
534    mid.count = 0;
535    jids.list = get_pool_memory(PM_MESSAGE);
536    jids.list[0] = 0;
537    jids.count = 0;
538
539
540    /*
541     * If MigrateJobId is set, then we migrate only that Job,
542     *  otherwise, we go through the full selection of jobs to
543     *  migrate.
544     */
545    if (jcr->MigrateJobId != 0) {
546       Dmsg1(dbglevel, "At Job start previous jobid=%u\n", jcr->MigrateJobId);
547       edit_uint64(jcr->MigrateJobId, ids.list);
548       ids.count = 1;
549    } else {
550       switch (jcr->job->selection_type) {
551       case MT_JOB:
552          if (!regex_find_jobids(jcr, &ids, sql_job, sql_jobids_from_job, "Job")) {
553             goto bail_out;
554          } 
555          break;
556       case MT_CLIENT:
557          if (!regex_find_jobids(jcr, &ids, sql_client, sql_jobids_from_client, "Client")) {
558             goto bail_out;
559          } 
560          break;
561       case MT_VOLUME:
562          if (!regex_find_jobids(jcr, &ids, sql_vol, sql_jobids_from_vol, "Volume")) {
563             goto bail_out;
564          } 
565          break;
566       case MT_SQLQUERY:
567          if (!jcr->job->selection_pattern) {
568             Jmsg(jcr, M_FATAL, 0, _("No Migration SQL selection pattern specified.\n"));
569             goto bail_out;
570          }
571          Dmsg1(dbglevel, "SQL=%s\n", jcr->job->selection_pattern);
572          if (!db_sql_query(jcr->db, jcr->job->selection_pattern,
573               unique_dbid_handler, (void *)&ids)) {
574             Jmsg(jcr, M_FATAL, 0,
575                  _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
576             goto bail_out;
577          }
578          break;
579       case MT_SMALLEST_VOL:
580          if (!find_mediaid_then_jobids(jcr, &ids, sql_smallest_vol, "Smallest Volume")) {
581             goto bail_out;
582          }
583          break;
584       case MT_OLDEST_VOL:
585          if (!find_mediaid_then_jobids(jcr, &ids, sql_oldest_vol, "Oldest Volume")) {
586             goto bail_out;
587          }
588          break;
589
590       case MT_POOL_OCCUPANCY:
591          ctx.count = 0;
592          /* Find count of bytes in pool */
593          Mmsg(query, sql_pool_bytes, jcr->pool->hdr.name);
594          if (!db_sql_query(jcr->db, query.c_str(), db_int64_handler, (void *)&ctx)) {
595             Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
596             goto bail_out;
597          }
598          if (ctx.count == 0) {
599             Jmsg(jcr, M_INFO, 0, _("No Volumes found to migrate.\n"));
600             goto ok_out;
601          }
602          pool_bytes = ctx.value;
603          Dmsg2(dbglevel, "highbytes=%d pool=%d\n", (int)jcr->pool->MigrationHighBytes,
604                (int)pool_bytes);
605          if (pool_bytes < (int64_t)jcr->pool->MigrationHighBytes) {
606             Jmsg(jcr, M_INFO, 0, _("No Volumes found to migrate.\n"));
607             goto ok_out;
608          }
609          Dmsg0(dbglevel, "We should do Occupation migration.\n");
610
611          ids.count = 0;
612          /* Find a list of MediaIds that could be migrated */
613          Mmsg(query, sql_mediaids, jcr->pool->hdr.name);
614          Dmsg1(dbglevel, "query=%s\n", query.c_str());
615          if (!db_sql_query(jcr->db, query.c_str(), unique_dbid_handler, (void *)&ids)) {
616             Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
617             goto bail_out;
618          }
619          if (ids.count == 0) {
620             Jmsg(jcr, M_INFO, 0, _("No Volumes found to migrate.\n"));
621             goto ok_out;
622          }
623          Dmsg2(dbglevel, "Pool Occupancy ids=%d MediaIds=%s\n", ids.count, ids.list);
624
625          /*
626           * Now loop over MediaIds getting more JobIds to migrate until
627           *  we reduce the pool occupancy below the low water mark.
628           */
629          p = ids.list;
630          for (int i=0; i < (int)ids.count; i++) {
631             stat = get_next_dbid_from_list(&p, &MediaId);
632             Dmsg2(dbglevel, "get_next_dbid stat=%d MediaId=%u\n", stat, MediaId);
633             if (stat < 0) {
634                Jmsg(jcr, M_FATAL, 0, _("Invalid MediaId found.\n"));
635                goto bail_out;
636             } else if (stat == 0) {
637                break;
638             }
639             mid.count = 1;
640             Mmsg(mid.list, "%s", edit_int64(MediaId, ed1));
641             ok = find_jobids_from_mediaid_list(jcr, &mid, "Volumes");
642             if (!ok) {
643                continue;
644             }
645             if (i != 0) {
646                pm_strcat(jids.list, ",");
647             }
648             pm_strcat(jids.list, mid.list);
649             jids.count += mid.count;
650
651             /* Now get the count of bytes added */
652             ctx.count = 0;
653             /* Find count of bytes from Jobs */
654             Mmsg(query, sql_job_bytes, mid.list);
655             if (!db_sql_query(jcr->db, query.c_str(), db_int64_handler, (void *)&ctx)) {
656                Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
657                goto bail_out;
658             }
659             pool_bytes -= ctx.value;
660             Dmsg1(dbglevel, "Job bytes=%d\n", (int)ctx.value);
661             Dmsg2(dbglevel, "lowbytes=%d pool=%d\n", (int)jcr->pool->MigrationLowBytes,
662                   (int)pool_bytes);
663             if (pool_bytes <= (int64_t)jcr->pool->MigrationLowBytes) {
664                Dmsg0(dbglevel, "We should be done.\n");
665                break;
666             }
667
668          }
669          Dmsg2(dbglevel, "Pool Occupancy ids=%d JobIds=%s\n", jids.count, jids.list);
670
671          break;
672
673       case MT_POOL_TIME:
674          ttime = time(NULL) - (time_t)jcr->pool->MigrationTime;
675          (void)localtime_r(&ttime, &tm);
676          strftime(dt, sizeof(dt), "%Y-%m-%d %H:%M:%S", &tm);
677
678          ids.count = 0;
679          Mmsg(query, sql_pool_time, jcr->pool->hdr.name, dt);
680          Dmsg1(dbglevel, "query=%s\n", query.c_str());
681          if (!db_sql_query(jcr->db, query.c_str(), unique_dbid_handler, (void *)&ids)) {
682             Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
683             goto bail_out;
684          }
685          if (ids.count == 0) {
686             Jmsg(jcr, M_INFO, 0, _("No Volumes found to migrate.\n"));
687             goto ok_out;
688          }
689          Dmsg2(dbglevel, "PoolTime ids=%d JobIds=%s\n", ids.count, ids.list);
690          break;
691
692       default:
693          Jmsg(jcr, M_FATAL, 0, _("Unknown Migration Selection Type.\n"));
694          goto bail_out;
695       }
696    }
697
698    /*
699     * Loop over all jobids except the last one, sending
700     *  them to start_migration_job(), which will start a job
701     *  for each of them.  For the last JobId, we handle it below.
702     */
703    p = ids.list;
704    Jmsg(jcr, M_INFO, 0, _("The following %u JobId%s will be migrated: %s\n"),
705       ids.count, ids.count==0?"":"s", ids.list);
706    Dmsg2(dbglevel, "Before loop count=%d ids=%s\n", ids.count, ids.list);
707    for (int i=1; i < (int)ids.count; i++) {
708       JobId = 0;
709       stat = get_next_jobid_from_list(&p, &JobId);
710       Dmsg3(dbglevel, "get_jobid_no=%d stat=%d JobId=%u\n", i, stat, JobId);
711       jcr->MigrateJobId = JobId;
712       start_migration_job(jcr);
713       Dmsg0(dbglevel, "Back from start_migration_job\n");
714       if (stat < 0) {
715          Jmsg(jcr, M_FATAL, 0, _("Invalid JobId found.\n"));
716          goto bail_out;
717       } else if (stat == 0) {
718          Jmsg(jcr, M_INFO, 0, _("No JobIds found to migrate.\n"));
719          goto ok_out;
720       }
721    }
722    
723    /* Now get the last JobId and handle it in the current job */
724    JobId = 0;
725    stat = get_next_jobid_from_list(&p, &JobId);
726    Dmsg2(dbglevel, "Last get_next_jobid stat=%d JobId=%u\n", stat, (int)JobId);
727    if (stat < 0) {
728       Jmsg(jcr, M_FATAL, 0, _("Invalid JobId found.\n"));
729       goto bail_out;
730    } else if (stat == 0) {
731       Jmsg(jcr, M_INFO, 0, _("No JobIds found to migrate.\n"));
732       goto ok_out;
733    }
734
735    jcr->previous_jr.JobId = JobId;
736    Dmsg1(dbglevel, "Previous jobid=%d\n", (int)jcr->previous_jr.JobId);
737
738    if (!db_get_job_record(jcr, jcr->db, &jcr->previous_jr)) {
739       Jmsg(jcr, M_FATAL, 0, _("Could not get job record for JobId %s to migrate. ERR=%s"),
740            edit_int64(jcr->previous_jr.JobId, ed1),
741            db_strerror(jcr->db));
742       goto bail_out;
743    }
744    Jmsg(jcr, M_INFO, 0, _("Migration using JobId=%s Job=%s\n"),
745       edit_int64(jcr->previous_jr.JobId, ed1), jcr->previous_jr.Job);
746    Dmsg3(dbglevel, "Migration JobId=%d  using JobId=%s Job=%s\n",
747       jcr->JobId,
748       edit_int64(jcr->previous_jr.JobId, ed1), jcr->previous_jr.Job);
749
750 ok_out:
751    ok = true;
752    goto out;
753
754 bail_out:
755    jcr->MigrateJobId = 0;
756    ok = false;
757            
758 out:
759    free_pool_memory(ids.list);
760    free_pool_memory(mid.list);
761    free_pool_memory(jids.list);
762    return ok;
763 }
764
765 static void start_migration_job(JCR *jcr)
766 {
767    UAContext *ua = new_ua_context(jcr);
768    char ed1[50];
769    ua->batch = true;
770    Mmsg(ua->cmd, "run %s jobid=%s", jcr->job->hdr.name, 
771         edit_uint64(jcr->MigrateJobId, ed1));
772    Dmsg1(dbglevel, "=============== Migration cmd=%s\n", ua->cmd);
773    parse_ua_args(ua);                 /* parse command */
774    int stat = run_cmd(ua, ua->cmd);
775    if (stat == 0) {
776       Jmsg(jcr, M_ERROR, 0, _("Could not start migration job.\n"));
777    } else {
778       Jmsg(jcr, M_INFO, 0, _("Migration JobId %d started.\n"), stat);
779    }
780    free_ua_context(ua);
781 }
782
783 static bool find_mediaid_then_jobids(JCR *jcr, idpkt *ids, const char *query1,
784                  const char *type) 
785 {
786    bool ok = false;
787    POOL_MEM query(PM_MESSAGE);
788
789    ids->count = 0;
790    /* Basic query for MediaId */
791    Mmsg(query, query1, jcr->pool->hdr.name);
792    if (!db_sql_query(jcr->db, query.c_str(), unique_dbid_handler, (void *)ids)) {
793       Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
794       goto bail_out;
795    }
796    if (ids->count == 0) {
797       Jmsg(jcr, M_INFO, 0, _("No %ss found to migrate.\n"), type);
798    }
799    if (ids->count != 1) {
800       Jmsg(jcr, M_FATAL, 0, _("SQL logic error. Count should be 1 but is %d\n"), 
801          ids->count);
802       goto bail_out;
803    }
804    Dmsg1(dbglevel, "Smallest Vol Jobids=%s\n", ids->list);
805
806    ok = find_jobids_from_mediaid_list(jcr, ids, type);
807
808 bail_out:
809    return ok;
810 }
811
812 static bool find_jobids_from_mediaid_list(JCR *jcr, idpkt *ids, const char *type) 
813 {
814    bool ok = false;
815    POOL_MEM query(PM_MESSAGE);
816
817    Mmsg(query, sql_jobids_from_mediaid, ids->list);
818    ids->count = 0;
819    if (!db_sql_query(jcr->db, query.c_str(), unique_dbid_handler, (void *)ids)) {
820       Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
821       goto bail_out;
822    }
823    if (ids->count == 0) {
824       Jmsg(jcr, M_INFO, 0, _("No %ss found to migrate.\n"), type);
825    }
826    ok = true;
827 bail_out:
828    return ok;
829 }
830
831 static bool regex_find_jobids(JCR *jcr, idpkt *ids, const char *query1,
832                  const char *query2, const char *type) 
833 {
834    dlist *item_chain;
835    uitem *item = NULL;
836    uitem *last_item = NULL;
837    regex_t preg;
838    char prbuf[500];
839    int rc;
840    bool ok = false;
841    POOL_MEM query(PM_MESSAGE);
842
843    item_chain = New(dlist(item, &item->link));
844    if (!jcr->job->selection_pattern) {
845       Jmsg(jcr, M_FATAL, 0, _("No Migration %s selection pattern specified.\n"),
846          type);
847       goto bail_out;
848    }
849    Dmsg1(dbglevel, "regex=%s\n", jcr->job->selection_pattern);
850    /* Compile regex expression */
851    rc = regcomp(&preg, jcr->job->selection_pattern, REG_EXTENDED);
852    if (rc != 0) {
853       regerror(rc, &preg, prbuf, sizeof(prbuf));
854       Jmsg(jcr, M_FATAL, 0, _("Could not compile regex pattern \"%s\" ERR=%s\n"),
855            jcr->job->selection_pattern, prbuf);
856       goto bail_out;
857    }
858    /* Basic query for names */
859    Mmsg(query, query1, jcr->pool->hdr.name);
860    Dmsg1(dbglevel, "get name query1=%s\n", query.c_str());
861    if (!db_sql_query(jcr->db, query.c_str(), unique_name_handler, 
862         (void *)item_chain)) {
863       Jmsg(jcr, M_FATAL, 0,
864            _("SQL to get %s failed. ERR=%s\n"), type, db_strerror(jcr->db));
865       goto bail_out;
866    }
867    /* Now apply the regex to the names and remove any item not matched */
868    foreach_dlist(item, item_chain) {
869       const int nmatch = 30;
870       regmatch_t pmatch[nmatch];
871       if (last_item) {
872          Dmsg1(dbglevel, "Remove item %s\n", last_item->item);
873          free(last_item->item);
874          item_chain->remove(last_item);
875       }
876       Dmsg1(dbglevel, "get name Item=%s\n", item->item);
877       rc = regexec(&preg, item->item, nmatch, pmatch,  0);
878       if (rc == 0) {
879          last_item = NULL;   /* keep this one */
880       } else {   
881          last_item = item;
882       }
883    }
884    if (last_item) {
885       free(last_item->item);
886       Dmsg1(dbglevel, "Remove item %s\n", last_item->item);
887       item_chain->remove(last_item);
888    }
889    regfree(&preg);
890    /* 
891     * At this point, we have a list of items in item_chain
892     *  that have been matched by the regex, so now we need
893     *  to look up their jobids.
894     */
895    ids->count = 0;
896    foreach_dlist(item, item_chain) {
897       Dmsg2(dbglevel, "Got %s: %s\n", type, item->item);
898       Mmsg(query, query2, item->item, jcr->pool->hdr.name);
899       Dmsg1(dbglevel, "get id from name query2=%s\n", query.c_str());
900       if (!db_sql_query(jcr->db, query.c_str(), unique_dbid_handler, (void *)ids)) {
901          Jmsg(jcr, M_FATAL, 0,
902               _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
903          goto bail_out;
904       }
905    }
906    if (ids->count == 0) {
907       Jmsg(jcr, M_INFO, 0, _("No %ss found to migrate.\n"), type);
908    }
909    ok = true;
910 bail_out:
911    Dmsg2(dbglevel, "Count=%d Jobids=%s\n", ids->count, ids->list);
912    delete item_chain;
913    Dmsg0(dbglevel, "After delete item_chain\n");
914    return ok;
915 }
916
917
918 /*
919  * Release resources allocated during backup.
920  */
921 void migration_cleanup(JCR *jcr, int TermCode)
922 {
923    char sdt[MAX_TIME_LENGTH], edt[MAX_TIME_LENGTH];
924    char ec1[30], ec2[30], ec3[30], ec4[30], ec5[30], elapsed[50];
925    char ec6[50], ec7[50], ec8[50];
926    char term_code[100], sd_term_msg[100];
927    const char *term_msg;
928    int msg_type;
929    MEDIA_DBR mr;
930    double kbps;
931    utime_t RunTime;
932    JCR *mig_jcr = jcr->mig_jcr;
933    POOL_MEM query(PM_MESSAGE);
934
935    Dmsg2(100, "Enter migrate_cleanup %d %c\n", TermCode, TermCode);
936    dequeue_messages(jcr);             /* display any queued messages */
937    memset(&mr, 0, sizeof(mr));
938    set_jcr_job_status(jcr, TermCode);
939    update_job_end_record(jcr);        /* update database */
940
941    /* 
942     * Check if we actually did something.  
943     *  mig_jcr is jcr of the newly migrated job.
944     */
945    if (mig_jcr) {
946       mig_jcr->JobFiles = jcr->JobFiles = jcr->SDJobFiles;
947       mig_jcr->JobBytes = jcr->JobBytes = jcr->SDJobBytes;
948       mig_jcr->VolSessionId = jcr->VolSessionId;
949       mig_jcr->VolSessionTime = jcr->VolSessionTime;
950       mig_jcr->jr.RealEndTime = 0; 
951       mig_jcr->jr.PriorJobId = jcr->previous_jr.JobId;
952
953       set_jcr_job_status(mig_jcr, TermCode);
954
955   
956       update_job_end_record(mig_jcr);
957      
958       /* Update final items to set them to the previous job's values */
959       Mmsg(query, "UPDATE Job SET StartTime='%s',EndTime='%s',"
960                   "JobTDate=%s WHERE JobId=%s", 
961          jcr->previous_jr.cStartTime, jcr->previous_jr.cEndTime, 
962          edit_uint64(jcr->previous_jr.JobTDate, ec1),
963          edit_uint64(mig_jcr->jr.JobId, ec2));
964       db_sql_query(mig_jcr->db, query.c_str(), NULL, NULL);
965
966       /* Now marke the previous job as migrated */
967       Mmsg(query, "UPDATE Job SET Type='%c' WHERE JobId=%s",
968            (char)JT_MIGRATED_JOB, edit_uint64(jcr->previous_jr.JobId, ec1));
969       db_sql_query(mig_jcr->db, query.c_str(), NULL, NULL);
970
971       if (!db_get_job_record(jcr, jcr->db, &jcr->jr)) {
972          Jmsg(jcr, M_WARNING, 0, _("Error getting job record for stats: %s"),
973             db_strerror(jcr->db));
974          set_jcr_job_status(jcr, JS_ErrorTerminated);
975       }
976
977       bstrncpy(mr.VolumeName, jcr->VolumeName, sizeof(mr.VolumeName));
978       if (!db_get_media_record(jcr, jcr->db, &mr)) {
979          Jmsg(jcr, M_WARNING, 0, _("Error getting Media record for Volume \"%s\": ERR=%s"),
980             mr.VolumeName, db_strerror(jcr->db));
981          set_jcr_job_status(jcr, JS_ErrorTerminated);
982       }
983
984       update_bootstrap_file(mig_jcr);
985
986       if (!db_get_job_volume_names(mig_jcr, mig_jcr->db, mig_jcr->jr.JobId, &mig_jcr->VolumeName)) {
987          /*
988           * Note, if the job has erred, most likely it did not write any
989           *  tape, so suppress this "error" message since in that case
990           *  it is normal.  Or look at it the other way, only for a
991           *  normal exit should we complain about this error.
992           */
993          if (jcr->JobStatus == JS_Terminated && jcr->jr.JobBytes) {
994             Jmsg(jcr, M_ERROR, 0, "%s", db_strerror(mig_jcr->db));
995          }
996          mig_jcr->VolumeName[0] = 0;         /* none */
997       }
998   }
999
1000    msg_type = M_INFO;                 /* by default INFO message */
1001    switch (jcr->JobStatus) {
1002    case JS_Terminated:
1003       if (jcr->Errors || jcr->SDErrors) {
1004          term_msg = _("%s OK -- with warnings");
1005       } else {
1006          term_msg = _("%s OK");
1007       }
1008       break;
1009    case JS_FatalError:
1010    case JS_ErrorTerminated:
1011       term_msg = _("*** %s Error ***");
1012       msg_type = M_ERROR;          /* Generate error message */
1013       if (jcr->store_bsock) {
1014          bnet_sig(jcr->store_bsock, BNET_TERMINATE);
1015          if (jcr->SD_msg_chan) {
1016             pthread_cancel(jcr->SD_msg_chan);
1017          }
1018       }
1019       break;
1020    case JS_Canceled:
1021       term_msg = _("%s Canceled");
1022       if (jcr->store_bsock) {
1023          bnet_sig(jcr->store_bsock, BNET_TERMINATE);
1024          if (jcr->SD_msg_chan) {
1025             pthread_cancel(jcr->SD_msg_chan);
1026          }
1027       }
1028       break;
1029    default:
1030       term_msg = _("Inappropriate %s term code");
1031       break;
1032    }
1033    bsnprintf(term_code, sizeof(term_code), term_msg, "Migration");
1034    bstrftimes(sdt, sizeof(sdt), jcr->jr.StartTime);
1035    bstrftimes(edt, sizeof(edt), jcr->jr.EndTime);
1036    RunTime = jcr->jr.EndTime - jcr->jr.StartTime;
1037    if (RunTime <= 0) {
1038       kbps = 0;
1039    } else {
1040       kbps = (double)jcr->SDJobBytes / (1000 * RunTime);
1041    }
1042
1043
1044    jobstatus_to_ascii(jcr->SDJobStatus, sd_term_msg, sizeof(sd_term_msg));
1045
1046    Jmsg(jcr, msg_type, 0, _("Bacula %s (%s): %s\n"
1047 "  Prev Backup JobId:      %s\n"
1048 "  New Backup JobId:       %s\n"
1049 "  Migration JobId:        %s\n"
1050 "  Migration Job:          %s\n"
1051 "  Backup Level:           %s%s\n"
1052 "  Client:                 %s\n"
1053 "  FileSet:                \"%s\" %s\n"
1054 "  Pool:                   \"%s\" (From %s)\n"
1055 "  Storage:                \"%s\" (From %s)\n"
1056 "  Start time:             %s\n"
1057 "  End time:               %s\n"
1058 "  Elapsed time:           %s\n"
1059 "  Priority:               %d\n"
1060 "  SD Files Written:       %s\n"
1061 "  SD Bytes Written:       %s (%sB)\n"
1062 "  Rate:                   %.1f KB/s\n"
1063 "  Volume name(s):         %s\n"
1064 "  Volume Session Id:      %d\n"
1065 "  Volume Session Time:    %d\n"
1066 "  Last Volume Bytes:      %s (%sB)\n"
1067 "  SD Errors:              %d\n"
1068 "  SD termination status:  %s\n"
1069 "  Termination:            %s\n\n"),
1070    VERSION,
1071    LSMDATE,
1072         edt, 
1073         mig_jcr ? edit_uint64(jcr->previous_jr.JobId, ec6) : "0", 
1074         mig_jcr ? edit_uint64(mig_jcr->jr.JobId, ec7) : "0",
1075         edit_uint64(jcr->jr.JobId, ec8),
1076         jcr->jr.Job,
1077         level_to_str(jcr->JobLevel), jcr->since,
1078         jcr->client->name(),
1079         jcr->fileset->name(), jcr->FSCreateTime,
1080         jcr->pool->name(), jcr->pool_source,
1081         jcr->wstore->name(), jcr->storage_source,
1082         sdt,
1083         edt,
1084         edit_utime(RunTime, elapsed, sizeof(elapsed)),
1085         jcr->JobPriority,
1086         edit_uint64_with_commas(jcr->SDJobFiles, ec1),
1087         edit_uint64_with_commas(jcr->SDJobBytes, ec2),
1088         edit_uint64_with_suffix(jcr->SDJobBytes, ec3),
1089         (float)kbps,
1090         mig_jcr ? mig_jcr->VolumeName : "",
1091         jcr->VolSessionId,
1092         jcr->VolSessionTime,
1093         edit_uint64_with_commas(mr.VolBytes, ec4),
1094         edit_uint64_with_suffix(mr.VolBytes, ec5),
1095         jcr->SDErrors,
1096         sd_term_msg,
1097         term_code);
1098
1099    Dmsg1(100, "migrate_cleanup() mig_jcr=0x%x\n", jcr->mig_jcr);
1100    if (jcr->mig_jcr) {
1101       free_jcr(jcr->mig_jcr);
1102       jcr->mig_jcr = NULL;
1103    }
1104    Dmsg0(100, "Leave migrate_cleanup()\n");
1105 }
1106
1107 /* 
1108  * Return next DBId from comma separated list   
1109  *
1110  * Returns:
1111  *   1 if next DBId returned
1112  *   0 if no more DBIds are in list
1113  *  -1 there is an error
1114  */
1115 static int get_next_dbid_from_list(char **p, DBId_t *DBId)
1116 {
1117    char id[30];
1118    char *q = *p;
1119
1120    id[0] = 0;
1121    for (int i=0; i<(int)sizeof(id); i++) {
1122       if (*q == 0) {
1123          break;
1124       } else if (*q == ',') {
1125          q++;
1126          break;
1127       }
1128       id[i] = *q++;
1129       id[i+1] = 0;
1130    }
1131    if (id[0] == 0) {
1132       return 0;
1133    } else if (!is_a_number(id)) {
1134       return -1;                      /* error */
1135    }
1136    *p = q;
1137    *DBId = str_to_int64(id);
1138    return 1;
1139 }