use axum::{ extract::{Path, Query, State}, http::StatusCode, response::{IntoResponse, Response}, Json, }; use colony_types::*; use sqlx::SqlitePool; use uuid::Uuid; use crate::db::*; use crate::state::AppState; // ── Error handling ── pub enum AppError { NotFound(String), Conflict(String), BadRequest(String), Internal(String), } impl IntoResponse for AppError { fn into_response(self) -> Response { let (status, message) = match self { AppError::NotFound(m) => (StatusCode::NOT_FOUND, m), AppError::Conflict(m) => (StatusCode::CONFLICT, m), AppError::BadRequest(m) => (StatusCode::BAD_REQUEST, m), AppError::Internal(m) => (StatusCode::INTERNAL_SERVER_ERROR, m), }; (status, Json(serde_json::json!({"error": message}))).into_response() } } impl From for AppError { fn from(e: sqlx::Error) -> Self { match &e { sqlx::Error::Database(db_err) if db_err.message().contains("UNIQUE") => { AppError::Conflict(format!("Already exists: {}", db_err.message())) } sqlx::Error::RowNotFound => AppError::NotFound("Not found".into()), _ => AppError::Internal(format!("Database error: {e}")), } } } type Result = std::result::Result; // ── User identity from ?user= param ── #[derive(Debug, serde::Deserialize)] pub struct UserParam { pub user: Option, } async fn resolve_user(db: &SqlitePool, param: &UserParam) -> Result { let username = param.user.as_deref().unwrap_or("benji"); let row = sqlx::query_scalar::<_, String>( "SELECT id FROM users WHERE username = ?", ) .bind(username) .fetch_optional(db) .await?; match row { Some(id) => Ok(id), None => Err(AppError::BadRequest(format!("Unknown user: {username}"))), } } // ── Health ── pub async fn health() -> &'static str { "ok" } // ── Channels ── pub async fn list_channels(State(state): State) -> Result>> { let rows = sqlx::query_as::<_, ChannelRow>("SELECT * FROM channels ORDER BY created_at") .fetch_all(&state.db) .await?; let channels: Vec = rows.iter().map(|r| r.to_api()).collect(); Ok(Json(channels)) } pub async fn list_users(State(state): State) -> Result>> { let rows = sqlx::query_as::<_, UserRow>("SELECT * FROM users ORDER BY created_at") .fetch_all(&state.db) .await?; Ok(Json(rows.iter().map(|r| r.to_api()).collect())) } pub async fn get_me( State(state): State, Query(user_param): Query, ) -> Result> { let username = user_param.user.as_deref().unwrap_or("benji"); let row = sqlx::query_as::<_, UserRow>("SELECT * FROM users WHERE username = ?") .bind(username) .fetch_optional(&state.db) .await?; match row { Some(r) => Ok(Json(r.to_api())), None => Err(AppError::NotFound(format!("User {username} not found"))), } } pub async fn create_channel( State(state): State, Query(user_param): Query, Json(body): Json, ) -> Result { let id = Uuid::new_v4().to_string(); let created_by = resolve_user(&state.db, &user_param).await?; sqlx::query("INSERT INTO channels (id, name, description, created_by) VALUES (?, ?, ?, ?)") .bind(&id) .bind(&body.name) .bind(&body.description) .bind(created_by) .execute(&state.db) .await?; let row = sqlx::query_as::<_, ChannelRow>("SELECT * FROM channels WHERE id = ?") .bind(&id) .fetch_one(&state.db) .await?; Ok((StatusCode::CREATED, Json(row.to_api()))) } pub async fn get_channel( State(state): State, Path(id): Path, ) -> Result> { let row = sqlx::query_as::<_, ChannelRow>("SELECT * FROM channels WHERE id = ?") .bind(&id) .fetch_optional(&state.db) .await?; match row { Some(r) => Ok(Json(r.to_api())), None => Err(AppError::NotFound(format!("Channel {id} not found"))), } } // ── Messages ── pub async fn list_messages( State(state): State, Path(channel_id): Path, Query(query): Query, ) -> Result>> { let mut sql = String::from( "SELECT m.*, u.id as u_id, u.username, u.display_name, u.role, u.created_at as u_created_at \ FROM messages m JOIN users u ON m.user_id = u.id \ WHERE m.channel_id = ?", ); let mut binds: Vec = vec![channel_id.clone()]; if let Some(after_seq) = &query.after_seq { sql.push_str(" AND m.seq > ?"); binds.push(after_seq.to_string()); } if let Some(msg_type) = &query.r#type { sql.push_str(" AND m.type = ?"); binds.push(match msg_type { MessageType::Text => "text", MessageType::Code => "code", MessageType::Result => "result", MessageType::Error => "error", MessageType::Plan => "plan", } .to_string()); } if let Some(user_id) = &query.user_id { sql.push_str(" AND m.user_id = ?"); binds.push(user_id.to_string()); } sql.push_str(" ORDER BY m.seq ASC"); let mut q = sqlx::query_as::<_, MessageWithUserRow>(&sql); for b in &binds { q = q.bind(b); } let rows = q.fetch_all(&state.db).await?; let messages: Vec = rows.iter().map(|r| r.to_api_message()).collect(); Ok(Json(messages)) } pub async fn post_message( State(state): State, Path(channel_id): Path, Query(user_param): Query, Json(body): Json, ) -> Result { // Verify channel exists let channel_exists = sqlx::query_scalar::<_, i64>( "SELECT COUNT(*) FROM channels WHERE id = ?", ) .bind(&channel_id) .fetch_one(&state.db) .await?; if channel_exists == 0 { return Err(AppError::NotFound(format!("Channel {channel_id} not found"))); } // Verify reply_to is in same channel (if provided) if let Some(ref reply_id) = body.reply_to { let reply_channel = sqlx::query_scalar::<_, String>( "SELECT channel_id FROM messages WHERE id = ?", ) .bind(reply_id.to_string()) .fetch_optional(&state.db) .await?; match reply_channel { None => return Err(AppError::BadRequest(format!("reply_to message {reply_id} not found"))), Some(ch) if ch != channel_id => { return Err(AppError::BadRequest("reply_to must reference a message in the same channel".into())); } _ => {} } } // Content length limit (64KB) if body.content.len() > 65_536 { return Err(AppError::BadRequest("Message content exceeds 64KB limit".into())); } let id = Uuid::new_v4().to_string(); let user_id = resolve_user(&state.db, &user_param).await?; let msg_type = match body.r#type { MessageType::Text => "text", MessageType::Code => "code", MessageType::Result => "result", MessageType::Error => "error", MessageType::Plan => "plan", }; let metadata_json = body .metadata .as_ref() .map(|m| serde_json::to_string(m).unwrap()); let reply_to = body.reply_to.map(|r| r.to_string()); // seq is AUTOINCREMENT — no race conditions, no manual tracking sqlx::query( "INSERT INTO messages (id, channel_id, user_id, type, content, metadata, reply_to) \ VALUES (?, ?, ?, ?, ?, ?, ?)", ) .bind(&id) .bind(&channel_id) .bind(&user_id) .bind(msg_type) .bind(&body.content) .bind(&metadata_json) .bind(&reply_to) .execute(&state.db) .await?; // Fetch the full message with user let row = sqlx::query_as::<_, MessageWithUserRow>( "SELECT m.*, u.id as u_id, u.username, u.display_name, u.role, u.created_at as u_created_at \ FROM messages m JOIN users u ON m.user_id = u.id WHERE m.id = ?", ) .bind(&id) .fetch_one(&state.db) .await?; let message = row.to_api_message(); // Broadcast to WebSocket subscribers let tx = state.get_sender(&channel_id).await; let _ = tx.send(WsEvent::Message(message.clone())); // Populate inbox for mentioned users populate_inbox(&state.db, &id, &channel_id, &body.content, &user_id).await; Ok((StatusCode::CREATED, Json(message))) } pub async fn delete_message( State(state): State, Path((channel_id, msg_id)): Path<(String, String)>, Query(user_param): Query, ) -> Result { let user_id = resolve_user(&state.db, &user_param).await?; // Verify message exists and belongs to this user let row = sqlx::query_as::<_, MessageWithUserRow>( "SELECT m.*, u.id as u_id, u.username, u.display_name, u.role, u.created_at as u_created_at \ FROM messages m JOIN users u ON m.user_id = u.id WHERE m.id = ? AND m.channel_id = ?", ) .bind(&msg_id) .bind(&channel_id) .fetch_optional(&state.db) .await?; let row = match row { Some(r) => r, None => return Err(AppError::NotFound("Message not found".into())), }; if row.user_id != user_id { return Err(AppError::BadRequest("Can only delete your own messages".into())); } // Soft delete sqlx::query("UPDATE messages SET deleted_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') WHERE id = ?") .bind(&msg_id) .execute(&state.db) .await?; // Broadcast delete event let tx = state.get_sender(&channel_id).await; let _ = tx.send(WsEvent::Delete { id: uuid::Uuid::parse_str(&msg_id).unwrap(), }); Ok(StatusCode::NO_CONTENT) } pub async fn restore_message( State(state): State, Path((channel_id, msg_id)): Path<(String, String)>, ) -> Result { // Restore soft-deleted message (any user can restore) let result = sqlx::query( "UPDATE messages SET deleted_at = NULL WHERE id = ? AND channel_id = ? AND deleted_at IS NOT NULL", ) .bind(&msg_id) .bind(&channel_id) .execute(&state.db) .await?; if result.rows_affected() == 0 { return Err(AppError::NotFound("Message not found or not deleted".into())); } // Fetch restored message let row = sqlx::query_as::<_, MessageWithUserRow>( "SELECT m.*, u.id as u_id, u.username, u.display_name, u.role, u.created_at as u_created_at \ FROM messages m JOIN users u ON m.user_id = u.id WHERE m.id = ?", ) .bind(&msg_id) .fetch_one(&state.db) .await?; let message = row.to_api_message(); // Broadcast as edit (not Message — dedup would ignore same ID) let tx = state.get_sender(&channel_id).await; let _ = tx.send(WsEvent::Edit(message.clone())); Ok(Json(message)) } // ── Inbox ── pub async fn get_inbox( State(state): State, Query(query): Query, ) -> Result>> { let user_id = resolve_user(&state.db, &UserParam { user: Some(query.user) }).await?; let rows = sqlx::query_as::<_, InboxRow>( "SELECT i.id as inbox_id, i.message_id, i.channel_id as inbox_channel_id, i.trigger, i.created_at as i_created_at, m.id as msg_id, m.seq, m.channel_id as msg_channel_id, m.user_id, m.type, m.content, m.metadata, m.reply_to, m.created_at as msg_created_at, m.updated_at, m.deleted_at, u.id as u_id, u.username, u.display_name, u.role, u.created_at as u_created_at, c.name as channel_name FROM inbox i JOIN messages m ON i.message_id = m.id JOIN users u ON m.user_id = u.id JOIN channels c ON i.channel_id = c.id WHERE i.user_id = ? AND i.acked_at IS NULL ORDER BY i.id ASC", ) .bind(&user_id) .fetch_all(&state.db) .await?; let items: Vec = rows.iter().map(|r| r.to_api()).collect(); Ok(Json(items)) } pub async fn ack_inbox( State(state): State, Query(user_param): Query, Json(body): Json, ) -> Result { let user_id = resolve_user(&state.db, &user_param).await?; let mut acked = 0i64; for id in &body.ids { let result = sqlx::query("UPDATE inbox SET acked_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') WHERE id = ? AND user_id = ? AND acked_at IS NULL") .bind(id) .bind(&user_id) .execute(&state.db) .await?; acked += result.rows_affected() as i64; } Ok(Json(serde_json::json!({"acked": acked}))) } /// Populate inbox entries when a message is posted async fn populate_inbox(db: &SqlitePool, message_id: &str, channel_id: &str, content: &str, sender_id: &str) { let mentions = crate::db::parse_mentions(content); let mut notified: std::collections::HashSet = std::collections::HashSet::new(); for mention in &mentions { // Resolve mentioned user if let Ok(Some(uid)) = sqlx::query_scalar::<_, String>("SELECT id FROM users WHERE username = ?") .bind(mention) .fetch_optional(db) .await { if uid != sender_id && notified.insert(uid.clone()) { let _ = sqlx::query("INSERT INTO inbox (user_id, message_id, channel_id, trigger) VALUES (?, ?, ?, 'mention')") .bind(&uid) .bind(message_id) .bind(channel_id) .execute(db) .await; } } // Handle @agents broadcast if mention == "agents" { let agents = sqlx::query_scalar::<_, String>("SELECT id FROM users WHERE role = 'agent' AND id != ?") .bind(sender_id) .fetch_all(db) .await .unwrap_or_default(); for agent_id in agents { if notified.insert(agent_id.clone()) { let _ = sqlx::query("INSERT INTO inbox (user_id, message_id, channel_id, trigger) VALUES (?, ?, ?, 'broadcast')") .bind(&agent_id) .bind(message_id) .bind(channel_id) .execute(db) .await; } } } // Handle @apes broadcast if mention == "apes" { let apes = sqlx::query_scalar::<_, String>("SELECT id FROM users WHERE role = 'ape' AND id != ?") .bind(sender_id) .fetch_all(db) .await .unwrap_or_default(); for ape_id in apes { if notified.insert(ape_id.clone()) { let _ = sqlx::query("INSERT INTO inbox (user_id, message_id, channel_id, trigger) VALUES (?, ?, ?, 'broadcast')") .bind(&ape_id) .bind(message_id) .bind(channel_id) .execute(db) .await; } } } } } // ── Inbox row type ── #[derive(Debug, sqlx::FromRow)] pub struct InboxRow { pub inbox_id: i64, pub message_id: String, pub inbox_channel_id: String, pub trigger: String, pub i_created_at: String, pub channel_name: String, // message fields pub msg_id: String, pub seq: i64, pub msg_channel_id: String, pub user_id: String, pub r#type: String, pub content: String, pub metadata: Option, pub reply_to: Option, pub msg_created_at: String, pub updated_at: Option, pub deleted_at: Option, // user fields pub u_id: String, pub username: String, pub display_name: String, pub role: String, pub u_created_at: String, } impl InboxRow { pub fn to_api(&self) -> InboxItem { let user_row = UserRow { id: self.u_id.clone(), username: self.username.clone(), display_name: self.display_name.clone(), role: self.role.clone(), password_hash: None, created_at: self.u_created_at.clone(), }; let msg_row = MessageRow { id: self.msg_id.clone(), seq: self.seq, channel_id: self.msg_channel_id.clone(), user_id: self.user_id.clone(), r#type: self.r#type.clone(), content: self.content.clone(), metadata: self.metadata.clone(), reply_to: self.reply_to.clone(), created_at: self.msg_created_at.clone(), updated_at: self.updated_at.clone(), deleted_at: self.deleted_at.clone(), }; InboxItem { id: self.inbox_id, message: msg_row.to_api(&user_row), channel_name: self.channel_name.clone(), trigger: self.trigger.clone(), created_at: self.i_created_at.parse().unwrap(), } } } // ── Joined row type for message + user ── #[derive(Debug, sqlx::FromRow)] pub struct MessageWithUserRow { pub id: String, pub seq: i64, pub channel_id: String, pub user_id: String, pub r#type: String, pub content: String, pub metadata: Option, pub reply_to: Option, pub created_at: String, pub updated_at: Option, pub deleted_at: Option, pub u_id: String, pub username: String, pub display_name: String, pub role: String, pub u_created_at: String, } impl MessageWithUserRow { pub fn to_api_message(&self) -> Message { let user_row = UserRow { id: self.u_id.clone(), username: self.username.clone(), display_name: self.display_name.clone(), role: self.role.clone(), password_hash: None, created_at: self.u_created_at.clone(), }; let msg_row = MessageRow { id: self.id.clone(), seq: self.seq, channel_id: self.channel_id.clone(), user_id: self.user_id.clone(), r#type: self.r#type.clone(), content: self.content.clone(), metadata: self.metadata.clone(), reply_to: self.reply_to.clone(), created_at: self.created_at.clone(), updated_at: self.updated_at.clone(), deleted_at: self.deleted_at.clone(), }; msg_row.to_api(&user_row) } }