Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 39 additions & 56 deletions services/ymax-planner/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,14 @@ export const processPortfolioEvents = async (
const flowId = flowIdFromKey(flowKey);
const scope = [portfolioId, flowId] as const;
const { policyVersion, rebalanceCount, targetAllocation } = portfolioStatus;
const conditions = [policyVersion, rebalanceCount] as const;
const versions = [policyVersion, rebalanceCount] as const;

const currentBalances = await getNonDustBalances(
portfolioStatus,
depositBrand,
balanceQueryPowers,
);
const errorContext = {
const logContext = {
path,
flowKey,
flowDetail,
Expand All @@ -304,40 +304,37 @@ export const processPortfolioEvents = async (
rebalanceCount,
targetAllocation,
};
const plannerContext = {
...errorContext,
// @ts-expect-error "amount" is not present on all varieties of
// FlowDetail, but we need it here when it is present (i.e., for types
// "deposit" and "withdraw" and it's harmless otherwise.
amount: flowDetail.amount,
network,
brand: depositBrand,
feeBrand,
gasEstimator,
};

const { network: _network, ...logContext } = plannerContext;
logger.debug(`Starting flow`, flowDetail, inspectForStdout(logContext));
const settle = async <M extends string & keyof PortfolioPlanner>(
methodName: M,
args: PortfolioPlanner[M] extends (...args: infer Args) => any
? Args
: never,
extraDetails?: object,
) => {
const planReceiver = walletStore.get<PortfolioPlanner>('planner', {
sendOnly: true,
});
const txOpts = { sendOnly: true };
const planReceiver = walletStore.get<PortfolioPlanner>('planner', txOpts);
const { tx, id } = await planReceiver[methodName]!(...args);
// The transaction has been submitted, but we won't know about a rejection
// for at least another block.
// tx has been submitted, but we won't know its fate until a future block.
if (!isDryRun) {
void getWalletInvocationUpdate(id as any).catch(err => {
logger.warn(`⚠️ Failure for ${methodName}`, args, err);
});
}
return tx;
const details = inspectForStdout({ ...logContext, ...extraDetails });
logger.info(methodName, flowDetail, currentBalances, details, tx);
};

const plannerContext = {
...logContext,
// @ts-expect-error "amount" is not present on all varieties of
// FlowDetail, but we need it here when it is present (i.e., for types
// "deposit" and "withdraw" and it's harmless otherwise.
amount: flowDetail.amount,
network,
brand: depositBrand,
feeBrand,
gasEstimator,
};
try {
let steps: MovementDesc[];
const { type } = flowDetail;
Expand All @@ -351,43 +348,29 @@ export const processPortfolioEvents = async (
case 'withdraw':
steps = await planWithdrawFromAllocations(plannerContext);
break;
default: {
default:
logger.warn(`⚠️ Unknown flow type ${type}`);
return;
}
}
(errorContext as any).steps = steps;

const tx = await (steps.length === 0
? settle('rejectPlan', [
...scope,
'Nothing to do for this operation.',
...conditions,
])
: settle('resolvePlan', [...scope, steps, ...conditions]));
logger.info(
`Resolving`,
flowDetail,
currentBalances,
inspectForStdout({
policyVersion,
rebalanceCount,
targetAllocation,
steps,
}),
tx,
);
(logContext as any).steps = steps;

if (steps.length > 0) {
await settle('resolvePlan', [...scope, steps, ...versions], { steps });
} else {
const reason = 'Nothing to do for this operation.';
await settle('rejectPlan', [...scope, reason, ...versions]);
}
} catch (err) {
annotateError(err, inspect(logContext, { depth: 4 }));
if (err instanceof UserInputError || err instanceof NoSolutionError) {
try {
await settle('rejectPlan', [...scope, err.message, ...conditions]);
} catch (settleErr) {
// eslint-disable-next-line no-ex-assign
err = AggregateError([err, settleErr]);
}
await settle('rejectPlan', [...scope, err.message, ...versions], {
cause: err,
}).catch(err2 => {
throw AggregateError([err, err2]);
});
} else {
throw err;
}
annotateError(err, inspect(errorContext, { depth: 4 }));
throw err;
}
};
const handledPortfolioKeys = new Set<string>();
Expand All @@ -413,7 +396,7 @@ export const processPortfolioEvents = async (
}

// If this (portfolio, flows) data hasn't changed since our last
// submission, there's no point in trying again.
// successful submission, there's no point in trying again.
memory.snapshots ||= new Map();
const oldState = memory.snapshots.get(portfolioKey);
const oldFingerprint = oldState?.fingerprint;
Expand All @@ -424,7 +407,6 @@ export const processPortfolioEvents = async (
oldState.repeats += 1;
return;
}
memory.snapshots.set(portfolioKey, { fingerprint, repeats: 0 });

// If any in-progress flows need activation (as indicated by not having
// its own dedicated vstorage data), then find the first such flow and
Expand All @@ -439,8 +421,9 @@ export const processPortfolioEvents = async (
portfolioKey, flowKey,
() => startFlow(status, portfolioKey, flowKey, flowDetail),
);
return;
break;
}
memory.snapshots.set(portfolioKey, { fingerprint, repeats: 0 });
} catch (err) {
const age = blockHeight - eventRecord.blockHeight;
if (err.code === STALE_RESPONSE) {
Expand Down
64 changes: 42 additions & 22 deletions services/ymax-planner/test/engine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,14 @@ const fakeSigningSmartWalletKit = async (
['action', 'fee', 'memo', 'signerData']
>[];
const getBridgeSends = () => harden([...bridgeSends]);
const mockDeliverTxResponseProto = {
constructor: undefined,
then: undefined,
toJSON: undefined,
valueOf: undefined,
[Symbol.iterator]: undefined,
[Symbol.toStringTag]: 'MockDeliverTxResponse',
};
const sendBridgeAction: SigningSmartWalletKit['sendBridgeAction'] = async (
action,
fee,
Expand All @@ -322,8 +330,9 @@ const fakeSigningSmartWalletKit = async (
{ code: 0, height, transactionHash },
{
get: (target: object, key: string | symbol) => {
if (key === 'then') return undefined;
if (Object.hasOwn(target, key)) return target[key];
for (const obj of [target, mockDeliverTxResponseProto]) {
if (Object.hasOwn(obj, key)) return obj[key];
}
Fail`Not implemented: DeliverTxResponse ${q(key)}`;
},
},
Expand Down Expand Up @@ -466,6 +475,7 @@ const fakePortfolioKit = async ({

return {
blockHeight: getBlockHeight(),
portfoliosPathPrefix,
portfolioId,
portfolioPath,
initialPortfolioStatus,
Expand Down Expand Up @@ -600,35 +610,47 @@ test.serial(
);

test.serial('startFlow logs include traceId prefix', async t => {
debugger;
const kit = await fakePortfolioKit({
accounts: { noble: AmountMath.make(depositBrand, 1_000_000n) },
});
const {
blockHeight,
portfoliosPathPrefix,
portfolioId,
portfolioPath,
initialPortfolioStatus,
powers,
} = kit;
const { updateVstorage } = kit.testPowers;

const flowId = 2;
const portfolioStatus = harden({
...initialPortfolioStatus,
const fakeWithdrawFlow = (flowId: number, amountValue: bigint) => ({
flowCount: 1,
flowsRunning: {
[`flow${flowId}`]: {
type: 'withdraw',
amount: AmountMath.make(depositBrand, 1_000_000n),
amount: AmountMath.make(depositBrand, amountValue),
},
},
});
updateVstorage(portfolioPath, 'set', { object: portfolioStatus, wrap: true });

const portfolioKey = `portfolio${portfolioId}`;
const flowKey = `flow${flowId}`;
const tracePrefix = `[${portfolioKey}.${flowKey}] `;
const portfolioStatus = harden({
...initialPortfolioStatus,
...fakeWithdrawFlow(4, 2_000_000n),
});
updateVstorage(portfolioPath, 'set', { object: portfolioStatus, wrap: true });

const portfolioId2 = portfolioId + 1;
const portfolioKey2 = `portfolio${portfolioId2}`;
const portfolioPath2 = `${portfoliosPathPrefix}.${portfolioKey2}`;
const portfolioStatus2 = {
...initialPortfolioStatus,
...fakeWithdrawFlow(2, 1_000_000n),
};
updateVstorage(portfolioPath2, 'set', {
object: portfolioStatus2,
wrap: true,
});

const captured: Array<{ level: 'debug' | 'info'; args: any[] }> = [];
const originalLogTarget = console;
Expand All @@ -639,13 +661,11 @@ test.serial('startFlow logs include traceId prefix', async t => {
info: (...args: any[]) => captured.push({ level: 'info', args }),
});

const vstorageEventDetail = makeVstorageEventDetail(
blockHeight,
portfolioPath,
portfolioStatus,
);
await processPortfolioEvents(
[vstorageEventDetail],
[
makeVstorageEventDetail(blockHeight, portfolioPath, portfolioStatus),
makeVstorageEventDetail(blockHeight, portfolioPath2, portfolioStatus2),
],
blockHeight,
{ deferrals: [] },
powers,
Expand All @@ -656,13 +676,13 @@ test.serial('startFlow logs include traceId prefix', async t => {

const tracedLogs = captured.filter(
({ level, args }) =>
['debug', 'info'].includes(level) && args[0] === tracePrefix,
);
t.true(tracedLogs.length >= 2, 'captured start and completion logs');
t.true(
tracedLogs.every(entry => entry.args[0] === tracePrefix),
'all traced logs include the trace prefix',
['debug', 'info'].includes(level) &&
/\[portfolio[0-9]+[.]flow[0-9]+\] /.test(args[0]),
);
arrayIsLike(t, tracedLogs, [
{ args: { 0: `[${portfolioKey}.flow4] `, 1: 'rejectPlan', length: 6 } },
{ args: { 0: `[${portfolioKey2}.flow2] `, 1: 'resolvePlan', length: 6 } },
]);
});

/**
Expand Down
Loading