Skip to content

Commit 5365bbd

Browse files
latest changes
1 parent 5396516 commit 5365bbd

File tree

2 files changed

+71
-93
lines changed

2 files changed

+71
-93
lines changed

src/operations/execute_operation.ts

Lines changed: 58 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,12 @@ import { TimeoutContext } from '../timeout';
3232
import { RETRY_COST, TOKEN_REFRESH_RATE } from '../token_bucket';
3333
import {
3434
abortable,
35-
exponentialBackoffDelayProvider,
35+
ExponentialBackoffProvider,
3636
maxWireVersion,
3737
supportsRetryableWrites
3838
} from '../utils';
3939
import { AggregateOperation } from './aggregate';
4040
import { AbstractOperation, Aspect } from './operation';
41-
import { RunCommandOperation } from './run_command';
4241

4342
const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
4443
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
@@ -246,46 +245,48 @@ async function executeOperationWithRetries<
246245
session.incrementTransactionNumber();
247246
}
248247

249-
// The maximum number of retry attempts using regular retryable reads/writes logic (not including
250-
// SystemOverLoad error retries).
251-
const maxNonOverloadRetryAttempts = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1;
252248
let previousOperationError: MongoError | undefined;
253249
const deprioritizedServers = new DeprioritizedServers();
254-
let nonOverloadRetryAttempt = 0;
255250

256-
let systemOverloadRetryAttempt = 0;
257-
const maxSystemOverloadRetryAttempts = 5;
258-
const backoffDelayProvider = exponentialBackoffDelayProvider(
251+
const backoffDelayProvider = new ExponentialBackoffProvider(
259252
10_000, // MAX_BACKOFF
260253
100, // base backoff
261254
2 // backoff rate
262255
);
263256

264-
while (true) {
257+
for (
258+
let attempt = 0, maxAttempts = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1;
259+
attempt <= maxAttempts;
260+
attempt++,
261+
maxAttempts = previousOperationError?.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
262+
? 5
263+
: maxAttempts
264+
) {
265265
if (previousOperationError) {
266-
if (previousOperationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
267-
systemOverloadRetryAttempt += 1;
268-
269-
// if retryable writes or reads are not configured, throw.
270-
const isOperationConfiguredForRetry =
271-
(hasReadAspect && topology.s.options.retryReads) ||
272-
(hasWriteAspect && topology.s.options.retryWrites);
273-
const isRunCommand = operation instanceof RunCommandOperation;
274-
275-
if (
276-
// if the SystemOverloadError is not retryable, throw.
277-
!previousOperationError.hasErrorLabel(MongoErrorLabel.RetryableError) ||
278-
!(isOperationConfiguredForRetry || isRunCommand)
279-
) {
280-
throw previousOperationError;
281-
}
266+
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
267+
throw new MongoServerError({
268+
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
269+
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
270+
originalError: previousOperationError
271+
});
272+
}
282273

283-
// if we have exhausted overload retry attempts, throw.
284-
if (systemOverloadRetryAttempt > maxSystemOverloadRetryAttempts) {
285-
throw previousOperationError;
286-
}
274+
const isRetryable =
275+
// bulk write commands are retryable if all operations in the batch are retryable
276+
(operation.hasAspect(Aspect.COMMAND_BATCHING) && operation.canRetryWrite) ||
277+
// if we have a retryable read or write operation, we can retry
278+
(hasWriteAspect && isRetryableWriteError(previousOperationError)) ||
279+
(hasReadAspect && isRetryableReadError(previousOperationError)) ||
280+
// if we have a retryable, system overloaded error, we can retry
281+
(previousOperationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError) &&
282+
previousOperationError.hasErrorLabel(MongoErrorLabel.RetryableError));
283+
284+
if (!isRetryable) {
285+
throw previousOperationError;
286+
}
287287

288-
const { value: delayMS } = backoffDelayProvider.next();
288+
if (previousOperationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
289+
const delayMS = backoffDelayProvider.getNextBackoffDuration();
289290

290291
// if the delay would exhaust the CSOT timeout, short-circuit.
291292
if (timeoutContext.csotEnabled() && delayMS > timeoutContext.remainingTimeMS) {
@@ -298,80 +299,49 @@ async function executeOperationWithRetries<
298299
);
299300
}
300301

301-
await setTimeout(delayMS);
302-
303302
if (!topology.tokenBucket.consume(RETRY_COST)) {
304303
throw previousOperationError;
305304
}
306305

307-
server = await topology.selectServer(selector, {
308-
session,
309-
operationName: operation.commandName,
310-
deprioritizedServers,
311-
signal: operation.options.signal
312-
});
313-
} else {
314-
nonOverloadRetryAttempt++;
315-
// we have no more retry attempts, throw.
316-
if (nonOverloadRetryAttempt >= maxNonOverloadRetryAttempts) {
317-
throw previousOperationError;
318-
}
319-
320-
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
321-
throw new MongoServerError({
322-
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
323-
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
324-
originalError: previousOperationError
325-
});
326-
}
306+
await setTimeout(delayMS);
307+
}
327308

328-
if (
329-
(operation.hasAspect(Aspect.COMMAND_BATCHING) && !operation.canRetryWrite) ||
330-
(hasWriteAspect && !isRetryableWriteError(previousOperationError)) ||
331-
(hasReadAspect && !isRetryableReadError(previousOperationError))
332-
) {
333-
throw previousOperationError;
334-
}
309+
if (
310+
previousOperationError instanceof MongoNetworkError &&
311+
operation.hasAspect(Aspect.CURSOR_CREATING) &&
312+
session != null &&
313+
session.isPinned &&
314+
!session.inTransaction()
315+
) {
316+
session.unpin({ force: true, forceClear: true });
317+
}
335318

336-
if (
337-
previousOperationError instanceof MongoNetworkError &&
338-
operation.hasAspect(Aspect.CURSOR_CREATING) &&
339-
session != null &&
340-
session.isPinned &&
341-
!session.inTransaction()
342-
) {
343-
session.unpin({ force: true, forceClear: true });
344-
}
319+
server = await topology.selectServer(selector, {
320+
session,
321+
operationName: operation.commandName,
322+
deprioritizedServers,
323+
signal: operation.options.signal
324+
});
345325

346-
server = await topology.selectServer(selector, {
347-
session,
348-
operationName: operation.commandName,
349-
deprioritizedServers,
350-
signal: operation.options.signal
351-
});
352-
353-
if (hasWriteAspect && !supportsRetryableWrites(server)) {
354-
throw new MongoUnexpectedServerResponseError(
355-
'Selected server does not support retryable writes'
356-
);
357-
}
326+
if (hasWriteAspect && !supportsRetryableWrites(server)) {
327+
throw new MongoUnexpectedServerResponseError(
328+
'Selected server does not support retryable writes'
329+
);
358330
}
359331
}
360332

