Skip to content

Commit e6a6d8b

Browse files
committed
volume: add fsync threshold
volume server count how many object has been written, if the number of objects is bigger than threshold, sync the kernel cache.
1 parent 9af9f36 commit e6a6d8b

File tree

3 files changed

+25
-3
lines changed

3 files changed

+25
-3
lines changed

weed/command/volume.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414

1515
"github.com/chrislusf/seaweedfs/weed/glog"
1616
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
17-
"github.com/chrislusf/seaweedfs/weed/server"
17+
weed_server "github.com/chrislusf/seaweedfs/weed/server"
1818
"github.com/chrislusf/seaweedfs/weed/storage"
1919
"github.com/chrislusf/seaweedfs/weed/util"
2020
"google.golang.org/grpc/reflection"
@@ -44,6 +44,7 @@ type VolumeServerOptions struct {
4444
cpuProfile *string
4545
memProfile *string
4646
compactionMBPerSecond *int
47+
fsyncThreshold *int
4748
}
4849

4950
func init() {
@@ -64,6 +65,7 @@ func init() {
6465
v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file")
6566
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
6667
v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second")
68+
v.fsyncThreshold = cmdVolume.Flag.Int("fsyncThreshold", 0, "call fsync when have written fsyncThreshold objects, set to 0 will not call fsync")
6769
}
6870

6971
var cmdVolume = &Command{
@@ -156,6 +158,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
156158
v.whiteList,
157159
*v.fixJpgOrientation, *v.readRedirect,
158160
*v.compactionMBPerSecond,
161+
*v.fsyncThreshold,
159162
)
160163

161164
listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port)

weed/server/volume_server.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type VolumeServer struct {
2929
compactionBytePerSecond int64
3030
MetricsAddress string
3131
MetricsIntervalSec int
32+
fsyncThreshold int
3233
}
3334

3435
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
@@ -41,6 +42,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
4142
fixJpgOrientation bool,
4243
readRedirect bool,
4344
compactionMBPerSecond int,
45+
fsyncThreshold int,
4446
) *VolumeServer {
4547

4648
v := viper.GetViper()
@@ -62,9 +64,10 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
6264
ReadRedirect: readRedirect,
6365
grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "volume"),
6466
compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
67+
fsyncThreshold: fsyncThreshold,
6568
}
6669
vs.SeedMasterNodes = masterNodes
67-
vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, vs.needleMapKind)
70+
vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, vs.needleMapKind, vs.fsyncThreshold)
6871

6972
vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
7073

weed/storage/store.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package storage
22

33
import (
44
"fmt"
5+
"sync"
56
"sync/atomic"
67

78
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -35,14 +36,18 @@ type Store struct {
3536
DeletedVolumesChan chan master_pb.VolumeShortInformationMessage
3637
NewEcShardsChan chan master_pb.VolumeEcShardInformationMessage
3738
DeletedEcShardsChan chan master_pb.VolumeEcShardInformationMessage
39+
40+
fsyncThreshold int
41+
writtenFileCount int
42+
fsyncLock sync.Mutex
3843
}
3944

4045
func (s *Store) String() (str string) {
4146
str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.GetVolumeSizeLimit())
4247
return
4348
}
4449

45-
func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) {
50+
func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType, fsyncThreshold int) (s *Store) {
4651
s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind}
4752
s.Locations = make([]*DiskLocation, 0)
4853
for i := 0; i < len(dirnames); i++ {
@@ -57,6 +62,8 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di
5762
s.NewEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 3)
5863
s.DeletedEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 3)
5964

65+
s.fsyncThreshold = fsyncThreshold
66+
6067
return
6168
}
6269
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64) error {
@@ -219,6 +226,15 @@ func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uin
219226
}
220227
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(size, v.version)) {
221228
_, size, isUnchanged, err = v.writeNeedle(n)
229+
if s.fsyncThreshold != 0 {
230+
s.fsyncLock.Lock()
231+
s.writtenFileCount++
232+
if s.writtenFileCount > s.fsyncThreshold {
233+
v.dataFile.Sync()
234+
s.writtenFileCount = 0
235+
}
236+
s.fsyncLock.Unlock()
237+
}
222238
} else {
223239
err = fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
224240
}

0 commit comments

Comments
 (0)