Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion actor/actor_ref.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func toActorRef(actorRef *internalpb.Actor) ActorRef {
name: addr.Name(),
kind: actorRef.GetType(),
address: addr,
isSingleton: actorRef.GetIsSingleton(),
isSingleton: actorRef.Singleton != nil,
relocatable: actorRef.GetRelocatable(),
}
}
Expand Down
63 changes: 49 additions & 14 deletions actor/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,7 @@ type ActorSystem interface {
removePeerActor(ctx context.Context, actorName string) error
removePeerGrain(ctx context.Context, grainID *internalpb.GrainId) error
passivationManager() *passivationManager
removeNodeLeft(address string)
}

// ActorSystem represent a collection of actors on a given node
Expand Down Expand Up @@ -2097,11 +2098,35 @@ func (x *actorSystem) RemoteSpawn(ctx context.Context, request *connect.Request[
return nil, connect.NewError(connect.CodeInternal, err)
}

if msg.GetIsSingleton() {
if err := x.SpawnSingleton(ctx, msg.GetActorName(), actor); err != nil {
wrapSpawnErr := func(err error) error {
if errors.Is(err, gerrors.ErrActorAlreadyExists) || errors.Is(err, gerrors.ErrSingletonAlreadyExists) {
return connect.NewError(connect.CodeAlreadyExists, err)
}
if cluster.IsQuorumError(err) {
return connect.NewError(connect.CodeUnavailable, cluster.NormalizeQuorumError(err))
}
return connect.NewError(connect.CodeInternal, err)
}

if msg.GetSingleton() != nil {
// define singleton options
singletonOpts := []ClusterSingletonOption{
WithSingletonSpawnTimeout(msg.GetSingleton().GetSpawnTimeout().AsDuration()),
WithSingletonSpawnWaitInterval(msg.GetSingleton().GetWaitInterval().AsDuration()),
WithSingletonSpawnRetries(int(msg.GetSingleton().GetMaxRetries())),
}

if msg.GetRole() != "" {
singletonOpts = append(singletonOpts, WithSingletonRole(msg.GetRole()))
}

if err := x.SpawnSingleton(ctx, msg.GetActorName(), actor, singletonOpts...); err != nil {
logger.Errorf("failed to create actor=(%s) on [host=%s, port=%d]: reason: (%v)", msg.GetActorName(), msg.GetHost(), msg.GetPort(), err)
return nil, connect.NewError(connect.CodeInternal, err)
return nil, wrapSpawnErr(err)
}

logger.Infof("actor=(%s) successfully created on [host=%s, port=%d]", msg.GetActorName(), msg.GetHost(), msg.GetPort())
return connect.NewResponse(new(internalpb.RemoteSpawnResponse)), nil
}

opts := []SpawnOption{
Expand Down Expand Up @@ -2138,7 +2163,7 @@ func (x *actorSystem) RemoteSpawn(ctx context.Context, request *connect.Request[

if _, err = x.Spawn(ctx, msg.GetActorName(), actor, opts...); err != nil {
logger.Errorf("failed to create actor=(%s) on [host=%s, port=%d]: reason: (%v)", msg.GetActorName(), msg.GetHost(), msg.GetPort(), err)
return nil, connect.NewError(connect.CodeInternal, err)
return nil, wrapSpawnErr(err)
}

logger.Infof("actor=(%s) successfully created on [host=%s, port=%d]", msg.GetActorName(), msg.GetHost(), msg.GetPort())
Expand Down Expand Up @@ -2589,6 +2614,7 @@ func (x *actorSystem) setupCluster() error {
cluster.WithDataTableSize(x.clusterConfig.tableSize),
cluster.WithBootstrapTimeout(x.clusterConfig.bootstrapTimeout),
cluster.WithRoutingTableInterval(x.clusterConfig.clusterStateSyncInterval),
cluster.WithBalancerInterval(x.clusterConfig.clusterBalancerInterval),
)

for _, kind := range x.clusterConfig.kinds.Values() {
Expand Down Expand Up @@ -2930,7 +2956,7 @@ func (x *actorSystem) replicateActors() {
if !x.isStopping() && x.InCluster() {
ctx := context.Background()
cluster := x.getCluster()
if actor.GetIsSingleton() {
if actor.GetSingleton() != nil {
kind := actor.GetType()
role := actor.GetRole()
if role != "" {
Expand Down Expand Up @@ -3440,15 +3466,6 @@ func (x *actorSystem) spawnDeadletter(ctx context.Context) error {
func (x *actorSystem) checkSpawnPreconditions(ctx context.Context, actorName string, actor Actor, singleton bool, singletonRole *string) error {
// check the existence of the actor given the kind prior to creating it
if x.clusterEnabled.Load() {
// here we make sure in cluster mode that the given actor is uniquely created
exists, err := x.cluster.ActorExists(ctx, actorName)
if err != nil {
return err
}
if exists {
return gerrors.NewErrActorAlreadyExists(actorName)
}

// a singleton actor must only have one instance at a given time of its kind
// in the whole cluster
if singleton {
Expand All @@ -3466,7 +3483,18 @@ func (x *actorSystem) checkSpawnPreconditions(ctx context.Context, actorName str
if id != "" {
return gerrors.ErrSingletonAlreadyExists
}
}

// here we make sure in cluster mode that the given actor is uniquely created
exists, err := x.cluster.ActorExists(ctx, actorName)
if err != nil {
return err
}
if exists {
return gerrors.NewErrActorAlreadyExists(actorName)
}

if singleton {
return nil
}
}
Expand Down Expand Up @@ -3812,6 +3840,13 @@ func (x *actorSystem) passivationManager() *passivationManager {
return passivator
}

// removeNodeLeft removes the given left node from the actor system
func (x *actorSystem) removeNodeLeft(address string) {
x.locker.RLock()
x.rebalancedNodes.Remove(address)
x.locker.RUnlock()
}

func (x *actorSystem) registerMetrics() error {
if x.metricProvider != nil && x.metricProvider.Meter() != nil {
meter := x.metricProvider.Meter()
Expand Down
131 changes: 115 additions & 16 deletions actor/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/tochemey/olric"
"github.com/travisjeffery/go-dynaport"
"go.opentelemetry.io/otel"
noopmetric "go.opentelemetry.io/otel/metric/noop"
Expand All @@ -57,6 +58,7 @@ import (
"golang.org/x/net/http2/h2c"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/tochemey/goakt/v3/address"
Expand Down Expand Up @@ -1758,7 +1760,7 @@ func TestActorSystem(t *testing.T) {
request := &remote.SpawnRequest{
Name: actorName,
Kind: "actor.exchanger",
Singleton: false,
Singleton: nil,
Relocatable: true,
}
err = remoting.RemoteSpawn(ctx, host, remotingPort, request)
Expand Down Expand Up @@ -1792,6 +1794,99 @@ func TestActorSystem(t *testing.T) {
)
})

t.Run("RemoteSpawn maps already exists errors", func(t *testing.T) {
ctx := context.Background()
system := MockSingletonClusterReadyActorSystem(t)
system.remoteConfig = remote.NewConfig("127.0.0.1", 9001)

clusterMock := mockscluster.NewCluster(t)
system.locker.Lock()
system.cluster = clusterMock
system.locker.Unlock()

system.registry.Register(new(MockActor))
actorType := registry.Name(new(MockActor))

clusterMock.EXPECT().ActorExists(mock.Anything, "actor").Return(true, nil).Once()

request := connect.NewRequest(&internalpb.RemoteSpawnRequest{
Host: system.remoteConfig.BindAddr(),
Port: int32(system.remoteConfig.BindPort()),
ActorName: "actor",
ActorType: actorType,
Singleton: nil,
Relocatable: true,
})

_, err := system.RemoteSpawn(ctx, request)
require.Error(t, err)
require.Equal(t, connect.CodeAlreadyExists, connect.CodeOf(err))
require.ErrorIs(t, err, gerrors.ErrActorAlreadyExists)
})

t.Run("RemoteSpawn maps quorum errors", func(t *testing.T) {
ctx := context.Background()
system := MockSingletonClusterReadyActorSystem(t)
system.remoteConfig = remote.NewConfig("127.0.0.1", 9002)

clusterMock := mockscluster.NewCluster(t)
system.locker.Lock()
system.cluster = clusterMock
system.locker.Unlock()

system.registry.Register(new(MockActor))
actorType := registry.Name(new(MockActor))

clusterMock.EXPECT().ActorExists(mock.Anything, "actor").Return(false, olric.ErrWriteQuorum).Once()

request := connect.NewRequest(&internalpb.RemoteSpawnRequest{
Host: system.remoteConfig.BindAddr(),
Port: int32(system.remoteConfig.BindPort()),
ActorName: "actor",
ActorType: actorType,
Singleton: nil,
Relocatable: true,
})

_, err := system.RemoteSpawn(ctx, request)
require.Error(t, err)
require.Equal(t, connect.CodeUnavailable, connect.CodeOf(err))
require.ErrorIs(t, err, gerrors.ErrWriteQuorum)
})
t.Run("RemoteSpawn singleton short-circuits standard spawn", func(t *testing.T) {
ctx := context.Background()
system := MockSingletonClusterReadyActorSystem(t)
system.remoteConfig = remote.NewConfig("127.0.0.1", 9003)

clusterMock := mockscluster.NewCluster(t)
system.locker.Lock()
system.cluster = clusterMock
system.locker.Unlock()

system.registry.Register(new(MockActor))
actorType := registry.Name(new(MockActor))

clusterMock.EXPECT().IsLeader(mock.Anything).Return(true).Once()
clusterMock.EXPECT().LookupKind(mock.Anything, actorType).Return("", nil).Once()
clusterMock.EXPECT().ActorExists(mock.Anything, "singleton").Return(false, nil).Once()

request := connect.NewRequest(&internalpb.RemoteSpawnRequest{
Host: system.remoteConfig.BindAddr(),
Port: int32(system.remoteConfig.BindPort()),
ActorName: "singleton",
ActorType: actorType,
Singleton: &internalpb.SingletonSpec{
SpawnTimeout: durationpb.New(2 * time.Second),
WaitInterval: durationpb.New(200 * time.Millisecond),
MaxRetries: int32(2),
},
Relocatable: true,
})

_, err := system.RemoteSpawn(ctx, request)
require.NoError(t, err)
})

t.Run("With CoordinatedShutdown with ShouldFail strategy", func(t *testing.T) {
ctx := context.TODO()
executionCount := atomic.NewInt32(0)
Expand Down Expand Up @@ -6120,7 +6215,7 @@ func TestRemotingSpawn(t *testing.T) {
request := &remote.SpawnRequest{
Name: actorName,
Kind: "actor.exchanger",
Singleton: false,
Singleton: nil,
Relocatable: false,
EnableStashing: false,
Dependencies: []extension.Dependency{dependency},
Expand Down Expand Up @@ -6238,7 +6333,7 @@ func TestRemotingSpawn(t *testing.T) {
request := &remote.SpawnRequest{
Name: actorName,
Kind: registry.Name(actor),
Singleton: false,
Singleton: nil,
Relocatable: false,
EnableStashing: true,
Dependencies: []extension.Dependency{dependency},
Expand Down Expand Up @@ -6300,7 +6395,7 @@ func TestRemotingSpawn(t *testing.T) {
request := &remote.SpawnRequest{
Name: actorName,
Kind: "actor.exchanger",
Singleton: false,
Singleton: nil,
Relocatable: false,
EnableStashing: false,
Dependencies: []extension.Dependency{new(MockDependency)},
Expand Down Expand Up @@ -6348,7 +6443,7 @@ func TestRemotingSpawn(t *testing.T) {
request := &remote.SpawnRequest{
Name: actorName,
Kind: "actor.exchanger",
Singleton: false,
Singleton: nil,
Relocatable: false,
}
err = remoting.RemoteSpawn(ctx, sys.Host(), int(sys.Port()), request)
Expand Down Expand Up @@ -6506,7 +6601,7 @@ func TestRemotingSpawn(t *testing.T) {
request := &remote.SpawnRequest{
Name: actorName,
Kind: "actor.exchanger",
Singleton: false,
Singleton: nil,
Relocatable: false,
}
err = remoting.RemoteSpawn(ctx, host, remotingPort, request)
Expand Down Expand Up @@ -6555,7 +6650,7 @@ func TestRemotingSpawn(t *testing.T) {
request := &remote.SpawnRequest{
Name: actorName,
Kind: "actor.exchanger",
Singleton: false,
Singleton: nil,
Relocatable: false,
}
err = remoting.RemoteSpawn(ctx, host, remotingPort, request)
Expand Down Expand Up @@ -6638,7 +6733,7 @@ func TestRemotingSpawn(t *testing.T) {
request := &remote.SpawnRequest{
Name: actorName,
Kind: "actor.exchanger",
Singleton: false,
Singleton: nil,
Relocatable: false,
}
err = remoting.RemoteSpawn(ctx, host, remotingPort, request)
Expand Down Expand Up @@ -6713,7 +6808,7 @@ func TestRemotingSpawn(t *testing.T) {
request := &remote.SpawnRequest{
Name: "",
Kind: "actor.exchanger",
Singleton: false,
Singleton: nil,
Relocatable: false,
}
err = remoting.RemoteSpawn(ctx, host, remotingPort, request)
Expand Down Expand Up @@ -6758,7 +6853,7 @@ func TestRemotingSpawn(t *testing.T) {
request := &remote.SpawnRequest{
Name: actorName,
Kind: "actor.exchanger",
Singleton: false,
Singleton: nil,
Relocatable: false,
EnableStashing: false,
Dependencies: []extension.Dependency{dependency},
Expand Down Expand Up @@ -6817,7 +6912,7 @@ func TestRemotingSpawn(t *testing.T) {
request := &remote.SpawnRequest{
Name: actorName,
Kind: "actor.exchanger",
Singleton: false,
Singleton: nil,
Relocatable: false,
EnableStashing: false,
Dependencies: []extension.Dependency{dependency},
Expand Down Expand Up @@ -6900,7 +6995,7 @@ func TestRemotingSpawn(t *testing.T) {
request := &remote.SpawnRequest{
Name: actorName,
Kind: "actor.exchanger",
Singleton: false,
Singleton: nil,
Relocatable: false,
EnableStashing: false,
Dependencies: []extension.Dependency{dependency},
Expand Down Expand Up @@ -6983,7 +7078,7 @@ func TestRemotingSpawn(t *testing.T) {
request := &remote.SpawnRequest{
Name: actorName,
Kind: "actor.exchanger",
Singleton: false,
Singleton: nil,
Relocatable: false,
EnableStashing: false,
Dependencies: []extension.Dependency{dependency},
Expand Down Expand Up @@ -7312,9 +7407,13 @@ func TestReplicateActors_ErrorPaths(t *testing.T) {
// Expect PutKind to fail for singleton actors and trigger the warning path.
addr := address.New("singleton", "test-replication", "127.0.0.1", 8080)
singleton := &internalpb.Actor{
Address: addr.String(),
Type: "singleton-kind",
IsSingleton: true,
Address: addr.String(),
Type: "singleton-kind",
Singleton: &internalpb.SingletonSpec{
SpawnTimeout: durationpb.New(time.Second),
WaitInterval: durationpb.New(500 * time.Millisecond),
MaxRetries: int32(3),
},
}
clusterMock.EXPECT().PutKind(mock.Anything, singleton.GetType()).Return(fmt.Errorf("put kind failure")).Once()

Expand Down
Loading