Skip to content

Conversation

@slyt3
Copy link
Contributor

@slyt3 slyt3 commented Dec 17, 2025

Summary

Implements API endpoint to return message observing status counts (observed, in_process, pending) for sessions.

Following team feedback, architecture uses a separate message_observing_tracking table decoupled from messages table, with foreign key binding for data integrity.

API Endpoint: GET /api/v1/session/{session_id}/observing-status

Response Example:

{
  "observed": 10,
  "in_process": 5,
  "pending": 3,
  "updated_at": "2025-12-17T10:30:00Z"
}

Why we need this PR?

This should close Issue #75

Users need visibility into message processing status to:

  • Monitor message processing progress
  • Debug stuck or pending messages
  • Track system health and performance
  • Understand message pipeline state

Changes

Backend (Go):

  • Model layer: MessageObservingStatus (response), MessageObservingTracking (DB table)
  • Repository layer: GORM query with LEFT JOIN (untracked messages default to 'pending')
  • Service layer: Business logic wrapper with validation
  • Handler layer: HTTP endpoint with comprehensive unit tests
  • Router: URL registration at /session/{id}/observing-status
  • Dependency injection: Full wiring in bootstrap container

Python SDK:

  • Type: MessageObservingStatus (Pydantic BaseModel)
  • Method: client.sessions.messages_observing_status(session_id)

TypeScript SDK:

  • Schema: MessageObservingStatusSchema (Zod validation)
  • Type: MessageObservingStatus (TypeScript interface)
  • Method: client.sessions.messagesObservingStatus(sessionId)

Database:

  • Migration: 002_create_message_observing_tracking.sql
  • Table: message_observing_tracking with FK to messages(id) ON DELETE CASCADE
  • Indexes: On message_id and observing_status for query performance

Impact Areas

Which part of Acontext would this feature affect?

  • Client SDK (Python)
  • Client SDK (TypeScript)
  • Core Service
  • API Server
  • Dashboard
  • CLI Tool
  • Documentation
  • Other: ...

Implementation Tasks

  • Design and create separate tracking table schema
  • Implement Go API backend (Model → Repo → Service → Handler)
  • Add dependency injection wiring
  • Implement Python SDK method
  • Implement TypeScript SDK method
  • Write handler unit tests with mocks
  • Run database migration (pending guidance)
  • End-to-end testing after migration

Testing

Completed:

  • Go handler unit tests (all passing)
    • Success case with mock data
    • Empty session_id validation
    • Service error handling
    • Nil service panic test

Pending:

  • Database migration execution
  • Integration tests with real database
  • End-to-end API testing
  • Python SDK integration test
  • TypeScript SDK integration test

Note: Awaiting guidance on running migrations in development environment for full integration testing.


Key Files:

  • src/server/api/go/internal/modules/handler/message_observing_handler.go
  • src/server/api/go/internal/modules/repo/message_observing_repo_impl.go
  • src/client/acontext-py/src/acontext/resources/sessions.py
  • src/client/acontext-ts/src/resources/sessions.ts
  • src/server/core/migrations/002_create_message_observing_tracking.sql

Questions for Reviewers

  1. How should I run the database migration locally for integration testing?
  2. Should repository tests be added before or after migration?
  3. Any architectural feedback on the separate table approach?

@GenerQAQ
Copy link
Contributor

