Skip to content
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't yet looked at the comment left by @vbabanin, so it is possible that Slava covered the same, but given that we found a discrepancy between our behavior and the behavior a spec test expects, we should investigate the relevant specification change that we are missing / implemented incorrectly, and implement / fix it, instead of just doing an ad-hoc driver change without any reference to the specification requirement (I found no reference in this PR or in JAVA-6033).

Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ public void close() {

@Override
public void run() {
ServerDescription currentServerDescription = unknownConnectingServerDescription(serverId, null);
ServerDescription unknownConnectingServerDescription = unknownConnectingServerDescription(serverId, null);
ServerDescription currentServerDescription = unknownConnectingServerDescription;
try {
while (!isClosed) {
ServerDescription previousServerDescription = currentServerDescription;
Expand All @@ -216,6 +217,12 @@ public void run() {
continue;
}

// For POLL mode, if we just established initial connection, do an immediate heartbeat
if (!shouldStreamResponses && connection != null && !connection.isClosed()
&& currentServerDescription.equals(connection.getInitialServerDescription())) {
continue;
Comment on lines +221 to +223
Copy link
Member

@vbabanin vbabanin Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we send two hello commands back-to-back but emit only one sequence of events (HearbeatStarted and "HearbeatSucceed") as if only one heartbeat was sent.

When establishing a new connection, the flow in this PR looks like this:

  1. At line 286, setupNewConnectionAndGetInitialDescription() calls logAndNotifyHeartbeatStarted() and sets alreadyLoggedHeartBeatStarted = true.
  2. We receive the handshake result and return it as the ServerDescription.
  3. We detect that this is the initial connection and immediately loop again.
  4. On the next loop iteration, we hit line 264, but alreadyLoggedHeartBeatStarted is still true, so we don’t log another “started” event.

We then call doHeartbeat(), which sends a second hello command, without waiting for the selection heartbeat timeout. When it succeeds at line 322, we log the “succeeded” event and reset the flag alreadyLoggedHeartBeatStarted = false.

As a result we get:

  • 1 ServerHeartbeatStartedEvent (for the first hello / handshake)
  • 2 hello commands sent back-to-back
  • 1 ServerHeartbeatSucceededEvent (for the second hello)

This conflicts with the SDAM spec, which requires a correlation between started and succeeded/failed events, and also seem to require treating an initial handshake as a heartbeat event:

“The driver MUST guarantee that every ServerHeartbeatStartedEvent has either a correlating ServerHeartbeatSucceededEvent or ServerHeartbeatFailedEvent.”

“The driver MUST publish a ServerHeartbeatStartedEvent before the driver opens a connection for the next heartbeat to a server…”

Judging by the above, the initial handshake appears to be treated as the first heartbeat. The unified tests confirm this expectation, for example, the serverMonitoringMode.yml unified test explicitly expects this sequence in streaming mode:

serverHeartbeatStartedEvent (awaited: false) - handshake
serverHeartbeatSucceededEvent (awaited: false) - handshake
serverHeartbeatStartedEvent (awaited: false) - next poll after waiting

If we run this test with the current implementation, we get:

HEARTBEAT STARTED connectionId{localValue:7} awaited: false
HEARTBEAT SUCCEED connectionId{localValue:7, serverValue:242} awaited: true

So the correlation between ServerHeartbeatStartedEvent and ServerHeartbeatSucceededEvent (in terms of awaited) does not match the expectations, and the test fails. These tests are currently ignored here:
https://github.com/vbabanin/mongo-java-driver/blob/5cb23ba8e2a9da83857f3e2f7bbb7e5cfc8dcbe2/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java#L410-L412 - the associated ticket is already closed, and I wasn’t able to find additional details on the reason for skipping these tests.

Given the above, it seems we should:

  • Align the behavior with the spec (that we missed before this PR) so that the handshake acts as the first heartbeat and we don’t send an extra hello.
  • Re-enable the ignored SDAM unified tests.

What do you think?

P.S: For the regular heartbeat, we send a ServerHeartbeatSucceededEvent right after receiving the regular heartbeat response. We could do the same for the initial connection: emit the succeeded event right after open, since open already includes receiving the handshake.

connection.open(operationContextFactory.create());
roundTripTimeSampler.addSample(connection.getInitialServerDescription().getRoundTripTimeNanos());

logAndNotifyHeartbeatSucceeded(.....)                                                                                                           

}

logStateChange(previousServerDescription, currentServerDescription);
sdamProvider.get().monitorUpdate(currentServerDescription);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.bson.BsonDocument;
import org.bson.ByteBufNIO;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.opentest4j.AssertionFailedError;

Expand Down Expand Up @@ -254,6 +255,66 @@ public void serverHeartbeatFailed(final ServerHeartbeatFailedEvent event) {
assertEquals(expectedEvents, events);
}

@Test
void shouldSendHeartbeatEventAfterInitialConnectionInPollMode() throws Exception {
// This test verifies that in POLL mode with a long heartbeat frequency,
// the monitor still fires a ServerHeartbeatSucceededEvent shortly after initial connection.
// This is critical for some unified tests like backpressure-network-error-fail.yml that rely on
// receiving an initial heartbeat event even with very long polling intervals.

// Given
ConnectionDescription connectionDescription = createDefaultConnectionDescription();
ServerDescription initialServerDescription = createDefaultServerDescription();
String helloResponse = "{"
+ LEGACY_HELLO_LOWER + ": true,"
+ "maxBsonObjectSize : 16777216, "
+ "maxMessageSizeBytes : 48000000, "
+ "maxWriteBatchSize : 1000, "
+ "localTime : ISODate(\"2016-04-05T20:36:36.082Z\"), "
+ "maxWireVersion : 4, "
+ "minWireVersion : 0, "
+ "ok : 1 "
+ "}";

InternalConnection mockConnection = mock(InternalConnection.class);
when(mockConnection.getDescription()).thenReturn(connectionDescription);
when(mockConnection.getInitialServerDescription()).thenReturn(initialServerDescription);
when(mockConnection.getBuffer(anyInt())).thenReturn(new ByteBufNIO(ByteBuffer.allocate(1024)));
when(mockConnection.receive(any(), any())).thenReturn(BsonDocument.parse(helloResponse));

// When
TestServerMonitorListener listener = createTestServerMonitorListener();
// Create monitor explicitly in POLL mode with long heartbeat frequency
DefaultServerMonitor monitor = new DefaultServerMonitor(
new ServerId(new ClusterId(), new ServerAddress()),
ServerSettings.builder()
.heartbeatFrequency(10000, TimeUnit.MILLISECONDS)
.serverMonitoringMode(com.mongodb.connection.ServerMonitoringMode.POLL)
.addServerMonitorListener(listener)
.build(),
createConnectionFactory(mockConnection),
ClusterConnectionMode.SINGLE,
null,
false,
SameObjectProvider.initialized(mock(SdamServerDescriptionManager.class)),
OPERATION_CONTEXT_FACTORY);
monitor.start();
this.monitor = monitor;

// Wait for heartbeat event - should happen quickly despite long heartbeatFrequency
listener.waitForEvents(ServerHeartbeatSucceededEvent.class, event -> true, 1, Duration.ofSeconds(2));
ServerHeartbeatStartedEvent startedEvent = getEvent(ServerHeartbeatStartedEvent.class, listener);
ServerHeartbeatSucceededEvent succeededEvent = getEvent(ServerHeartbeatSucceededEvent.class, listener);

// Then
assertEquals(connectionDescription.getConnectionId(), startedEvent.getConnectionId());
assertEquals(connectionDescription.getConnectionId(), succeededEvent.getConnectionId());
assertEquals(BsonDocument.parse(helloResponse), succeededEvent.getReply());
assertTrue(succeededEvent.getElapsedTime(TimeUnit.NANOSECONDS) > 0);
// The event should not be awaited in POLL mode
Assertions.assertFalse(succeededEvent.isAwaited());
}


private InternalConnectionFactory createConnectionFactory(final InternalConnection connection) {
InternalConnectionFactory factory = mock(InternalConnectionFactory.class);
Expand Down