Skip to content

Commit dca92fd

Browse files
authored
add disableFatalDefects option to cluster entities (#4750)
1 parent ba63604 commit dca92fd

File tree

9 files changed

+164
-63
lines changed

9 files changed

+164
-63
lines changed

.changeset/spicy-zoos-boil.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@effect/cluster": patch
3+
"@effect/rpc": patch
4+
---
5+
6+
add disableFatalDefects option to cluster entities

.changeset/tricky-stars-worry.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/workflow": patch
3+
---
4+
5+
add Workflow.CaptureDefects annotation, to configure defect behaviour

packages/cluster/src/Entity.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import * as Layer from "effect/Layer"
1818
import * as Mailbox from "effect/Mailbox"
1919
import * as Option from "effect/Option"
2020
import * as Predicate from "effect/Predicate"
21+
import type * as Schedule from "effect/Schedule"
2122
import { Scope } from "effect/Scope"
2223
import type * as Stream from "effect/Stream"
2324
import type {
@@ -127,6 +128,8 @@ export interface Entity<in out Rpcs extends Rpc.Any> extends Equal.Equal {
127128
readonly maxIdleTime?: DurationInput | undefined
128129
readonly concurrency?: number | "unbounded" | undefined
129130
readonly mailboxCapacity?: number | "unbounded" | undefined
131+
readonly disableFatalDefects?: boolean | undefined
132+
readonly defectRetryPolicy?: Schedule.Schedule<any, unknown> | undefined
130133
}
131134
): Layer.Layer<
132135
never,
@@ -163,6 +166,8 @@ export interface Entity<in out Rpcs extends Rpc.Any> extends Equal.Equal {
163166
options?: {
164167
readonly maxIdleTime?: DurationInput | undefined
165168
readonly mailboxCapacity?: number | "unbounded" | undefined
169+
readonly disableFatalDefects?: boolean | undefined
170+
readonly defectRetryPolicy?: Schedule.Schedule<any, unknown> | undefined
166171
}
167172
): Layer.Layer<
168173
never,
@@ -234,6 +239,9 @@ const Proto = {
234239
options?: {
235240
readonly maxIdleTime?: DurationInput | undefined
236241
readonly concurrency?: number | "unbounded" | undefined
242+
readonly mailboxCapacity?: number | "unbounded" | undefined
243+
readonly disableFatalDefects?: boolean | undefined
244+
readonly defectRetryPolicy?: Schedule.Schedule<any, unknown> | undefined
237245
}
238246
): Layer.Layer<
239247
never,
@@ -276,6 +284,9 @@ const Proto = {
276284
>,
277285
options?: {
278286
readonly maxIdleTime?: DurationInput | undefined
287+
readonly mailboxCapacity?: number | "unbounded" | undefined
288+
readonly disableFatalDefects?: boolean | undefined
289+
readonly defectRetryPolicy?: Schedule.Schedule<any, unknown> | undefined
279290
}
280291
) {
281292
const buildHandlers = Effect.gen(this, function*() {

packages/cluster/src/Sharding.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ export class Sharding extends Context.Tag("@effect/cluster/Sharding")<Sharding,
104104
readonly maxIdleTime?: DurationInput | undefined
105105
readonly concurrency?: number | "unbounded" | undefined
106106
readonly mailboxCapacity?: number | "unbounded" | undefined
107+
readonly disableFatalDefects?: boolean | undefined
108+
readonly defectRetryPolicy?: Schedule.Schedule<any, unknown> | undefined
107109
}
108110
) => Effect.Effect<
109111
void,

packages/cluster/src/internal/entityManager.ts

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ export const make = Effect.fnUntraced(function*<
8888
readonly maxIdleTime?: DurationInput | undefined
8989
readonly concurrency?: number | "unbounded" | undefined
9090
readonly mailboxCapacity?: number | "unbounded" | undefined
91+
readonly disableFatalDefects?: boolean | undefined
92+
readonly defectRetryPolicy?: Schedule.Schedule<any, unknown, never> | undefined
9193
}
9294
) {
9395
const config = yield* ShardingConfig
@@ -97,6 +99,9 @@ export const make = Effect.fnUntraced(function*<
9799
const mailboxCapacity = options.mailboxCapacity ?? config.entityMailboxCapacity
98100
const clock = yield* Effect.clock
99101
const context = yield* Effect.context<Rpc.Context<Rpcs> | Rpc.Middleware<Rpcs> | RX>()
102+
const retryDriver = yield* Schedule.driver(
103+
options.defectRetryPolicy ? Schedule.andThen(options.defectRetryPolicy, defaultRetryPolicy) : defaultRetryPolicy
104+
)
100105

101106
const activeServers = new Map<EntityId, EntityState>()
102107

@@ -141,6 +146,7 @@ export const make = Effect.fnUntraced(function*<
141146
const server = yield* RpcServer.makeNoSerialization(entity.protocol, {
142147
spanPrefix: `${entity.type}(${address.entityId})`,
143148
concurrency: options.concurrency ?? 1,
149+
disableFatalDefects: options.disableFatalDefects,
144150
onFromServer(response): Effect.Effect<void> {
145151
switch (response._tag) {
146152
case "Exit": {
@@ -210,19 +216,7 @@ export const make = Effect.fnUntraced(function*<
210216
))
211217
}
212218
case "Defect": {
213-
const effect = writeRef.unsafeRebuild()
214-
defectRequestIds = Array.from(activeRequests.keys())
215-
return Effect.logError("Defect in entity, restarting", Cause.die(response.defect)).pipe(
216-
Effect.andThen(effect.pipe(
217-
Effect.tapErrorCause(Effect.logError),
218-
Effect.retry(Schedule.spaced(500))
219-
)),
220-
Effect.annotateLogs({
221-
module: "EntityManager",
222-
address,
223-
runner: options.runnerAddress
224-
})
225-
)
219+
return Effect.forkIn(onDefect(Cause.die(response.defect)), managerScope)
226220
}
227221
case "ClientEnd": {
228222
return endLatch.open
@@ -259,6 +253,21 @@ export const make = Effect.fnUntraced(function*<
259253
})
260254
)
261255

256+
function onDefect(cause: Cause.Cause<never>): Effect.Effect<void> {
257+
const effect = writeRef.unsafeRebuild()
258+
defectRequestIds = Array.from(activeRequests.keys())
259+
return Effect.logError("Defect in entity, restarting", cause).pipe(
260+
Effect.andThen(Effect.ignore(retryDriver.next(void 0))),
261+
Effect.andThen(effect),
262+
Effect.annotateLogs({
263+
module: "EntityManager",
264+
address,
265+
runner: options.runnerAddress
266+
}),
267+
Effect.catchAllCause(onDefect)
268+
)
269+
}
270+
262271
const state: EntityState = {
263272
address,
264273
mailboxGauge: ClusterMetrics.mailboxSize.pipe(
@@ -482,6 +491,10 @@ export const make = Effect.fnUntraced(function*<
482491
})
483492
})
484493

494+
const defaultRetryPolicy = Schedule.exponential(500, 1.5).pipe(
495+
Schedule.union(Schedule.spaced("10 seconds"))
496+
)
497+
485498
const makeMessageSchema = <Rpcs extends Rpc.Any>(entity: Entity<Rpcs>): Schema.Schema<
486499
{
487500
readonly _tag: "IncomingRequest"

packages/cluster/test/Sharding.test.ts

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,21 @@ import {
99
Snowflake
1010
} from "@effect/cluster"
1111
import { assert, describe, expect, it } from "@effect/vitest"
12-
import { Array, Cause, Chunk, Effect, Exit, Fiber, FiberId, Layer, Mailbox, Option, Stream, TestClock } from "effect"
12+
import {
13+
Array,
14+
Cause,
15+
Chunk,
16+
Effect,
17+
Exit,
18+
Fiber,
19+
FiberId,
20+
Layer,
21+
Mailbox,
22+
MutableRef,
23+
Option,
24+
Stream,
25+
TestClock
26+
} from "effect"
1327
import { TestEntity, TestEntityNoState, TestEntityState, User } from "./TestEntity.js"
1428

1529
describe.concurrent("Sharding", () => {
@@ -496,6 +510,18 @@ describe.concurrent("Sharding", () => {
496510
)
497511
expect(error._tag).toEqual("EntityNotManagedByRunner")
498512
}).pipe(Effect.provide(TestShardingWithoutEntities)))
513+
514+
it.scoped("restart on defect", () =>
515+
Effect.gen(function*() {
516+
yield* TestClock.adjust(1)
517+
const state = yield* TestEntityState
518+
const makeClient = yield* TestEntity.client
519+
const client = makeClient("1")
520+
MutableRef.set(state.defectTrigger, true)
521+
const result = yield* client.GetUser({ id: 123 })
522+
expect(result).toEqual(new User({ id: 123, name: "User 123" }))
523+
expect(state.layerBuilds.current).toEqual(2)
524+
}).pipe(Effect.provide(TestSharding)))
499525
})
500526

501527
const TestShardingConfig = ShardingConfig.layer({

packages/cluster/test/TestEntity.ts

Lines changed: 57 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import type { Envelope } from "@effect/cluster"
22
import { ClusterSchema, Entity } from "@effect/cluster"
33
import type { RpcGroup } from "@effect/rpc"
44
import { Rpc, RpcSchema } from "@effect/rpc"
5-
import { Effect, Layer, Mailbox, Option, PrimaryKey, Schema, Stream } from "effect"
5+
import { Effect, Layer, Mailbox, MutableRef, Option, PrimaryKey, Schedule, Schema, Stream } from "effect"
66

77
export class User extends Schema.Class<User>("User")({
88
id: Schema.Number,
@@ -58,61 +58,74 @@ export class TestEntityState extends Effect.Service<TestEntityState>()("TestEnti
5858
RpcGroup.Rpcs<typeof TestEntity.protocol> extends infer R ? R extends Rpc.Any ? Envelope.Request<R> : never
5959
: never
6060
>()
61+
const defectTrigger = MutableRef.make(false)
62+
const layerBuilds = MutableRef.make(0)
6163

6264
return {
6365
messages,
6466
streamMessages,
6567
envelopes,
66-
interrupts
68+
interrupts,
69+
defectTrigger,
70+
layerBuilds
6771
} as const
6872
})
6973
}) {}
7074

71-
export const TestEntityNoState = TestEntity.toLayer(Effect.gen(function*() {
72-
const state = yield* TestEntityState
75+
export const TestEntityNoState = TestEntity.toLayer(
76+
Effect.gen(function*() {
77+
const state = yield* TestEntityState
7378

74-
const never = (envelope: any) =>
75-
Effect.suspend(() => {
76-
state.envelopes.unsafeOffer(envelope)
77-
return Effect.never
78-
}).pipe(Effect.onInterrupt(() => {
79-
state.interrupts.unsafeOffer(envelope)
80-
return Effect.void
81-
}))
82-
return {
83-
GetUser: (envelope) =>
84-
Effect.sync(() => {
79+
MutableRef.update(state.layerBuilds, (count) => count + 1)
80+
81+
const never = (envelope: any) =>
82+
Effect.suspend(() => {
83+
state.envelopes.unsafeOffer(envelope)
84+
return Effect.never
85+
}).pipe(Effect.onInterrupt(() => {
86+
state.interrupts.unsafeOffer(envelope)
87+
return Effect.void
88+
}))
89+
return {
90+
GetUser: (envelope) =>
91+
Effect.sync(() => {
92+
state.envelopes.unsafeOffer(envelope)
93+
if (state.defectTrigger.current) {
94+
MutableRef.set(state.defectTrigger, false)
95+
throw new Error("User not found")
96+
}
97+
return new User({ id: envelope.payload.id, name: `User ${envelope.payload.id}` })
98+
}),
99+
GetUserVolatile: (envelope) =>
100+
Effect.sync(() => {
101+
state.envelopes.unsafeOffer(envelope)
102+
return new User({ id: envelope.payload.id, name: `User ${envelope.payload.id}` })
103+
}),
104+
Never: never,
105+
NeverFork: (envelope) => Rpc.fork(never(envelope)),
106+
NeverVolatile: never,
107+
RequestWithKey: (envelope) => {
85108
state.envelopes.unsafeOffer(envelope)
86-
return new User({ id: envelope.payload.id, name: `User ${envelope.payload.id}` })
87-
}),
88-
GetUserVolatile: (envelope) =>
89-
Effect.sync(() => {
109+
return Effect.orDie(state.messages.take)
110+
},
111+
StreamWithKey: (envelope) => {
112+
let sequence = envelope.lastSentChunkValue.pipe(
113+
Option.map((value) => value + 1),
114+
Option.getOrElse(() => 0)
115+
)
116+
return Mailbox.toStream(state.streamMessages).pipe(
117+
Stream.map(() => sequence++)
118+
)
119+
},
120+
GetAllUsers: (envelope) => {
90121
state.envelopes.unsafeOffer(envelope)
91-
return new User({ id: envelope.payload.id, name: `User ${envelope.payload.id}` })
92-
}),
93-
Never: never,
94-
NeverFork: (envelope) => Rpc.fork(never(envelope)),
95-
NeverVolatile: never,
96-
RequestWithKey: (envelope) => {
97-
state.envelopes.unsafeOffer(envelope)
98-
return Effect.orDie(state.messages.take)
99-
},
100-
StreamWithKey: (envelope) => {
101-
let sequence = envelope.lastSentChunkValue.pipe(
102-
Option.map((value) => value + 1),
103-
Option.getOrElse(() => 0)
104-
)
105-
return Mailbox.toStream(state.streamMessages).pipe(
106-
Stream.map(() => sequence++)
107-
)
108-
},
109-
GetAllUsers: (envelope) => {
110-
state.envelopes.unsafeOffer(envelope)
111-
return Stream.fromIterable(envelope.payload.ids.map((id) => new User({ id, name: `User ${id}` }))).pipe(
112-
Stream.rechunk(1)
113-
)
122+
return Stream.fromIterable(envelope.payload.ids.map((id) => new User({ id, name: `User ${id}` }))).pipe(
123+
Stream.rechunk(1)
124+
)
125+
}
114126
}
115-
}
116-
}))
127+
}),
128+
{ defectRetryPolicy: Schedule.forever }
129+
)
117130

118131
export const TestEntityLayer = TestEntityNoState.pipe(Layer.provideMerge(TestEntityState.Default))

packages/rpc/src/RpcServer.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ export const makeNoSerialization: <Rpcs extends Rpc.Any>(
7575
readonly spanPrefix?: string | undefined
7676
readonly disableClientAcks?: boolean | undefined
7777
readonly concurrency?: number | "unbounded" | undefined
78+
readonly disableFatalDefects?: boolean | undefined
7879
}
7980
) => Effect.Effect<
8081
RpcServer<Rpcs>,
@@ -89,13 +90,15 @@ export const makeNoSerialization: <Rpcs extends Rpc.Any>(
8990
readonly spanPrefix?: string | undefined
9091
readonly disableClientAcks?: boolean | undefined
9192
readonly concurrency?: number | "unbounded" | undefined
93+
readonly disableFatalDefects?: boolean | undefined
9294
}
9395
) {
9496
const enableTracing = options.disableTracing !== true
9597
const enableSpanPropagation = options.disableSpanPropagation !== true
9698
const supportsAck = options.disableClientAcks !== true
9799
const spanPrefix = options.spanPrefix ?? "RpcServer"
98100
const concurrency = options.concurrency ?? "unbounded"
101+
const disableFatalDefects = options.disableFatalDefects ?? false
99102
const context = yield* Effect.context<Rpc.ToHandler<Rpcs> | Scope.Scope>()
100103
const scope = Context.get(context, Scope.Scope)
101104
const fiberSet = yield* FiberSet.make()
@@ -261,6 +264,9 @@ export const makeNoSerialization: <Rpcs extends Rpc.Any>(
261264
},
262265
onFailure: (cause) => {
263266
responded = true
267+
if (!disableFatalDefects && Cause.isDie(cause)) {
268+
return sendDefect(client, Cause.squash(cause))
269+
}
264270
return options.onFromServer({
265271
_tag: "Exit",
266272
clientId: client.id,

0 commit comments

Comments
 (0)