backend: inbox table + endpoints, mention-triggered inbox population
- Migration: inbox table with user_id, message_id, trigger, acked_at - GET /api/inbox?user= — returns unacked inbox items with full message + channel - POST /api/inbox/ack — ack items by ID array - post_message now calls populate_inbox() to create entries for @mentions - Handles @agents (all agents) and @apes (all apes) broadcasts - parse_mentions made public for reuse Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
1
.claude/scheduled_tasks.lock
Normal file
1
.claude/scheduled_tasks.lock
Normal file
@@ -0,0 +1 @@
|
||||
{"sessionId":"6df01b4f-363c-489f-b71c-23531c8f6831","pid":73181,"acquiredAt":1774816757211}
|
||||
@@ -97,6 +97,28 @@ pub enum WsEvent {
|
||||
Delete { id: Uuid },
|
||||
}
|
||||
|
||||
// ── Inbox types ──
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
|
||||
#[ts(export)]
|
||||
pub struct InboxItem {
|
||||
pub id: i64,
|
||||
pub message: Message,
|
||||
pub channel_name: String,
|
||||
pub trigger: String,
|
||||
pub created_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct InboxQuery {
|
||||
pub user: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct AckRequest {
|
||||
pub ids: Vec<i64>,
|
||||
}
|
||||
|
||||
// ── Query params ──
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
|
||||
11
crates/colony/migrations/20260329000002_inbox.sql
Normal file
11
crates/colony/migrations/20260329000002_inbox.sql
Normal file
@@ -0,0 +1,11 @@
|
||||
CREATE TABLE IF NOT EXISTS inbox (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
user_id TEXT NOT NULL REFERENCES users(id),
|
||||
message_id TEXT NOT NULL REFERENCES messages(id),
|
||||
channel_id TEXT NOT NULL,
|
||||
trigger TEXT NOT NULL CHECK (trigger IN ('mention', 'watch', 'broadcast')),
|
||||
acked_at TEXT,
|
||||
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_inbox_user_unacked ON inbox(user_id, acked_at);
|
||||
@@ -3,7 +3,7 @@ use sqlx::FromRow;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Extract @mentions from message content
|
||||
fn parse_mentions(content: &str) -> Vec<String> {
|
||||
pub fn parse_mentions(content: &str) -> Vec<String> {
|
||||
content
|
||||
.split_whitespace()
|
||||
.filter(|w| w.starts_with('@') && w.len() > 1)
|
||||
|
||||
@@ -59,6 +59,8 @@ async fn main() {
|
||||
"/api/channels/{channel_id}/messages/{msg_id}/restore",
|
||||
axum::routing::post(routes::restore_message),
|
||||
)
|
||||
.route("/api/inbox", get(routes::get_inbox))
|
||||
.route("/api/inbox/ack", axum::routing::post(routes::ack_inbox))
|
||||
.route("/ws/{channel_id}", get(ws::ws_handler))
|
||||
.fallback_service(
|
||||
ServeDir::new("static").fallback(ServeFile::new("static/index.html")),
|
||||
|
||||
@@ -257,7 +257,7 @@ pub async fn post_message(
|
||||
)
|
||||
.bind(&id)
|
||||
.bind(&channel_id)
|
||||
.bind(user_id)
|
||||
.bind(&user_id)
|
||||
.bind(msg_type)
|
||||
.bind(&body.content)
|
||||
.bind(&metadata_json)
|
||||
@@ -280,6 +280,9 @@ pub async fn post_message(
|
||||
let tx = state.get_sender(&channel_id).await;
|
||||
let _ = tx.send(WsEvent::Message(message.clone()));
|
||||
|
||||
// Populate inbox for mentioned users
|
||||
populate_inbox(&state.db, &id, &channel_id, &body.content, &user_id).await;
|
||||
|
||||
Ok((StatusCode::CREATED, Json(message)))
|
||||
}
|
||||
|
||||
@@ -359,6 +362,167 @@ pub async fn restore_message(
|
||||
Ok(Json(message))
|
||||
}
|
||||
|
||||
// ── Inbox ──
|
||||
|
||||
pub async fn get_inbox(
|
||||
State(state): State<AppState>,
|
||||
Query(query): Query<InboxQuery>,
|
||||
) -> Result<Json<Vec<InboxItem>>> {
|
||||
let user_id = resolve_user(&state.db, &UserParam { user: Some(query.user) }).await?;
|
||||
|
||||
let rows = sqlx::query_as::<_, InboxRow>(
|
||||
"SELECT i.id as inbox_id, i.message_id, i.channel_id as inbox_channel_id, i.trigger, i.created_at as i_created_at,
|
||||
m.id as msg_id, m.seq, m.channel_id as msg_channel_id, m.user_id, m.type, m.content, m.metadata, m.reply_to, m.created_at as msg_created_at, m.updated_at, m.deleted_at,
|
||||
u.id as u_id, u.username, u.display_name, u.role, u.created_at as u_created_at,
|
||||
c.name as channel_name
|
||||
FROM inbox i
|
||||
JOIN messages m ON i.message_id = m.id
|
||||
JOIN users u ON m.user_id = u.id
|
||||
JOIN channels c ON i.channel_id = c.id
|
||||
WHERE i.user_id = ? AND i.acked_at IS NULL
|
||||
ORDER BY i.id ASC",
|
||||
)
|
||||
.bind(&user_id)
|
||||
.fetch_all(&state.db)
|
||||
.await?;
|
||||
|
||||
let items: Vec<InboxItem> = rows.iter().map(|r| r.to_api()).collect();
|
||||
Ok(Json(items))
|
||||
}
|
||||
|
||||
pub async fn ack_inbox(
|
||||
State(state): State<AppState>,
|
||||
Json(body): Json<AckRequest>,
|
||||
) -> Result<impl IntoResponse> {
|
||||
for id in &body.ids {
|
||||
sqlx::query("UPDATE inbox SET acked_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') WHERE id = ? AND acked_at IS NULL")
|
||||
.bind(id)
|
||||
.execute(&state.db)
|
||||
.await?;
|
||||
}
|
||||
Ok(Json(serde_json::json!({"acked": body.ids.len()})))
|
||||
}
|
||||
|
||||
/// Populate inbox entries when a message is posted
|
||||
async fn populate_inbox(db: &SqlitePool, message_id: &str, channel_id: &str, content: &str, sender_id: &str) {
|
||||
let mentions = crate::db::parse_mentions(content);
|
||||
|
||||
for mention in &mentions {
|
||||
// Resolve mentioned user
|
||||
if let Ok(Some(user_id)) = sqlx::query_scalar::<_, String>("SELECT id FROM users WHERE username = ?")
|
||||
.bind(mention)
|
||||
.fetch_optional(db)
|
||||
.await
|
||||
{
|
||||
if user_id != sender_id {
|
||||
let _ = sqlx::query("INSERT INTO inbox (user_id, message_id, channel_id, trigger) VALUES (?, ?, ?, 'mention')")
|
||||
.bind(&user_id)
|
||||
.bind(message_id)
|
||||
.bind(channel_id)
|
||||
.execute(db)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
// Handle @agents broadcast
|
||||
if mention == "agents" {
|
||||
let agents = sqlx::query_scalar::<_, String>("SELECT id FROM users WHERE role = 'agent' AND id != ?")
|
||||
.bind(sender_id)
|
||||
.fetch_all(db)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
for agent_id in agents {
|
||||
let _ = sqlx::query("INSERT INTO inbox (user_id, message_id, channel_id, trigger) VALUES (?, ?, ?, 'broadcast')")
|
||||
.bind(&agent_id)
|
||||
.bind(message_id)
|
||||
.bind(channel_id)
|
||||
.execute(db)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
// Handle @apes broadcast
|
||||
if mention == "apes" {
|
||||
let apes = sqlx::query_scalar::<_, String>("SELECT id FROM users WHERE role = 'ape' AND id != ?")
|
||||
.bind(sender_id)
|
||||
.fetch_all(db)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
for ape_id in apes {
|
||||
let _ = sqlx::query("INSERT INTO inbox (user_id, message_id, channel_id, trigger) VALUES (?, ?, ?, 'broadcast')")
|
||||
.bind(&ape_id)
|
||||
.bind(message_id)
|
||||
.bind(channel_id)
|
||||
.execute(db)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Inbox row type ──
|
||||
|
||||
#[derive(Debug, sqlx::FromRow)]
|
||||
pub struct InboxRow {
|
||||
pub inbox_id: i64,
|
||||
pub message_id: String,
|
||||
pub inbox_channel_id: String,
|
||||
pub trigger: String,
|
||||
pub i_created_at: String,
|
||||
pub channel_name: String,
|
||||
// message fields
|
||||
pub msg_id: String,
|
||||
pub seq: i64,
|
||||
pub msg_channel_id: String,
|
||||
pub user_id: String,
|
||||
pub r#type: String,
|
||||
pub content: String,
|
||||
pub metadata: Option<String>,
|
||||
pub reply_to: Option<String>,
|
||||
pub msg_created_at: String,
|
||||
pub updated_at: Option<String>,
|
||||
pub deleted_at: Option<String>,
|
||||
// user fields
|
||||
pub u_id: String,
|
||||
pub username: String,
|
||||
pub display_name: String,
|
||||
pub role: String,
|
||||
pub u_created_at: String,
|
||||
}
|
||||
|
||||
impl InboxRow {
|
||||
pub fn to_api(&self) -> InboxItem {
|
||||
let user_row = UserRow {
|
||||
id: self.u_id.clone(),
|
||||
username: self.username.clone(),
|
||||
display_name: self.display_name.clone(),
|
||||
role: self.role.clone(),
|
||||
password_hash: None,
|
||||
created_at: self.u_created_at.clone(),
|
||||
};
|
||||
let msg_row = MessageRow {
|
||||
id: self.msg_id.clone(),
|
||||
seq: self.seq,
|
||||
channel_id: self.msg_channel_id.clone(),
|
||||
user_id: self.user_id.clone(),
|
||||
r#type: self.r#type.clone(),
|
||||
content: self.content.clone(),
|
||||
metadata: self.metadata.clone(),
|
||||
reply_to: self.reply_to.clone(),
|
||||
created_at: self.msg_created_at.clone(),
|
||||
updated_at: self.updated_at.clone(),
|
||||
deleted_at: self.deleted_at.clone(),
|
||||
};
|
||||
InboxItem {
|
||||
id: self.inbox_id,
|
||||
message: msg_row.to_api(&user_row),
|
||||
channel_name: self.channel_name.clone(),
|
||||
trigger: self.trigger.clone(),
|
||||
created_at: self.i_created_at.parse().unwrap(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Joined row type for message + user ──
|
||||
|
||||
#[derive(Debug, sqlx::FromRow)]
|
||||
|
||||
Reference in New Issue
Block a user