2 Bacula(R) - The Network Backup Solution
4 Copyright (C) 2000-2017 Kern Sibbald
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
17 Bacula(R) is a registered trademark of Kern Sibbald.
20 * Routines for writing to the Cloud using S3 protocol.
21 * NOTE!!! This cloud driver is not compatible with
22 * any disk-changer script for changing Volumes.
23 * It does however work with Bacula Virtual autochangers.
25 * Written by Kern Sibbald, May MMXVI
28 #include "s3_driver.h"
32 static const int dbglvl = 100;
33 static const char *S3Errors[] = {
38 "InvalidBucketNameTooLong",
39 "InvalidBucketNameFirstCharacter",
40 "InvalidBucketNameCharacter",
41 "InvalidBucketNameCharacterSequence",
42 "InvalidBucketNameTooShort",
43 "InvalidBucketNameDotQuadNotation",
45 "FailedToInitializeRequest",
46 "MetaDataHeadersTooLong",
53 "CacheControlTooLong",
54 "BadContentDispositionFilename",
55 "ContentDispositionFilenameTooLong",
57 "ContentEncodingTooLong",
61 "IfNotMatchETagTooLong",
66 "EmailAddressTooLong",
68 "UserDisplayNameTooLong",
71 "TargetBucketTooLong",
72 "TargetPrefixTooLong",
76 "XmlDocumentTooLarge",
79 "ServerFailedVerification",
84 "AmbiguousGrantByEmailAddress",
86 "BucketAlreadyExists",
87 "BucketAlreadyOwnedByYou",
89 "CredentialsNotSupported",
90 "CrossLocationLoggingProhibited",
94 "IllegalVersioningConfigurationException",
96 "IncorrectNumberOfFilesInPostRequest",
100 "InvalidAddressingHeader",
103 "InvalidBucketState",
105 "InvalidLocationConstraint",
106 "InvalidObjectState",
110 "InvalidPolicyDocument",
114 "InvalidSOAPRequest",
115 "InvalidStorageClass",
116 "InvalidTargetBucketForLogging",
121 "MalformedPOSTRequest",
123 "MaxMessageLengthExceeded",
124 "MaxPostPreDataLengthExceededError",
128 "MissingContentLength",
129 "MissingRequestBodyError",
130 "MissingSecurityElement",
131 "MissingSecurityHeader",
132 "NoLoggingStatusForKey",
135 "NoSuchLifecycleConfiguration",
140 "NotSuchBucketPolicy",
143 "PreconditionFailed",
145 "RestoreAlreadyInProgress",
146 "RequestIsNotMultiPartContent",
148 "RequestTimeTooSkewed",
149 "RequestTorrentOfBucketError",
150 "SignatureDoesNotMatch",
151 "ServiceUnavailable",
154 "TokenRefreshRequired",
157 "UnresolvableGrantByEmailAddress",
158 "UserKeyMustBeSpecified",
160 "HttpErrorMovedTemporarily",
161 "HttpErrorBadRequest",
162 "HttpErrorForbidden",
169 #define S3ErrorsSize (sizeof(S3Errors)/sizeof(char *))
174 * Our Bacula context for s3_xxx callbacks
175 * NOTE: only items needed for particular callback are set
191 bwlimit *limit; /* Used to control the bandwidth */
192 bacula_ctx(POOLMEM *&err) : jcr(NULL), xfer(NULL), errMsg(err), parts(NULL),
193 isTruncated(0), nextMarker(NULL), obj_len(0), caller(NULL),
194 infile(NULL), outfile(NULL), volumes(NULL), status(S3StatusOK), limit(NULL)
196 bacula_ctx(transfer *t) : jcr(NULL), xfer(t), errMsg(t->m_message), parts(NULL),
197 isTruncated(0), nextMarker(NULL), obj_len(0), caller(NULL),
198 infile(NULL), outfile(NULL), volumes(NULL), status(S3StatusOK), limit(NULL)
203 /* Imported functions */
204 const char *mode_to_str(int mode);
206 /* Forward referenced functions */
208 /* Const and Static definitions */
210 static S3Status responsePropertiesCallback(
211 const S3ResponseProperties *properties,
214 static void responseCompleteCallback(
216 const S3ErrorDetails *oops,
220 S3ResponseHandler responseHandler =
222 &responsePropertiesCallback,
223 &responseCompleteCallback
229 static S3Status responsePropertiesCallback(
230 const S3ResponseProperties *properties,
233 bacula_ctx *ctx = (bacula_ctx *)callbackData;
235 if (ctx->xfer && properties) {
236 if (properties->contentLength > 0) {
237 ctx->xfer->m_res_size = properties->contentLength;
239 if (properties->lastModified > 0) {
240 ctx->xfer->m_res_mtime = properties->lastModified;
246 static void responseCompleteCallback(
248 const S3ErrorDetails *oops,
251 bacula_ctx *ctx = (bacula_ctx *)callbackCtx;
256 ctx->status = status; /* return completion status */
258 if (status < 0 || status > S3ErrorsSize) {
259 status = (S3Status)S3ErrorsSize;
263 msg = S3Errors[status];
265 if ((status != S3StatusOK) && ctx->errMsg) {
266 if (oops->furtherDetails) {
267 Mmsg(ctx->errMsg, "%s ERR=%s\n"
268 "furtherDetails=%s\n", ctx->caller, msg, oops->furtherDetails);
269 Dmsg1(dbglvl, "%s", ctx->errMsg);
271 Mmsg(ctx->errMsg, "%s ERR=%s\n", ctx->caller, msg);
272 Dmsg1(dbglvl, "%s", ctx->errMsg);
281 static int putObjectCallback(int buf_len, char *buf, void *callbackCtx)
283 bacula_ctx *ctx = (bacula_ctx *)callbackCtx;
288 if (ctx->xfer->is_cancelled()) {
289 Mmsg(ctx->errMsg, _("Job cancelled.\n"));
293 read_len = (ctx->obj_len > buf_len) ? buf_len : ctx->obj_len;
294 rbytes = fread(buf, 1, read_len, ctx->infile);
295 Dmsg5(dbglvl, "%s thread=%lu rbytes=%d bufsize=%u remlen=%lu\n",
296 ctx->caller, pthread_self(), rbytes, buf_len, ctx->obj_len);
299 Mmsg(ctx->errMsg, "%s Error reading input file: ERR=%s\n",
300 ctx->caller, be.bstrerror());
303 ctx->obj_len -= rbytes;
306 ctx->limit->control_bwlimit(rbytes);
314 S3PutObjectHandler putObjectHandler =
322 * Put a cache object into the cloud
324 S3Status s3_driver::put_object(transfer *xfer, const char *cache_fname, const char *cloud_fname)
327 bacula_ctx ctx(xfer);
328 ctx.limit = upload_limit.use_bwlimit() ? &upload_limit : NULL;
331 if (lstat(cache_fname, &statbuf) == -1) {
333 Mmsg2(ctx.errMsg, "Failed to stat file %s. ERR=%s\n",
334 cache_fname, be.bstrerror());
338 ctx.obj_len = statbuf.st_size;
340 if (!(ctx.infile = bfopen(cache_fname, "r"))) {
342 Mmsg2(ctx.errMsg, "Failed to open input file %s. ERR=%s\n",
343 cache_fname, be.bstrerror());
347 ctx.caller = "S3_put_object";
348 S3_put_object(&s3ctx, cloud_fname, ctx.obj_len, NULL, NULL,
349 &putObjectHandler, &ctx);
356 /* no error so far -> retrieve uploaded part info */
357 if (ctx.errMsg[0] == 0) {
359 get_cloud_volume_parts_list(xfer->m_dcr, cloud_fname, &parts, ctx.errMsg);
360 for (int i=1; i <= parts.last_index() ; i++) {
361 cloud_part *p = (cloud_part *)parts.get(i);
363 xfer->m_res_size = p->size;
364 xfer->m_res_mtime = p->mtime;
365 break; /* not need to go further */
373 static S3Status getObjectDataCallback(int buf_len, const char *buf,
376 bacula_ctx *ctx = (bacula_ctx *)callbackCtx;
380 if (ctx->xfer->is_cancelled()) {
381 Mmsg(ctx->errMsg, _("Job cancelled.\n"));
382 return S3StatusAbortedByCallback;
384 /* Write buffer to output file */
385 wbytes = fwrite(buf, 1, buf_len, ctx->outfile);
388 Mmsg(ctx->errMsg, "%s Error writing output file: ERR=%s\n",
389 ctx->caller, be.bstrerror());
390 return S3StatusAbortedByCallback;
394 ctx->limit->control_bwlimit(wbytes);
396 return ((wbytes < buf_len) ?
397 S3StatusAbortedByCallback : S3StatusOK);
401 bool s3_driver::get_cloud_object(transfer *xfer, const char *cloud_fname, const char *cache_fname)
403 int64_t ifModifiedSince = -1;
404 int64_t ifNotModifiedSince = -1;
405 const char *ifMatch = 0;
406 const char *ifNotMatch = 0;
407 uint64_t startByte = 0;
408 uint64_t byteCount = 0;
409 bacula_ctx ctx(xfer);
410 ctx.limit = download_limit.use_bwlimit() ? &download_limit : NULL;
413 /* Initialize handlers */
414 S3GetConditions getConditions = {
420 S3GetObjectHandler getObjectHandler = {
421 { &responsePropertiesCallback, &responseCompleteCallback },
422 &getObjectDataCallback
426 /* see if cache file already exists */
428 if (lstat(cache_fname, &buf) == -1) {
429 ctx.outfile = bfopen(cache_fname, "w");
431 /* Exists so truncate and write from beginning */
432 ctx.outfile = bfopen(cache_fname, "r+");
437 Mmsg2(ctx.errMsg, "Could not open cache file %s. ERR=%s\n",
438 cache_fname, be.bstrerror());
443 ctx.caller = "S3_get_object";
444 S3_get_object(&s3ctx, cloud_fname, &getConditions, startByte,
445 byteCount, 0, &getObjectHandler, &ctx);
447 if (fclose(ctx.outfile) < 0) {
449 Mmsg2(ctx.errMsg, "Error closing cache file %s: %s\n",
450 cache_fname, be.bstrerror());
454 return (ctx.errMsg[0] == 0);
460 bool s3_driver::truncate_cloud_volume(DCR *dcr, const char *VolumeName, ilist *trunc_parts, POOLMEM *&err)
467 int last_index = (int)trunc_parts->last_index();
468 POOLMEM *cloud_fname = get_pool_memory(PM_FNAME);
469 for (int i=1; (i<=last_index); i++) {
470 if (!trunc_parts->get(i)) {
473 if (ctx.jcr->is_canceled()) {
474 Mmsg(err, _("Job cancelled.\n"));
477 /* don't forget to specify the volume name is the object path */
478 make_cloud_filename(cloud_fname, VolumeName, i);
479 Dmsg1(dbglvl, "Object to truncate: %s\n", cloud_fname);
480 ctx.caller = "S3_delete_object";
481 S3_delete_object(&s3ctx, cloud_fname, 0, &responseHandler, &ctx);
482 if (ctx.status != S3StatusOK) {
483 /* error message should have been filled within response cb */
489 free_pool_memory(cloud_fname);
490 bfree_and_null(ctx.nextMarker);
491 return (err[0] == 0);
494 void s3_driver::make_cloud_filename(POOLMEM *&filename,
495 const char *VolumeName, uint32_t apart)
499 dev->add_vol_and_part(filename, VolumeName, "part", apart);
500 Dmsg1(dbglvl, "make_cloud_filename: %s\n", filename);
503 bool s3_driver::retry_put_object(S3Status status)
506 status == S3StatusFailedToConnect ||
507 status == S3StatusConnectionFailed
512 * Copy a single cache part to the cloud
514 bool s3_driver::copy_cache_part_to_cloud(transfer *xfer)
517 POOLMEM *cloud_fname = get_pool_memory(PM_FNAME);
518 make_cloud_filename(cloud_fname, xfer->m_volume_name, xfer->m_part);
519 uint32_t retry = max_upload_retries;
520 S3Status status = S3StatusOK;
522 status = put_object(xfer, xfer->m_cache_fname, cloud_fname);
524 } while (retry_put_object(status) && (retry>0));
525 free_pool_memory(cloud_fname);
526 return (status == S3StatusOK);
530 * Copy a single object (part) from the cloud to the cache
532 bool s3_driver::copy_cloud_part_to_cache(transfer *xfer)
535 POOLMEM *cloud_fname = get_pool_memory(PM_FNAME);
536 make_cloud_filename(cloud_fname, xfer->m_volume_name, xfer->m_part);
537 bool rtn = get_cloud_object(xfer, cloud_fname, xfer->m_cache_fname);
538 free_pool_memory(cloud_fname);
543 * NOTE: See the SD Cloud resource in stored_conf.h
546 bool s3_driver::init(JCR *jcr, cloud_dev *adev, DEVRES *adevice)
550 dev = adev; /* copy cloud device pointer */
551 device = adevice; /* copy device resource pointer */
552 cloud = device->cloud; /* local pointer to cloud definition */
554 /* Setup bucket context for S3 lib */
555 s3ctx.hostName = cloud->host_name;
556 s3ctx.bucketName = cloud->bucket_name;
557 s3ctx.protocol = (S3Protocol)cloud->protocol;
558 s3ctx.uriStyle = (S3UriStyle)cloud->uri_style;
559 s3ctx.accessKeyId = cloud->access_key;
560 s3ctx.secretAccessKey = cloud->secret_key;
561 s3ctx.authRegion = cloud->region;
563 /* File I/O buffer */
564 buf_len = dev->max_block_size;
566 buf_len = DEFAULT_BLOCK_SIZE;
569 if ((status = S3_initialize("s3", S3_INIT_ALL, s3ctx.hostName)) != S3StatusOK) {
570 Mmsg1(dev->errmsg, "Failed to initialize S3 lib. ERR=%s\n", S3_get_status_name(status));
571 Qmsg1(jcr, M_FATAL, 0, "%s", dev->errmsg);
572 Tmsg1(0, "%s", dev->errmsg);
578 bool s3_driver::start_of_job(DCR *dcr)
580 Jmsg(dcr->jcr, M_INFO, 0, _("Using S3 cloud driver Host=%s Bucket=%s\n"),
581 s3ctx.hostName, s3ctx.bucketName);
585 bool s3_driver::end_of_job(DCR *dcr)
591 * Note, dcr may be NULL
593 bool s3_driver::term(DCR *dcr)
602 * libs3 callback for get_cloud_volume_parts_list()
604 static S3Status partslistBucketCallback(
606 const char *nextMarker,
608 const S3ListBucketContent *object,
609 int commonPrefixesCount,
610 const char **commonPrefixes,
613 bacula_ctx *ctx = (bacula_ctx *)callbackCtx;
616 for (int i = 0; ctx->parts && (i < numObj); i++) {
617 const S3ListBucketContent *obj = &(object[i]);
618 const char *ext=strstr(obj->key, "part.");
619 if (obj && ext!=NULL) {
620 cloud_part *part = (cloud_part*) malloc(sizeof(cloud_part));
622 part->index = atoi(&(ext[5]));
623 part->mtime = obj->lastModified;
624 part->size = obj->size;
625 ctx->parts->put(part->index, part);
629 ctx->isTruncated = isTruncated;
630 if (ctx->nextMarker) {
631 bfree_and_null(ctx->nextMarker);
634 ctx->nextMarker = bstrdup(nextMarker);
638 if (ctx->jcr->is_canceled()) {
639 Mmsg(ctx->errMsg, _("Job cancelled.\n"));
640 return S3StatusAbortedByCallback;
645 S3ListBucketHandler partslistBucketHandler =
648 &partslistBucketCallback
651 bool s3_driver::get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, ilist *parts, POOLMEM *&err)
656 if (!parts || strlen(VolumeName) == 0) {
657 pm_strcpy(err, "Invalid argument");
664 ctx.isTruncated = 1; /* pass into the while loop at least once */
665 ctx.caller = "S3_list_bucket";
666 while (ctx.isTruncated!=0) {
668 S3_list_bucket(&s3ctx, VolumeName, ctx.nextMarker, NULL, 0, NULL,
669 &partslistBucketHandler, &ctx);
670 if (ctx.status != S3StatusOK) {
671 pm_strcpy(err, S3Errors[ctx.status]);
672 bfree_and_null(ctx.nextMarker);
676 bfree_and_null(ctx.nextMarker);
682 * libs3 callback for get_cloud_volumes_list()
684 static S3Status volumeslistBucketCallback(
686 const char *nextMarker,
688 const S3ListBucketContent *object,
689 int commonPrefixesCount,
690 const char **commonPrefixes,
693 bacula_ctx *ctx = (bacula_ctx *)callbackCtx;
696 for (int i = 0; ctx->volumes && (i < commonPrefixesCount); i++) {
697 char *cp = bstrdup(commonPrefixes[i]);
698 cp[strlen(cp)-1] = 0;
699 ctx->volumes->append(cp);
702 ctx->isTruncated = isTruncated;
703 if (ctx->nextMarker) {
704 bfree_and_null(ctx->nextMarker);
707 ctx->nextMarker = bstrdup(nextMarker);
711 if (ctx->jcr->is_canceled()) {
712 Mmsg(ctx->errMsg, _("Job cancelled.\n"));
713 return S3StatusAbortedByCallback;
718 S3ListBucketHandler volumeslistBucketHandler =
721 &volumeslistBucketCallback
724 bool s3_driver::get_cloud_volumes_list(DCR *dcr, alist *volumes, POOLMEM *&err)
730 pm_strcpy(err, "Invalid argument");
735 ctx.volumes = volumes;
737 ctx.isTruncated = 1; /* pass into the while loop at least once */
738 ctx.caller = "S3_list_bucket";
739 while (ctx.isTruncated!=0) {
741 S3_list_bucket(&s3ctx, NULL, ctx.nextMarker, "/", 0, NULL,
742 &volumeslistBucketHandler, &ctx);
743 if (ctx.status != S3StatusOK) {
747 bfree_and_null(ctx.nextMarker);
748 return (err[0] == 0);
752 static S3Status listBucketCallback(
754 const char *nextMarker,
756 const S3ListBucketContent *contents,
757 int commonPrefixesCount,
758 const char **commonPrefixes,
761 S3ListBucketHandler listBucketHandler =
769 * List content of a bucket
771 static S3Status listBucketCallback(
773 const char *nextMarker,
775 const S3ListBucketContent *contents,
776 int commonPrefixesCount,
777 const char **commonPrefixes,
780 bacula_ctx *ctx = (bacula_ctx *)callbackCtx;
782 Pmsg1(000, "\n%-22s", " Object Name");
783 Pmsg2(000, " %-5s %-20s", "Size", " Last Modified");
784 Pmsg0(000, "\n---------------------- ----- --------------------\n");
785 print_hdr = false; /* print header once only */
788 for (int i = 0; i < numObj; i++) {
791 const S3ListBucketContent *content = &(contents[i]);
792 time_t t = (time_t) content->lastModified;
793 strftime(timebuf, sizeof(timebuf), "%Y-%m-%dT%H:%M:%SZ", gmtime(&t));
794 sprintf(sizebuf, "%5llu", (unsigned long long) content->size);
795 Pmsg3(000, "%-22s %s %s\n", content->key, sizebuf, timebuf);
798 if (ctx->jcr->is_canceled()) {
799 Mmsg(ctx->errMsg, _("Job cancelled.\n"));
800 return S3StatusAbortedByCallback;
806 #endif /* HAVE_LIBS3 */