]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/migrate.c
Update copyright
[bacula/bacula] / bacula / src / dird / migrate.c
1 /*
2  *
3  *   Bacula Director -- migrate.c -- responsible for doing
4  *     migration jobs.
5  *
6  *     Kern Sibbald, September MMIV
7  *
8  *  Basic tasks done here:
9  *     Open DB and create records for this job.
10  *     Open Message Channel with Storage daemon to tell him a job will be starting.
11  *     Open connection with Storage daemon and pass him commands
12  *       to do the backup.
13  *     When the Storage daemon finishes the job, update the DB.
14  *
15  *   Version $Id$
16  */
17 /*
18    Bacula® - The Network Backup Solution
19
20    Copyright (C) 2004-2006 Free Software Foundation Europe e.V.
21
22    The main author of Bacula is Kern Sibbald, with contributions from
23    many others, a complete list can be found in the file AUTHORS.
24    This program is Free Software; you can redistribute it and/or
25    modify it under the terms of version two of the GNU General Public
26    License as published by the Free Software Foundation plus additions
27    that are listed in the file LICENSE.
28
29    This program is distributed in the hope that it will be useful, but
30    WITHOUT ANY WARRANTY; without even the implied warranty of
31    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
32    General Public License for more details.
33
34    You should have received a copy of the GNU General Public License
35    along with this program; if not, write to the Free Software
36    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
37    02110-1301, USA.
38
39    Bacula® is a registered trademark of John Walker.
40    The licensor of Bacula is the Free Software Foundation Europe
41    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
42    Switzerland, email:ftf@fsfeurope.org.
43 */
44
45 #include "bacula.h"
46 #include "dird.h"
47 #include "ua.h"
48 #ifndef HAVE_REGEX_H
49 #include "lib/bregex.h"
50 #else
51 #include <regex.h>
52 #endif
53
54 static const int dbglevel = 10;
55
56 static char OKbootstrap[] = "3000 OK bootstrap\n";
57 static bool get_job_to_migrate(JCR *jcr);
58 struct idpkt;
59 static bool regex_find_jobids(JCR *jcr, idpkt *ids, const char *query1,
60                  const char *query2, const char *type);
61 static bool find_mediaid_then_jobids(JCR *jcr, idpkt *ids, const char *query1,
62                  const char *type);
63 static bool find_jobids_from_mediaid_list(JCR *jcr, idpkt *ids, const char *type);
64 static void start_migration_job(JCR *jcr);
65 static int get_next_dbid_from_list(char **p, DBId_t *DBId);
66
67 /* 
68  * Called here before the job is run to do the job
69  *   specific setup.
70  */
71 bool do_migration_init(JCR *jcr)
72 {
73    /* If we find a job or jobs to migrate it is previous_jr.JobId */
74    if (!get_job_to_migrate(jcr)) {
75       return false;
76    }
77    Dmsg1(dbglevel, "Back from get_job_to_migrate JobId=%d\n", (int)jcr->JobId);
78
79    if (jcr->previous_jr.JobId == 0) {
80       Dmsg1(dbglevel, "JobId=%d no previous JobId\n", (int)jcr->JobId);
81       Jmsg(jcr, M_INFO, 0, _("No previous Job found to migrate.\n"));
82       return true;                    /* no work */
83    }
84
85    if (!get_or_create_fileset_record(jcr)) {
86       Dmsg1(dbglevel, "JobId=%d no FileSet\n", (int)jcr->JobId);
87       Jmsg(jcr, M_FATAL, 0, _("Could not get or create the FileSet record.\n"));
88       return false;
89    }
90
91    apply_pool_overrides(jcr);
92
93    jcr->jr.PoolId = get_or_create_pool_record(jcr, jcr->pool->hdr.name);
94    if (jcr->jr.PoolId == 0) {
95       Dmsg1(dbglevel, "JobId=%d no PoolId\n", (int)jcr->JobId);
96       Jmsg(jcr, M_FATAL, 0, _("Could not get or create a Pool record.\n"));
97       return false;
98    }
99
100   /* If pool storage specified, use it instead of job storage */
101    copy_wstorage(jcr, jcr->pool->storage, _("Pool resource"));
102
103    if (jcr->wstorage->size() == 0) {
104       Jmsg(jcr, M_FATAL, 0, _("No Storage specification found in Job or Pool.\n"));
105       return false;
106    }
107
108    create_restore_bootstrap_file(jcr);
109    return true;
110 }
111
112 /*
113  * Do a Migration of a previous job
114  *
115  *  Returns:  false on failure
116  *            true  on success
117  */
118 bool do_migration(JCR *jcr)
119 {
120    POOL_DBR pr;
121    POOL *pool;
122    char ed1[100];
123    BSOCK *sd;
124    JOB *job, *prev_job;
125    JCR *mig_jcr;                   /* newly migrated job */
126
127    /* 
128     *  previous_jr refers to the job DB record of the Job that is
129     *    going to be migrated.
130     *  prev_job refers to the job resource of the Job that is
131     *    going to be migrated.
132     *  jcr is the jcr for the current "migration" job.  It is a
133     *    control job that is put in the DB as a migration job, which
134     *    means that this job migrated a previous job to a new job.
135     *    No Volume or File data is associated with this control
136     *    job.
137     *  mig_jcr refers to the newly migrated job that is run by
138     *    the current jcr.  It is a backup job that moves (migrates) the
139     *    data written for the previous_jr into the new pool.  This
140     *    job (mig_jcr) becomes the new backup job that replaces
141     *    the original backup job.
142     */
143    if (jcr->previous_jr.JobId == 0 || jcr->ExpectedFiles == 0) {
144       set_jcr_job_status(jcr, JS_Terminated);
145       Dmsg1(dbglevel, "JobId=%d expected files == 0\n", (int)jcr->JobId);
146       if (jcr->previous_jr.JobId == 0) {
147          Jmsg(jcr, M_INFO, 0, _("No previous Job found to migrate.\n"));
148       } else {
149          Jmsg(jcr, M_INFO, 0, _("Previous Job has no data to migrate.\n"));
150       }
151       migration_cleanup(jcr, jcr->JobStatus);
152       return true;                    /* no work */
153    }
154
155    Dmsg5(dbglevel, "JobId=%d: Previous: Name=%s JobId=%d Type=%c Level=%c\n",
156       (int)jcr->JobId,
157       jcr->previous_jr.Name, (int)jcr->previous_jr.JobId, 
158       jcr->previous_jr.JobType, jcr->previous_jr.JobLevel);
159
160    Dmsg5(dbglevel, "JobId=%d: Current: Name=%s JobId=%d Type=%c Level=%c\n",
161       (int)jcr->JobId,
162       jcr->jr.Name, (int)jcr->jr.JobId, 
163       jcr->jr.JobType, jcr->jr.JobLevel);
164
165    LockRes();
166    job = (JOB *)GetResWithName(R_JOB, jcr->jr.Name);
167    prev_job = (JOB *)GetResWithName(R_JOB, jcr->previous_jr.Name);
168    UnlockRes();
169    if (!job || !prev_job) {
170       return false;
171    }
172
173    /* Create a migation jcr */
174    mig_jcr = jcr->mig_jcr = new_jcr(sizeof(JCR), dird_free_jcr);
175    memcpy(&mig_jcr->previous_jr, &jcr->previous_jr, sizeof(mig_jcr->previous_jr));
176
177    /*
178     * Turn the mig_jcr into a "real" job that takes on the aspects of
179     *   the previous backup job "prev_job".
180     */
181    set_jcr_defaults(mig_jcr, prev_job);
182    if (!setup_job(mig_jcr)) {
183       return false;
184    }
185
186    /* Now reset the job record from the previous job */
187    memcpy(&mig_jcr->jr, &jcr->previous_jr, sizeof(mig_jcr->jr));
188    /* Update the jr to reflect the new values of PoolId, FileSetId, and JobId. */
189    mig_jcr->jr.PoolId = jcr->jr.PoolId;
190    mig_jcr->jr.FileSetId = jcr->jr.FileSetId;
191    mig_jcr->jr.JobId = mig_jcr->JobId;
192
193    Dmsg4(dbglevel, "mig_jcr: Name=%s JobId=%d Type=%c Level=%c\n",
194       mig_jcr->jr.Name, (int)mig_jcr->jr.JobId, 
195       mig_jcr->jr.JobType, mig_jcr->jr.JobLevel);
196
197    /*
198     * Get the PoolId used with the original job. Then
199     *  find the pool name from the database record.
200     */
201    memset(&pr, 0, sizeof(pr));
202    pr.PoolId = mig_jcr->previous_jr.PoolId;
203    if (!db_get_pool_record(jcr, jcr->db, &pr)) {
204       Jmsg(jcr, M_FATAL, 0, _("Pool for JobId %s not in database. ERR=%s\n"),
205             edit_int64(pr.PoolId, ed1), db_strerror(jcr->db));
206          return false;
207    }
208    /* Get the pool resource corresponding to the original job */
209    pool = (POOL *)GetResWithName(R_POOL, pr.Name);
210    if (!pool) {
211       Jmsg(jcr, M_FATAL, 0, _("Pool resource \"%s\" not found.\n"), pr.Name);
212       return false;
213    }
214
215    /* If pool storage specified, use it for restore */
216    copy_rstorage(mig_jcr, pool->storage, _("Pool resource"));
217    copy_rstorage(jcr, pool->storage, _("Pool resource"));
218
219    /*
220     * If the original backup pool has a NextPool, make sure a 
221     *  record exists in the database. Note, in this case, we
222     *  will be migrating from pool to pool->NextPool.
223     */
224    if (pool->NextPool) {
225       jcr->jr.PoolId = get_or_create_pool_record(jcr, pool->NextPool->hdr.name);
226       if (jcr->jr.PoolId == 0) {
227          return false;
228       }
229       /*
230        * put the "NextPool" resource pointer in our jcr so that we
231        * can pull the Storage reference from it.
232        */
233       mig_jcr->pool = jcr->pool = pool->NextPool;
234       mig_jcr->jr.PoolId = jcr->jr.PoolId;
235       pm_strcpy(jcr->pool_source, _("NextPool in Pool resource"));
236    } else {
237       Jmsg(jcr, M_FATAL, 0, _("No Next Pool specification found in Pool \"%s\".\n"),
238          pool->hdr.name);
239       return false;
240    }
241
242    if (!jcr->pool->storage) {
243       Jmsg(jcr, M_FATAL, 0, _("No Storage specification found in Next Pool \"%s\".\n"),
244          jcr->pool->hdr.name);
245       return false;
246    }
247
248    /* If pool storage specified, use it instead of job storage for backup */
249    copy_wstorage(jcr, jcr->pool->storage, _("Next pool resource"));
250
251    /* Print Job Start message */
252    Jmsg(jcr, M_INFO, 0, _("Start Migration JobId %s, Job=%s\n"),
253         edit_uint64(jcr->JobId, ed1), jcr->Job);
254
255    set_jcr_job_status(jcr, JS_Running);
256    set_jcr_job_status(mig_jcr, JS_Running);
257    Dmsg2(dbglevel, "JobId=%d JobLevel=%c\n", (int)jcr->jr.JobId, jcr->jr.JobLevel);
258
259    /* Update job start record for this migration control job */
260    if (!db_update_job_start_record(jcr, jcr->db, &jcr->jr)) {
261       Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(jcr->db));
262       return false;
263    }
264
265    Dmsg4(dbglevel, "mig_jcr: Name=%s JobId=%d Type=%c Level=%c\n",
266       mig_jcr->jr.Name, (int)mig_jcr->jr.JobId, 
267       mig_jcr->jr.JobType, mig_jcr->jr.JobLevel);
268
269    /* Update job start record for the real migration backup job */
270    if (!db_update_job_start_record(mig_jcr, mig_jcr->db, &mig_jcr->jr)) {
271       Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(mig_jcr->db));
272       return false;
273    }
274
275
276    /*
277     * Open a message channel connection with the Storage
278     * daemon. This is to let him know that our client
279     * will be contacting him for a backup  session.
280     *
281     */
282    Dmsg0(110, "Open connection with storage daemon\n");
283    set_jcr_job_status(jcr, JS_WaitSD);
284    set_jcr_job_status(mig_jcr, JS_WaitSD);
285    /*
286     * Start conversation with Storage daemon
287     */
288    if (!connect_to_storage_daemon(jcr, 10, SDConnectTimeout, 1)) {
289       return false;
290    }
291    sd = jcr->store_bsock;
292    /*
293     * Now start a job with the Storage daemon
294     */
295    Dmsg2(dbglevel, "Read store=%s, write store=%s\n", 
296       ((STORE *)jcr->rstorage->first())->name(),
297       ((STORE *)jcr->wstorage->first())->name());
298    if (((STORE *)jcr->rstorage->first())->name() == ((STORE *)jcr->wstorage->first())->name()) {
299       Jmsg(jcr, M_FATAL, 0, _("Read storage \"%s\" same as write storage.\n"),
300            ((STORE *)jcr->rstorage->first())->name());
301       return false;
302    }
303    if (!start_storage_daemon_job(jcr, jcr->rstorage, jcr->wstorage)) {
304       return false;
305    }
306    Dmsg0(150, "Storage daemon connection OK\n");
307
308    if (!send_bootstrap_file(jcr, sd) ||
309        !response(jcr, sd, OKbootstrap, "Bootstrap", DISPLAY_ERROR)) {
310       return false;
311    }
312
313    if (!bnet_fsend(sd, "run")) {
314       return false;
315    }
316
317    /*
318     * Now start a Storage daemon message thread
319     */
320    if (!start_storage_daemon_message_thread(jcr)) {
321       return false;
322    }
323
324
325    set_jcr_job_status(jcr, JS_Running);
326    set_jcr_job_status(mig_jcr, JS_Running);
327
328    /* Pickup Job termination data */
329    /* Note, the SD stores in jcr->JobFiles/ReadBytes/JobBytes/Errors */
330    wait_for_storage_daemon_termination(jcr);
331
332    set_jcr_job_status(jcr, jcr->SDJobStatus);
333    if (jcr->JobStatus != JS_Terminated) {
334       return false;
335    }
336    migration_cleanup(jcr, jcr->JobStatus);
337    if (mig_jcr) {
338       UAContext *ua = new_ua_context(jcr);
339       purge_files_from_job(ua, jcr->previous_jr.JobId);
340       free_ua_context(ua);
341    }
342    return true;
343 }
344
345 struct idpkt {
346    POOLMEM *list;
347    uint32_t count;
348 };
349
350 /* Add an item to the list if it is unique */
351 static void add_unique_id(idpkt *ids, char *item) 
352 {
353    char id[30];
354    char *q = ids->list;
355
356    /* Walk through current list to see if each item is the same as item */
357    for ( ; *q; ) {
358        id[0] = 0;
359        for (int i=0; i<(int)sizeof(id); i++) {
360           if (*q == 0) {
361              break;
362           } else if (*q == ',') {
363              q++;
364              break;
365           }
366           id[i] = *q++;
367           id[i+1] = 0;
368        }
369        if (strcmp(item, id) == 0) {
370           return;
371        }
372    }
373    /* Did not find item, so add it to list */
374    if (ids->count == 0) {
375       ids->list[0] = 0;
376    } else {
377       pm_strcat(ids->list, ",");
378    }
379    pm_strcat(ids->list, item);
380    ids->count++;
381 // Dmsg3(0, "add_uniq count=%d Ids=%p %s\n", ids->count, ids->list, ids->list);
382    return;
383 }
384
385 /*
386  * Callback handler make list of DB Ids
387  */
388 static int unique_dbid_handler(void *ctx, int num_fields, char **row)
389 {
390    idpkt *ids = (idpkt *)ctx;
391
392    add_unique_id(ids, row[0]);
393    Dmsg3(dbglevel, "dbid_hdlr count=%d Ids=%p %s\n", ids->count, ids->list, ids->list);
394    return 0;
395 }
396
397
398 struct uitem {
399    dlink link;   
400    char *item;
401 };
402
403 static int item_compare(void *item1, void *item2)
404 {
405    uitem *i1 = (uitem *)item1;
406    uitem *i2 = (uitem *)item2;
407    return strcmp(i1->item, i2->item);
408 }
409
410 static int unique_name_handler(void *ctx, int num_fields, char **row)
411 {
412    dlist *list = (dlist *)ctx;
413
414    uitem *new_item = (uitem *)malloc(sizeof(uitem));
415    uitem *item;
416    
417    memset(new_item, 0, sizeof(uitem));
418    new_item->item = bstrdup(row[0]);
419    Dmsg1(dbglevel, "Unique_name_hdlr Item=%s\n", row[0]);
420    item = (uitem *)list->binary_insert((void *)new_item, item_compare);
421    if (item != new_item) {            /* already in list */
422       free(new_item->item);
423       free((char *)new_item);
424       return 0;
425    }
426    return 0;
427 }
428
429 /* Get Job names in Pool */
430 const char *sql_job =
431    "SELECT DISTINCT Job.Name from Job,Pool"
432    " WHERE Pool.Name='%s' AND Job.PoolId=Pool.PoolId";
433
434 /* Get JobIds from regex'ed Job names */
435 const char *sql_jobids_from_job =
436    "SELECT DISTINCT Job.JobId,Job.StartTime FROM Job,Pool"
437    " WHERE Job.Name='%s' AND Pool.Name='%s' AND Job.PoolId=Pool.PoolId"
438    " ORDER by Job.StartTime";
439
440 /* Get Client names in Pool */
441 const char *sql_client =
442    "SELECT DISTINCT Client.Name from Client,Pool,Job"
443    " WHERE Pool.Name='%s' AND Job.ClientId=Client.ClientId AND"
444    " Job.PoolId=Pool.PoolId";
445
446 /* Get JobIds from regex'ed Client names */
447 const char *sql_jobids_from_client =
448    "SELECT DISTINCT Job.JobId,Job.StartTime FROM Job,Pool,Client"
449    " WHERE Client.Name='%s' AND Pool.Name='%s' AND Job.PoolId=Pool.PoolId"
450    " AND Job.ClientId=Client.ClientId "
451    " ORDER by Job.StartTime";
452
453 /* Get Volume names in Pool */
454 const char *sql_vol = 
455    "SELECT DISTINCT VolumeName FROM Media,Pool WHERE"
456    " VolStatus in ('Full','Used','Error') AND Media.Enabled=1 AND"
457    " Media.PoolId=Pool.PoolId AND Pool.Name='%s'";
458
459 /* Get JobIds from regex'ed Volume names */
460 const char *sql_jobids_from_vol =
461    "SELECT DISTINCT Job.JobId,Job.StartTime FROM Media,JobMedia,Job"
462    " WHERE Media.VolumeName='%s' AND Media.MediaId=JobMedia.MediaId"
463    " AND JobMedia.JobId=Job.JobId" 
464    " ORDER by Job.StartTime";
465
466
467 const char *sql_smallest_vol = 
468    "SELECT MediaId FROM Media,Pool WHERE"
469    " VolStatus in ('Full','Used','Error') AND Media.Enabled=1 AND"
470    " Media.PoolId=Pool.PoolId AND Pool.Name='%s'"
471    " ORDER BY VolBytes ASC LIMIT 1";
472
473 const char *sql_oldest_vol = 
474    "SELECT MediaId FROM Media,Pool WHERE"
475    " VolStatus in ('Full','Used','Error') AND Media.Enabled=1 AND"
476    " Media.PoolId=Pool.PoolId AND Pool.Name='%s'"
477    " ORDER BY LastWritten ASC LIMIT 1";
478
479 /* Get JobIds when we have selected MediaId */
480 const char *sql_jobids_from_mediaid =
481    "SELECT DISTINCT Job.JobId,Job.StartTime FROM JobMedia,Job"
482    " WHERE JobMedia.JobId=Job.JobId AND JobMedia.MediaId=%s"
483    " ORDER by Job.StartTime";
484
485 /* Get tne number of bytes in the pool */
486 const char *sql_pool_bytes =
487    "SELECT SUM(VolBytes) FROM Media,Pool WHERE"
488    " VolStatus in ('Full','Used','Error','Append') AND Media.Enabled=1 AND"
489    " Media.PoolId=Pool.PoolId AND Pool.Name='%s'";
490
491 /* Get tne number of bytes in the Jobs */
492 const char *sql_job_bytes =
493    "SELECT SUM(JobBytes) FROM Job WHERE JobId IN (%s)";
494
495
496 /* Get Media Ids in Pool */
497 const char *sql_mediaids =
498    "SELECT MediaId FROM Media,Pool WHERE"
499    " VolStatus in ('Full','Used','Error') AND Media.Enabled=1 AND"
500    " Media.PoolId=Pool.PoolId AND Pool.Name='%s' ORDER BY LastWritten ASC";
501
502 /* Get JobIds in Pool longer than specified time */
503 const char *sql_pool_time = 
504    "SELECT DISTINCT Job.JobId from Pool,Job,Media,JobMedia WHERE"
505    " Pool.Name='%s' AND Media.PoolId=Pool.PoolId AND"
506    " VolStatus in ('Full','Used','Error') AND Media.Enabled=1 AND"
507    " JobMedia.JobId=Job.JobId AND Job.PoolId=Media.PoolId"
508    " AND Job.RealEndTime<='%s'";
509
510 /*
511 * const char *sql_ujobid =
512 *   "SELECT DISTINCT Job.Job from Client,Pool,Media,Job,JobMedia "
513 *   " WHERE Media.PoolId=Pool.PoolId AND Pool.Name='%s' AND"
514 *   " JobMedia.JobId=Job.JobId AND Job.PoolId=Media.PoolId";
515 */
516
517
518
519 /*
520  *
521  * This is the central piece of code that finds a job or jobs 
522  *   actually JobIds to migrate.  It first looks to see if one
523  *   has been "manually" specified in jcr->MigrateJobId, and if
524  *   so, it returns that JobId to be run.  Otherwise, it
525  *   examines the Selection Type to see what kind of migration
526  *   we are doing (Volume, Job, Client, ...) and applies any
527  *   Selection Pattern if appropriate to obtain a list of JobIds.
528  *   Finally, it will loop over all the JobIds found, except the last
529  *   one starting a new job with MigrationJobId set to that JobId, and
530  *   finally, it returns the last JobId to the caller.
531  *
532  * Returns: false on error
533  *          true  if OK and jcr->previous_jr filled in
534  */
535 static bool get_job_to_migrate(JCR *jcr)
536 {
537    char ed1[30];
538    POOL_MEM query(PM_MESSAGE);
539    JobId_t JobId;
540    DBId_t  MediaId = 0;
541    int stat;
542    char *p;
543    idpkt ids, mid, jids;
544    db_int64_ctx ctx;
545    int64_t pool_bytes;
546    bool ok;
547    time_t ttime;
548    struct tm tm;
549    char dt[MAX_TIME_LENGTH];
550
551    ids.list = get_pool_memory(PM_MESSAGE);
552    ids.list[0] = 0;
553    ids.count = 0;
554    mid.list = get_pool_memory(PM_MESSAGE);
555    mid.list[0] = 0;
556    mid.count = 0;
557    jids.list = get_pool_memory(PM_MESSAGE);
558    jids.list[0] = 0;
559    jids.count = 0;
560
561
562    /*
563     * If MigrateJobId is set, then we migrate only that Job,
564     *  otherwise, we go through the full selection of jobs to
565     *  migrate.
566     */
567    if (jcr->MigrateJobId != 0) {
568       Dmsg1(dbglevel, "At Job start previous jobid=%u\n", jcr->MigrateJobId);
569       edit_uint64(jcr->MigrateJobId, ids.list);
570       ids.count = 1;
571    } else {
572       switch (jcr->job->selection_type) {
573       case MT_JOB:
574          if (!regex_find_jobids(jcr, &ids, sql_job, sql_jobids_from_job, "Job")) {
575             goto bail_out;
576          } 
577          break;
578       case MT_CLIENT:
579          if (!regex_find_jobids(jcr, &ids, sql_client, sql_jobids_from_client, "Client")) {
580             goto bail_out;
581          } 
582          break;
583       case MT_VOLUME:
584          if (!regex_find_jobids(jcr, &ids, sql_vol, sql_jobids_from_vol, "Volume")) {
585             goto bail_out;
586          } 
587          break;
588       case MT_SQLQUERY:
589          if (!jcr->job->selection_pattern) {
590             Jmsg(jcr, M_FATAL, 0, _("No Migration SQL selection pattern specified.\n"));
591             goto bail_out;
592          }
593          Dmsg1(dbglevel, "SQL=%s\n", jcr->job->selection_pattern);
594          if (!db_sql_query(jcr->db, jcr->job->selection_pattern,
595               unique_dbid_handler, (void *)&ids)) {
596             Jmsg(jcr, M_FATAL, 0,
597                  _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
598             goto bail_out;
599          }
600          break;
601       case MT_SMALLEST_VOL:
602          if (!find_mediaid_then_jobids(jcr, &ids, sql_smallest_vol, "Smallest Volume")) {
603             goto bail_out;
604          }
605          break;
606       case MT_OLDEST_VOL:
607          if (!find_mediaid_then_jobids(jcr, &ids, sql_oldest_vol, "Oldest Volume")) {
608             goto bail_out;
609          }
610          break;
611
612       case MT_POOL_OCCUPANCY:
613          ctx.count = 0;
614          /* Find count of bytes in pool */
615          Mmsg(query, sql_pool_bytes, jcr->pool->hdr.name);
616          if (!db_sql_query(jcr->db, query.c_str(), db_int64_handler, (void *)&ctx)) {
617             Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
618             goto bail_out;
619          }
620          if (ctx.count == 0) {
621             Jmsg(jcr, M_INFO, 0, _("No Volumes found to migrate.\n"));
622             goto ok_out;
623          }
624          pool_bytes = ctx.value;
625          Dmsg2(dbglevel, "highbytes=%d pool=%d\n", (int)jcr->pool->MigrationHighBytes,
626                (int)pool_bytes);
627          if (pool_bytes < (int64_t)jcr->pool->MigrationHighBytes) {
628             Jmsg(jcr, M_INFO, 0, _("No Volumes found to migrate.\n"));
629             goto ok_out;
630          }
631          Dmsg0(dbglevel, "We should do Occupation migration.\n");
632
633          ids.count = 0;
634          /* Find a list of MediaIds that could be migrated */
635          Mmsg(query, sql_mediaids, jcr->pool->hdr.name);
636          Dmsg1(dbglevel, "query=%s\n", query.c_str());
637          if (!db_sql_query(jcr->db, query.c_str(), unique_dbid_handler, (void *)&ids)) {
638             Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
639             goto bail_out;
640          }
641          if (ids.count == 0) {
642             Jmsg(jcr, M_INFO, 0, _("No Volumes found to migrate.\n"));
643             goto ok_out;
644          }
645          Dmsg2(dbglevel, "Pool Occupancy ids=%d MediaIds=%s\n", ids.count, ids.list);
646
647          /*
648           * Now loop over MediaIds getting more JobIds to migrate until
649           *  we reduce the pool occupancy below the low water mark.
650           */
651          p = ids.list;
652          for (int i=0; i < (int)ids.count; i++) {
653             stat = get_next_dbid_from_list(&p, &MediaId);
654             Dmsg2(dbglevel, "get_next_dbid stat=%d MediaId=%u\n", stat, MediaId);
655             if (stat < 0) {
656                Jmsg(jcr, M_FATAL, 0, _("Invalid MediaId found.\n"));
657                goto bail_out;
658             } else if (stat == 0) {
659                break;
660             }
661             mid.count = 1;
662             Mmsg(mid.list, "%s", edit_int64(MediaId, ed1));
663             ok = find_jobids_from_mediaid_list(jcr, &mid, "Volumes");
664             if (!ok) {
665                continue;
666             }
667             if (i != 0) {
668                pm_strcat(jids.list, ",");
669             }
670             pm_strcat(jids.list, mid.list);
671             jids.count += mid.count;
672
673             /* Now get the count of bytes added */
674             ctx.count = 0;
675             /* Find count of bytes from Jobs */
676             Mmsg(query, sql_job_bytes, mid.list);
677             if (!db_sql_query(jcr->db, query.c_str(), db_int64_handler, (void *)&ctx)) {
678                Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
679                goto bail_out;
680             }
681             pool_bytes -= ctx.value;
682             Dmsg1(dbglevel, "Job bytes=%d\n", (int)ctx.value);
683             Dmsg2(dbglevel, "lowbytes=%d pool=%d\n", (int)jcr->pool->MigrationLowBytes,
684                   (int)pool_bytes);
685             if (pool_bytes <= (int64_t)jcr->pool->MigrationLowBytes) {
686                Dmsg0(dbglevel, "We should be done.\n");
687                break;
688             }
689
690          }
691          Dmsg2(dbglevel, "Pool Occupancy ids=%d JobIds=%s\n", jids.count, jids.list);
692
693          break;
694
695       case MT_POOL_TIME:
696          ttime = time(NULL) - (time_t)jcr->pool->MigrationTime;
697          (void)localtime_r(&ttime, &tm);
698          strftime(dt, sizeof(dt), "%Y-%m-%d %H:%M:%S", &tm);
699
700          ids.count = 0;
701          Mmsg(query, sql_pool_time, jcr->pool->hdr.name, dt);
702          Dmsg1(dbglevel, "query=%s\n", query.c_str());
703          if (!db_sql_query(jcr->db, query.c_str(), unique_dbid_handler, (void *)&ids)) {
704             Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
705             goto bail_out;
706          }
707          if (ids.count == 0) {
708             Jmsg(jcr, M_INFO, 0, _("No Volumes found to migrate.\n"));
709             goto ok_out;
710          }
711          Dmsg2(dbglevel, "PoolTime ids=%d JobIds=%s\n", ids.count, ids.list);
712          break;
713
714       default:
715          Jmsg(jcr, M_FATAL, 0, _("Unknown Migration Selection Type.\n"));
716          goto bail_out;
717       }
718    }
719
720    /*
721     * Loop over all jobids except the last one, sending
722     *  them to start_migration_job(), which will start a job
723     *  for each of them.  For the last JobId, we handle it below.
724     */
725    p = ids.list;
726    if (ids.count == 0) {
727       Jmsg(jcr, M_INFO, 0, _("No JobIds found to migrate.\n"));
728       goto ok_out;
729    }
730    Jmsg(jcr, M_INFO, 0, _("The following %u JobId%s will be migrated: %s\n"),
731       ids.count, ids.count==0?"":"s", ids.list);
732    Dmsg2(dbglevel, "Before loop count=%d ids=%s\n", ids.count, ids.list);
733    for (int i=1; i < (int)ids.count; i++) {
734       JobId = 0;
735       stat = get_next_jobid_from_list(&p, &JobId);
736       Dmsg3(dbglevel, "get_jobid_no=%d stat=%d JobId=%u\n", i, stat, JobId);
737       jcr->MigrateJobId = JobId;
738       start_migration_job(jcr);
739       Dmsg0(dbglevel, "Back from start_migration_job\n");
740       if (stat < 0) {
741          Jmsg(jcr, M_FATAL, 0, _("Invalid JobId found.\n"));
742          goto bail_out;
743       } else if (stat == 0) {
744          Jmsg(jcr, M_INFO, 0, _("No JobIds found to migrate.\n"));
745          goto bail_out;
746       }
747    }
748    
749    /* Now get the last JobId and handle it in the current job */
750    JobId = 0;
751    stat = get_next_jobid_from_list(&p, &JobId);
752    Dmsg2(dbglevel, "Last get_next_jobid stat=%d JobId=%u\n", stat, (int)JobId);
753    if (stat < 0) {
754       Jmsg(jcr, M_FATAL, 0, _("Invalid JobId found.\n"));
755       goto bail_out;
756    } else if (stat == 0) {
757       Jmsg(jcr, M_INFO, 0, _("No JobIds found to migrate.\n"));
758       goto bail_out;
759    }
760
761    jcr->previous_jr.JobId = JobId;
762    Dmsg1(dbglevel, "Previous jobid=%d\n", (int)jcr->previous_jr.JobId);
763
764    if (!db_get_job_record(jcr, jcr->db, &jcr->previous_jr)) {
765       Jmsg(jcr, M_FATAL, 0, _("Could not get job record for JobId %s to migrate. ERR=%s"),
766            edit_int64(jcr->previous_jr.JobId, ed1),
767            db_strerror(jcr->db));
768       goto bail_out;
769    }
770    Jmsg(jcr, M_INFO, 0, _("Migration using JobId=%s Job=%s\n"),
771       edit_int64(jcr->previous_jr.JobId, ed1), jcr->previous_jr.Job);
772    Dmsg3(dbglevel, "Migration JobId=%d  using JobId=%s Job=%s\n",
773       jcr->JobId,
774       edit_int64(jcr->previous_jr.JobId, ed1), jcr->previous_jr.Job);
775
776 ok_out:
777    ok = true;
778    goto out;
779
780 bail_out:
781    jcr->MigrateJobId = 0;
782    ok = false;
783            
784 out:
785    free_pool_memory(ids.list);
786    free_pool_memory(mid.list);
787    free_pool_memory(jids.list);
788    return ok;
789 }
790
791 static void start_migration_job(JCR *jcr)
792 {
793    UAContext *ua = new_ua_context(jcr);
794    char ed1[50];
795    ua->batch = true;
796    Mmsg(ua->cmd, "run %s jobid=%s", jcr->job->hdr.name, 
797         edit_uint64(jcr->MigrateJobId, ed1));
798    Dmsg1(dbglevel, "=============== Migration cmd=%s\n", ua->cmd);
799    parse_ua_args(ua);                 /* parse command */
800    int stat = run_cmd(ua, ua->cmd);
801    if (stat == 0) {
802       Jmsg(jcr, M_ERROR, 0, _("Could not start migration job.\n"));
803    } else {
804       Jmsg(jcr, M_INFO, 0, _("Migration JobId %d started.\n"), stat);
805    }
806    free_ua_context(ua);
807 }
808
809 static bool find_mediaid_then_jobids(JCR *jcr, idpkt *ids, const char *query1,
810                  const char *type) 
811 {
812    bool ok = false;
813    POOL_MEM query(PM_MESSAGE);
814
815    ids->count = 0;
816    /* Basic query for MediaId */
817    Mmsg(query, query1, jcr->pool->hdr.name);
818    if (!db_sql_query(jcr->db, query.c_str(), unique_dbid_handler, (void *)ids)) {
819       Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
820       goto bail_out;
821    }
822    if (ids->count == 0) {
823       Jmsg(jcr, M_INFO, 0, _("No %ss found to migrate.\n"), type);
824    }
825    if (ids->count != 1) {
826       Jmsg(jcr, M_FATAL, 0, _("SQL logic error. Count should be 1 but is %d\n"), 
827          ids->count);
828       goto bail_out;
829    }
830    Dmsg1(dbglevel, "Smallest Vol Jobids=%s\n", ids->list);
831
832    ok = find_jobids_from_mediaid_list(jcr, ids, type);
833
834 bail_out:
835    return ok;
836 }
837
838 static bool find_jobids_from_mediaid_list(JCR *jcr, idpkt *ids, const char *type) 
839 {
840    bool ok = false;
841    POOL_MEM query(PM_MESSAGE);
842
843    Mmsg(query, sql_jobids_from_mediaid, ids->list);
844    ids->count = 0;
845    if (!db_sql_query(jcr->db, query.c_str(), unique_dbid_handler, (void *)ids)) {
846       Jmsg(jcr, M_FATAL, 0, _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
847       goto bail_out;
848    }
849    if (ids->count == 0) {
850       Jmsg(jcr, M_INFO, 0, _("No %ss found to migrate.\n"), type);
851    }
852    ok = true;
853 bail_out:
854    return ok;
855 }
856
857 static bool regex_find_jobids(JCR *jcr, idpkt *ids, const char *query1,
858                  const char *query2, const char *type) 
859 {
860    dlist *item_chain;
861    uitem *item = NULL;
862    uitem *last_item = NULL;
863    regex_t preg;
864    char prbuf[500];
865    int rc;
866    bool ok = false;
867    POOL_MEM query(PM_MESSAGE);
868
869    item_chain = New(dlist(item, &item->link));
870    if (!jcr->job->selection_pattern) {
871       Jmsg(jcr, M_FATAL, 0, _("No Migration %s selection pattern specified.\n"),
872          type);
873       goto bail_out;
874    }
875    Dmsg1(dbglevel, "regex=%s\n", jcr->job->selection_pattern);
876    /* Compile regex expression */
877    rc = regcomp(&preg, jcr->job->selection_pattern, REG_EXTENDED);
878    if (rc != 0) {
879       regerror(rc, &preg, prbuf, sizeof(prbuf));
880       Jmsg(jcr, M_FATAL, 0, _("Could not compile regex pattern \"%s\" ERR=%s\n"),
881            jcr->job->selection_pattern, prbuf);
882       goto bail_out;
883    }
884    /* Basic query for names */
885    Mmsg(query, query1, jcr->pool->hdr.name);
886    Dmsg1(dbglevel, "get name query1=%s\n", query.c_str());
887    if (!db_sql_query(jcr->db, query.c_str(), unique_name_handler, 
888         (void *)item_chain)) {
889       Jmsg(jcr, M_FATAL, 0,
890            _("SQL to get %s failed. ERR=%s\n"), type, db_strerror(jcr->db));
891       goto bail_out;
892    }
893    /* Now apply the regex to the names and remove any item not matched */
894    foreach_dlist(item, item_chain) {
895       const int nmatch = 30;
896       regmatch_t pmatch[nmatch];
897       if (last_item) {
898          Dmsg1(dbglevel, "Remove item %s\n", last_item->item);
899          free(last_item->item);
900          item_chain->remove(last_item);
901       }
902       Dmsg1(dbglevel, "get name Item=%s\n", item->item);
903       rc = regexec(&preg, item->item, nmatch, pmatch,  0);
904       if (rc == 0) {
905          last_item = NULL;   /* keep this one */
906       } else {   
907          last_item = item;
908       }
909    }
910    if (last_item) {
911       free(last_item->item);
912       Dmsg1(dbglevel, "Remove item %s\n", last_item->item);
913       item_chain->remove(last_item);
914    }
915    regfree(&preg);
916    /* 
917     * At this point, we have a list of items in item_chain
918     *  that have been matched by the regex, so now we need
919     *  to look up their jobids.
920     */
921    ids->count = 0;
922    foreach_dlist(item, item_chain) {
923       Dmsg2(dbglevel, "Got %s: %s\n", type, item->item);
924       Mmsg(query, query2, item->item, jcr->pool->hdr.name);
925       Dmsg1(dbglevel, "get id from name query2=%s\n", query.c_str());
926       if (!db_sql_query(jcr->db, query.c_str(), unique_dbid_handler, (void *)ids)) {
927          Jmsg(jcr, M_FATAL, 0,
928               _("SQL failed. ERR=%s\n"), db_strerror(jcr->db));
929          goto bail_out;
930       }
931    }
932    if (ids->count == 0) {
933       Jmsg(jcr, M_INFO, 0, _("No %ss found to migrate.\n"), type);
934    }
935    ok = true;
936 bail_out:
937    Dmsg2(dbglevel, "Count=%d Jobids=%s\n", ids->count, ids->list);
938    delete item_chain;
939    Dmsg0(dbglevel, "After delete item_chain\n");
940    return ok;
941 }
942
943
944 /*
945  * Release resources allocated during backup.
946  */
947 void migration_cleanup(JCR *jcr, int TermCode)
948 {
949    char sdt[MAX_TIME_LENGTH], edt[MAX_TIME_LENGTH];
950    char ec1[30], ec2[30], ec3[30], ec4[30], ec5[30], elapsed[50];
951    char ec6[50], ec7[50], ec8[50];
952    char term_code[100], sd_term_msg[100];
953    const char *term_msg;
954    int msg_type;
955    MEDIA_DBR mr;
956    double kbps;
957    utime_t RunTime;
958    JCR *mig_jcr = jcr->mig_jcr;
959    POOL_MEM query(PM_MESSAGE);
960
961    Dmsg2(100, "Enter migrate_cleanup %d %c\n", TermCode, TermCode);
962    dequeue_messages(jcr);             /* display any queued messages */
963    memset(&mr, 0, sizeof(mr));
964    set_jcr_job_status(jcr, TermCode);
965    update_job_end_record(jcr);        /* update database */
966
967    /* 
968     * Check if we actually did something.  
969     *  mig_jcr is jcr of the newly migrated job.
970     */
971    if (mig_jcr) {
972       mig_jcr->JobFiles = jcr->JobFiles = jcr->SDJobFiles;
973       mig_jcr->JobBytes = jcr->JobBytes = jcr->SDJobBytes;
974       mig_jcr->VolSessionId = jcr->VolSessionId;
975       mig_jcr->VolSessionTime = jcr->VolSessionTime;
976       mig_jcr->jr.RealEndTime = 0; 
977       mig_jcr->jr.PriorJobId = jcr->previous_jr.JobId;
978
979       set_jcr_job_status(mig_jcr, TermCode);
980
981   
982       update_job_end_record(mig_jcr);
983      
984       /* Update final items to set them to the previous job's values */
985       Mmsg(query, "UPDATE Job SET StartTime='%s',EndTime='%s',"
986                   "JobTDate=%s WHERE JobId=%s", 
987          jcr->previous_jr.cStartTime, jcr->previous_jr.cEndTime, 
988          edit_uint64(jcr->previous_jr.JobTDate, ec1),
989          edit_uint64(mig_jcr->jr.JobId, ec2));
990       db_sql_query(mig_jcr->db, query.c_str(), NULL, NULL);
991
992       /* Now marke the previous job as migrated */
993       Mmsg(query, "UPDATE Job SET Type='%c' WHERE JobId=%s",
994            (char)JT_MIGRATED_JOB, edit_uint64(jcr->previous_jr.JobId, ec1));
995       db_sql_query(mig_jcr->db, query.c_str(), NULL, NULL);
996
997       if (!db_get_job_record(jcr, jcr->db, &jcr->jr)) {
998          Jmsg(jcr, M_WARNING, 0, _("Error getting job record for stats: %s"),
999             db_strerror(jcr->db));
1000          set_jcr_job_status(jcr, JS_ErrorTerminated);
1001       }
1002
1003       bstrncpy(mr.VolumeName, jcr->VolumeName, sizeof(mr.VolumeName));
1004       if (!db_get_media_record(jcr, jcr->db, &mr)) {
1005          Jmsg(jcr, M_WARNING, 0, _("Error getting Media record for Volume \"%s\": ERR=%s"),
1006             mr.VolumeName, db_strerror(jcr->db));
1007          set_jcr_job_status(jcr, JS_ErrorTerminated);
1008       }
1009
1010       update_bootstrap_file(mig_jcr);
1011
1012       if (!db_get_job_volume_names(mig_jcr, mig_jcr->db, mig_jcr->jr.JobId, &mig_jcr->VolumeName)) {
1013          /*
1014           * Note, if the job has erred, most likely it did not write any
1015           *  tape, so suppress this "error" message since in that case
1016           *  it is normal.  Or look at it the other way, only for a
1017           *  normal exit should we complain about this error.
1018           */
1019          if (jcr->JobStatus == JS_Terminated && jcr->jr.JobBytes) {
1020             Jmsg(jcr, M_ERROR, 0, "%s", db_strerror(mig_jcr->db));
1021          }
1022          mig_jcr->VolumeName[0] = 0;         /* none */
1023       }
1024   }
1025
1026    msg_type = M_INFO;                 /* by default INFO message */
1027    switch (jcr->JobStatus) {
1028    case JS_Terminated:
1029       if (jcr->Errors || jcr->SDErrors) {
1030          term_msg = _("%s OK -- with warnings");
1031       } else {
1032          term_msg = _("%s OK");
1033       }
1034       break;
1035    case JS_FatalError:
1036    case JS_ErrorTerminated:
1037       term_msg = _("*** %s Error ***");
1038       msg_type = M_ERROR;          /* Generate error message */
1039       if (jcr->store_bsock) {
1040          bnet_sig(jcr->store_bsock, BNET_TERMINATE);
1041          if (jcr->SD_msg_chan) {
1042             pthread_cancel(jcr->SD_msg_chan);
1043          }
1044       }
1045       break;
1046    case JS_Canceled:
1047       term_msg = _("%s Canceled");
1048       if (jcr->store_bsock) {
1049          bnet_sig(jcr->store_bsock, BNET_TERMINATE);
1050          if (jcr->SD_msg_chan) {
1051             pthread_cancel(jcr->SD_msg_chan);
1052          }
1053       }
1054       break;
1055    default:
1056       term_msg = _("Inappropriate %s term code");
1057       break;
1058    }
1059    bsnprintf(term_code, sizeof(term_code), term_msg, "Migration");
1060    bstrftimes(sdt, sizeof(sdt), jcr->jr.StartTime);
1061    bstrftimes(edt, sizeof(edt), jcr->jr.EndTime);
1062    RunTime = jcr->jr.EndTime - jcr->jr.StartTime;
1063    if (RunTime <= 0) {
1064       kbps = 0;
1065    } else {
1066       kbps = (double)jcr->SDJobBytes / (1000 * RunTime);
1067    }
1068
1069
1070    jobstatus_to_ascii(jcr->SDJobStatus, sd_term_msg, sizeof(sd_term_msg));
1071
1072    Jmsg(jcr, msg_type, 0, _("Bacula %s (%s): %s\n"
1073 "  Prev Backup JobId:      %s\n"
1074 "  New Backup JobId:       %s\n"
1075 "  Migration JobId:        %s\n"
1076 "  Migration Job:          %s\n"
1077 "  Backup Level:           %s%s\n"
1078 "  Client:                 %s\n"
1079 "  FileSet:                \"%s\" %s\n"
1080 "  Pool:                   \"%s\" (From %s)\n"
1081 "  Storage:                \"%s\" (From %s)\n"
1082 "  Start time:             %s\n"
1083 "  End time:               %s\n"
1084 "  Elapsed time:           %s\n"
1085 "  Priority:               %d\n"
1086 "  SD Files Written:       %s\n"
1087 "  SD Bytes Written:       %s (%sB)\n"
1088 "  Rate:                   %.1f KB/s\n"
1089 "  Volume name(s):         %s\n"
1090 "  Volume Session Id:      %d\n"
1091 "  Volume Session Time:    %d\n"
1092 "  Last Volume Bytes:      %s (%sB)\n"
1093 "  SD Errors:              %d\n"
1094 "  SD termination status:  %s\n"
1095 "  Termination:            %s\n\n"),
1096    VERSION,
1097    LSMDATE,
1098         edt, 
1099         mig_jcr ? edit_uint64(jcr->previous_jr.JobId, ec6) : "0", 
1100         mig_jcr ? edit_uint64(mig_jcr->jr.JobId, ec7) : "0",
1101         edit_uint64(jcr->jr.JobId, ec8),
1102         jcr->jr.Job,
1103         level_to_str(jcr->JobLevel), jcr->since,
1104         jcr->client->name(),
1105         jcr->fileset->name(), jcr->FSCreateTime,
1106         jcr->pool->name(), jcr->pool_source,
1107         jcr->wstore->name(), jcr->storage_source,
1108         sdt,
1109         edt,
1110         edit_utime(RunTime, elapsed, sizeof(elapsed)),
1111         jcr->JobPriority,
1112         edit_uint64_with_commas(jcr->SDJobFiles, ec1),
1113         edit_uint64_with_commas(jcr->SDJobBytes, ec2),
1114         edit_uint64_with_suffix(jcr->SDJobBytes, ec3),
1115         (float)kbps,
1116         mig_jcr ? mig_jcr->VolumeName : "",
1117         jcr->VolSessionId,
1118         jcr->VolSessionTime,
1119         edit_uint64_with_commas(mr.VolBytes, ec4),
1120         edit_uint64_with_suffix(mr.VolBytes, ec5),
1121         jcr->SDErrors,
1122         sd_term_msg,
1123         term_code);
1124
1125    Dmsg1(100, "migrate_cleanup() mig_jcr=0x%x\n", jcr->mig_jcr);
1126    if (jcr->mig_jcr) {
1127       free_jcr(jcr->mig_jcr);
1128       jcr->mig_jcr = NULL;
1129    }
1130    Dmsg0(100, "Leave migrate_cleanup()\n");
1131 }
1132
1133 /* 
1134  * Return next DBId from comma separated list   
1135  *
1136  * Returns:
1137  *   1 if next DBId returned
1138  *   0 if no more DBIds are in list
1139  *  -1 there is an error
1140  */
1141 static int get_next_dbid_from_list(char **p, DBId_t *DBId)
1142 {
1143    char id[30];
1144    char *q = *p;
1145
1146    id[0] = 0;
1147    for (int i=0; i<(int)sizeof(id); i++) {
1148       if (*q == 0) {
1149          break;
1150       } else if (*q == ',') {
1151          q++;
1152          break;
1153       }
1154       id[i] = *q++;
1155       id[i+1] = 0;
1156    }
1157    if (id[0] == 0) {
1158       return 0;
1159    } else if (!is_a_number(id)) {
1160       return -1;                      /* error */
1161    }
1162    *p = q;
1163    *DBId = str_to_int64(id);
1164    return 1;
1165 }