Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions appveyor.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: 2.0.{build}
version: 2.{build}
platform: x64
branches:
only:
Expand Down Expand Up @@ -31,10 +31,11 @@ deploy:
- provider: GitHub
auth_token:
secure: ImwOgsH/e1F+reDfqNIvoQ773FZHsjQt/4znrFdxUVrs1VNpFK9IUaW4hIL/yl4c
release: 'v$(appveyor_build_version)'
description: 'This is Emitter broker build v$(appveyor_build_version).'
draft: true
release: 'master'
description: 'This is v$(appveyor_build_version) pre-release which is automatically built on every commit to master.'
draft: false
prerelease: true
force_update: true
artifact: binary
on:
branch: master
5 changes: 4 additions & 1 deletion internal/broker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,10 @@ func (s *Service) listen(addr *net.TCPAddr, conf *tls.Config) {

// Create new listener
logging.LogTarget("service", "starting the listener", addr)
l, err := listener.New(addr.String(), conf)
l, err := listener.New(addr.String(), listener.Config{
WriteRate: s.Config.Limit.WriteRate,
TLS: conf,
})
if err != nil {
panic(err)
}
Expand Down
11 changes: 8 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func New(filename string, stores ...cfg.SecretStore) *Config {
type Config struct {
ListenAddr string `json:"listen"` // The API port used for TCP & Websocket communication.
License string `json:"license"` // The license file to use for the broker.
Limit *LimitConfig `json:"limit,omitempty"` // Configuration for various limits such as message size.
Limit LimitConfig `json:"limit,omitempty"` // Configuration for various limits such as message size.
TLS *cfg.TLSConfig `json:"tls,omitempty"` // The API port used for Secure TCP & Websocket communication.
Cluster *ClusterConfig `json:"cluster,omitempty"` // The configuration for the clustering.
Storage *cfg.ProviderConfig `json:"storage,omitempty"` // The configuration for the storage provider.
Expand All @@ -101,7 +101,7 @@ type Config struct {

// MaxMessageBytes returns the configured max message size, must be smaller than 64K.
func (c *Config) MaxMessageBytes() int64 {
if c.Limit == nil || c.Limit.MessageSize <= 0 || c.Limit.MessageSize > maxMessageSize {
if c.Limit.MessageSize <= 0 || c.Limit.MessageSize > maxMessageSize {
return maxMessageSize
}
return int64(c.Limit.MessageSize)
Expand Down Expand Up @@ -159,8 +159,13 @@ type ClusterConfig struct {

// LimitConfig represents various limit configurations - such as message size.
type LimitConfig struct {
//Maximum message size allowed from/to the peer. Default if not specified is 64kB.

// Maximum message size allowed from/to the peer. Default if not specified is 64kB.
MessageSize int `json:"messageSize,omitempty"`

// The maximum socket write rate per connection. This does not limit QpS but instead
// can be used to scale throughput. Defaults to 60.
WriteRate int `json:"writeRate,omitempty"`
}

// LoadProvider loads a provider from the configuration or panics if the configuration is
Expand Down
8 changes: 6 additions & 2 deletions internal/network/listener/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,15 @@ type Conn struct {
}

// NewConn creates a new sniffed connection.
func newConn(c net.Conn) *Conn {
func newConn(c net.Conn, writeRate int) *Conn {
if writeRate <= 0 || writeRate > 1000 {
writeRate = 60
}

conn := &Conn{
socket: c,
reader: sniffer{source: c},
limit: rate.New(100, time.Second),
limit: rate.New(writeRate, time.Second),
}

// TODO: see if we can get rid of this goroutine per connection
Expand Down
2 changes: 1 addition & 1 deletion internal/network/listener/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

func TestConn(t *testing.T) {
conn := newConn(new(fakeConn))
conn := newConn(new(fakeConn), 0)
defer conn.Close()

assert.Equal(t, 0, conn.Len())
Expand Down
16 changes: 12 additions & 4 deletions internal/network/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,26 @@ var ErrListenerClosed = errListenerClosed("mux: listener closed")
// for readability of readTimeout
var noTimeout time.Duration

// Config represents the configuration of the listener.
type Config struct {
TLS *tls.Config // The TLS/SSL configuration.
WriteRate int // The maximum write rate (QPS) per connection.
}

// New announces on the local network address laddr. The syntax of laddr is
// "host:port", like "127.0.0.1:8080". If host is omitted, as in ":8080",
// New listens on all available interfaces instead of just the interface
// with the given host address. Listening on a hostname is not recommended
// because this creates a socket for at most one of its IP addresses.
func New(address string, config *tls.Config) (*Listener, error) {
func New(address string, config Config) (*Listener, error) {
l, err := net.Listen("tcp", address)
if err != nil {
return nil, err
}

// If we have a TLS configuration provided, wrap the listener in TLS
if config != nil {
l = tls.NewListener(l, config)
if config.TLS != nil {
l = tls.NewListener(l, config.TLS)
}

return &Listener{
Expand All @@ -88,6 +94,7 @@ func New(address string, config *tls.Config) (*Listener, error) {
errorHandler: func(_ error) bool { return true },
closing: make(chan struct{}),
readTimeout: noTimeout,
config: config,
}, nil
}

Expand All @@ -104,6 +111,7 @@ type Listener struct {
closing chan struct{}
matchers []processor
readTimeout time.Duration
config Config
}

// Accept waits for and returns the next connection to the listener.
Expand Down Expand Up @@ -167,7 +175,7 @@ func (m *Listener) Serve() error {
func (m *Listener) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()

muc := newConn(c)
muc := newConn(c, m.config.WriteRate)
if m.readTimeout > noTimeout {
_ = c.SetReadDeadline(time.Now().Add(m.readTimeout))
}
Expand Down
2 changes: 1 addition & 1 deletion internal/network/listener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func safeDial(t *testing.T, addr net.Addr) (*rpc.Client, func()) {
}

func testListener(t *testing.T) (*Listener, func()) {
l, err := New(":0", nil)
l, err := New(":0", Config{})
if err != nil {
t.Fatal(err)
}
Expand Down