package com.oiusa.las.web; import java.io.BufferedReader; import java.io.InputStreamReader; import java.nio.channels.Channels; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.nio.file.StandardOpenOption; import java.util.Locale; import org.eclipse.microprofile.context.ManagedExecutor; import org.jboss.logging.Logger; import com.oiusa.las.model.LasFile; import com.oiusa.las.service.FileStore; import jakarta.inject.Inject; import jakarta.ws.rs.DefaultValue; import jakarta.ws.rs.GET; import jakarta.ws.rs.PathParam; import jakarta.ws.rs.Produces; import jakarta.ws.rs.QueryParam; import jakarta.ws.rs.core.Context; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.sse.Sse; import jakarta.ws.rs.sse.SseEventSink; /** * Case-insensitive substring search streamed over SSE: scans the file once and emits a {@code match} * event per hit (capped at {@code max}), periodic {@code progress} events, and a final {@code done}. * Sequential single-pass scan = bounded memory even for 12 GB files. The client closes the stream to * cancel an in-flight search. */ @jakarta.ws.rs.Path("/api/files") public class SearchResource { private static final Logger LOG = Logger.getLogger(SearchResource.class); private static final int SNIPPET_MAX = 2000; @Inject FileStore store; @Inject ManagedExecutor executor; public record Match(long line, String text) {} public record Progress(long scanned, int matches) {} public record Done(long scanned, int matches, boolean truncated) {} @GET @jakarta.ws.rs.Path("/{id}/search") @Produces(MediaType.SERVER_SENT_EVENTS) public void search(@PathParam("id") String id, @QueryParam("q") String q, @QueryParam("max") @DefaultValue("500") int max, @Context SseEventSink sink, @Context Sse sse) { final LasFile f = store.get(id); if (f == null || q == null || q.isEmpty()) { sink.send(sse.newEventBuilder().name("done") .mediaType(MediaType.APPLICATION_JSON_TYPE) .data(Done.class, new Done(0, 0, false)).build()); sink.close(); return; } final int cap = Math.max(1, Math.min(10000, max)); final String needle = q.toLowerCase(Locale.ROOT); executor.execute(() -> runSearch(f, sink, sse, needle, cap)); } private void runSearch(LasFile f, SseEventSink sink, Sse sse, String needle, int cap) { long line = 0; int matches = 0; boolean truncated = false; try (FileChannel ch = FileChannel.open(f.path, StandardOpenOption.READ)) { BufferedReader r = new BufferedReader( new InputStreamReader(Channels.newInputStream(ch), StandardCharsets.ISO_8859_1), 1 << 20); String ln; while ((ln = r.readLine()) != null) { if (sink.isClosed()) return; if (ln.toLowerCase(Locale.ROOT).contains(needle)) { String snippet = ln.length() > SNIPPET_MAX ? ln.substring(0, SNIPPET_MAX) + "…" : ln; sink.send(sse.newEventBuilder().name("match") .mediaType(MediaType.APPLICATION_JSON_TYPE) .data(Match.class, new Match(line, snippet)).build()) .toCompletableFuture().get(); if (++matches >= cap) { truncated = true; break; } } line++; if ((line & 0x3FFFF) == 0) { // ~every 262k lines sink.send(sse.newEventBuilder().name("progress") .mediaType(MediaType.APPLICATION_JSON_TYPE) .data(Progress.class, new Progress(line, matches)).build()); } } if (!sink.isClosed()) { sink.send(sse.newEventBuilder().name("done") .mediaType(MediaType.APPLICATION_JSON_TYPE) .data(Done.class, new Done(line, matches, truncated)).build()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { LOG.debugf(e, "search ended for %s", f.id); } finally { try { sink.close(); } catch (Exception ignore) { } } } }