diff --git a/.claude/scheduled_tasks.lock b/.claude/scheduled_tasks.lock new file mode 100644 index 0000000..98dac30 --- /dev/null +++ b/.claude/scheduled_tasks.lock @@ -0,0 +1 @@ +{"sessionId":"6df01b4f-363c-489f-b71c-23531c8f6831","pid":73181,"acquiredAt":1774816757211} \ No newline at end of file diff --git a/crates/colony-types/src/lib.rs b/crates/colony-types/src/lib.rs index 1aa3cee..f0d47ae 100644 --- a/crates/colony-types/src/lib.rs +++ b/crates/colony-types/src/lib.rs @@ -97,6 +97,28 @@ pub enum WsEvent { Delete { id: Uuid }, } +// ── Inbox types ── + +#[derive(Debug, Clone, Serialize, Deserialize, TS)] +#[ts(export)] +pub struct InboxItem { + pub id: i64, + pub message: Message, + pub channel_name: String, + pub trigger: String, + pub created_at: DateTime, +} + +#[derive(Debug, Deserialize)] +pub struct InboxQuery { + pub user: String, +} + +#[derive(Debug, Deserialize)] +pub struct AckRequest { + pub ids: Vec, +} + // ── Query params ── #[derive(Debug, Deserialize)] diff --git a/crates/colony/migrations/20260329000002_inbox.sql b/crates/colony/migrations/20260329000002_inbox.sql new file mode 100644 index 0000000..4759604 --- /dev/null +++ b/crates/colony/migrations/20260329000002_inbox.sql @@ -0,0 +1,11 @@ +CREATE TABLE IF NOT EXISTS inbox ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id TEXT NOT NULL REFERENCES users(id), + message_id TEXT NOT NULL REFERENCES messages(id), + channel_id TEXT NOT NULL, + trigger TEXT NOT NULL CHECK (trigger IN ('mention', 'watch', 'broadcast')), + acked_at TEXT, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) +); + +CREATE INDEX IF NOT EXISTS idx_inbox_user_unacked ON inbox(user_id, acked_at); diff --git a/crates/colony/src/db.rs b/crates/colony/src/db.rs index f845929..a327574 100644 --- a/crates/colony/src/db.rs +++ b/crates/colony/src/db.rs @@ -3,7 +3,7 @@ use sqlx::FromRow; use uuid::Uuid; /// Extract @mentions from message content -fn parse_mentions(content: &str) -> Vec { +pub fn parse_mentions(content: &str) -> Vec { content .split_whitespace() .filter(|w| w.starts_with('@') && w.len() > 1) diff --git a/crates/colony/src/main.rs b/crates/colony/src/main.rs index 2495736..b6e80df 100644 --- a/crates/colony/src/main.rs +++ b/crates/colony/src/main.rs @@ -59,6 +59,8 @@ async fn main() { "/api/channels/{channel_id}/messages/{msg_id}/restore", axum::routing::post(routes::restore_message), ) + .route("/api/inbox", get(routes::get_inbox)) + .route("/api/inbox/ack", axum::routing::post(routes::ack_inbox)) .route("/ws/{channel_id}", get(ws::ws_handler)) .fallback_service( ServeDir::new("static").fallback(ServeFile::new("static/index.html")), diff --git a/crates/colony/src/routes.rs b/crates/colony/src/routes.rs index 4a651e4..255cb77 100644 --- a/crates/colony/src/routes.rs +++ b/crates/colony/src/routes.rs @@ -257,7 +257,7 @@ pub async fn post_message( ) .bind(&id) .bind(&channel_id) - .bind(user_id) + .bind(&user_id) .bind(msg_type) .bind(&body.content) .bind(&metadata_json) @@ -280,6 +280,9 @@ pub async fn post_message( let tx = state.get_sender(&channel_id).await; let _ = tx.send(WsEvent::Message(message.clone())); + // Populate inbox for mentioned users + populate_inbox(&state.db, &id, &channel_id, &body.content, &user_id).await; + Ok((StatusCode::CREATED, Json(message))) } @@ -359,6 +362,167 @@ pub async fn restore_message( Ok(Json(message)) } +// ── Inbox ── + +pub async fn get_inbox( + State(state): State, + Query(query): Query, +) -> Result>> { + let user_id = resolve_user(&state.db, &UserParam { user: Some(query.user) }).await?; + + let rows = sqlx::query_as::<_, InboxRow>( + "SELECT i.id as inbox_id, i.message_id, i.channel_id as inbox_channel_id, i.trigger, i.created_at as i_created_at, + m.id as msg_id, m.seq, m.channel_id as msg_channel_id, m.user_id, m.type, m.content, m.metadata, m.reply_to, m.created_at as msg_created_at, m.updated_at, m.deleted_at, + u.id as u_id, u.username, u.display_name, u.role, u.created_at as u_created_at, + c.name as channel_name + FROM inbox i + JOIN messages m ON i.message_id = m.id + JOIN users u ON m.user_id = u.id + JOIN channels c ON i.channel_id = c.id + WHERE i.user_id = ? AND i.acked_at IS NULL + ORDER BY i.id ASC", + ) + .bind(&user_id) + .fetch_all(&state.db) + .await?; + + let items: Vec = rows.iter().map(|r| r.to_api()).collect(); + Ok(Json(items)) +} + +pub async fn ack_inbox( + State(state): State, + Json(body): Json, +) -> Result { + for id in &body.ids { + sqlx::query("UPDATE inbox SET acked_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') WHERE id = ? AND acked_at IS NULL") + .bind(id) + .execute(&state.db) + .await?; + } + Ok(Json(serde_json::json!({"acked": body.ids.len()}))) +} + +/// Populate inbox entries when a message is posted +async fn populate_inbox(db: &SqlitePool, message_id: &str, channel_id: &str, content: &str, sender_id: &str) { + let mentions = crate::db::parse_mentions(content); + + for mention in &mentions { + // Resolve mentioned user + if let Ok(Some(user_id)) = sqlx::query_scalar::<_, String>("SELECT id FROM users WHERE username = ?") + .bind(mention) + .fetch_optional(db) + .await + { + if user_id != sender_id { + let _ = sqlx::query("INSERT INTO inbox (user_id, message_id, channel_id, trigger) VALUES (?, ?, ?, 'mention')") + .bind(&user_id) + .bind(message_id) + .bind(channel_id) + .execute(db) + .await; + } + } + + // Handle @agents broadcast + if mention == "agents" { + let agents = sqlx::query_scalar::<_, String>("SELECT id FROM users WHERE role = 'agent' AND id != ?") + .bind(sender_id) + .fetch_all(db) + .await + .unwrap_or_default(); + for agent_id in agents { + let _ = sqlx::query("INSERT INTO inbox (user_id, message_id, channel_id, trigger) VALUES (?, ?, ?, 'broadcast')") + .bind(&agent_id) + .bind(message_id) + .bind(channel_id) + .execute(db) + .await; + } + } + + // Handle @apes broadcast + if mention == "apes" { + let apes = sqlx::query_scalar::<_, String>("SELECT id FROM users WHERE role = 'ape' AND id != ?") + .bind(sender_id) + .fetch_all(db) + .await + .unwrap_or_default(); + for ape_id in apes { + let _ = sqlx::query("INSERT INTO inbox (user_id, message_id, channel_id, trigger) VALUES (?, ?, ?, 'broadcast')") + .bind(&ape_id) + .bind(message_id) + .bind(channel_id) + .execute(db) + .await; + } + } + } +} + +// ── Inbox row type ── + +#[derive(Debug, sqlx::FromRow)] +pub struct InboxRow { + pub inbox_id: i64, + pub message_id: String, + pub inbox_channel_id: String, + pub trigger: String, + pub i_created_at: String, + pub channel_name: String, + // message fields + pub msg_id: String, + pub seq: i64, + pub msg_channel_id: String, + pub user_id: String, + pub r#type: String, + pub content: String, + pub metadata: Option, + pub reply_to: Option, + pub msg_created_at: String, + pub updated_at: Option, + pub deleted_at: Option, + // user fields + pub u_id: String, + pub username: String, + pub display_name: String, + pub role: String, + pub u_created_at: String, +} + +impl InboxRow { + pub fn to_api(&self) -> InboxItem { + let user_row = UserRow { + id: self.u_id.clone(), + username: self.username.clone(), + display_name: self.display_name.clone(), + role: self.role.clone(), + password_hash: None, + created_at: self.u_created_at.clone(), + }; + let msg_row = MessageRow { + id: self.msg_id.clone(), + seq: self.seq, + channel_id: self.msg_channel_id.clone(), + user_id: self.user_id.clone(), + r#type: self.r#type.clone(), + content: self.content.clone(), + metadata: self.metadata.clone(), + reply_to: self.reply_to.clone(), + created_at: self.msg_created_at.clone(), + updated_at: self.updated_at.clone(), + deleted_at: self.deleted_at.clone(), + }; + InboxItem { + id: self.inbox_id, + message: msg_row.to_api(&user_row), + channel_name: self.channel_name.clone(), + trigger: self.trigger.clone(), + created_at: self.i_created_at.parse().unwrap(), + } + } +} + // ── Joined row type for message + user ── #[derive(Debug, sqlx::FromRow)]