fix: all 5 high-severity bugs from codex review

- use sqlx migrate!() instead of broken split(';') — triggers now work
- seq via AUTOINCREMENT — no race conditions, monotonic ordering
- replace ?since= with ?after_seq= — cursor-based, no timestamp format issues
- replace all unwrap() with typed errors (404, 409, 400, 500)
- reply_to same-channel enforced in route handler
- add biome for frontend linting

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-29 19:07:12 +02:00
parent e940afde52
commit b48232ca03
17 changed files with 331 additions and 88 deletions

View File

@@ -1,7 +1,7 @@
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
response::{IntoResponse, Response},
Json,
};
use colony_types::*;
@@ -10,6 +10,41 @@ use uuid::Uuid;
use crate::db::*;
// ── Error handling ──
pub enum AppError {
NotFound(String),
Conflict(String),
BadRequest(String),
Internal(String),
}
impl IntoResponse for AppError {
fn into_response(self) -> Response {
let (status, message) = match self {
AppError::NotFound(m) => (StatusCode::NOT_FOUND, m),
AppError::Conflict(m) => (StatusCode::CONFLICT, m),
AppError::BadRequest(m) => (StatusCode::BAD_REQUEST, m),
AppError::Internal(m) => (StatusCode::INTERNAL_SERVER_ERROR, m),
};
(status, Json(serde_json::json!({"error": message}))).into_response()
}
}
impl From<sqlx::Error> for AppError {
fn from(e: sqlx::Error) -> Self {
match &e {
sqlx::Error::Database(db_err) if db_err.message().contains("UNIQUE") => {
AppError::Conflict(format!("Already exists: {}", db_err.message()))
}
sqlx::Error::RowNotFound => AppError::NotFound("Not found".into()),
_ => AppError::Internal(format!("Database error: {e}")),
}
}
}
type Result<T> = std::result::Result<T, AppError>;
// ── Health ──
pub async fn health() -> &'static str {
@@ -18,22 +53,21 @@ pub async fn health() -> &'static str {
// ── Channels ──
pub async fn list_channels(State(db): State<SqlitePool>) -> impl IntoResponse {
pub async fn list_channels(State(db): State<SqlitePool>) -> Result<Json<Vec<Channel>>> {
let rows = sqlx::query_as::<_, ChannelRow>("SELECT * FROM channels ORDER BY created_at")
.fetch_all(&db)
.await
.unwrap();
.await?;
let channels: Vec<Channel> = rows.iter().map(|r| r.to_api()).collect();
Json(channels)
Ok(Json(channels))
}
pub async fn create_channel(
State(db): State<SqlitePool>,
Json(body): Json<CreateChannel>,
) -> impl IntoResponse {
) -> Result<impl IntoResponse> {
let id = Uuid::new_v4().to_string();
// Hardcoded to benji for now (no auth yet)
// Hardcoded to benji for now (no auth yet — S4 will extract from middleware)
let created_by = "00000000-0000-0000-0000-000000000001";
sqlx::query("INSERT INTO channels (id, name, description, created_by) VALUES (?, ?, ?, ?)")
@@ -42,31 +76,28 @@ pub async fn create_channel(
.bind(&body.description)
.bind(created_by)
.execute(&db)
.await
.unwrap();
.await?;
let row = sqlx::query_as::<_, ChannelRow>("SELECT * FROM channels WHERE id = ?")
.bind(&id)
.fetch_one(&db)
.await
.unwrap();
.await?;
(StatusCode::CREATED, Json(row.to_api()))
Ok((StatusCode::CREATED, Json(row.to_api())))
}
pub async fn get_channel(
State(db): State<SqlitePool>,
Path(id): Path<String>,
) -> impl IntoResponse {
) -> Result<Json<Channel>> {
let row = sqlx::query_as::<_, ChannelRow>("SELECT * FROM channels WHERE id = ?")
.bind(&id)
.fetch_optional(&db)
.await
.unwrap();
.await?;
match row {
Some(r) => Ok(Json(r.to_api())),
None => Err(StatusCode::NOT_FOUND),
None => Err(AppError::NotFound(format!("Channel {id} not found"))),
}
}
@@ -76,8 +107,7 @@ pub async fn list_messages(
State(db): State<SqlitePool>,
Path(channel_id): Path<String>,
Query(query): Query<MessageQuery>,
) -> impl IntoResponse {
// Build query dynamically based on filters
) -> Result<Json<Vec<Message>>> {
let mut sql = String::from(
"SELECT m.*, u.id as u_id, u.username, u.display_name, u.role, u.created_at as u_created_at \
FROM messages m JOIN users u ON m.user_id = u.id \
@@ -85,9 +115,9 @@ pub async fn list_messages(
);
let mut binds: Vec<String> = vec![channel_id.clone()];
if let Some(since) = &query.since {
sql.push_str(" AND m.created_at > ?");
binds.push(since.to_rfc3339());
if let Some(after_seq) = &query.after_seq {
sql.push_str(" AND m.seq > ?");
binds.push(after_seq.to_string());
}
if let Some(msg_type) = &query.r#type {
sql.push_str(" AND m.type = ?");
@@ -97,7 +127,8 @@ pub async fn list_messages(
MessageType::Result => "result",
MessageType::Error => "error",
MessageType::Plan => "plan",
}.to_string());
}
.to_string());
}
if let Some(user_id) = &query.user_id {
sql.push_str(" AND m.user_id = ?");
@@ -106,24 +137,53 @@ pub async fn list_messages(
sql.push_str(" ORDER BY m.seq ASC");
// Use raw query with dynamic binds
let mut q = sqlx::query_as::<_, MessageWithUserRow>(&sql);
for b in &binds {
q = q.bind(b);
}
let rows = q.fetch_all(&db).await.unwrap();
let rows = q.fetch_all(&db).await?;
let messages: Vec<Message> = rows.iter().map(|r| r.to_api_message()).collect();
Json(messages)
Ok(Json(messages))
}
pub async fn post_message(
State(db): State<SqlitePool>,
Path(channel_id): Path<String>,
Json(body): Json<PostMessage>,
) -> impl IntoResponse {
) -> Result<impl IntoResponse> {
// Verify channel exists
let channel_exists = sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM channels WHERE id = ?",
)
.bind(&channel_id)
.fetch_one(&db)
.await?;
if channel_exists == 0 {
return Err(AppError::NotFound(format!("Channel {channel_id} not found")));
}
// Verify reply_to is in same channel (if provided)
if let Some(ref reply_id) = body.reply_to {
let reply_channel = sqlx::query_scalar::<_, String>(
"SELECT channel_id FROM messages WHERE id = ?",
)
.bind(reply_id.to_string())
.fetch_optional(&db)
.await?;
match reply_channel {
None => return Err(AppError::BadRequest(format!("reply_to message {reply_id} not found"))),
Some(ch) if ch != channel_id => {
return Err(AppError::BadRequest("reply_to must reference a message in the same channel".into()));
}
_ => {}
}
}
let id = Uuid::new_v4().to_string();
// Hardcoded to benji for now (no auth yet)
// Hardcoded to benji for now (no auth yet — S4 will extract from middleware)
let user_id = "00000000-0000-0000-0000-000000000001";
let msg_type = match body.r#type {
@@ -134,24 +194,18 @@ pub async fn post_message(
MessageType::Plan => "plan",
};
let metadata_json = body.metadata.as_ref().map(|m| serde_json::to_string(m).unwrap());
let metadata_json = body
.metadata
.as_ref()
.map(|m| serde_json::to_string(m).unwrap());
let reply_to = body.reply_to.map(|r| r.to_string());
// Get next seq
let seq: i64 = sqlx::query_scalar::<_, i64>(
"SELECT COALESCE((SELECT next_seq FROM channel_seq WHERE channel_id = ?), 1)",
)
.bind(&channel_id)
.fetch_one(&db)
.await
.unwrap();
// seq is AUTOINCREMENT — no race conditions, no manual tracking
sqlx::query(
"INSERT INTO messages (id, seq, channel_id, user_id, type, content, metadata, reply_to) \
VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
"INSERT INTO messages (id, channel_id, user_id, type, content, metadata, reply_to) \
VALUES (?, ?, ?, ?, ?, ?, ?)",
)
.bind(&id)
.bind(seq)
.bind(&channel_id)
.bind(user_id)
.bind(msg_type)
@@ -159,8 +213,7 @@ pub async fn post_message(
.bind(&metadata_json)
.bind(&reply_to)
.execute(&db)
.await
.unwrap();
.await?;
// Fetch the full message with user
let row = sqlx::query_as::<_, MessageWithUserRow>(
@@ -169,17 +222,15 @@ pub async fn post_message(
)
.bind(&id)
.fetch_one(&db)
.await
.unwrap();
.await?;
(StatusCode::CREATED, Json(row.to_api_message()))
Ok((StatusCode::CREATED, Json(row.to_api_message())))
}
// ── Joined row type for message + user ──
#[derive(Debug, sqlx::FromRow)]
pub struct MessageWithUserRow {
// message fields
pub id: String,
pub seq: i64,
pub channel_id: String,
@@ -191,7 +242,6 @@ pub struct MessageWithUserRow {
pub created_at: String,
pub updated_at: Option<String>,
pub deleted_at: Option<String>,
// user fields (aliased)
pub u_id: String,
pub username: String,
pub display_name: String,