361333
operation.server = server;
362334

363335
try {
336+
const isRetry = attempt > 0;
337+
364338
// If attempt > 0 and we are command batching we need to reset the batch.
365-
if (
366-
(nonOverloadRetryAttempt > 0 || systemOverloadRetryAttempt > 0) &&
367-
operation.hasAspect(Aspect.COMMAND_BATCHING)
368-
) {
339+
if (isRetry && operation.hasAspect(Aspect.COMMAND_BATCHING)) {
369340
operation.resetBatch();
370341
}
371342

372343
try {
373344
const result = await server.command(operation, timeoutContext);
374-
const isRetry = nonOverloadRetryAttempt > 0 || systemOverloadRetryAttempt > 0;
375345
topology.tokenBucket.deposit(
376346
isRetry
377347
? // on successful retry, deposit the retry cost + the refresh rate.
@@ -404,4 +374,6 @@ async function executeOperationWithRetries<
404374
timeoutContext.clear();
405375
}
406376
}
377+
378+
throw previousOperationError ?? new MongoRuntimeError('ahh');
407379
}

src/utils.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1432,12 +1432,18 @@ export async function abortable<T>(
14321432
}
14331433
}
14341434

1435-
export function* exponentialBackoffDelayProvider(
1436-
maxBackoff: number,
1437-
baseBackoff: number,
1438-
backoffIncreaseRate: number
1439-
): Generator<number> {
1440-
for (let i = 0; ; i++) {
1441-
yield Math.random() * Math.min(maxBackoff, baseBackoff * backoffIncreaseRate ** i);
1435+
export class ExponentialBackoffProvider {
1436+
constructor(
1437+
public readonly maxBackoff: number,
1438+
public readonly baseBackoff: number,
1439+
public readonly backoffIncreaseRate: number,
1440+
public iteration = 0
1441+
) {}
1442+
1443+
getNextBackoffDuration(): number {
1444+
return (
1445+
Math.random() *
1446+
Math.min(this.maxBackoff, this.baseBackoff * this.backoffIncreaseRate ** this.iteration)
1447+
);
14421448
}
14431449
}

0 commit comments

Comments
 (0)