diff --git a/crates/colony/src/db.rs b/crates/colony/src/db.rs index 9b634e8..f845929 100644 --- a/crates/colony/src/db.rs +++ b/crates/colony/src/db.rs @@ -97,7 +97,11 @@ impl MessageRow { } else { self.content.clone() }, - mentions: parse_mentions(&self.content), + mentions: if self.deleted_at.is_some() { + vec![] + } else { + parse_mentions(&self.content) + }, metadata: self .metadata .as_ref() diff --git a/crates/colony/src/main.rs b/crates/colony/src/main.rs index fb540e4..2495736 100644 --- a/crates/colony/src/main.rs +++ b/crates/colony/src/main.rs @@ -4,9 +4,10 @@ mod state; mod ws; use axum::{routing::get, Router}; -use sqlx::sqlite::SqlitePoolOptions; +use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; use state::AppState; use std::env; +use std::str::FromStr; use tower_http::services::{ServeDir, ServeFile}; #[tokio::main] @@ -14,19 +15,20 @@ async fn main() { let db_url = env::var("DATABASE_URL").unwrap_or_else(|_| "sqlite:colony.db?mode=rwc".into()); let port = env::var("PORT").unwrap_or_else(|_| "3001".into()); + let opts = SqliteConnectOptions::from_str(&db_url) + .expect("Invalid DATABASE_URL") + .create_if_missing(true) + .pragma("journal_mode", "WAL") + .pragma("foreign_keys", "ON"); + let pool = SqlitePoolOptions::new() .max_connections(5) - .connect(&db_url) + .connect_with(opts) .await .expect("Failed to connect to database"); eprintln!("colony: connected to {}", db_url); - sqlx::query("PRAGMA journal_mode=WAL") - .execute(&pool) - .await - .unwrap(); - sqlx::migrate!("./migrations") .run(&pool) .await diff --git a/crates/colony/src/routes.rs b/crates/colony/src/routes.rs index 675958e..4a651e4 100644 --- a/crates/colony/src/routes.rs +++ b/crates/colony/src/routes.rs @@ -228,6 +228,11 @@ pub async fn post_message( } } + // Content length limit (64KB) + if body.content.len() > 65_536 { + return Err(AppError::BadRequest("Message content exceeds 64KB limit".into())); + } + let id = Uuid::new_v4().to_string(); let user_id = resolve_user(&state.db, &user_param).await?; @@ -347,9 +352,9 @@ pub async fn restore_message( let message = row.to_api_message(); - // Broadcast as new message (restored) + // Broadcast as edit (not Message — dedup would ignore same ID) let tx = state.get_sender(&channel_id).await; - let _ = tx.send(WsEvent::Message(message.clone())); + let _ = tx.send(WsEvent::Edit(message.clone())); Ok(Json(message)) } diff --git a/crates/colony/src/ws.rs b/crates/colony/src/ws.rs index 8ccac3e..71ab28f 100644 --- a/crates/colony/src/ws.rs +++ b/crates/colony/src/ws.rs @@ -79,6 +79,11 @@ async fn handle_socket(socket: WebSocket, channel_id: String, state: AppState) { } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { eprintln!("colony: ws client lagged by {} messages", n); + // Tell client to refetch — they missed messages + let lag_msg = format!(r#"{{"event":"lag","missed":{}}}"#, n); + if sender.send(axum::extract::ws::Message::Text(lag_msg.into())).await.is_err() { + break; + } } Err(_) => break, // Channel closed } diff --git a/docs/tech-spec-colony-cli-2026-03-29.md b/docs/tech-spec-colony-cli-2026-03-29.md index 33e61d5..5c8ef2b 100644 --- a/docs/tech-spec-colony-cli-2026-03-29.md +++ b/docs/tech-spec-colony-cli-2026-03-29.md @@ -64,12 +64,13 @@ password = "Apes2026!" # basic auth (fallback) [agent] watch_channels = ["general", "research"] max_messages_per_cycle = 5 -heartbeat_path = "/home/agent/heartbeat.md" -memory_path = "/home/agent/memory/memory.md" +# Paths are relative to agent home dir (e.g. /home/agents/scout/) +heartbeat_path = "heartbeat.md" +memory_path = "memory/memory.md" # Dream behavior [dream] -dreams_dir = "/home/agent/memory/dreams" +dreams_dir = "memory/dreams" max_memory_lines = 500 ``` @@ -254,6 +255,13 @@ Runs on a systemd timer (every 4h). Consolidates memory and considers identity e 4. Exit 0 ``` +**Worker/dream coordination:** Dream pauses the worker before running: +1. `systemctl stop agent-{name}-worker` +2. Run dream cycle (edits memory.md, CLAUDE.md) +3. `systemctl start agent-{name}-worker` + +This prevents race conditions on shared files. + ### `colony-agent birth --instruction "purpose description"` Creates a new agent on the same VM (no new VM needed). diff --git a/ui/colony/src/App.tsx b/ui/colony/src/App.tsx index 77edf4f..da0cf2b 100644 --- a/ui/colony/src/App.tsx +++ b/ui/colony/src/App.tsx @@ -48,6 +48,7 @@ export default function App() { const [showScrollDown, setShowScrollDown] = useState(false); const scrollRef = useRef(null); const prevMsgCountRef = useRef(0); + const maxSeqRef = useRef(0); const activeChannelRef = useRef(activeChannelId); activeChannelRef.current = activeChannelId; @@ -58,14 +59,29 @@ export default function App() { setActiveChannelId((prev) => (prev ? prev : chs[0]?.id ?? null)); }, []); - const loadMessages = useCallback(async () => { + const loadMessages = useCallback(async (afterSeq?: number) => { const channelId = activeChannelRef.current; if (!channelId) return; - setLoading(true); + if (!afterSeq) setLoading(true); try { - const msgs = await getMessages(channelId); + const params = afterSeq ? { after_seq: afterSeq } : undefined; + const msgs = await getMessages(channelId, params); if (activeChannelRef.current === channelId) { - setMessages(msgs); + if (afterSeq) { + // Gap repair: merge new messages, dedup by id + setMessages((prev) => { + const existing = new Set(prev.map((m) => m.id)); + const fresh = msgs.filter((m) => !existing.has(m.id)); + return fresh.length ? [...prev, ...fresh] : prev; + }); + } else { + setMessages(msgs); + } + // Track highest seq for gap repair + for (const m of msgs) { + const s = Number(m.seq); + if (s > maxSeqRef.current) maxSeqRef.current = s; + } } } catch { // Silently ignore fetch errors @@ -80,14 +96,32 @@ export default function App() { if (prev.some((m) => m.id === msg.id)) return prev; return [...prev, msg]; }); + const s = Number(msg.seq); + if (s > maxSeqRef.current) maxSeqRef.current = s; }, []); - // On WS reconnect, refetch full history to catch missed messages + // WebSocket: replace edited/restored messages + const handleWsEdit = useCallback((msg: Message) => { + setMessages((prev) => prev.map((m) => (m.id === msg.id ? msg : m))); + }, []); + + // WebSocket: mark deleted messages + const handleWsDelete = useCallback((id: string) => { + setMessages((prev) => + prev.map((m) => + m.id === id + ? { ...m, content: "[deleted]", deleted_at: new Date().toISOString(), mentions: [] } + : m, + ), + ); + }, []); + + // On WS reconnect/lag, fetch only missed messages const handleWsReconnect = useCallback(() => { - loadMessages(); + loadMessages(maxSeqRef.current || undefined); }, [loadMessages]); - useChannelSocket(activeChannelId, handleWsMessage, handleWsReconnect); + useChannelSocket(activeChannelId, handleWsMessage, handleWsEdit, handleWsDelete, handleWsReconnect); useEffect(() => { loadChannels(); }, [loadChannels]); @@ -95,6 +129,7 @@ export default function App() { setMessages([]); setSelectedMessages([]); prevMsgCountRef.current = 0; + maxSeqRef.current = 0; loadMessages(); }, [activeChannelId, loadMessages]); @@ -108,10 +143,12 @@ export default function App() { if (vp) vp.scrollTo({ top: vp.scrollHeight, behavior: smooth ? "smooth" : "instant" }); } - // Auto-scroll only on new messages + // Auto-scroll only when user is near the bottom useEffect(() => { if (messages.length > prevMsgCountRef.current) { - scrollToBottom(); + const vp = getViewport(); + const nearBottom = !vp || (vp.scrollHeight - vp.scrollTop - vp.clientHeight < 150); + if (nearBottom) scrollToBottom(); } prevMsgCountRef.current = messages.length; }, [messages]); @@ -254,7 +291,6 @@ export default function App() { onDelete={async (chId, msgId) => { try { await deleteMessage(chId, msgId); - loadMessages(); } catch { // ignore } @@ -262,7 +298,6 @@ export default function App() { onRestore={async (chId, msgId) => { try { await restoreMessage(chId, msgId); - loadMessages(); } catch { // ignore } @@ -296,7 +331,6 @@ export default function App() { onClearReply={() => setSelectedMessages([])} onMessageSent={() => { setSelectedMessages([]); - loadMessages(); setTimeout(() => scrollToBottom(), 100); }} /> diff --git a/ui/colony/src/components/MessageItem.tsx b/ui/colony/src/components/MessageItem.tsx index d5e5cb0..f8e45f3 100644 --- a/ui/colony/src/components/MessageItem.tsx +++ b/ui/colony/src/components/MessageItem.tsx @@ -209,7 +209,7 @@ export function MessageItem({ message, compact, lastInGroup, replyTarget, onSele {/* Content */}
diff --git a/ui/colony/src/hooks/useChannelSocket.ts b/ui/colony/src/hooks/useChannelSocket.ts index 0ebdccf..48ea071 100644 --- a/ui/colony/src/hooks/useChannelSocket.ts +++ b/ui/colony/src/hooks/useChannelSocket.ts @@ -1,26 +1,44 @@ import { useEffect, useRef, useCallback } from "react"; import type { Message } from "@/types/Message"; -import { getCurrentUsername, getMessages } from "@/api"; +import { getCurrentUsername } from "@/api"; interface WsMessageEvent { event: "message"; data: Message; } +interface WsEditEvent { + event: "edit"; + data: Message; +} + +interface WsDeleteEvent { + event: "delete"; + data: { id: string }; +} + interface WsConnectedEvent { event: "connected"; } -type WsEvent = WsMessageEvent | WsConnectedEvent; +interface WsLagEvent { + event: "lag"; + missed: number; +} + +type WsEvent = WsMessageEvent | WsEditEvent | WsDeleteEvent | WsConnectedEvent | WsLagEvent; export function useChannelSocket( channelId: string | null, onMessage: (msg: Message) => void, + onEdit: (msg: Message) => void, + onDelete: (id: string) => void, onReconnect: () => void, ) { const wsRef = useRef(null); const reconnectTimer = useRef | null>(null); const intentionalClose = useRef(false); + const backoffMs = useRef(3000); const connect = useCallback(() => { if (!channelId) return; @@ -32,6 +50,7 @@ export function useChannelSocket( const ws = new WebSocket(`${protocol}//${host}/ws/${channelId}`); ws.onopen = () => { + backoffMs.current = 3000; // Reset backoff on successful connect ws.send(JSON.stringify({ type: "auth", user: getCurrentUsername(), @@ -43,8 +62,12 @@ export function useChannelSocket( const event: WsEvent = JSON.parse(e.data); if (event.event === "message") { onMessage(event.data); - } else if (event.event === "connected") { - // Refetch history on reconnect to catch missed messages + } else if (event.event === "edit") { + onEdit(event.data); + } else if (event.event === "delete") { + onDelete(event.data.id); + } else if (event.event === "connected" || event.event === "lag") { + // Refetch to catch missed messages onReconnect(); } } catch { @@ -57,19 +80,18 @@ export function useChannelSocket( }; ws.onclose = () => { - // Only reconnect if this wasn't an intentional teardown if (!intentionalClose.current) { - reconnectTimer.current = setTimeout(connect, 3000); + reconnectTimer.current = setTimeout(connect, backoffMs.current); + backoffMs.current = Math.min(backoffMs.current * 2, 30000); // Exponential backoff, max 30s } }; wsRef.current = ws; - }, [channelId, onMessage, onReconnect]); + }, [channelId, onMessage, onEdit, onDelete, onReconnect]); useEffect(() => { connect(); return () => { - // Mark as intentional so onclose doesn't reconnect intentionalClose.current = true; if (reconnectTimer.current) clearTimeout(reconnectTimer.current); wsRef.current?.close(); diff --git a/ui/colony/src/index.css b/ui/colony/src/index.css index 112263a..320433b 100644 --- a/ui/colony/src/index.css +++ b/ui/colony/src/index.css @@ -79,7 +79,7 @@ @apply border-border; } body { - @apply bg-background text-foreground font-mono; + @apply bg-background text-foreground font-mono antialiased; font-size: 13px; line-height: 1.6; }