Skip to content
This repository was archived by the owner on Feb 8, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions src/rgw/motr/gc/gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -258,18 +258,16 @@ int MotrGC::delete_obj_from_gc(motr_gc_obj_info ginfo) {
int MotrGC::process_parts(motr_gc_obj_info ginfo, std::time_t end_time) {
int rc = 0;
int max_entries = 10000;
int number_of_parts = 0;
int processed_parts = 0;
std::vector<std::string> keys(max_entries);
std::vector<bufferlist> vals(max_entries);

keys[0] = ginfo.name;
rc = store->next_query_by_name(ginfo.multipart_iname, keys, vals);
if (rc < 0) {
ldout(cct, 0) <<__func__<<": ERROR: next query failed. rc="
<< rc << dendl;
return rc;
}
number_of_parts = rc;
for (const auto& bl: vals) {
if (bl.length() == 0)
break;
Expand All @@ -290,15 +288,13 @@ int MotrGC::process_parts(motr_gc_obj_info ginfo, std::time_t end_time) {
part_name.append(buff);

std::string obj_fqdn = ginfo.name + "." + part_name;
motr_gc_obj_info gc_obj(tag, obj_fqdn, mobj, ginfo.deletion_time,
info.size, false, "");
motr_gc_obj_info gc_obj(tag, obj_fqdn, mobj, ginfo.deletion_time, info.size);
rc = enqueue(gc_obj);
if (rc < 0) {
ldout(cct, 0) <<__func__<< ": ERROR: failed to push "
<< obj_fqdn << "into GC queue " << dendl;
continue;
}
processed_parts++;
bufferlist bl_del;
rc = store->do_idx_op_by_name(ginfo.multipart_iname,
M0_IC_DEL, part_name, bl_del);
Expand All @@ -312,10 +308,6 @@ int MotrGC::process_parts(motr_gc_obj_info ginfo, std::time_t end_time) {
}
}

if (processed_parts == number_of_parts) {
store->delete_motr_idx_by_name(ginfo.multipart_iname);
}

return rc;
}

Expand Down
8 changes: 4 additions & 4 deletions src/rgw/motr/gc/gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ struct motr_gc_obj_info {
motr_gc_obj_info() {}
motr_gc_obj_info(const std::string& _tag, const std::string& _name, Meta& _mobj,
const std::time_t& _deletion_time, const std::uint64_t& _size,
bool _is_multipart, const std::string& _multipart_iname)
: tag(_tag), name(_name), mobj(_mobj),
deletion_time(_deletion_time), size(_size),
is_multipart(_is_multipart), multipart_iname(_multipart_iname) {}
const std::string& _multipart_iname="")
: tag(_tag), name(_name), mobj(_mobj), deletion_time(_deletion_time),
size(_size), is_multipart(_multipart_iname != ""),
multipart_iname(_multipart_iname) {}

void encode(bufferlist &bl) const {
ENCODE_START(11, 2, bl);
Expand Down
127 changes: 60 additions & 67 deletions src/rgw/rgw_sal_motr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -870,8 +870,14 @@ int MotrBucket::remove_bucket(const DoutPrefixProvider *dpp, bool delete_childre
}

// 3. Remove mp index??
string bucket_multipart_iname = "motr.rgw.bucket." + tenant_bkt_name + ".multiparts";
ret = store->delete_motr_idx_by_name(bucket_multipart_iname);
string iname = "motr.rgw.bucket." + tenant_bkt_name + ".multiparts.in-progress";
ret = store->delete_motr_idx_by_name(iname);
if (ret < 0) {
ldpp_dout(dpp, LOG_ERROR) <<__func__ << ": ERROR: failed to remove multipart.in-progress index rc=" << ret << dendl;
return ret;
}
iname = "motr.rgw.bucket." + tenant_bkt_name + ".multiparts";
ret = store->delete_motr_idx_by_name(iname);
if (ret < 0) {
ldpp_dout(dpp, LOG_ERROR) <<__func__ << ": ERROR: failed to remove multipart index rc=" << ret << dendl;
return ret;
Expand Down Expand Up @@ -1089,18 +1095,30 @@ int MotrBucket::create_multipart_indices()
int rc;
string tenant_bkt_name = get_bucket_name(info.bucket.tenant, info.bucket.name);

// Bucket multipart index stores in-progress multipart uploads.
// There are two additional indexes per bucket for multiparts:
// one for in-progress uploads, another one for completed uploads.

// Key is the object name + upload_id, value is a rgw_bucket_dir_entry.
// An entry is inserted when a multipart upload is initialised (
// MotrMultipartUpload::init()) and will be removed when the upload
// is completed (MotrMultipartUpload::complete()).
// MotrBucket::list_multiparts() will scan this index to return all
// in-progress multipart uploads in the bucket.
string bucket_multipart_iname = "motr.rgw.bucket." + tenant_bkt_name + ".multiparts";
rc = store->create_motr_idx_by_name(bucket_multipart_iname);
string iname = "motr.rgw.bucket." + tenant_bkt_name + ".multiparts.in-progress";
rc = store->create_motr_idx_by_name(iname);
if (rc < 0) {
ldout(store->cctx, LOG_ERROR) <<__func__
<< ": ERROR: failed to create bucket in-progress multiparts index "
<< iname << ", rc=" << rc << dendl;
return rc;
}

iname = "motr.rgw.bucket." + tenant_bkt_name + ".multiparts";
rc = store->create_motr_idx_by_name(iname);
if (rc < 0) {
ldout(store->cctx, LOG_ERROR) <<__func__ << ": ERROR: failed to create bucket multipart index "
<< bucket_multipart_iname << dendl;
ldout(store->cctx, LOG_ERROR) <<__func__
<< ": ERROR: failed to create bucket multiparts index "
<< iname << ", rc=" << rc << dendl;
return rc;
}

Expand Down Expand Up @@ -1448,7 +1466,7 @@ int MotrBucket::list_multiparts(const DoutPrefixProvider *dpp,
string tenant_bkt_name = get_bucket_name(this->get_tenant(), this->get_name());

string bucket_multipart_iname =
"motr.rgw.bucket." + tenant_bkt_name + ".multiparts";
"motr.rgw.bucket." + tenant_bkt_name + ".multiparts.in-progress";
key_vec[0].clear();
key_vec[0].assign(marker.begin(), marker.end());
rc = store->next_query_by_name(bucket_multipart_iname, key_vec, val_vec, prefix, delim);
Expand Down Expand Up @@ -2226,7 +2244,8 @@ int MotrObject::MotrDeleteOp::create_delete_marker(const DoutPrefixProvider* dpp
int MotrObject::remove_mobj_and_index_entry(
const DoutPrefixProvider* dpp, rgw_bucket_dir_entry& ent,
std::string delete_key, std::string bucket_index_iname,
std::string bucket_name) {
std::string bucket_name)
{
int rc;
bufferlist bl;
uint64_t size_rounded = 0;
Expand All @@ -2242,14 +2261,12 @@ int MotrObject::remove_mobj_and_index_entry(
if (rc < 0) {
ldpp_dout(dpp, 0) <<__func__<< ": ERROR: get_upload_id failed. rc=" << rc << dendl;
} else {
std::string obj_fqdn = bucket_name + "/" + delete_key;
std::string obj_part_iname = "motr.rgw.object." + bucket_name + "." +
this->get_name() + "." + upload_id +
".parts";
ldpp_dout(dpp, 20) << __func__ << ": object part index=" << obj_part_iname << dendl;
std::string obj_fqdn = this->get_name() + "." + upload_id;
std::string iname = "motr.rgw.bucket." + bucket_name + ".multiparts";
ldpp_dout(dpp, 20) << __func__ << ": object part index=" << iname << dendl;
::Meta *mobj = reinterpret_cast<::Meta*>(&this->meta);
motr_gc_obj_info gc_obj(upload_id, obj_fqdn, *mobj, std::time(nullptr),
ent.meta.size, true, obj_part_iname);
ent.meta.size, iname);
rc = store->get_gc()->enqueue(gc_obj);
if (rc == 0) {
pushed_to_gc = true;
Expand Down Expand Up @@ -2280,7 +2297,7 @@ int MotrObject::remove_mobj_and_index_entry(
std::string obj_fqdn = bucket_name + "/" + delete_key;
::Meta *mobj = reinterpret_cast<::Meta*>(&this->meta);
motr_gc_obj_info gc_obj(tag, obj_fqdn, *mobj, std::time(nullptr),
ent.meta.size, false, "");
ent.meta.size);
rc = store->get_gc()->enqueue(gc_obj);
if (rc == 0) {
pushed_to_gc = true;
Expand Down Expand Up @@ -2710,10 +2727,9 @@ int MotrObject::create_mobj(const DoutPrefixProvider *dpp, uint64_t sz)
}
expected_obj_size = sz;
chunk_io_sz = expected_obj_size;
if (expected_obj_size > MAX_ACC_SIZE) {
if (expected_obj_size > MAX_ACC_SIZE)
// Cap it to MAX_ACC_SIZE
chunk_io_sz = MAX_ACC_SIZE;
}

ldpp_dout(dpp, 20) <<__func__ << ": key=" << this->get_key().to_str()
<< " size=" << sz << " meta:oid=[0x" << std::hex
Expand Down Expand Up @@ -2941,11 +2957,10 @@ int MotrObject::write_mobj(const DoutPrefixProvider *dpp, bufferlist&& in_buffer
available_data = io_ctxt.total_bufer_sz;
}
bs = this->get_optimal_bs(chunk_io_sz);
if (bs < chunk_io_sz) {
if (bs < chunk_io_sz)
chunk_io_sz = bs;
}
int64_t remaining_bytes =
expected_obj_size - processed_bytes;

int64_t remaining_bytes = expected_obj_size - processed_bytes;
// Check if this is the last io of the original object size
if (remaining_bytes <= 0)
last_io = true;
Expand Down Expand Up @@ -3988,10 +4003,7 @@ int MotrMultipartUpload::delete_parts(const DoutPrefixProvider *dpp, std::string
<< ", rc=" << rc << dendl;
}

// Delete object part index.
string obj_part_iname = "motr.rgw.object." + tenant_bkt_name + "." + mp_obj.get_key() +
"." + upload_id + ".parts";
return store->delete_motr_idx_by_name(obj_part_iname);
return rc;
}

int MotrMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct,
Expand All @@ -4004,7 +4016,7 @@ int MotrMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct,
meta_obj = get_meta_obj();
string tenant_bkt_name = get_bucket_name(meta_obj->get_bucket()->get_tenant(), meta_obj->get_bucket()->get_name());
string bucket_multipart_iname =
"motr.rgw.bucket." + tenant_bkt_name + ".multiparts";
"motr.rgw.bucket." + tenant_bkt_name + ".multiparts.in-progress";
rc = store->do_idx_op_by_name(bucket_multipart_iname,
M0_IC_GET, meta_obj->get_key().to_str(), bl);
if (rc < 0) {
Expand Down Expand Up @@ -4065,6 +4077,7 @@ int MotrMultipartUpload::init(const DoutPrefixProvider *dpp, optional_yield y,

owner = _owner;

string tenant_bkt_name = get_bucket_name(bucket->get_tenant(), bucket->get_name());
do {
char buf[33];
string tmp_obj_name;
Expand Down Expand Up @@ -4110,9 +4123,8 @@ int MotrMultipartUpload::init(const DoutPrefixProvider *dpp, optional_yield y,
encode(attrs, bl);
// Insert an entry into bucket multipart index so it is not shown
// when listing a bucket.
string tenant_bkt_name = get_bucket_name(obj->get_bucket()->get_tenant(), obj->get_bucket()->get_name());
string bucket_multipart_iname =
"motr.rgw.bucket." + tenant_bkt_name + ".multiparts";
"motr.rgw.bucket." + tenant_bkt_name + ".multiparts.in-progress";
rc = store->do_idx_op_by_name(bucket_multipart_iname,
M0_IC_PUT, obj->get_key().to_str(), bl);
} while (rc == -EEXIST);
Expand All @@ -4121,22 +4133,6 @@ int MotrMultipartUpload::init(const DoutPrefixProvider *dpp, optional_yield y,
ldpp_dout(dpp, LOG_ERROR) <<__func__ << ": ERROR: index opration failed, M0_IC_PUT rc="<< rc << dendl;
return rc;
}
string tenant_bkt_name = get_bucket_name(bucket->get_tenant(), bucket->get_name());
//This is multipart init, you will always have upload id here.
string upload_id = get_upload_id();

// Create object part index.
string obj_part_iname = "motr.rgw.object." + tenant_bkt_name + "." + mp_obj.get_key() +
"." + upload_id + ".parts";
ldpp_dout(dpp, 20) <<__func__ << ": object part index=" << obj_part_iname << dendl;
rc = store->create_motr_idx_by_name(obj_part_iname);
if (rc == -EEXIST)
rc = 0;
if (rc < 0) {
// TODO: clean the bucket index entry
ldpp_dout(dpp, LOG_ERROR) <<__func__ << ": ERROR: Failed to create object multipart index " << obj_part_iname << dendl;
return rc;
}

// Add one to the object_count of the current bucket stats
// Size will be added when parts are uploaded
Expand Down Expand Up @@ -4187,15 +4183,14 @@ int MotrMultipartUpload::list_parts(const DoutPrefixProvider *dpp, CephContext *
}
}

string obj_part_iname = "motr.rgw.object." + tenant_bkt_name + "." + mp_obj.get_key() +
"." + upload_id + ".parts";
ldpp_dout(dpp, 20) <<__func__ << ": object part index=" << obj_part_iname << dendl;
string iname = "motr.rgw.bucket." + tenant_bkt_name + ".multiparts";
ldpp_dout(dpp, 20) <<__func__ << ": object part index=" << iname << dendl;
key_vec[0].clear();
key_vec[0] = "part.";
key_vec[0] = mp_obj.get_key() + "." + upload_id;
char buf[32];
snprintf(buf, sizeof(buf), "%08d", marker + 1);
snprintf(buf, sizeof(buf), ".%08d", marker + 1);
key_vec[0].append(buf);
rc = store->next_query_by_name(obj_part_iname, key_vec, val_vec);
rc = store->next_query_by_name(iname, key_vec, val_vec);
if (rc < 0) {
ldpp_dout(dpp, LOG_ERROR) <<__func__ << ": ERROR: NEXT query failed. rc=" << rc << dendl;
return rc;
Expand Down Expand Up @@ -4390,7 +4385,7 @@ int MotrMultipartUpload::complete(const DoutPrefixProvider *dpp,
meta_obj = get_meta_obj();
string tenant_bkt_name = get_bucket_name(meta_obj->get_bucket()->get_tenant(), meta_obj->get_bucket()->get_name());
string bucket_multipart_iname =
"motr.rgw.bucket." + tenant_bkt_name + ".multiparts";
"motr.rgw.bucket." + tenant_bkt_name + ".multiparts.in-progress";
rc = this->store->do_idx_op_by_name(bucket_multipart_iname,
M0_IC_GET, meta_obj->get_key().to_str(), bl);
ldpp_dout(dpp, 20) <<__func__ << ": read entry from bucket multipart index rc=" << rc << dendl;
Expand Down Expand Up @@ -4494,7 +4489,7 @@ int MotrMultipartUpload::get_info(const DoutPrefixProvider *dpp, optional_yield
bufferlist bl;
string tenant_bkt_name = get_bucket_name(meta_obj->get_bucket()->get_tenant(), meta_obj->get_bucket()->get_name());
string bucket_multipart_iname =
"motr.rgw.bucket." + tenant_bkt_name + ".multiparts";
"motr.rgw.bucket." + tenant_bkt_name + ".multiparts.in-progress";
int rc = this->store->do_idx_op_by_name(bucket_multipart_iname,
M0_IC_GET, meta_obj->get_key().to_str(), bl);
if (rc < 0) {
Expand Down Expand Up @@ -4621,21 +4616,19 @@ int MotrMultipartWriter::complete(size_t accounted_size, const std::string& etag
encode(attrs, bl);
part_obj->meta.encode(bl);

string p = "part.";
char buf[32];
snprintf(buf, sizeof(buf), "%08d", (int)part_num);
p.append(buf);
string tenant_bkt_name = get_bucket_name(head_obj->get_bucket()->get_tenant(), head_obj->get_bucket()->get_name());
//This is a MultipartComplete operation so this should always have valid upload id.
string upload_id_str = upload_id;
string obj_part_iname = "motr.rgw.object." + tenant_bkt_name + "." +
head_obj->get_key().to_str() + "." + upload_id_str + ".parts";
ldpp_dout(dpp, 20) <<__func__ << ": object part index=" << obj_part_iname << dendl;
string part = head_obj->get_name() + "." + upload_id;
char buf[32];
snprintf(buf, sizeof(buf), ".%08d", (int)part_num);
part.append(buf);

// Before updating object part index with entry for new part, check if
// old part exists. Perform M0_IC_GET operation on object part index.
string tenant_bkt_name = get_bucket_name(head_obj->get_bucket()->get_tenant(),
head_obj->get_bucket()->get_name());
string iname = "motr.rgw.bucket." + tenant_bkt_name + ".multiparts";
bufferlist old_part_check_bl;
rc = store->do_idx_op_by_name(obj_part_iname, M0_IC_GET, p, old_part_check_bl);
rc = store->do_idx_op_by_name(iname, M0_IC_GET, part, old_part_check_bl);
if (rc == 0 && old_part_check_bl.length() > 0) {
// Old part exists. Try to delete it.
RGWUploadPartInfo old_part_info;
Expand All @@ -4644,7 +4637,7 @@ int MotrMultipartWriter::complete(size_t accounted_size, const std::string& etag
head_obj->get_key().to_str() +
".part." + std::to_string(part_num);
std::unique_ptr<MotrObject> old_part_obj =
std::make_unique<MotrObject>(this->store, rgw_obj_key(part_obj_name),head_obj->get_bucket());
std::make_unique<MotrObject>(this->store, rgw_obj_key(part_obj_name), head_obj->get_bucket());
if (old_part_obj == nullptr)
return -ENOMEM;

Expand All @@ -4661,14 +4654,14 @@ int MotrMultipartWriter::complete(size_t accounted_size, const std::string& etag
// Delete old object
rc = old_mobj->delete_mobj(dpp);
if (rc == 0) {
ldpp_dout(dpp, 20) <<__func__ << ": Old part [" << p << "] deleted succesfully" << dendl;
ldpp_dout(dpp, 20) <<__func__ << ": Old part [" << part << "] deleted succesfully" << dendl;
} else {
ldpp_dout(dpp, 0) <<__func__ << ": Failed to delete old part [" << p << "]. Error=" << rc << dendl;
ldpp_dout(dpp, 0) <<__func__ << ": Failed to delete old part [" << part << "], rc=" << rc << dendl;
return rc;
}
}

rc = store->do_idx_op_by_name(obj_part_iname, M0_IC_PUT, p, bl);
rc = store->do_idx_op_by_name(iname, M0_IC_PUT, part, bl);
if (rc < 0) {
ldpp_dout(dpp, 0) <<__func__ << ": failed to add part obj in part index, rc=" << rc << dendl;
return rc == -ENOENT ? -ERR_NO_SUCH_UPLOAD : rc;
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_sal_motr.h
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ class MotrObject : public Object {
uint64_t part_num;
// Object size as available from Content-Length header
uint64_t expected_obj_size = 0;
uint64_t chunk_io_sz = 0;
int64_t chunk_io_sz = 0;
// Total Number of bytes processed so far
uint64_t processed_bytes = 0;
struct AccumulateIOCtxt io_ctxt = {};
Expand Down