Skip to content

Commit 3c81293

Browse files
authored
Improve subscriptions for observable handling (#6236)
* fix: subscribeNewHeads for improved observable handling * fix: replace switchMap with mergeMap for improved subscription handling
1 parent ce6f698 commit 3c81293

File tree

4 files changed

+8
-8
lines changed

4 files changed

+8
-8
lines changed

packages/api-derive/src/chain/subscribeFinalizedBlocks.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import type { Observable } from 'rxjs';
55
import type { SignedBlockExtended } from '../type/types.js';
66
import type { DeriveApi } from '../types.js';
77

8-
import { switchMap } from 'rxjs';
8+
import { mergeMap } from 'rxjs';
99

1010
import { memo } from '../util/index.js';
1111

@@ -22,7 +22,7 @@ import { memo } from '../util/index.js';
2222
export function subscribeFinalizedBlocks (instanceId: string, api: DeriveApi): () => Observable<SignedBlockExtended> {
2323
return memo(instanceId, (): Observable<SignedBlockExtended> =>
2424
api.derive.chain.subscribeFinalizedHeads().pipe(
25-
switchMap((header) =>
25+
mergeMap((header) =>
2626
api.derive.chain.getBlock(header.createdAtHash || header.hash)
2727
)
2828
)

packages/api-derive/src/chain/subscribeFinalizedHeads.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import type { Observable } from 'rxjs';
55
import type { Hash, Header } from '@polkadot/types/interfaces';
66
import type { DeriveApi } from '../types.js';
77

8-
import { from, of, switchMap } from 'rxjs';
8+
import { from, mergeMap, of, switchMap } from 'rxjs';
99

1010
import { memo } from '../util/index.js';
1111

@@ -43,7 +43,7 @@ export function subscribeFinalizedHeads (instanceId: string, api: DeriveApi): ()
4343
let prevHash: Hash | null = null;
4444

4545
return api.rpc.chain.subscribeFinalizedHeads().pipe(
46-
switchMap((header) => {
46+
mergeMap((header) => {
4747
const endHash = prevHash;
4848
const startHash = header.parentHash;
4949

packages/api-derive/src/chain/subscribeNewBlocks.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import type { Observable } from 'rxjs';
55
import type { SignedBlockExtended } from '../type/types.js';
66
import type { DeriveApi } from '../types.js';
77

8-
import { switchMap } from 'rxjs';
8+
import { mergeMap } from 'rxjs';
99

1010
import { memo } from '../util/index.js';
1111

@@ -22,7 +22,7 @@ import { memo } from '../util/index.js';
2222
export function subscribeNewBlocks (instanceId: string, api: DeriveApi): () => Observable<SignedBlockExtended> {
2323
return memo(instanceId, (): Observable<SignedBlockExtended> =>
2424
api.derive.chain.subscribeNewHeads().pipe(
25-
switchMap((header) =>
25+
mergeMap((header) =>
2626
api.derive.chain.getBlock(header.createdAtHash || header.hash)
2727
)
2828
)

packages/api-derive/src/chain/subscribeNewHeads.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import type { Observable } from 'rxjs';
55
import type { HeaderExtended } from '../type/types.js';
66
import type { DeriveApi } from '../types.js';
77

8-
import { map, switchMap } from 'rxjs';
8+
import { map, mergeMap } from 'rxjs';
99

1010
import { createHeaderExtended } from '../type/index.js';
1111
import { memo } from '../util/index.js';
@@ -25,7 +25,7 @@ import { getAuthorDetails } from './util.js';
2525
export function subscribeNewHeads (instanceId: string, api: DeriveApi): () => Observable<HeaderExtended> {
2626
return memo(instanceId, (): Observable<HeaderExtended> =>
2727
api.rpc.chain.subscribeNewHeads().pipe(
28-
switchMap((header) =>
28+
mergeMap((header) =>
2929
getAuthorDetails(api, header)
3030
),
3131
map(([header, validators, author]): HeaderExtended => {

0 commit comments

Comments
 (0)