From a90fe84654adf521279c9c85d214f935a4eac9e2 Mon Sep 17 00:00:00 2001 From: Kern Sibbald Date: Sun, 9 Jul 2006 13:21:16 +0000 Subject: [PATCH] Modify read_record and friends to properly deal with migration needs. - Update debug code in read_record. git-svn-id: https://bacula.svn.sourceforge.net/svnroot/bacula/trunk@3132 91ce42f0-d328-0410-95d8-f526ca767f89 --- bacula/kes-1.39 | 4 ++ bacula/src/stored/acquire.c | 1 + bacula/src/stored/block.c | 2 +- bacula/src/stored/dev.c | 2 + bacula/src/stored/mac.c | 42 +++++++++++++------ bacula/src/stored/match_bsr.c | 9 +++-- bacula/src/stored/protos.h | 2 +- bacula/src/stored/read_record.c | 72 ++++++++++++++++++++++----------- bacula/src/stored/record.h | 1 + bacula/src/version.h | 4 +- 10 files changed, 94 insertions(+), 45 deletions(-) diff --git a/bacula/kes-1.39 b/bacula/kes-1.39 index bb290973b8..33357cac44 100644 --- a/bacula/kes-1.39 +++ b/bacula/kes-1.39 @@ -2,6 +2,10 @@ Kern Sibbald General: +09Jul06 +- Modify read_record and friends to properly deal with migration + needs. +- Update debug code in read_record. 08Jul06 ======================= Warning ========================== Separate read and write storage in Jobs in the Director. This diff --git a/bacula/src/stored/acquire.c b/bacula/src/stored/acquire.c index 530ada2421..50c79ebca2 100644 --- a/bacula/src/stored/acquire.c +++ b/bacula/src/stored/acquire.c @@ -385,6 +385,7 @@ DCR *acquire_device_for_append(DCR *dcr) } goto get_out; } + Dmsg2(190, "Output pos=%u:%u\n", dcr->dev->file, dcr->dev->block_num); } dev->num_writers++; /* we are now a writer */ diff --git a/bacula/src/stored/block.c b/bacula/src/stored/block.c index a3d354da1e..6ac2bfdc2c 100644 --- a/bacula/src/stored/block.c +++ b/bacula/src/stored/block.c @@ -564,13 +564,13 @@ bool write_block_to_dev(DCR *dcr) dev->VolCatInfo.VolCatBlocks++; dev->EndBlock = dev->block_num; dev->EndFile = dev->file; - dev->block_num++; block->BlockNumber++; /* Update dcr values */ if (dev->is_tape()) { dcr->EndBlock = dev->EndBlock; dcr->EndFile = dev->EndFile; + dev->block_num++; } else { /* Save address of block just written */ uint64_t addr = dev->file_addr + wlen - 1; diff --git a/bacula/src/stored/dev.c b/bacula/src/stored/dev.c index 6373a8e39f..ffda26336a 100644 --- a/bacula/src/stored/dev.c +++ b/bacula/src/stored/dev.c @@ -937,6 +937,8 @@ bool update_pos_dev(DEVICE *dev) ok = false; } else { dev->file_addr = pos; + dev->block_num = (uint32_t)pos; + dev->file = (uint32_t)(pos >> 32); } } return ok; diff --git a/bacula/src/stored/mac.c b/bacula/src/stored/mac.c index 035df19854..4148806010 100644 --- a/bacula/src/stored/mac.c +++ b/bacula/src/stored/mac.c @@ -85,6 +85,12 @@ bool do_mac(JCR *jcr) goto bail_out; } + Dmsg2(200, "===== After acquire pos %u:%u\n", jcr->dcr->dev->file, jcr->dcr->dev->block_num); + + + set_jcr_job_status(jcr, JS_Running); + dir_send_job_status(jcr); + jcr->dcr->VolFirstIndex = jcr->dcr->VolLastIndex = 0; jcr->run_time = time(NULL); @@ -105,6 +111,7 @@ ok_out: Dmsg0(100, _("Set ok=FALSE after write_block_to_device.\n")); ok = false; } + Dmsg2(200, "Flush block to device pos %u:%u\n", dev->file, dev->block_num); } @@ -142,7 +149,7 @@ ok_out: generate_daemon_event(jcr, "JobEnd"); bnet_fsend(dir, Job_end, jcr->Job, jcr->JobStatus, jcr->JobFiles, edit_uint64(jcr->JobBytes, ec1)); - Dmsg4(400, Job_end, jcr->Job, jcr->JobStatus, jcr->JobFiles, ec1); + Dmsg4(200, Job_end, jcr->Job, jcr->JobStatus, jcr->JobFiles, ec1); bnet_sig(dir, BNET_EOD); /* send EOD to Director daemon */ @@ -156,12 +163,16 @@ ok_out: */ static bool record_cb(DCR *dcr, DEV_RECORD *rec) { - bool ok = true; JCR *jcr = dcr->jcr; + DEVICE *dev = jcr->dcr->dev; char buf1[100], buf2[100]; int32_t stream; - /* We want to write SOS_LABEL and EOS_LABEL */ + /* If label and not for us, discard it */ + if (rec->FileIndex < 0 && rec->match_stat <= 0) { + return true; + } + /* We want to write SOS_LABEL and EOS_LABEL discard all others */ switch (rec->FileIndex) { case PRE_LABEL: case VOL_LABEL: @@ -169,32 +180,37 @@ static bool record_cb(DCR *dcr, DEV_RECORD *rec) case EOM_LABEL: return true; /* don't write vol labels */ } + /* + * Modify record SessionId and SessionTime to correspond to + * output. + */ rec->VolSessionId = jcr->VolSessionId; rec->VolSessionTime = jcr->VolSessionTime; - Dmsg4(850, "before writ_rec FI=%d SessId=%d Strm=%s len=%d\n", - rec->FileIndex, rec->VolSessionId, - stream_to_ascii(buf1, rec->Stream,rec->FileIndex), - rec->data_len); + Dmsg5(200, "before write_rec JobId=%d FI=%s SessId=%d Strm=%s len=%d\n", + jcr->JobId, + FI_to_ascii(buf1, rec->FileIndex), rec->VolSessionId, + stream_to_ascii(buf1, rec->Stream,rec->FileIndex), rec->data_len); while (!write_record_to_block(jcr->dcr->block, rec)) { - Dmsg2(850, "!write_record_to_block data_len=%d rem=%d\n", rec->data_len, - rec->remainder); + Dmsg4(200, "!write_record_to_block blkpos=%u:%u len=%d rem=%d\n", + dev->file, dev->block_num, rec->data_len, rec->remainder); if (!write_block_to_device(jcr->dcr)) { - DEVICE *dev = jcr->dcr->dev; Dmsg2(90, "Got write_block_to_dev error on device %s. %s\n", dev->print_name(), dev->bstrerror()); Jmsg2(jcr, M_FATAL, 0, _("Fatal append error on device %s: ERR=%s\n"), dev->print_name(), dev->bstrerror()); return false; } + Dmsg2(200, "===== Wrote block new pos %u:%u\n", dev->file, dev->block_num); } jcr->JobBytes += rec->data_len; /* increment bytes this job */ if (rec->FileIndex > 0) { jcr->JobFiles = rec->FileIndex; } else { - return ok; /* don't send LABELs to Dir */ + return true; /* don't send LABELs to Dir */ } - Dmsg4(850, "write_record FI=%s SessId=%d Strm=%s len=%d\n", + Dmsg5(500, "wrote_record JobId=%d FI=%s SessId=%d Strm=%s len=%d\n", + jcr->JobId, FI_to_ascii(buf1, rec->FileIndex), rec->VolSessionId, stream_to_ascii(buf2, rec->Stream, rec->FileIndex), rec->data_len); @@ -217,5 +233,5 @@ static bool record_cb(DCR *dcr, DEV_RECORD *rec) } } - return ok; + return true; } diff --git a/bacula/src/stored/match_bsr.c b/bacula/src/stored/match_bsr.c index 0b38996036..d02d085c93 100755 --- a/bacula/src/stored/match_bsr.c +++ b/bacula/src/stored/match_bsr.c @@ -241,15 +241,16 @@ static BSR *find_smallest_volfile(BSR *found_bsr, BSR *bsr) } /* - * Called to tell the matcher that the end of - * the current file has been reached. + * Called after the signature record so that + * we can see if the current bsr has been + * fully processed (i.e. is done). * The bsr argument is not used, but is included * for consistency with the other match calls. * * Returns: true if we should reposition * : false otherwise. */ -bool match_set_eof(BSR *bsr, DEV_RECORD *rec) +bool is_this_bsr_done(BSR *bsr, DEV_RECORD *rec) { BSR *rbsr = rec->bsr; Dmsg1(300, "match_set %d\n", rbsr != NULL); @@ -261,7 +262,7 @@ bool match_set_eof(BSR *bsr, DEV_RECORD *rec) if (rbsr->count && rbsr->found >= rbsr->count) { rbsr->done = true; rbsr->root->reposition = true; - Dmsg2(500, "match_set_eof reposition count=%d found=%d\n", + Dmsg2(500, "is_end_this_bsr set reposition=1 count=%d found=%d\n", rbsr->count, rbsr->found); return true; } diff --git a/bacula/src/stored/protos.h b/bacula/src/stored/protos.h index 31624d617b..421afd0871 100644 --- a/bacula/src/stored/protos.h +++ b/bacula/src/stored/protos.h @@ -174,7 +174,7 @@ int match_bsr(BSR *bsr, DEV_RECORD *rec, VOLUME_LABEL *volrec, int match_bsr_block(BSR *bsr, DEV_BLOCK *block); void position_bsr_block(BSR *bsr, DEV_BLOCK *block); BSR *find_next_bsr(BSR *root_bsr, DEVICE *dev); -bool match_set_eof(BSR *bsr, DEV_RECORD *rec); +bool is_this_bsr_done(BSR *bsr, DEV_RECORD *rec); /* From mount.c */ bool mount_next_write_volume(DCR *dcr, bool release); diff --git a/bacula/src/stored/read_record.c b/bacula/src/stored/read_record.c index 36064d06fb..eeef37547c 100644 --- a/bacula/src/stored/read_record.c +++ b/bacula/src/stored/read_record.c @@ -39,6 +39,8 @@ static bool try_repositioning(JCR *jcr, DEV_RECORD *rec, DEVICE *dev); static char *rec_state_to_str(DEV_RECORD *rec); #endif +static const int dbglvl = 1000; + bool read_records(DCR *dcr, bool record_cb(DCR *dcr, DEV_RECORD *rec), bool mount_cb(DCR *dcr)) @@ -128,7 +130,7 @@ bool read_records(DCR *dcr, break; } } - Dmsg2(300, "New block at position=(file:block) %u:%u\n", dev->file, dev->block_num); + Dmsg2(dbglvl, "Read new block at pos=%u:%u\n", dev->file, dev->block_num); #ifdef if_and_when_FAST_BLOCK_REJECTION_is_working /* this does not stop when file/block are too big */ if (!match_bsr_block(jcr->bsr, block)) { @@ -154,21 +156,22 @@ bool read_records(DCR *dcr, if (!found) { rec = new_record(); recs->prepend(rec); - Dmsg2(300, "New record for SI=%d ST=%d\n", + Dmsg3(dbglvl, "New record for state=%s SI=%d ST=%d\n", + rec_state_to_str(rec), block->VolSessionId, block->VolSessionTime); } - Dmsg3(300, "After mount next vol. stat=%s blk=%d rem=%d\n", rec_state_to_str(rec), + Dmsg3(dbglvl, "Before read rec loop. stat=%s blk=%d rem=%d\n", rec_state_to_str(rec), block->BlockNumber, rec->remainder); record = 0; rec->state = 0; - Dmsg1(300, "Block empty %d\n", is_block_empty(rec)); + Dmsg1(dbglvl, "Block %s empty\n", is_block_empty(rec)?"is":"NOT"); for (rec->state=0; ok && !is_block_empty(rec); ) { if (!read_record_from_block(block, rec)) { Dmsg3(400, "!read-break. state=%s blk=%d rem=%d\n", rec_state_to_str(rec), block->BlockNumber, rec->remainder); break; } - Dmsg5(300, "read-OK. state=%s blk=%d rem=%d file:block=%u:%u\n", + Dmsg5(dbglvl, "read-OK. state=%s blk=%d rem=%d file:block=%u:%u\n", rec_state_to_str(rec), block->BlockNumber, rec->remainder, dev->file, dev->block_num); /* @@ -178,7 +181,7 @@ bool read_records(DCR *dcr, * get all the data. */ record++; - Dmsg6(300, "recno=%d state=%s blk=%d SI=%d ST=%d FI=%d\n", record, + Dmsg6(dbglvl, "recno=%d state=%s blk=%d SI=%d ST=%d FI=%d\n", record, rec_state_to_str(rec), block->BlockNumber, rec->VolSessionId, rec->VolSessionTime, rec->FileIndex); @@ -190,9 +193,25 @@ bool read_records(DCR *dcr, /* Some sort of label? */ if (rec->FileIndex < 0) { handle_session_record(dev, rec, &sessrec); + if (jcr->bsr) { + /* We just check block FI and FT not FileIndex */ + rec->match_stat = match_bsr_block(jcr->bsr, block); + } else { + rec->match_stat = 0; + } + /* + * Note, we pass *all* labels to the callback routine. If + * he wants to know if they matched the bsr, then he must + * check the match_stat in the record */ ok = record_cb(dcr, rec); + /* + * If this is the end of the Session (EOS) for this record + * we can remove the record. Note, there is a separate + * record to read each session. If a new session is seen + * a new record will be created at approx line 157 above. + */ if (rec->FileIndex == EOS_LABEL) { - Dmsg2(300, "Remove rec. SI=%d ST=%d\n", rec->VolSessionId, + Dmsg2(dbglvl, "Remove EOS rec. SI=%d ST=%d\n", rec->VolSessionId, rec->VolSessionTime); recs->remove(rec); free_record(rec); @@ -204,13 +223,13 @@ bool read_records(DCR *dcr, * Apply BSR filter */ if (jcr->bsr) { - int stat = match_bsr(jcr->bsr, rec, &dev->VolHdr, &sessrec); - if (stat == -1) { /* no more possible matches */ + rec->match_stat = match_bsr(jcr->bsr, rec, &dev->VolHdr, &sessrec); + if (rec->match_stat == -1) { /* no more possible matches */ done = true; /* all items found, stop */ - Dmsg2(300, "All done=(file:block) %u:%u\n", dev->file, dev->block_num); + Dmsg2(dbglvl, "All done=(file:block) %u:%u\n", dev->file, dev->block_num); break; - } else if (stat == 0) { /* no match */ - Dmsg4(300, "BSR no match: clear rem=%d FI=%d before set_eof pos %u:%u\n", + } else if (rec->match_stat == 0) { /* no match */ + Dmsg4(dbglvl, "BSR no match: clear rem=%d FI=%d before set_eof pos %u:%u\n", rec->remainder, rec->FileIndex, dev->file, dev->block_num); rec->remainder = 0; rec->state &= ~REC_PARTIAL_RECORD; @@ -222,26 +241,31 @@ bool read_records(DCR *dcr, } dcr->VolLastIndex = rec->FileIndex; /* let caller know where we are */ if (is_partial_record(rec)) { - Dmsg6(300, "Partial, break. recno=%d state=%s blk=%d SI=%d ST=%d FI=%d\n", record, + Dmsg6(dbglvl, "Partial, break. recno=%d state=%s blk=%d SI=%d ST=%d FI=%d\n", record, rec_state_to_str(rec), block->BlockNumber, rec->VolSessionId, rec->VolSessionTime, rec->FileIndex); break; /* read second part of record */ } ok = record_cb(dcr, rec); + /* + * If we have a digest stream, we check to see if we have + * finished the current bsr, and if so, repositioning will + * be truned on. + */ if (crypto_digest_stream_type(rec->Stream) != CRYPTO_DIGEST_NONE) { - Dmsg3(300, "Done FI=%u before set_eof pos %u:%u\n", rec->FileIndex, + Dmsg3(dbglvl, "Have digest FI=%u before bsr check pos %u:%u\n", rec->FileIndex, dev->file, dev->block_num); - if (match_set_eof(jcr->bsr, rec) && try_repositioning(jcr, rec, dev)) { - Dmsg2(300, "Break after match_set_eof pos %u:%u\n", + if (is_this_bsr_done(jcr->bsr, rec) && try_repositioning(jcr, rec, dev)) { + Dmsg2(dbglvl, "This bsr done, break pos %u:%u\n", dev->file, dev->block_num); break; } - Dmsg2(300, "After set_eof pos %u:%u\n", dev->file, dev->block_num); + Dmsg2(900, "After is_bsr_done pos %u:%u\n", dev->file, dev->block_num); } } /* end for loop over records */ - Dmsg2(300, "After end records position=(file:block) %u:%u\n", dev->file, dev->block_num); + Dmsg2(dbglvl, "After end recs in block. pos=%u:%u\n", dev->file, dev->block_num); } /* end for loop over blocks */ -// Dmsg2(300, "Position=(file:block) %u:%u\n", dev->file, dev->block_num); +// Dmsg2(dbglvl, "Position=(file:block) %u:%u\n", dev->file, dev->block_num); /* Walk down list and free all remaining allocated recs */ while (!recs->empty()) { @@ -264,8 +288,8 @@ static bool try_repositioning(JCR *jcr, DEV_RECORD *rec, DEVICE *dev) BSR *bsr; bsr = find_next_bsr(jcr->bsr, dev); if (bsr == NULL && jcr->bsr->mount_next_volume) { - Dmsg0(300, "Would mount next volume here\n"); - Dmsg2(300, "Current postion (file:block) %u:%u\n", + Dmsg0(dbglvl, "Would mount next volume here\n"); + Dmsg2(dbglvl, "Current postion (file:block) %u:%u\n", dev->file, dev->block_num); jcr->bsr->mount_next_volume = false; if (!dev->at_eot()) { @@ -282,7 +306,7 @@ static bool try_repositioning(JCR *jcr, DEV_RECORD *rec, DEVICE *dev) dev->file, dev->block_num, bsr->volfile->sfile, bsr->volblock->sblock); } - Dmsg4(300, "Try_Reposition from (file:block) %u:%u to %u:%u\n", + Dmsg4(dbglvl, "Try_Reposition from (file:block) %u:%u to %u:%u\n", dev->file, dev->block_num, bsr->volfile->sfile, bsr->volblock->sblock); dev->reposition(bsr->volfile->sfile, bsr->volblock->sblock); @@ -307,7 +331,7 @@ static BSR *position_to_first_file(JCR *jcr, DEVICE *dev) if (bsr && (bsr->volfile->sfile != 0 || bsr->volblock->sblock != 0)) { Jmsg(jcr, M_INFO, 0, _("Forward spacing to file:block %u:%u.\n"), bsr->volfile->sfile, bsr->volblock->sblock); - Dmsg2(300, "Forward spacing to file:block %u:%u.\n", + Dmsg2(dbglvl, "Forward spacing to file:block %u:%u.\n", bsr->volfile->sfile, bsr->volblock->sblock); dev->reposition(bsr->volfile->sfile, bsr->volblock->sblock); } @@ -345,7 +369,7 @@ static void handle_session_record(DEVICE *dev, DEV_RECORD *rec, SESSION_LABEL *s rtype = buf; break; } - Dmsg5(300, _("%s Record: VolSessionId=%d VolSessionTime=%d JobId=%d DataLen=%d\n"), + Dmsg5(dbglvl, _("%s Record: VolSessionId=%d VolSessionTime=%d JobId=%d DataLen=%d\n"), rtype, rec->VolSessionId, rec->VolSessionTime, rec->Stream, rec->data_len); } diff --git a/bacula/src/stored/record.h b/bacula/src/stored/record.h index 1d50c54387..eaf1f029b1 100644 --- a/bacula/src/stored/record.h +++ b/bacula/src/stored/record.h @@ -94,6 +94,7 @@ struct DEV_RECORD { BSR *bsr; /* pointer to bsr that matched */ uint8_t ser_buf[WRITE_RECHDR_LENGTH]; /* serialized record header goes here */ POOLMEM *data; /* Record data. This MUST be a memory pool item */ + int32_t match_stat; /* bsr match status */ }; diff --git a/bacula/src/version.h b/bacula/src/version.h index ee05851925..98f0106858 100644 --- a/bacula/src/version.h +++ b/bacula/src/version.h @@ -4,8 +4,8 @@ #undef VERSION #define VERSION "1.39.16" -#define BDATE "8 July 2006" -#define LSMDATE "08Jul06" +#define BDATE "9 July 2006" +#define LSMDATE "09Jul06" /* Debug flags */ #undef DEBUG -- 2.39.5