The Message model has a session_task_process_status field, which is used to track the processing status of messages:

  • pending: waiting for processing
  • running: in progress
  • success: Processing successful
  • failed: Processing failed
    async def process_session_pending_message(
    project_config: ProjectConfig, project_id: asUUID, session_id: asUUID
    ) -> Result[None]:
    pending_message_ids = None
    try:
    async with DB_CLIENT.get_session_context() as session:
    r = await MD.get_message_ids(
    session,
    session_id,
    limit=(
    project_config.project_session_message_buffer_max_overflow
    + project_config.project_session_message_buffer_max_turns
    ),
    asc=True,
    )
    pending_message_ids, eil = r.unpack()
    if eil:
    return r
    if not pending_message_ids:
    return Result.resolve(None)
    await MD.update_message_status_to(
    session, pending_message_ids, TaskStatus.RUNNING
    )
    LOG.info(f"Unpending {len(pending_message_ids)} session messages to process")
    async with DB_CLIENT.get_session_context() as session:
    r = await MD.fetch_messages_data_by_ids(session, pending_message_ids)
    messages, eil = r.unpack()
    if eil:
    return r
    r = await MD.fetch_previous_messages_by_datetime(
    session,
    session_id,
    messages[0].created_at,
    limit=project_config.project_session_message_use_previous_messages_turns,
    )
    messages_data = [
    MessageBlob(
    message_id=m.id, role=m.role, parts=m.parts, task_id=m.task_id
    )
    for m in messages
    ]
    r = await AT.task_agent_curd(
    project_id,
    session_id,
    messages_data,
    max_iterations=project_config.default_task_agent_max_iterations,
    previous_progress_num=project_config.default_task_agent_previous_progress_num,
    )
    after_status = TaskStatus.SUCCESS
    if not r.ok():
    after_status = TaskStatus.FAILED
    async with DB_CLIENT.get_session_context() as session:
    await MD.update_message_status_to(
    session, pending_message_ids, after_status
    )
    return r
    except Exception as e:
    if pending_message_ids is None:
    raise e
    LOG.error(
    f"Exception while processing session pending message: {e}, rollback {len(pending_message_ids)} message status to failed"
    )
    async with DB_CLIENT.get_session_context() as session:
    await MD.update_message_status_to(
    session, pending_message_ids, TaskStatus.FAILED
    )
    raise e

- Query existing session_task_process_status field from messages table
- Map values: success→observed, running→in_process, pending→pending
- Implement Go API endpoint: GET /session/{id}/observing-status
- Add Python SDK method: messages_observing_status()
- Add TypeScript SDK method: messagesObservingStatus()
- Returns counts of observed, in_process, pending messages

Backend (Go):
- Model layer: MessageObservingStatus response type
- Repository layer: Direct query on messages table
- Service layer: Business logic wrapper
- Handler layer: HTTP endpoint with unit tests
- Router: URL registration
- Dependency injection: Full wiring

Python SDK:
- Type: MessageObservingStatus (Pydantic)
- Method: sessions.messages_observing_status()

TypeScript SDK:
- Schema: MessageObservingStatusSchema (Zod)
- Type: MessageObservingStatus
- Method: sessions.messagesObservingStatus()

Tests:
- Handler tests with mocks (all passing)

Note: Uses existing session_task_process_status field. No database migration needed.
@slyt3 slyt3 force-pushed the feat/message-observing-status branch from 989aca8 to 55dd818 Compare December 18, 2025 09:41
@slyt3
Copy link
Contributor Author

slyt3 commented Dec 18, 2025

@gusye1234 @GenerQAQ

Updated Implementation

Changed the approach how @gus asked:

What Changed:

  • Now uses existing session_task_process_status field from messages table
  • Maps values: successobserved, runningin_process, pendingpending
  • Removed separate message_observing_tracking table
  • Removed database migration

The session_task_process_status field already tracks message processing status with different names. API/SDK layer now maps these to the requested format.

@slyt3
Copy link
Contributor Author

slyt3 commented Dec 18, 2025

I tested in Docker env and tested the endpoints, everythings works as expected

Copy link
Contributor

@gusye1234 gusye1234 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not introduce a new handler for this api, just use the session handler please.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you missed the async session client

ArtifactHandler: artifactHandler,
TaskHandler: taskHandler,
ToolHandler: toolHandler,
MessageObservingHandler: messageObservingHandler,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to introduce a new handler just for this api, this api belongs to session handler

@slyt3
Copy link
Contributor Author

slyt3 commented Dec 18, 2025

Let's not introduce a new handler for this api, just use the session handler please.

okey, gonna keep session-related endpoints togerther, will move to method to SessionHandler later this day.

but should i keep the service/repo layers separate or consolidate to session service/repo too?

yeah forgot to add async version in async_sessions.py ngl missed that.

will update and push later this day. thanks

@gusye1234
Copy link
Contributor

gusye1234 commented Dec 18, 2025

but should i keep the service/repo layers separate or consolidate to session service/repo too?

Oops, miss this.

I don't think you need to add new files in modules/repo and modules/model, those are for ORM.

You can just simply add this function and the struct in modules/handle/session.go, just like every other session apis.

@GenerQAQ , can you look up this PR and back me up? I think we have no need to add new file in /repo and /model in this PR, right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants