Skip to content

upgrade for parallel put-object operations and add ruby version limits. #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ UploadPartCopy,AbortMultipartUpload,ListMultipartUpload,ListParts

- 下载 [oss-emulator](https://github.com/aliyun/oss-emulator)

- 运行。进入 *oss-emulator* 目录, 执行命令 `ruby bin/emulator -r store -p 8080`。
- 运行。进入 *oss-emulator* 目录, 执行命令 `ruby bin/emulator -r store`, 默认监听端口为80; 如果要指定其他端口(比如8080), 则执行命令`ruby bin/emulator -r store -p 8080`。

### Windows

Expand All @@ -73,13 +73,17 @@ UploadPartCopy,AbortMultipartUpload,ListMultipartUpload,ListParts

- 下载 [oss-emulator](https://github.com/aliyun/oss-emulator)

- 运行。进入 *oss-emulator* 目录, 执行命令 `ruby bin/emulator -r store -p 8080`。
- 运行。进入 *oss-emulator* 目录, 执行命令 `ruby bin/emulator -r store`, 默认监听端口为80; 如果要指定其他端口(比如8080), 则执行命令`ruby bin/emulator -r store -p 8080`。

## 使用示例

### ossutil

- 方法一:直接在命令行中携带参数, 其中endpoint设置为oss-emulator的IP; AccessKeyId和AccessKeySecret如下, 也可以不填。 如:
- 方法一:直接在命令行中携带参数, 其中endpoint设置为oss-emulator的IP; AccessKeyId和AccessKeySecret如下, 也可以不填; 默认连接到服务器端口80。 如:
```
ossutil -e http://192.168.0.1 -i AccessKeyId -k AccessKeySecret ls oss://bucket
```
如果要指定连接到其他服务器端口(比如8080), 则按照如下命令来执行:
```
ossutil -e http://192.168.0.1:8080 -i AccessKeyId -k AccessKeySecret ls oss://bucket
```
Expand All @@ -94,8 +98,16 @@ UploadPartCopy,AbortMultipartUpload,ListMultipartUpload,ListParts

### Python SDK

- *Python SDK* 连接 oss-emulator 代码的如下, 其中endpoint设置为 oss-emulator 的IP, AccessKeyId和AccessKeySecret如下, 也可以不填。
- *Python SDK* 连接 oss-emulator 代码的如下, 其中endpoint设置为 oss-emulator 的IP, AccessKeyId和AccessKeySecret如下, 也可以不填; 默认连接到服务器端口80。

```
import oss2

auth = oss2.Auth('AccessKeySecret', 'AccessKeySecret')
bucket = oss2.Bucket(auth, 'http://192.168.0.1', 'MyBucketName')
bucket.create_bucket()
```
- 如果要指定连接到其他服务器端口(比如8080), 则按照如下代码来编写:
```
import oss2

Expand Down
4 changes: 4 additions & 0 deletions lib/emulator/cmdline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ def server
Config.init()
Config.set_quiet_mode(options[:quiet])
Config.set_log_level(options[:loglevel].downcase) if options[:loglevel]

if RUBY_VERSION < '2.2.8' && RUBY_ENGINE == 'ruby'
abort("The Ruby version should be above 2.2.8 , current Ruby version is #{RUBY_VERSION} . ")
end

if options[:root]
root_dir = File.expand_path(options[:root])
Expand Down
1 change: 1 addition & 0 deletions lib/emulator/logging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module OssEmulator
# Log.set_quiet_mode(false)
# Log.info("something", 'green')
# Log.fatal("something", 'red')
# 打印调试信息 ruby emulator -r ../store -q false -L debug
class Log

LOG_DEFAULT_DIR = "../log"
Expand Down
15 changes: 8 additions & 7 deletions lib/emulator/multipart.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "rexml/document"
require 'emulator/config'
require 'emulator/util'
require 'emulator/request'
require 'emulator/response'
include REXML
include Comparable
Expand All @@ -15,9 +16,6 @@ def self.initiate_multipart_upload(bucket, object, response)
# NoSuchBucket
return if OssResponse.response_no_such_bucket(response, bucket)

# delete object
OssUtil.delete_object_file_and_dir(bucket, object)

dataset = {
cmd: Request::POST_INIT_MULTIPART_UPLOAD,
bucket: bucket,
Expand All @@ -30,9 +28,10 @@ def self.initiate_multipart_upload(bucket, object, response)

# UploadPart
def self.upload_part(req, query, request, response)
part_number = query['partNumber'].first
part_number = query['partNumber'].first
uploadId = query['uploadId'].first

Object.put_object(req.bucket, req.object, request, response, part_number)
Object.put_object(req.bucket, req.object, request, response, part_number, uploadId)
end

# CompleteMultipartUpload
Expand All @@ -48,14 +47,16 @@ def self.complete_multipart_upload(req, request, response)
end

object_dir = File.join(Config.store, req.bucket, req.object)
base_obj_part_filename = File.join(object_dir, Store::OBJECT_CONTENT_PREFIX)
temp_subdir = req.query_parser['uploadId'].first
temp_object_dir = File.join(object_dir, temp_subdir)
base_obj_part_filename = File.join(temp_object_dir, Store::OBJECT_CONTENT_PREFIX)
complete_file_size = 0
parts.each do |part|
part_filename = "#{base_obj_part_filename}#{part[:number]}"
complete_file_size += File.size(part_filename)
end

options = { :size => complete_file_size, :part_size => File.size(File.join(object_dir, Store::OBJECT_CONTENT)) }
options = { :temp_dir => temp_subdir, :size => complete_file_size, :part_size => File.size(File.join(temp_object_dir, Store::OBJECT_CONTENT)) }
dataset = OssUtil.put_object_metadata(req.bucket, req.object, request, options)

dataset[:cmd] = Request::POST_COMPLETE_MULTIPART_UPLOAD
Expand Down
22 changes: 16 additions & 6 deletions lib/emulator/object.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module OssEmulator
module Object

# PutObject
def self.put_object(bucket, object, request, response, part_number=nil)
def self.put_object(bucket, object, request, response, part_number=nil, uploadId=nil)
# NoSuchBucket
return if OssResponse.response_no_such_bucket(response, bucket)

Expand Down Expand Up @@ -44,13 +44,22 @@ def self.put_object(bucket, object, request, response, part_number=nil)
end

obj_dir = File.join(Config.store, bucket, object)
temp_subdir = ""
temp_obj_dir = ""
Log.debug("request.header['authorization']=#{request.header['authorization']}")
if part_number
object_content_filename = File.join(obj_dir, "#{Store::OBJECT_CONTENT_PREFIX}#{part_number}")
temp_subdir = uploadId
Log.debug("temp_subdir=#{temp_subdir}", 'green')
temp_obj_dir = File.join(obj_dir, temp_subdir)
object_content_filename = File.join(temp_obj_dir, "#{Store::OBJECT_CONTENT_PREFIX}#{part_number}")
else
OssUtil.delete_object_file_and_dir(bucket, object)
object_content_filename = File.join(obj_dir, Store::OBJECT_CONTENT)
#OssUtil.delete_object_file_and_dir(bucket, object)
temp_subdir = request.header['authorization'].first.split(':')[1].gsub(/[^a-zA-Z0-9]/, '')
Log.debug("temp_subdir=#{temp_subdir}", 'green')
temp_obj_dir = File.join(obj_dir, temp_subdir)
object_content_filename = File.join(temp_obj_dir, Store::OBJECT_CONTENT)
end
FileUtils.mkdir_p(obj_dir) unless File.exist?(obj_dir)
FileUtils.mkdir_p(temp_obj_dir) unless File.exist?(temp_obj_dir)
f_object_content = File.new(object_content_filename, 'a')
f_object_content.binmode

Expand Down Expand Up @@ -83,7 +92,8 @@ def self.put_object(bucket, object, request, response, part_number=nil)

dataset = {}
# put object metadata if not multipart upload
dataset = OssUtil.put_object_metadata(bucket, object, request) unless part_number
option = { temp_dir: temp_subdir }
dataset = OssUtil.put_object_metadata(bucket, object, request, option) unless part_number

dataset[:cmd] = Request::PUT_OBJECT
OssResponse.response_ok(response, dataset)
Expand Down
55 changes: 52 additions & 3 deletions lib/emulator/util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'time'
require 'fileutils'
require 'builder'
require "thread"
require 'emulator/config'

module OssEmulator
Expand Down Expand Up @@ -159,16 +160,20 @@ def self.get_bucket_list_objects(lbr, req)
end #get_bucket_list_objects

def self.delete_object_file_and_dir(bucket, object)
Log.debug("delete_object_file_and_dir", 'yellow')
bucket_dir = File.join(Config.store, bucket)
object_dir = File.join(bucket_dir, object)

object_metadata_filename = File.join(object_dir, Store::OBJECT_METADATA)
FileUtils.rm_rf(object_metadata_filename) if File.exist?(object_metadata_filename)
object_content_filename = File.join(object_dir, Store::OBJECT_CONTENT_PREFIX)
FileUtils.rm_rf("#{object_content_filename}*") if File.exist?(object_content_filename)

current_level_folder = object_dir
while File.exist?(current_level_folder)
return if current_level_folder==bucket_dir
Find.find(current_level_folder) do |filename|
Log.debug("filename", 'blue')
return if filename.include?(Store::OBJECT_METADATA)
end

Expand All @@ -179,8 +184,18 @@ def self.delete_object_file_and_dir(bucket, object)

def self.put_object_metadata(bucket, object, request, options = nil)
obj_dir = File.join(Config.store, bucket, object)
content_filename = File.join(obj_dir, Store::OBJECT_CONTENT)
Log.raise("put_object_metadata : Object content file does not exist in put_object_metadata : #{obj_dir}") unless File.exist?(content_filename)

temp_obj_dir = obj_dir
temp_subdir = ''
if options!=nil && options.include?(:temp_dir)
temp_subdir = options[:temp_dir] if options.include?(:temp_dir)
temp_obj_dir = File.join(obj_dir, temp_subdir)
Log.debug("obj_dir=#{obj_dir}, options=#{options}, temp_subdir=#{temp_subdir}")
end

content_filename = File.join(temp_obj_dir, Store::OBJECT_CONTENT)

Log.raise("put_object_metadata : Object content file does not exist in put_object_metadata : #{temp_obj_dir}") unless File.exist?(content_filename)

# construct metadata
metadata = {}
Expand Down Expand Up @@ -220,11 +235,45 @@ def self.put_object_metadata(bucket, object, request, options = nil)
end

# store metadata to file
metadata_file = File.join(obj_dir, Store::OBJECT_METADATA)
metadata_file = File.join(temp_obj_dir, Store::OBJECT_METADATA)
File.open(metadata_file, 'w') do |f|
f << YAML::dump(metadata)
end

# define the dynamic mutex var
if temp_subdir!=''
# bucket, object, str_var_mutex = "@mutex_#{temp_subdir}"
bucket_var = bucket.gsub(/\W/, '_')
object_var = object.gsub(/\W/, '_')
str_var_mutex = "@mutex_#{bucket_var}_#{object_var}"
mutex_set = instance_variable_set(str_var_mutex, Mutex.new)
thread_id = Thread.current
mutex_status = instance_variable_get(str_var_mutex).locked?
Log.debug("#{thread_id} :: Generate Mutex=#{mutex_set},status=#{mutex_status}; #{str_var_mutex}", 'blue')

instance_variable_get(str_var_mutex).synchronize{
# remove old object files
Log.debug("#{thread_id} :: #{str_var_mutex} : enter synchronize block and remove old object. ", 'blue')
list_oldfiles = Dir.entries(obj_dir)
list_oldfiles.each do |f|
if f.include?("_object_oss_aliyun_ALIBABA")
filepath = "#{obj_dir}/#{f}"
Log.debug(filepath, "yellow")
FileUtils.rm_rf(filepath)
end
end

# move object files from temp_dir to normal dir
Log.info("#{thread_id} :: #{str_var_mutex} : move temp_object to normal object. ", 'blue')
list_move = Dir.entries(temp_obj_dir)
list_move.each do |f|
FileUtils.mv "#{temp_obj_dir}/#{f}",obj_dir if !File.directory?(f)
end
FileUtils.rm_rf(temp_obj_dir) if File.exist?(temp_obj_dir)
Log.debug("#{thread_id} :: #{str_var_mutex} : quit synchronize block. ", 'blue')
}
end

metadata
end

Expand Down