diff --git a/crates/colony-types/src/lib.rs b/crates/colony-types/src/lib.rs index f7f3794..0b0b603 100644 --- a/crates/colony-types/src/lib.rs +++ b/crates/colony-types/src/lib.rs @@ -124,6 +124,8 @@ pub struct AckRequest { #[derive(Debug, Deserialize)] pub struct MessageQuery { pub after_seq: Option, + pub before_seq: Option, + pub limit: Option, pub r#type: Option, pub user_id: Option, } diff --git a/crates/colony/src/routes.rs b/crates/colony/src/routes.rs index 67d470b..54a181d 100644 --- a/crates/colony/src/routes.rs +++ b/crates/colony/src/routes.rs @@ -186,6 +186,11 @@ pub async fn list_messages( Path(channel_id): Path, Query(query): Query, ) -> Result>> { + // Reject conflicting cursors + if query.after_seq.is_some() && query.before_seq.is_some() { + return Err(AppError::BadRequest("Cannot use both after_seq and before_seq".into())); + } + let mut sql = String::from( "SELECT m.*, u.id as u_id, u.username, u.display_name, u.role, u.created_at as u_created_at \ FROM messages m JOIN users u ON m.user_id = u.id \ @@ -193,10 +198,16 @@ pub async fn list_messages( ); let mut binds: Vec = vec![channel_id.clone()]; + // Cursor filtering if let Some(after_seq) = &query.after_seq { sql.push_str(" AND m.seq > ?"); binds.push(after_seq.to_string()); } + if let Some(before_seq) = &query.before_seq { + sql.push_str(" AND m.seq < ?"); + binds.push(before_seq.to_string()); + } + if let Some(msg_type) = &query.r#type { sql.push_str(" AND m.type = ?"); binds.push(match msg_type { @@ -213,7 +224,21 @@ pub async fn list_messages( binds.push(user_id.to_string()); } - sql.push_str(" ORDER BY m.seq ASC"); + // When limit is set without after_seq, fetch the LATEST messages + // (ORDER BY DESC LIMIT N, then reverse to chronological order) + let use_desc = query.limit.is_some() && query.after_seq.is_none(); + + if use_desc { + sql.push_str(" ORDER BY m.seq DESC"); + } else { + sql.push_str(" ORDER BY m.seq ASC"); + } + + // Apply limit (capped at 1000) + if let Some(limit) = &query.limit { + let capped = (*limit).min(1000).max(1); + sql.push_str(&format!(" LIMIT {}", capped)); + } let mut q = sqlx::query_as::<_, MessageWithUserRow>(&sql); for b in &binds { @@ -221,7 +246,13 @@ pub async fn list_messages( } let rows = q.fetch_all(&state.db).await?; - let messages: Vec = rows.iter().map(|r| r.to_api_message()).collect(); + let mut messages: Vec = rows.iter().map(|r| r.to_api_message()).collect(); + + // Reverse DESC results back to chronological order + if use_desc { + messages.reverse(); + } + Ok(Json(messages)) } diff --git a/ui/colony/src/App.tsx b/ui/colony/src/App.tsx index da0cf2b..bdc271c 100644 --- a/ui/colony/src/App.tsx +++ b/ui/colony/src/App.tsx @@ -1,4 +1,4 @@ -import { useCallback, useEffect, useRef, useState } from "react"; +import { useCallback, useEffect, useLayoutEffect, useRef, useState } from "react"; import type { Channel } from "@/types/Channel"; import type { Message } from "@/types/Message"; import { getChannels, getMessages, getCurrentUsername, deleteMessage, restoreMessage } from "@/api"; @@ -43,95 +43,22 @@ export default function App() { const [activeChannelId, setActiveChannelId] = useState(null); const [messages, setMessages] = useState([]); const [loading, setLoading] = useState(false); + const [loadingOlder, setLoadingOlder] = useState(false); + const [hasMoreBefore, setHasMoreBefore] = useState(true); const [selectedMessages, setSelectedMessages] = useState<{ id: string; username: string; content: string }[]>([]); const [sheetOpen, setSheetOpen] = useState(false); const [showScrollDown, setShowScrollDown] = useState(false); const scrollRef = useRef(null); const prevMsgCountRef = useRef(0); const maxSeqRef = useRef(0); + const initialLoadRef = useRef(true); + const pendingPrependRef = useRef(false); + const prependScrollHeightRef = useRef(0); const activeChannelRef = useRef(activeChannelId); activeChannelRef.current = activeChannelId; - const loadChannels = useCallback(async () => { - const chs = await getChannels(); - setChannels(chs); - setActiveChannelId((prev) => (prev ? prev : chs[0]?.id ?? null)); - }, []); - - const loadMessages = useCallback(async (afterSeq?: number) => { - const channelId = activeChannelRef.current; - if (!channelId) return; - if (!afterSeq) setLoading(true); - try { - const params = afterSeq ? { after_seq: afterSeq } : undefined; - const msgs = await getMessages(channelId, params); - if (activeChannelRef.current === channelId) { - if (afterSeq) { - // Gap repair: merge new messages, dedup by id - setMessages((prev) => { - const existing = new Set(prev.map((m) => m.id)); - const fresh = msgs.filter((m) => !existing.has(m.id)); - return fresh.length ? [...prev, ...fresh] : prev; - }); - } else { - setMessages(msgs); - } - // Track highest seq for gap repair - for (const m of msgs) { - const s = Number(m.seq); - if (s > maxSeqRef.current) maxSeqRef.current = s; - } - } - } catch { - // Silently ignore fetch errors - } finally { - setLoading(false); - } - }, []); - - // WebSocket: append new messages in real-time - const handleWsMessage = useCallback((msg: Message) => { - setMessages((prev) => { - if (prev.some((m) => m.id === msg.id)) return prev; - return [...prev, msg]; - }); - const s = Number(msg.seq); - if (s > maxSeqRef.current) maxSeqRef.current = s; - }, []); - - // WebSocket: replace edited/restored messages - const handleWsEdit = useCallback((msg: Message) => { - setMessages((prev) => prev.map((m) => (m.id === msg.id ? msg : m))); - }, []); - - // WebSocket: mark deleted messages - const handleWsDelete = useCallback((id: string) => { - setMessages((prev) => - prev.map((m) => - m.id === id - ? { ...m, content: "[deleted]", deleted_at: new Date().toISOString(), mentions: [] } - : m, - ), - ); - }, []); - - // On WS reconnect/lag, fetch only missed messages - const handleWsReconnect = useCallback(() => { - loadMessages(maxSeqRef.current || undefined); - }, [loadMessages]); - - useChannelSocket(activeChannelId, handleWsMessage, handleWsEdit, handleWsDelete, handleWsReconnect); - - useEffect(() => { loadChannels(); }, [loadChannels]); - - useEffect(() => { - setMessages([]); - setSelectedMessages([]); - prevMsgCountRef.current = 0; - maxSeqRef.current = 0; - loadMessages(); - }, [activeChannelId, loadMessages]); + const PAGE_SIZE = 100; function getViewport() { const el = scrollRef.current as unknown as HTMLElement | null; @@ -143,9 +70,175 @@ export default function App() { if (vp) vp.scrollTo({ top: vp.scrollHeight, behavior: smooth ? "smooth" : "instant" }); } - // Auto-scroll only when user is near the bottom + function updateSeqRefs(msgs: Message[]) { + for (const m of msgs) { + const s = Number(m.seq); + if (s > maxSeqRef.current) maxSeqRef.current = s; + } + } + + const loadChannels = useCallback(async () => { + const chs = await getChannels(); + setChannels(chs); + setActiveChannelId((prev) => (prev ? prev : chs[0]?.id ?? null)); + }, []); + + // Initial load: fetch latest PAGE_SIZE messages + const loadInitialMessages = useCallback(async () => { + const channelId = activeChannelRef.current; + if (!channelId) return; + setLoading(true); + try { + const msgs = await getMessages(channelId, { limit: PAGE_SIZE }); + if (activeChannelRef.current === channelId) { + setMessages(msgs); + setHasMoreBefore(msgs.length >= PAGE_SIZE); + updateSeqRefs(msgs); + initialLoadRef.current = true; + } + } catch { + // ignore + } finally { + setLoading(false); + } + }, []); + + // Load older messages (scroll-up pagination) + const loadOlderMessages = useCallback(async () => { + const channelId = activeChannelRef.current; + if (!channelId || loadingOlder || !hasMoreBefore) return; + setLoadingOlder(true); + + // Save scroll height before prepend + const vp = getViewport(); + if (vp) prependScrollHeightRef.current = vp.scrollHeight; + + try { + // Get the lowest seq from current messages + let lowestSeq = Infinity; + setMessages((prev) => { + for (const m of prev) { + const s = Number(m.seq); + if (s < lowestSeq) lowestSeq = s; + } + return prev; + }); + + if (lowestSeq === Infinity) { setLoadingOlder(false); return; } + + const msgs = await getMessages(channelId, { before_seq: lowestSeq, limit: PAGE_SIZE }); + if (activeChannelRef.current === channelId && msgs.length > 0) { + setHasMoreBefore(msgs.length >= PAGE_SIZE); + pendingPrependRef.current = true; + setMessages((prev) => { + const existing = new Set(prev.map((m) => m.id)); + const fresh = msgs.filter((m) => !existing.has(m.id)); + return fresh.length ? [...fresh, ...prev] : prev; + }); + } else { + setHasMoreBefore(false); + } + } catch { + // ignore + } finally { + setLoadingOlder(false); + } + }, [loadingOlder, hasMoreBefore]); + + // Gap repair: loop after_seq fetches until caught up + const repairGap = useCallback(async () => { + const channelId = activeChannelRef.current; + if (!channelId || !maxSeqRef.current) return; + + let cursor = maxSeqRef.current; + // eslint-disable-next-line no-constant-condition + while (true) { + try { + const msgs = await getMessages(channelId, { after_seq: cursor, limit: PAGE_SIZE }); + if (activeChannelRef.current !== channelId || msgs.length === 0) break; + setMessages((prev) => { + const existing = new Set(prev.map((m) => m.id)); + const fresh = msgs.filter((m) => !existing.has(m.id)); + return fresh.length ? [...prev, ...fresh] : prev; + }); + updateSeqRefs(msgs); + cursor = maxSeqRef.current; + if (msgs.length < PAGE_SIZE) break; // caught up + } catch { + break; + } + } + }, []); + + // WebSocket handlers + const handleWsMessage = useCallback((msg: Message) => { + setMessages((prev) => { + if (prev.some((m) => m.id === msg.id)) return prev; + return [...prev, msg]; + }); + const s = Number(msg.seq); + if (s > maxSeqRef.current) maxSeqRef.current = s; + }, []); + + const handleWsEdit = useCallback((msg: Message) => { + setMessages((prev) => prev.map((m) => (m.id === msg.id ? msg : m))); + }, []); + + const handleWsDelete = useCallback((id: string) => { + setMessages((prev) => + prev.map((m) => + m.id === id + ? { ...m, content: "[deleted]", deleted_at: new Date().toISOString(), mentions: [] } + : m, + ), + ); + }, []); + + // On reconnect/lag, repair the gap (don't refetch everything) + const handleWsReconnect = useCallback(() => { + repairGap(); + }, [repairGap]); + + useChannelSocket(activeChannelId, handleWsMessage, handleWsEdit, handleWsDelete, handleWsReconnect); + + useEffect(() => { loadChannels(); }, [loadChannels]); + + // Channel switch: reset and load latest useEffect(() => { - if (messages.length > prevMsgCountRef.current) { + setMessages([]); + setSelectedMessages([]); + prevMsgCountRef.current = 0; + maxSeqRef.current = 0; + setHasMoreBefore(true); + initialLoadRef.current = true; + loadInitialMessages(); + }, [activeChannelId, loadInitialMessages]); + + // Scroll to bottom on initial load + useEffect(() => { + if (initialLoadRef.current && messages.length > 0) { + initialLoadRef.current = false; + // Use requestAnimationFrame to ensure DOM has rendered + requestAnimationFrame(() => scrollToBottom()); + } + }, [messages]); + + // Maintain scroll position after prepending older messages + useLayoutEffect(() => { + if (pendingPrependRef.current) { + pendingPrependRef.current = false; + const vp = getViewport(); + if (vp) { + const newHeight = vp.scrollHeight; + const delta = newHeight - prependScrollHeightRef.current; + vp.scrollTop += delta; + } + } + }, [messages]); + + // Auto-scroll on new WS messages only when near bottom + useEffect(() => { + if (!initialLoadRef.current && messages.length > prevMsgCountRef.current && !pendingPrependRef.current) { const vp = getViewport(); const nearBottom = !vp || (vp.scrollHeight - vp.scrollTop - vp.clientHeight < 150); if (nearBottom) scrollToBottom(); @@ -153,13 +246,17 @@ export default function App() { prevMsgCountRef.current = messages.length; }, [messages]); - // Track scroll position for scroll-down button + // Track scroll position for scroll-down button + trigger upward pagination useEffect(() => { const vp = getViewport(); if (!vp) return; function onScroll() { const v = vp!; setShowScrollDown(v.scrollHeight - v.scrollTop - v.clientHeight > 150); + // Trigger load-older when near top + if (v.scrollTop < 200 && !loadingOlder && hasMoreBefore) { + loadOlderMessages(); + } } vp.addEventListener("scroll", onScroll, { passive: true }); return () => vp.removeEventListener("scroll", onScroll); @@ -243,7 +340,18 @@ export default function App() { no messages yet — start typing below ) : ( - messages.map((msg, i) => { + <> + {loadingOlder && ( +
+ loading older... +
+ )} + {!hasMoreBefore && messages.length > 0 && ( +
+ beginning of conversation +
+ )} + {messages.map((msg, i) => { const prev = i > 0 ? messages[i - 1] : null; const next = i < messages.length - 1 ? messages[i + 1] : null; const sameSender = prev && prev.user.username === msg.user.username; @@ -305,7 +413,8 @@ export default function App() { /> ); - }) + })} + )} diff --git a/ui/colony/src/api.ts b/ui/colony/src/api.ts index fd94aa7..1685b24 100644 --- a/ui/colony/src/api.ts +++ b/ui/colony/src/api.ts @@ -50,10 +50,12 @@ export async function createChannel(body: CreateChannel): Promise { export async function getMessages( channelId: string, - params?: { after_seq?: number; type?: string; user_id?: string }, + params?: { after_seq?: number; before_seq?: number; limit?: number; type?: string; user_id?: string }, ): Promise { const query = new URLSearchParams(); if (params?.after_seq) query.set("after_seq", String(params.after_seq)); + if (params?.before_seq) query.set("before_seq", String(params.before_seq)); + if (params?.limit) query.set("limit", String(params.limit)); if (params?.type) query.set("type", params.type); if (params?.user_id) query.set("user_id", params.user_id); const qs = query.toString();