From 5bb7279e09c1d586a1de010eaaa46ed8db6a9189 Mon Sep 17 00:00:00 2001 From: Kern Sibbald Date: Fri, 23 Aug 2002 09:53:16 +0000 Subject: [PATCH] Implement new_read_record ... git-svn-id: https://bacula.svn.sourceforge.net/svnroot/bacula/trunk@118 91ce42f0-d328-0410-95d8-f526ca767f89 --- bacula/src/stored/bls.c | 19 +-- bacula/src/stored/protos.h | 1 + bacula/src/stored/record.c | 268 +++++++++++++++++++++++++++---------- bacula/src/stored/record.h | 4 + 4 files changed, 208 insertions(+), 84 deletions(-) diff --git a/bacula/src/stored/bls.c b/bacula/src/stored/bls.c index 53fbcf690c..6b59140e75 100644 --- a/bacula/src/stored/bls.c +++ b/bacula/src/stored/bls.c @@ -80,7 +80,7 @@ static char *rec_state_to_str(DEV_RECORD *rec) if (rec->state & REC_NO_HEADER) { strcat(buf, "Nohdr,"); } - if (rec->state & REC_PARTIAL_RECORD) { + if (is_partial_record(rec)) { strcat(buf, "partial,"); } if (rec->state & REC_BLOCK_EMPTY) { @@ -504,18 +504,11 @@ Warning, this Volume is a continuation of Volume %s\n", } record = 0; - for (rec->state=0;!(rec->state & REC_BLOCK_EMPTY); ) { - if (!read_record_from_block(block, rec)) { - if (rec->state & REC_NO_HEADER) { - Dmsg2(30, "!read,nohdr-break. stat=%s blk=%d\n", rec_state_to_str(rec), + for (rec->state=0; !is_block_empty(rec); ) { + if (!new_read_record_from_block(block, rec)) { + Dmsg2(30, "!read-break. stat=%s blk=%d\n", rec_state_to_str(rec), block->BlockNumber); - break; - } - if (rec->state & REC_NO_MATCH) { - Dmsg2(30, "!read,nomatch-break. stat=%s blk=%d\n", rec_state_to_str(rec), - block->BlockNumber); - break; - } + break; } /* * At this point, we have at least a record header. @@ -599,7 +592,7 @@ Warning, this Volume is a continuation of Volume %s\n", rec->remainder = 0; continue; /* we don't want record, read next one */ } - if (rec->state & REC_PARTIAL_RECORD) { + if (is_partial_record(rec)) { Dmsg6(10, "Partial, break. recno=%d state=%s blk=%d SI=%d ST=%d FI=%d\n", record, rec_state_to_str(rec), block->BlockNumber, rec->VolSessionId, rec->VolSessionTime, rec->FileIndex); diff --git a/bacula/src/stored/protos.h b/bacula/src/stored/protos.h index d550a2a437..31c4415116 100644 --- a/bacula/src/stored/protos.h +++ b/bacula/src/stored/protos.h @@ -145,6 +145,7 @@ char *FI_to_ascii(int fi); char *stream_to_ascii(int stream); int write_record_to_block(DEV_BLOCK *block, DEV_RECORD *rec); int read_record_from_block(DEV_BLOCK *block, DEV_RECORD *rec); +int new_read_record_from_block(DEV_BLOCK *block, DEV_RECORD *rec); DEV_RECORD *new_record(); void free_record(DEV_RECORD *rec); int read_record(DEVICE *dev, DEV_BLOCK *block, DEV_RECORD *record); diff --git a/bacula/src/stored/record.c b/bacula/src/stored/record.c index fb2dae35f7..25fc1ad3d8 100644 --- a/bacula/src/stored/record.c +++ b/bacula/src/stored/record.c @@ -2,7 +2,7 @@ * * record.c -- tape record handling functions * - * Kern Sibbald, April MMI + * Kern Sibbald, April MMI * * Version $Id$ * @@ -119,7 +119,7 @@ void free_record(DEV_RECORD *rec) * if necessary, handle getting a new Volume * * Returns: 0 on failure - * 1 on success + * 1 on success */ int read_record(DEVICE *dev, DEV_BLOCK *block, DEV_RECORD *record) { @@ -127,15 +127,15 @@ int read_record(DEVICE *dev, DEV_BLOCK *block, DEV_RECORD *record) while (!read_record_from_block(block, record)) { Dmsg2(90, "!read_record_from_block data_len=%d rem=%d\n", record->data_len, - record->remainder); + record->remainder); if (!read_block_from_dev(dev, block)) { Dmsg0(200, "===== Got read block I/O error ======\n"); - return 0; + return 0; } } Dmsg4(90, "read_record FI=%s SessId=%d Strm=%s len=%d\n", - FI_to_ascii(record->FileIndex), record->VolSessionId, - stream_to_ascii(record->Stream), record->data_len); + FI_to_ascii(record->FileIndex), record->VolSessionId, + stream_to_ascii(record->Stream), record->data_len); record->File = dev->file; record->Block = dev->block_num; return 1; @@ -146,7 +146,7 @@ int read_record(DEVICE *dev, DEV_BLOCK *block, DEV_RECORD *record) * Write a Record to the block * * Returns: 0 on failure (none or partially written) - * 1 on success (all bytes written) + * 1 on success (all bytes written) * * and remainder returned in packet. * @@ -181,22 +181,22 @@ rem=%d remainder=%d\n", if (rec->remainder == 0) { /* Require enough room to write a full header */ if (remlen >= RECHDR_LENGTH) { - ser_begin(block->bufp, RECHDR_LENGTH); - ser_uint32(rec->VolSessionId); - ser_uint32(rec->VolSessionTime); - ser_int32(rec->FileIndex); - ser_int32(rec->Stream); - ser_uint32(rec->data_len); - ASSERT(ser_length(block->bufp) == RECHDR_LENGTH); - - block->bufp += RECHDR_LENGTH; - block->binbuf += RECHDR_LENGTH; - remlen -= RECHDR_LENGTH; - rec->remainder = rec->data_len; + ser_begin(block->bufp, RECHDR_LENGTH); + ser_uint32(rec->VolSessionId); + ser_uint32(rec->VolSessionTime); + ser_int32(rec->FileIndex); + ser_int32(rec->Stream); + ser_uint32(rec->data_len); + ASSERT(ser_length(block->bufp) == RECHDR_LENGTH); + + block->bufp += RECHDR_LENGTH; + block->binbuf += RECHDR_LENGTH; + remlen -= RECHDR_LENGTH; + rec->remainder = rec->data_len; } else { - rec->remainder = rec->data_len + RECHDR_LENGTH; - sm_check(__FILE__, __LINE__, False); - return 0; + rec->remainder = rec->data_len + RECHDR_LENGTH; + sm_check(__FILE__, __LINE__, False); + return 0; } } else { /* @@ -219,12 +219,12 @@ rem=%d remainder=%d\n", ser_uint32(rec->VolSessionTime); ser_int32(rec->FileIndex); if (rec->remainder > rec->data_len) { - ser_int32(rec->Stream); /* normal full header */ - ser_uint32(rec->data_len); - rec->remainder = rec->data_len; /* must still do data record */ + ser_int32(rec->Stream); /* normal full header */ + ser_uint32(rec->data_len); + rec->remainder = rec->data_len; /* must still do data record */ } else { - ser_int32(-rec->Stream); /* mark this as a continuation record */ - ser_uint32(rec->remainder); /* bytes to do */ + ser_int32(-rec->Stream); /* mark this as a continuation record */ + ser_uint32(rec->remainder); /* bytes to do */ } ASSERT(ser_length(block->bufp) == RECHDR_LENGTH); @@ -237,7 +237,7 @@ rem=%d remainder=%d\n", } if (remlen == 0) { sm_check(__FILE__, __LINE__, False); - return 0; /* partial transfer */ + return 0; /* partial transfer */ } /* @@ -248,36 +248,36 @@ rem=%d remainder=%d\n", if (rec->remainder > 0) { /* Write as much of data as possible */ if (remlen >= rec->remainder) { - memcpy(block->bufp, rec->data+rec->data_len-rec->remainder, - rec->remainder); - block->bufp += rec->remainder; - block->binbuf += rec->remainder; + memcpy(block->bufp, rec->data+rec->data_len-rec->remainder, + rec->remainder); + block->bufp += rec->remainder; + block->binbuf += rec->remainder; } else { - memcpy(block->bufp, rec->data+rec->data_len-rec->remainder, - remlen); - if (!sm_check_rtn(__FILE__, __LINE__, False)) { - /* We damaged a buffer */ + memcpy(block->bufp, rec->data+rec->data_len-rec->remainder, + remlen); + if (!sm_check_rtn(__FILE__, __LINE__, False)) { + /* We damaged a buffer */ Dmsg6(0, "Damaged block FI=%s SessId=%d Strm=%s len=%d\n\ rem=%d remainder=%d\n", - FI_to_ascii(rec->FileIndex), rec->VolSessionId, - stream_to_ascii(rec->Stream), rec->data_len, - remlen, rec->remainder); + FI_to_ascii(rec->FileIndex), rec->VolSessionId, + stream_to_ascii(rec->Stream), rec->data_len, + remlen, rec->remainder); Dmsg5(0, "Damaged block: bufp=%x binbuf=%d buf_len=%d rem=%d moved=%d\n", - block->bufp, block->binbuf, block->buf_len, block->buf_len-block->binbuf, - remlen); + block->bufp, block->binbuf, block->buf_len, block->buf_len-block->binbuf, + remlen); Dmsg2(0, "Damaged block: buf=%x binbuffrombuf=%d \n", - block->buf, block->bufp-block->buf); + block->buf, block->bufp-block->buf); Emsg0(M_ABORT, 0, "Damaged buffer\n"); - } + } - block->bufp += remlen; - block->binbuf += remlen; - rec->remainder -= remlen; - return 0; /* did partial transfer */ + block->bufp += remlen; + block->binbuf += remlen; + rec->remainder -= remlen; + return 0; /* did partial transfer */ } } - rec->remainder = 0; /* did whole transfer */ + rec->remainder = 0; /* did whole transfer */ sm_check(__FILE__, __LINE__, False); return 1; } @@ -286,7 +286,7 @@ rem=%d remainder=%d\n", /* * Read a Record from the block * Returns: 0 on failure - * 1 on success + * 1 on success */ int read_record_from_block(DEV_BLOCK *block, DEV_RECORD *rec) { @@ -309,7 +309,7 @@ int read_record_from_block(DEV_BLOCK *block, DEV_RECORD *rec) */ if (remlen >= RECHDR_LENGTH) { Dmsg3(90, "read_record_block: remlen=%d data_len=%d rem=%d\n", - remlen, rec->data_len, rec->remainder); + remlen, rec->data_len, rec->remainder); unser_begin(block->bufp, RECHDR_LENGTH); unser_uint32(VolSessionId); @@ -327,22 +327,22 @@ int read_record_from_block(DEV_BLOCK *block, DEV_RECORD *rec) * if Stream is negative, it means that this is a continuation * of a previous partially written record. */ - if (Stream < 0) { /* continuation record? */ + if (Stream < 0) { /* continuation record? */ Dmsg1(500, "Got negative Stream => continuation. remainder=%d\n", - rec->remainder); - rec->state |= REC_CONTINUATION; + rec->remainder); + rec->state |= REC_CONTINUATION; if (!rec->remainder) { /* if we didn't read previously */ - rec->data_len = 0; /* return data as if no continuation */ - } else if (rec->VolSessionId != VolSessionId || - rec->VolSessionTime != VolSessionTime || - rec->Stream != -Stream) { - rec->state |= REC_NO_MATCH; - return 0; /* This is from some other Session */ - } - rec->Stream = -Stream; /* set correct Stream */ - } else { /* Regular record */ - rec->Stream = Stream; - rec->data_len = 0; /* transfer to beginning of data */ + rec->data_len = 0; /* return data as if no continuation */ + } else if (rec->VolSessionId != VolSessionId || + rec->VolSessionTime != VolSessionTime || + rec->Stream != -Stream) { + rec->state |= REC_NO_MATCH; + return 0; /* This is from some other Session */ + } + rec->Stream = -Stream; /* set correct Stream */ + } else { /* Regular record */ + rec->Stream = Stream; + rec->data_len = 0; /* transfer to beginning of data */ } rec->VolSessionId = VolSessionId; rec->VolSessionTime = VolSessionTime; @@ -350,8 +350,8 @@ int read_record_from_block(DEV_BLOCK *block, DEV_RECORD *rec) Dmsg6(90, "rd_rec_blk() got FI=%s SessId=%d Strm=%s len=%d\n\ remlen=%d data_len=%d\n", - FI_to_ascii(rec->FileIndex), rec->VolSessionId, - stream_to_ascii(rec->Stream), data_bytes, remlen, rec->data_len); + FI_to_ascii(rec->FileIndex), rec->VolSessionId, + stream_to_ascii(rec->Stream), data_bytes, remlen, rec->data_len); } else { /* * No more records in this block because the number @@ -363,14 +363,14 @@ remlen=%d data_len=%d\n", */ Dmsg0(90, "read_record_block: nothing\n"); if (!rec->remainder) { - rec->remainder = 1; /* set to expect continuation */ - rec->data_len = 0; /* no data transferred */ + rec->remainder = 1; /* set to expect continuation */ + rec->data_len = 0; /* no data transferred */ } rec->state |= (REC_NO_HEADER | REC_BLOCK_EMPTY); return 0; } - ASSERT(data_bytes < MAX_BLOCK_LENGTH); /* temp sanity check */ + ASSERT(data_bytes < MAX_BLOCK_LENGTH); /* temp sanity check */ rec->data = check_pool_memory_size(rec->data, rec->data_len+data_bytes); @@ -394,7 +394,7 @@ remlen=%d data_len=%d\n", block->bufp += remlen; block->binbuf -= remlen; rec->data_len += remlen; - rec->remainder = 1; /* partial record transferred */ + rec->remainder = 1; /* partial record transferred */ Dmsg1(90, "read_record_block: partial xfered=%d\n", rec->data_len); rec->state |= (REC_PARTIAL_RECORD | REC_BLOCK_EMPTY); /********FIXME********* this should return 1 */ @@ -404,5 +404,131 @@ remlen=%d data_len=%d\n", Dmsg4(90, "Rtn full rd_rec_blk FI=%s SessId=%d Strm=%s len=%d\n", FI_to_ascii(rec->FileIndex), rec->VolSessionId, stream_to_ascii(rec->Stream), rec->data_len); - return 1; /* transferred full record */ + return 1; /* transferred full record */ +} + +/* + * Read a Record from the block + * Returns: 0 if nothing read or if the continuation record does not match. + * In both of these cases, a block read must be done. + * 1 if at least the record header was read, this + * routine may have to be called again with a new + * block if the entire record was not read. + */ +int new_read_record_from_block(DEV_BLOCK *block, DEV_RECORD *rec) +{ + ser_declare; + uint32_t remlen; + uint32_t VolSessionId; + uint32_t VolSessionTime; + int32_t FileIndex; + int32_t Stream; + uint32_t data_bytes; + + remlen = block->binbuf; + + /* Clear state flags */ + rec->state = 0; + + /* + * Get the header. There is always a full header, + * otherwise we find it in the next block. + */ + if (remlen >= RECHDR_LENGTH) { + Dmsg3(90, "read_record_block: remlen=%d data_len=%d rem=%d\n", + remlen, rec->data_len, rec->remainder); + + unser_begin(block->bufp, RECHDR_LENGTH); + unser_uint32(VolSessionId); + unser_uint32(VolSessionTime); + unser_int32(FileIndex); + unser_int32(Stream); + unser_uint32(data_bytes); + + ASSERT(unser_length(block->bufp) == RECHDR_LENGTH); + block->bufp += RECHDR_LENGTH; + block->binbuf -= RECHDR_LENGTH; + remlen -= RECHDR_LENGTH; + + /* + * if Stream is negative, it means that this is a continuation + * of a previous partially written record. + */ + if (Stream < 0) { /* continuation record? */ + Dmsg1(500, "Got negative Stream => continuation. remainder=%d\n", + rec->remainder); + rec->state |= REC_CONTINUATION; + if (!rec->remainder) { /* if we didn't read previously */ + rec->data_len = 0; /* return data as if no continuation */ + } else if (rec->VolSessionId != VolSessionId || + rec->VolSessionTime != VolSessionTime || + rec->Stream != -Stream) { + rec->state |= REC_NO_MATCH; + return 0; /* This is from some other Session */ + } + rec->Stream = -Stream; /* set correct Stream */ + } else { /* Regular record */ + rec->Stream = Stream; + rec->data_len = 0; /* transfer to beginning of data */ + } + rec->VolSessionId = VolSessionId; + rec->VolSessionTime = VolSessionTime; + rec->FileIndex = FileIndex; + + Dmsg6(90, "rd_rec_blk() got FI=%s SessId=%d Strm=%s len=%d\n\ +remlen=%d data_len=%d\n", + FI_to_ascii(rec->FileIndex), rec->VolSessionId, + stream_to_ascii(rec->Stream), data_bytes, remlen, rec->data_len); + } else { + /* + * No more records in this block because the number + * of remaining bytes are less than a record header + * length, so return empty handed, but indicate that + * he must read again. By returning, we allow the + * higher level routine to fetch the next block and + * then reread. + */ + Dmsg0(90, "read_record_block: nothing\n"); + if (!rec->remainder) { + rec->remainder = 1; /* set to expect continuation */ + rec->data_len = 0; /* no data transferred */ + } + rec->state |= (REC_NO_HEADER | REC_BLOCK_EMPTY); + return 0; + } + + ASSERT(data_bytes < MAX_BLOCK_LENGTH); /* temp sanity check */ + + rec->data = check_pool_memory_size(rec->data, rec->data_len+data_bytes); + + /* + * At this point, we have read the header, now we + * must transfer as much of the data record as + * possible taking into account: 1. A partial + * data record may have previously been transferred, + * 2. The current block may not contain the whole data + * record. + */ + if (remlen >= data_bytes) { + /* Got whole record */ + memcpy(rec->data+rec->data_len, block->bufp, data_bytes); + block->bufp += data_bytes; + block->binbuf -= data_bytes; + rec->data_len += data_bytes; + } else { + /* Partial record */ + memcpy(rec->data+rec->data_len, block->bufp, remlen); + block->bufp += remlen; + block->binbuf -= remlen; + rec->data_len += remlen; + rec->remainder = 1; /* partial record transferred */ + Dmsg1(90, "read_record_block: partial xfered=%d\n", rec->data_len); + rec->state |= (REC_PARTIAL_RECORD | REC_BLOCK_EMPTY); + return 1; + } + rec->remainder = 0; + Dmsg4(90, "Rtn full rd_rec_blk FI=%s SessId=%d Strm=%s len=%d\n", + FI_to_ascii(rec->FileIndex), rec->VolSessionId, + stream_to_ascii(rec->Stream), rec->data_len); + return 1; /* transferred full record */ } diff --git a/bacula/src/stored/record.h b/bacula/src/stored/record.h index fe6f41cea1..601e9ee590 100644 --- a/bacula/src/stored/record.h +++ b/bacula/src/stored/record.h @@ -55,12 +55,16 @@ typedef struct s_record_hdr { uint32_t data_len; } RECORD_HDR; +/* Record state bit definitions */ #define REC_NO_HEADER 0x01 /* No header read */ #define REC_PARTIAL_RECORD 0x02 /* returning partial record */ #define REC_BLOCK_EMPTY 0x04 /* not enough data in block */ #define REC_NO_MATCH 0x08 /* No match on continuation data */ #define REC_CONTINUATION 0x10 /* Continuation record found */ +#define is_partial_record(r) ((r)->state & REC_PARTIAL_RECORD) +#define is_block_empty(r) ((r)->state & REC_BLOCK_EMPTY) + /* * DEV_RECORD for reading and writing records. * It consists of a Record Header, and the Record Data -- 2.39.5