Skip to content

Commit 989aca8

Browse files
committed
feat: Add message observing status API
- Add separate message_observing_tracking table (decoupled from messages) - Implement Go API endpoint: GET /session/{id}/observing-status - Add Python SDK method: messages_observing_status() - Add TypeScript SDK method: messagesObservingStatus() - Returns counts of observed, in_process, pending messages Backend (Go): - Model layer: MessageObservingStatus, MessageObservingTracking - Repository layer: GORM with LEFT JOIN query - Service layer: Business logic wrapper - Handler layer: HTTP endpoint with unit tests - Router: URL registration - Dependency injection: Full wiring Python SDK: - Type: MessageObservingStatus (Pydantic) - Method: sessions.messages_observing_status() TypeScript SDK: - Schema: MessageObservingStatusSchema (Zod) - Type: MessageObservingStatus - Method: sessions.messagesObservingStatus() Database: - Migration: 002_create_message_observing_tracking.sql - Table: message_observing_tracking with FK to messages Tests: - Handler tests with mocks (all passing) - End-to-end testing pending migration
1 parent 5b3111f commit 989aca8

File tree

17 files changed

+601
-22
lines changed

17 files changed

+601
-22
lines changed

src/client/acontext-py/src/acontext/resources/sessions.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
LearningStatus,
1616
ListSessionsOutput,
1717
Message,
18+
MessageObservingStatus,
1819
Session,
1920
TokenCounts,
2021
)
@@ -348,3 +349,20 @@ def get_token_counts(self, session_id: str) -> TokenCounts:
348349
"""
349350
data = self._requester.request("GET", f"/session/{session_id}/token_counts")
350351
return TokenCounts.model_validate(data)
352+
def messages_observing_status(self, session_id: str) -> MessageObservingStatus:
353+
"""Get message observing status counts for a session.
354+
355+
Returns the count of messages by their observing status:
356+
observed, in_process, and pending.
357+
358+
Args:
359+
session_id: The UUID of the session.
360+
361+
Returns:
362+
MessageObservingStatus object containing observed, in_process,
363+
pending counts and updated_at timestamp.
364+
"""
365+
data = self._requester.request(
366+
"GET", f"/session/{session_id}/observing-status"
367+
)
368+
return MessageObservingStatus.model_validate(data)

src/client/acontext-py/src/acontext/types/session.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,3 +249,19 @@ class TokenCounts(BaseModel):
249249
...,
250250
description="Total token count for all text and tool-call parts in a session",
251251
)
252+
253+
class MessageObservingStatus(BaseModel):
254+
"""Response model for message observing status."""
255+
256+
observed: int = Field(
257+
..., description="Number of messages with observed status"
258+
)
259+
in_process: int = Field(
260+
..., description="Number of messages with in_process status"
261+
)
262+
pending: int = Field(
263+
..., description="Number of messages with pending status"
264+
)
265+
updated_at: str = Field(
266+
..., description="Timestamp when the status was retrieved"
267+
)

src/client/acontext-ts/package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/client/acontext-ts/src/resources/sessions.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import {
1717
ListSessionsOutput,
1818
ListSessionsOutputSchema,
1919
Message,
20+
MessageObservingStatus,
21+
MessageObservingStatusSchema,
2022
MessageSchema,
2123
Session,
2224
SessionSchema,
@@ -264,5 +266,20 @@ export class SessionsAPI {
264266
const data = await this.requester.request('GET', `/session/${sessionId}/token_counts`);
265267
return TokenCountsSchema.parse(data);
266268
}
269+
270+
/**
271+
* Get message observing status counts for a session.
272+
*
273+
* Returns the count of messages by their observing status:
274+
* observed, in_process, and pending.
275+
*
276+
* @param sessionId - The UUID of the session.
277+
* @returns MessageObservingStatus object containing observed, in_process,
278+
* pending counts and updated_at timestamp.
279+
*/
280+
async messagesObservingStatus(sessionId: string): Promise<MessageObservingStatus> {
281+
const data = await this.requester.request('GET', `/session/${sessionId}/observing-status`);
282+
return MessageObservingStatusSchema.parse(data);
283+
}
267284
}
268285

src/client/acontext-ts/src/types/session.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,15 @@ export const TokenCountsSchema = z.object({
126126

127127
export type TokenCounts = z.infer<typeof TokenCountsSchema>;
128128

129+
export const MessageObservingStatusSchema = z.object({
130+
observed: z.number(),
131+
in_process: z.number(),
132+
pending: z.number(),
133+
updated_at: z.string(),
134+
});
135+
136+
export type MessageObservingStatus = z.infer<typeof MessageObservingStatusSchema>;
137+
129138
/**
130139
* Parameters for the remove_tool_result edit strategy.
131140
*/

src/server/api/go/cmd/server/main.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -90,18 +90,20 @@ func main() {
9090
artifactHandler := do.MustInvoke[*handler.ArtifactHandler](inj)
9191
taskHandler := do.MustInvoke[*handler.TaskHandler](inj)
9292
toolHandler := do.MustInvoke[*handler.ToolHandler](inj)
93+
messageObservingHandler := do.MustInvoke[*handler.MessageObservingHandler](inj)
9394

9495
engine := router.NewRouter(router.RouterDeps{
95-
Config: cfg,
96-
DB: db,
97-
Log: log,
98-
SpaceHandler: spaceHandler,
99-
BlockHandler: blockHandler,
100-
SessionHandler: sessionHandler,
101-
DiskHandler: diskHandler,
102-
ArtifactHandler: artifactHandler,
103-
TaskHandler: taskHandler,
104-
ToolHandler: toolHandler,
96+
Config: cfg,
97+
DB: db,
98+
Log: log,
99+
SpaceHandler: spaceHandler,
100+
BlockHandler: blockHandler,
101+
SessionHandler: sessionHandler,
102+
DiskHandler: diskHandler,
103+
ArtifactHandler: artifactHandler,
104+
TaskHandler: taskHandler,
105+
ToolHandler: toolHandler,
106+
MessageObservingHandler: messageObservingHandler,
105107
})
106108

107109
addr := fmt.Sprintf("%s:%d", cfg.App.Host, cfg.App.Port)

src/server/api/go/internal/bootstrap/container.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,10 @@ func BuildContainer() *do.Injector {
170170
return repo.NewTaskRepo(do.MustInvoke[*gorm.DB](i)), nil
171171
})
172172

173+
do.Provide(inj, func(i *do.Injector) (repo.MessageObservingRepo, error) {
174+
return repo.NewMessageObservingRepo(do.MustInvoke[*gorm.DB](i)), nil
175+
})
176+
173177
// Service
174178
do.Provide(inj, func(i *do.Injector) (service.SpaceService, error) {
175179
return service.NewSpaceService(
@@ -208,6 +212,11 @@ func BuildContainer() *do.Injector {
208212
do.MustInvoke[*zap.Logger](i),
209213
), nil
210214
})
215+
do.Provide(inj, func(i *do.Injector) (service.MessageObservingService, error) {
216+
return service.NewMessageObservingService(
217+
do.MustInvoke[repo.MessageObservingRepo](i),
218+
), nil
219+
})
211220

212221
// Handler
213222
do.Provide(inj, func(i *do.Injector) (*handler.SpaceHandler, error) {
@@ -240,6 +249,11 @@ func BuildContainer() *do.Injector {
240249
do.Provide(inj, func(i *do.Injector) (*handler.ToolHandler, error) {
241250
return handler.NewToolHandler(do.MustInvoke[*httpclient.CoreClient](i)), nil
242251
})
252+
do.Provide(inj, func(i *do.Injector) (*handler.MessageObservingHandler, error) {
253+
return handler.NewMessageObservingHandler(
254+
do.MustInvoke[service.MessageObservingService](i),
255+
), nil
256+
})
243257

244258
return inj
245259
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package handler
2+
3+
import (
4+
"net/http"
5+
6+
"github.com/gin-gonic/gin"
7+
"github.com/memodb-io/Acontext/internal/modules/service"
8+
)
9+
10+
// MessageObservingHandler handles HTTP requests for message observing status
11+
type MessageObservingHandler struct {
12+
svc service.MessageObservingService
13+
}
14+
15+
// NewMessageObservingHandler creates a new message observing handler
16+
func NewMessageObservingHandler(svc service.MessageObservingService) *MessageObservingHandler {
17+
if svc == nil {
18+
panic("message observing service cannot be nil")
19+
}
20+
return &MessageObservingHandler{
21+
svc: svc,
22+
}
23+
}
24+
25+
// GetSessionObservingStatus handles GET /api/v1/sessions/:session_id/observing-status
26+
//
27+
// @Summary Get message observing status for a session
28+
// @Description Returns the count of observed, in_process, and pending messages
29+
// @Tags sessions
30+
// @Accept json
31+
// @Produce json
32+
// @Param session_id path string true "Session ID" format(uuid)
33+
// @Success 200 {object} model.MessageObservingStatus
34+
// @Failure 400 {object} map[string]string
35+
// @Failure 500 {object} map[string]string
36+
// @Router /api/v1/sessions/{session_id}/observing-status [get]
37+
func (h *MessageObservingHandler) GetSessionObservingStatus(c *gin.Context) {
38+
// Extract session_id from URL path
39+
sessionID := c.Param("session_id")
40+
41+
// Validate session ID
42+
if sessionID == "" {
43+
c.JSON(http.StatusBadRequest, gin.H{
44+
"error": "session_id is required",
45+
})
46+
return
47+
}
48+
49+
// Call service
50+
status, err := h.svc.GetSessionObservingStatus(c.Request.Context(), sessionID)
51+
if err != nil {
52+
c.JSON(http.StatusInternalServerError, gin.H{
53+
"error": err.Error(),
54+
})
55+
return
56+
}
57+
58+
// Return success response
59+
c.JSON(http.StatusOK, status)
60+
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package handler
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net/http"
7+
"net/http/httptest"
8+
"testing"
9+
"time"
10+
11+
"github.com/gin-gonic/gin"
12+
"github.com/memodb-io/Acontext/internal/modules/model"
13+
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/mock"
15+
)
16+
17+
// MockMessageObservingService is a mock implementation of MessageObservingService
18+
type MockMessageObservingService struct {
19+
mock.Mock
20+
}
21+
22+
func (m *MockMessageObservingService) GetSessionObservingStatus(ctx context.Context, sessionID string) (*model.MessageObservingStatus, error) {
23+
args := m.Called(ctx, sessionID)
24+
if args.Get(0) == nil {
25+
return nil, args.Error(1)
26+
}
27+
return args.Get(0).(*model.MessageObservingStatus), args.Error(1)
28+
}
29+
30+
func TestMessageObservingHandler_GetSessionObservingStatus_Success(t *testing.T) {
31+
// Setup
32+
gin.SetMode(gin.TestMode)
33+
34+
mockService := new(MockMessageObservingService)
35+
handler := NewMessageObservingHandler(mockService)
36+
37+
sessionID := "550e8400-e29b-41d4-a716-446655440000"
38+
expectedStatus := &model.MessageObservingStatus{
39+
Observed: 10,
40+
InProcess: 5,
41+
Pending: 3,
42+
UpdatedAt: time.Now(),
43+
}
44+
45+
mockService.On("GetSessionObservingStatus", mock.Anything, sessionID).
46+
Return(expectedStatus, nil)
47+
48+
// Create test request
49+
w := httptest.NewRecorder()
50+
c, _ := gin.CreateTestContext(w)
51+
c.Params = gin.Params{
52+
{Key: "session_id", Value: sessionID},
53+
}
54+
req, _ := http.NewRequest("GET", "/session/"+sessionID+"/observing-status", nil)
55+
c.Request = req
56+
57+
// Execute
58+
handler.GetSessionObservingStatus(c)
59+
60+
// Assert
61+
assert.Equal(t, http.StatusOK, w.Code)
62+
mockService.AssertExpectations(t)
63+
64+
// Check response body contains expected values
65+
assert.Contains(t, w.Body.String(), `"observed":10`)
66+
assert.Contains(t, w.Body.String(), `"in_process":5`)
67+
assert.Contains(t, w.Body.String(), `"pending":3`)
68+
}
69+
70+
func TestMessageObservingHandler_GetSessionObservingStatus_EmptySessionID(t *testing.T) {
71+
// Setup
72+
gin.SetMode(gin.TestMode)
73+
74+
mockService := new(MockMessageObservingService)
75+
handler := NewMessageObservingHandler(mockService)
76+
77+
// Create test request with empty session_id
78+
w := httptest.NewRecorder()
79+
c, _ := gin.CreateTestContext(w)
80+
c.Params = gin.Params{
81+
{Key: "session_id", Value: ""},
82+
}
83+
req, _ := http.NewRequest("GET", "/session//observing-status", nil)
84+
c.Request = req
85+
86+
// Execute
87+
handler.GetSessionObservingStatus(c)
88+
89+
// Assert
90+
assert.Equal(t, http.StatusBadRequest, w.Code)
91+
assert.Contains(t, w.Body.String(), "session_id is required")
92+
mockService.AssertNotCalled(t, "GetSessionObservingStatus")
93+
}
94+
95+
func TestMessageObservingHandler_GetSessionObservingStatus_ServiceError(t *testing.T) {
96+
// Setup
97+
gin.SetMode(gin.TestMode)
98+
99+
mockService := new(MockMessageObservingService)
100+
handler := NewMessageObservingHandler(mockService)
101+
102+
sessionID := "550e8400-e29b-41d4-a716-446655440000"
103+
expectedError := errors.New("database connection failed")
104+
105+
mockService.On("GetSessionObservingStatus", mock.Anything, sessionID).
106+
Return(nil, expectedError)
107+
108+
// Create test request
109+
w := httptest.NewRecorder()
110+
c, _ := gin.CreateTestContext(w)
111+
c.Params = gin.Params{
112+
{Key: "session_id", Value: sessionID},
113+
}
114+
req, _ := http.NewRequest("GET", "/session/"+sessionID+"/observing-status", nil)
115+
c.Request = req
116+
117+
// Execute
118+
handler.GetSessionObservingStatus(c)
119+
120+
// Assert
121+
assert.Equal(t, http.StatusInternalServerError, w.Code)
122+
assert.Contains(t, w.Body.String(), "database connection failed")
123+
mockService.AssertExpectations(t)
124+
}
125+
126+
func TestNewMessageObservingHandler_NilService(t *testing.T) {
127+
// Should panic when service is nil
128+
assert.Panics(t, func() {
129+
NewMessageObservingHandler(nil)
130+
})
131+
}

0 commit comments

Comments
 (0)