From 5503b7802e4c8714d7d4273a212f229ff7524735 Mon Sep 17 00:00:00 2001 From: Kern Sibbald Date: Fri, 23 Apr 2010 22:31:35 +0200 Subject: [PATCH] Add compress/decompress of Object Record data --- bacula/src/cats/cats.h | 1 + bacula/src/cats/make_mysql_tables.in | 3 +- bacula/src/cats/make_postgresql_tables.in | 3 +- bacula/src/cats/make_sqlite3_tables.in | 1 + bacula/src/cats/sql_create.c | 7 +- bacula/src/dird/catreq.c | 27 +++++--- bacula/src/dird/fd_cmds.c | 16 +++-- bacula/src/filed/backup.c | 33 +++++++-- bacula/src/filed/fd_plugins.h | 2 + bacula/src/filed/job.c | 48 +++++++++---- bacula/src/findlib/find.c | 2 +- bacula/src/lib/bsock.c | 5 +- bacula/src/lib/bsys.c | 82 ++++++++++++++++++++++- bacula/src/lib/protos.h | 5 +- 14 files changed, 185 insertions(+), 50 deletions(-) diff --git a/bacula/src/cats/cats.h b/bacula/src/cats/cats.h index a340b79400..79581201d6 100644 --- a/bacula/src/cats/cats.h +++ b/bacula/src/cats/cats.h @@ -944,6 +944,7 @@ struct ROBJECT_DBR { char *object; char *plugin_name; uint32_t object_len; + uint32_t object_full_len; uint32_t object_index; int32_t object_compression; uint32_t FileIndex; diff --git a/bacula/src/cats/make_mysql_tables.in b/bacula/src/cats/make_mysql_tables.in index 1d3f8790a7..c030341893 100644 --- a/bacula/src/cats/make_mysql_tables.in +++ b/bacula/src/cats/make_mysql_tables.in @@ -36,7 +36,7 @@ CREATE TABLE Path ( -- In general, these will cause very significant performance -- problems in other areas. A better approch is to carefully check -- that all your memory configuation parameters are --- suitable for the size of your installation. If you backup +-- suitable for the size of your installation. If you backup -- millions of files, you need to adapt the database memory -- configuration parameters concerning sorting, joining and global -- memory. By default, sort and join parameters are very small @@ -66,6 +66,7 @@ CREATE TABLE RestoreObject ( RestoreObject LONGBLOB NOT NULL, PluginName TINYBLOB NOT NULL, ObjectLength INTEGER DEFAULT 0, + ObjectFullLength INTEGER DEFAULT 0, ObjectIndex INTEGER DEFAULT 0, ObjectType INTEGER DEFAULT 0, FileIndex INTEGER UNSIGNED DEFAULT 0, diff --git a/bacula/src/cats/make_postgresql_tables.in b/bacula/src/cats/make_postgresql_tables.in index 18335d9c68..4fb6745247 100644 --- a/bacula/src/cats/make_postgresql_tables.in +++ b/bacula/src/cats/make_postgresql_tables.in @@ -36,7 +36,7 @@ CREATE UNIQUE INDEX path_name_idx on Path (Path); -- In general, these will cause very significant performance -- problems in other areas. A better approch is to carefully check -- that all your memory configuation parameters are --- suitable for the size of your installation. If you backup +-- suitable for the size of your installation. If you backup -- millions of files, you need to adapt the database memory -- configuration parameters concerning sorting, joining and global -- memory. By default, sort and join parameters are very small @@ -81,6 +81,7 @@ CREATE TABLE RestoreObject ( RestoreObject TEXT NOT NULL, PluginName TEXT NOT NULL, ObjectLength INTEGER DEFAULT 0, + ObjectFullLength INTEGER DEFAULT 0, ObjectIndex INTEGER DEFAULT 0, ObjectType INTEGER DEFAULT 0, FileIndex INTEGER UNSIGNED DEFAULT 0, diff --git a/bacula/src/cats/make_sqlite3_tables.in b/bacula/src/cats/make_sqlite3_tables.in index 3c72458824..3205b6a013 100644 --- a/bacula/src/cats/make_sqlite3_tables.in +++ b/bacula/src/cats/make_sqlite3_tables.in @@ -55,6 +55,7 @@ CREATE TABLE RestoreObject ( RestoreObject TEXT DEFAULT '', PluginName TEXT DEFAULT '', ObjectLength INTEGER DEFAULT 0, + ObjectFullLength INTEGER DEFAULT 0, ObjectIndex INTEGER DEFAULT 0, ObjectType INTEGER DEFAULT 0, FileIndex INTEGER UNSIGNED DEFAULT 0, diff --git a/bacula/src/cats/sql_create.c b/bacula/src/cats/sql_create.c index 7d46aebba9..30ed060564 100644 --- a/bacula/src/cats/sql_create.c +++ b/bacula/src/cats/sql_create.c @@ -1243,9 +1243,10 @@ bool db_create_restore_object_record(JCR *jcr, B_DB *mdb, ROBJECT_DBR *ro) Mmsg(mdb->cmd, "INSERT INTO RestoreObject (ObjectName,RestoreObject," - "ObjectLength,ObjectIndex,ObjectType,ObjectCompression,FileIndex,JobId) " - "VALUES ('%s','%s',%d,%d,%d,%d,%d,%u)", - mdb->esc_name, esc_obj, ro->object_len, ro->object_index, + "ObjectLength,ObjectFullLength,ObjectIndex,ObjectType," + "ObjectCompression,FileIndex,JobId) " + "VALUES ('%s','%s',%d,%d,%d,%d,%d,%d,%u)", + mdb->esc_name, esc_obj, ro->object_len, ro->object_full_len, ro->object_index, FT_RESTORE_FIRST, ro->object_compression, ro->FileIndex, ro->JobId); ro->RestoreObjectId = sql_insert_autokey_record(mdb, mdb->cmd, NT_("RestoreObject")); diff --git a/bacula/src/dird/catreq.c b/bacula/src/dird/catreq.c index 47e0b47902..7e9422e5b7 100644 --- a/bacula/src/dird/catreq.c +++ b/bacula/src/dird/catreq.c @@ -423,7 +423,8 @@ static void update_attribute(JCR *jcr, char *msg, int32_t msglen) * File_index * File_type * Object_index - * Object_len + * Object_len (possibly compressed) + * Object_full_len (not compressed) * Object_compression * Plugin_name * Object_name @@ -492,17 +493,23 @@ static void update_attribute(JCR *jcr, char *msg, int32_t msglen) Dmsg1(100, "Robj=%s\n", p); - skip_nonspaces(&p); /* skip FileIndex */ + skip_nonspaces(&p); /* skip FileIndex */ + skip_spaces(&p); + ro.FileType = str_to_int32(p); /* FileType */ + skip_nonspaces(&p); + skip_spaces(&p); + ro.object_index = str_to_int32(p); /* Object Index */ + skip_nonspaces(&p); + skip_spaces(&p); + ro.object_len = str_to_int32(p); /* object length possibly compressed */ + skip_nonspaces(&p); + skip_spaces(&p); + ro.object_full_len = str_to_int32(p); /* uncompressed object length */ + skip_nonspaces(&p); skip_spaces(&p); - ro.FileType = str_to_int32(p); - skip_nonspaces(&p); /* move past FileType */ + ro.object_compression = str_to_int32(p); /* compression */ + skip_nonspaces(&p); skip_spaces(&p); - ro.object_index = str_to_int32(p); - skip_nonspaces(&p); /* move past object_index */ - ro.object_len = str_to_int32(p); - skip_nonspaces(&p); /* move past object_length */ - ro.object_compression = str_to_int32(p); - skip_nonspaces(&p); /* move past object_compression */ ro.plugin_name = p; /* point to plugin name */ len = strlen(ro.plugin_name); diff --git a/bacula/src/dird/fd_cmds.c b/bacula/src/dird/fd_cmds.c index 437e095bc3..56085ccd29 100644 --- a/bacula/src/dird/fd_cmds.c +++ b/bacula/src/dird/fd_cmds.c @@ -678,16 +678,18 @@ static int restore_object_handler(void *ctx, int num_fields, char **row) if (jcr->is_job_canceled()) { return 1; } - fd->fsend("restoreobject JobId=%s ObjLen=%s ObjInx=%s ObjType=%s FI=%s\n", - row[0], row[1], row[2], row[3], row[4]); + fd->fsend("restoreobject JobId=%s %s,%s,%s,%s,%s,%s\n", + row[0], row[1], row[2], row[3], row[4], row[5], row[6]); + + Dmsg1(000, "Send obj hdr=%s", fd->msg); msg_save = fd->msg; - fd->msg = row[5] ? row[5] : (char *)""; + fd->msg = row[7] ? row[7] : (char *)""; fd->msglen = strlen(fd->msg); fd->send(); /* send Object name */ -// Dmsg1(000, "Send obj: %s\n", fd->msg); + Dmsg1(000, "Send obj: %s\n", fd->msg); - fd->msg = row[6] ? row[6] : (char *)""; /* object */ + fd->msg = row[8] ? row[8] : (char *)""; /* object */ fd->msglen = str_to_uint64(row[1]); /* object length */ fd->send(); /* send object */ // Dmsg1(000, "Send obj: %s\n", fd->msg); @@ -705,8 +707,8 @@ bool send_restore_objects(JCR *jcr) if (!jcr->JobIds || !jcr->JobIds[0]) { return true; } - Mmsg(query, "SELECT JobId,ObjectLength,ObjectIndex,ObjectType," - "FileIndex,ObjectName,RestoreObject FROM RestoreObject " + Mmsg(query, "SELECT JobId,ObjectLength,ObjectFullLength,ObjectIndex,ObjectType," + "ObjectCompression,FileIndex,ObjectName,RestoreObject FROM RestoreObject " "WHERE JobId IN (%s) ORDER BY ObjectIndex ASC", jcr->JobIds); /* restore_object_handler is called for each file found */ diff --git a/bacula/src/filed/backup.c b/bacula/src/filed/backup.c index 96e0be2db7..7fd9bf70e3 100644 --- a/bacula/src/filed/backup.c +++ b/bacula/src/filed/backup.c @@ -1108,6 +1108,7 @@ bool encode_and_send_attributes(JCR *jcr, FF_PKT *ff_pkt, int &data_stream) char attribsExBuf[MAXSTRING]; char *attribsEx; int attr_stream; + int comp_len; bool stat; #ifdef FD_NO_SEND_TEST return true; @@ -1164,7 +1165,8 @@ bool encode_and_send_attributes(JCR *jcr, FF_PKT *ff_pkt, int &data_stream) * File_index * File_type * Object_index - * Object_len + * Object_len (possibly compressed) + * Object_full_len (not compressed) * Object_compression * Plugin_name * Object_name @@ -1191,15 +1193,34 @@ bool encode_and_send_attributes(JCR *jcr, FF_PKT *ff_pkt, int &data_stream) ff_pkt->type, ff_pkt->link, 0, attribs, 0, 0, attribsEx, 0); break; case FT_RESTORE_FIRST: - sd->msglen = Mmsg(sd->msg, "%d %d %d %d %d %s%c%s%c", + comp_len = ff_pkt->object_len; + if (ff_pkt->object_len > 1000) { + /* Big object, compress it */ + int stat; + comp_len = ff_pkt->object_len + 1000; + POOLMEM *comp_obj = get_memory(comp_len); + stat = Zdeflate(ff_pkt->object, ff_pkt->object_len, comp_obj, comp_len); + if (comp_len < ff_pkt->object_len) { + ff_pkt->object = comp_obj; + ff_pkt->object_compression = 1; /* zlib level 9 compression */ + } else { + /* Uncompressed object smaller, use it */ + comp_len = ff_pkt->object_len; + } + Dmsg2(100, "Object compressed from %d to %d bytes\n", ff_pkt->object_len, comp_len); + } + sd->msglen = Mmsg(sd->msg, "%d %d %d %d %d %d %s%c%s%c", jcr->JobFiles, ff_pkt->type, ff_pkt->object_index, - ff_pkt->object_len, ff_pkt->object_compression, + comp_len, ff_pkt->object_len, ff_pkt->object_compression, ff_pkt->fname, 0, ff_pkt->object_name, 0); - sd->msg = check_pool_memory_size(sd->msg, sd->msglen + ff_pkt->object_len + 2); - memcpy(sd->msg + sd->msglen, ff_pkt->object, ff_pkt->object_len); + sd->msg = check_pool_memory_size(sd->msg, sd->msglen + comp_len + 2); + memcpy(sd->msg + sd->msglen, ff_pkt->object, comp_len); /* Note we send one extra byte so Dir can store zero after object */ - sd->msglen += ff_pkt->object_len + 1; + sd->msglen += comp_len + 1; stat = sd->send(); + if (ff_pkt->object_compression) { + free_and_null_pool_memory(ff_pkt->object); + } break; default: stat = sd->fsend("%ld %d %s%c%s%c%c%s%c", jcr->JobFiles, diff --git a/bacula/src/filed/fd_plugins.h b/bacula/src/filed/fd_plugins.h index 875936c963..9745db1988 100644 --- a/bacula/src/filed/fd_plugins.h +++ b/bacula/src/filed/fd_plugins.h @@ -81,7 +81,9 @@ struct restore_object_pkt { char *object; /* restore object data to save */ int32_t object_type; /* FT_xx for this file */ int32_t object_len; /* restore object length */ + int32_t object_full_len; /* restore object uncompressed length */ int32_t object_index; /* restore object index */ + int32_t object_compression; /* set to compression type */ int32_t stream; /* attribute stream id */ uint32_t JobId; /* JobId object came from */ int32_t pkt_end; /* end packet sentinel */ diff --git a/bacula/src/filed/job.c b/bacula/src/filed/job.c index b130670e78..f1421ceafd 100644 --- a/bacula/src/filed/job.c +++ b/bacula/src/filed/job.c @@ -137,7 +137,7 @@ static char sessioncmd[] = "session %127s %ld %ld %ld %ld %ld %ld\n"; static char restorecmd[] = "restore replace=%c prelinks=%d where=%s\n"; static char restorecmd1[] = "restore replace=%c prelinks=%d where=\n"; static char restorecmdR[] = "restore replace=%c prelinks=%d regexwhere=%s\n"; -static char restoreobjcmd[] = "restoreobject JobId=%u ObjLen=%d ObjInx=%d ObjType=%d FI=%d\n"; +static char restoreobjcmd[] = "restoreobject JobId=%u %d,%d,%d,%d,%d,%d\n"; static char endrestoreobjectcmd[] = "restoreobject end\n"; static char verifycmd[] = "verify level=%30s"; static char estimatecmd[] = "estimate listing=%d"; @@ -629,7 +629,6 @@ static int runscript_cmd(JCR *jcr) static int restore_object_cmd(JCR *jcr) { BSOCK *dir = jcr->dir_bsock; - POOLMEM *msg = get_memory(dir->msglen+1); int32_t FileIndex; restore_object_pkt rop; @@ -639,20 +638,21 @@ static int restore_object_cmd(JCR *jcr) Dmsg1(100, "Enter restoreobject_cmd: %s", dir->msg); if (strcmp(dir->msg, endrestoreobjectcmd) == 0) { generate_plugin_event(jcr, bEventRestoreObject, NULL); - free_memory(msg); return dir->fsend(OKRestoreObject); } if (sscanf(dir->msg, restoreobjcmd, &rop.JobId, &rop.object_len, - &rop.object_index, &rop.object_type, &FileIndex) != 5) { + &rop.object_full_len, &rop.object_index, + &rop.object_type, &rop.object_compression, &FileIndex) != 7) { Dmsg0(5, "Bad restore object command\n"); pm_strcpy(jcr->errmsg, dir->msg); Jmsg1(jcr, M_FATAL, 0, _("Bad RestoreObject command: %s\n"), jcr->errmsg); goto bail_out; } - Dmsg5(100, "Recv object: JobId=%u objlen=%d objinx=%d objtype=%d FI=%d\n", - rop.JobId, rop.object_len, rop.object_index, rop.object_type, FileIndex); + Dmsg6(100, "Recv object: JobId=%u objlen=%d full_len=%d objinx=%d objtype=%d FI=%d\n", + rop.JobId, rop.object_len, rop.object_full_len, + rop.object_index, rop.object_type, FileIndex); /* Read Object name */ if (dir->recv() < 0) { goto bail_out; @@ -664,9 +664,28 @@ static int restore_object_cmd(JCR *jcr) if (dir->recv() < 0) { goto bail_out; } + /* Transfer object from message buffer, and get new message buffer */ rop.object = dir->msg; - Dmsg2(100, "Recv Object: len=%d Object=%s\n", dir->msglen, dir->msg); - + dir->msg = get_pool_memory(PM_MESSAGE); + + /* If object is compressed, uncompress it */ + if (rop.object_compression == 1) { /* zlib level 9 */ + int stat; + int out_len = rop.object_full_len + 100; + POOLMEM *obj = get_memory(out_len); + Dmsg2(100, "Inflating from %d to %d\n", rop.object_len, rop.object_full_len); + stat = Zinflate(rop.object, rop.object_len, obj, out_len); + Dmsg1(100, "Zinflate stat=%d\n", stat); + if (out_len != rop.object_full_len) { + Jmsg3(jcr, M_ERROR, 0, ("Decompression failed. Len wanted=%d got=%d. Object=%s\n"), + rop.object_full_len, out_len, rop.object_name); + } + free_pool_memory(rop.object); /* release compressed object */ + rop.object = obj; /* new uncompressed object */ + rop.object_len = out_len; + } + Dmsg2(100, "Recv Object: len=%d Object=%s\n", rop.object_len, rop.object); + /* Special Job meta data */ if (strcmp(rop.object_name, "job_metadata.xml") == 0) { Dmsg0(100, "got job metadata\n"); free_and_null_pool_memory(jcr->job_metadata); @@ -680,17 +699,15 @@ static int restore_object_cmd(JCR *jcr) if (rop.object_name) { free(rop.object_name); } - if (!rop.object) { - dir->msg = get_pool_memory(PM_MESSAGE); + if (rop.object) { + free_pool_memory(rop.object); } - free_memory(msg); Dmsg1(100, "Send: %s", OKRestoreObject); return 1; bail_out: dir->fsend(_("2909 Bad RestoreObject command.\n")); - free_memory(msg); return 0; } @@ -1986,8 +2003,11 @@ static int restore_cmd(JCR *jcr) Dmsg0(100, "restore command\n"); #if defined(WIN32_VSS) - /* TODO: this should be given from the director */ - enable_vss = 1; + /** + * No need to enable VSS for restore if we do not have plugin + * data to restore + */ + enable_vss = jcr->job_metadata != NULL; Dmsg2(50, "g_pVSSClient = %p, enable_vss = %d\n", g_pVSSClient, enable_vss); // capture state here, if client is backed up by multiple directors diff --git a/bacula/src/findlib/find.c b/bacula/src/findlib/find.c index d2da062c86..6347655f91 100644 --- a/bacula/src/findlib/find.c +++ b/bacula/src/findlib/find.c @@ -40,7 +40,7 @@ #include "bacula.h" #include "find.h" -static const int dbglvl = 150; +static const int dbglvl = 450; int32_t name_max; /* filename max length */ int32_t path_max; /* path name max length */ diff --git a/bacula/src/lib/bsock.c b/bacula/src/lib/bsock.c index 6447fcf5eb..5e4fff647e 100644 --- a/bacula/src/lib/bsock.c +++ b/bacula/src/lib/bsock.c @@ -1,7 +1,7 @@ /* Bacula® - The Network Backup Solution - Copyright (C) 2007-2008 Free Software Foundation Europe e.V. + Copyright (C) 2007-2010 Free Software Foundation Europe e.V. The main author of Bacula is Kern Sibbald, with contributions from many others, a complete list can be found in the file AUTHORS. @@ -30,7 +30,6 @@ * * by Kern Sibbald * - * Version $Id: $ */ @@ -348,7 +347,7 @@ bool BSOCK::send() } return false; } - if (msglen > 1000000) { + if (msglen > 4000000) { if (!m_suppress_error_msgs) { Qmsg4(m_jcr, M_ERROR, 0, _("Socket has insane msglen=%d on call to %s:%s:%d\n"), diff --git a/bacula/src/lib/bsys.c b/bacula/src/lib/bsys.c index f5d7bc6a7a..989958438f 100644 --- a/bacula/src/lib/bsys.c +++ b/bacula/src/lib/bsys.c @@ -1,7 +1,7 @@ /* Bacula® - The Network Backup Solution - Copyright (C) 2000-2008 Free Software Foundation Europe e.V. + Copyright (C) 2000-2010 Free Software Foundation Europe e.V. The main author of Bacula is Kern Sibbald, with contributions from many others, a complete list can be found in the file AUTHORS. @@ -32,10 +32,12 @@ * * Bacula utility functions are in util.c * - * Version $Id$ */ #include "bacula.h" +#ifdef HAVE_LIBZ +#include +#endif static pthread_mutex_t timer_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -673,3 +675,79 @@ char *escape_filename(const char *file_path) return escaped_path; } + +/* + * Deflate or compress and input buffer. You must supply an + * output buffer sufficiently long and the length of the + * output buffer. Generally, if the output buffer is the + * same size as the input buffer, it should work (at least + * for text). + */ +int Zdeflate(char *in, int in_len, char *out, int &out_len) +{ +#ifdef HAVE_LIBZ + z_stream strm; + int ret; + + /* allocate deflate state */ + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + ret = deflateInit(&strm, 9); + if (ret != Z_OK) { + Dmsg0(200, "deflateInit error\n"); + (void)deflateEnd(&strm); + return ret; + } + + strm.next_in = (Bytef *)in; + strm.avail_in = in_len; + Dmsg1(200, "In: %d bytes\n", strm.avail_in); + strm.avail_out = out_len; + strm.next_out = (Bytef *)out; + ret = deflate(&strm, Z_FINISH); + out_len = out_len - strm.avail_out; + Dmsg1(200, "compressed=%d\n", out_len); + (void)deflateEnd(&strm); + return ret; +#else + return 1; +#endif +} + +/* + * Inflate or uncompress an input buffer. You must supply + * and output buffer and an output length sufficiently long + * or there will be an error. This uncompresses in one call. + */ +int Zinflate(char *in, int in_len, char *out, int &out_len) +{ +#ifdef HAVE_LIBZ + z_stream strm; + int ret; + + /* allocate deflate state */ + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + strm.next_in = (Bytef *)in; + strm.avail_in = in_len; + ret = inflateInit(&strm); + if (ret != Z_OK) { + Dmsg0(200, "inflateInit error\n"); + (void)inflateEnd(&strm); + return ret; + } + + Dmsg1(200, "In len: %d bytes\n", strm.avail_in); + strm.avail_out = out_len; + strm.next_out = (Bytef *)out; + ret = inflate(&strm, Z_FINISH); + out_len -= strm.avail_out; + Dmsg1(200, "Uncompressed=%d\n", out_len); + (void)inflateEnd(&strm); + return ret; +#else + return 1; +#endif +} diff --git a/bacula/src/lib/protos.h b/bacula/src/lib/protos.h index e24291cce4..701d98e2c0 100644 --- a/bacula/src/lib/protos.h +++ b/bacula/src/lib/protos.h @@ -1,7 +1,7 @@ /* Bacula® - The Network Backup Solution - Copyright (C) 2000-2008 Free Software Foundation Europe e.V. + Copyright (C) 2000-2010 Free Software Foundation Europe e.V. The main author of Bacula is Kern Sibbald, with contributions from many others, a complete list can be found in the file AUTHORS. @@ -28,7 +28,6 @@ /* * Prototypes for lib directory of Bacula * - * Version $Id$ */ #ifndef __LIBPROTOS_H @@ -79,6 +78,8 @@ long long int strtoll (const char *ptr, char **endptr, int base); void read_state_file(char *dir, const char *progname, int port); int b_strerror(int errnum, char *buf, size_t bufsiz); char *escape_filename(const char *file_path); +int Zdeflate(char *in, int in_len, char *out, int &out_len); +int Zinflate(char *in, int in_len, char *out, int &out_len); /* bnet.c */ int32_t bnet_recv (BSOCK *bsock); -- 2.39.5