File size: 27,707 Bytes
7cd14d8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67c3421
 
7456af6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67c3421
97e0d9c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67c3421
97e0d9c
67c3421
 
 
 
 
 
 
 
97e0d9c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
import json
import time
from contextlib import contextmanager

import pytest
from fastapi.testclient import TestClient

import main


class FakeEngine:
    def __init__(self, model_id="fake-model"):
        self.model_id = model_id
        self.last_context_info = {
            "compressed": False,
            "prompt_tokens": 5,
            "max_context": 8192,
            "budget": 7900,
            "strategy": "truncate",
            "dropped_messages": 0,
        }

    def infer(self, messages, max_tokens, temperature):
        # Simulate parse error pathway when special trigger is present
        if messages and isinstance(messages[0].get("content"), str) and "PARSE_ERR" in messages[0]["content"]:
            raise ValueError("Simulated parse error")
        # Return echo content for deterministic test
        parts = []
        for m in messages:
            c = m.get("content", "")
            if isinstance(c, list):
                for p in c:
                    if isinstance(p, dict) and p.get("type") == "text":
                        parts.append(p.get("text", ""))
            elif isinstance(c, str):
                parts.append(c)
        txt = " ".join(parts) or "OK"
        # Simulate context accounting changing with request
        self.last_context_info = {
            "compressed": False,
            "prompt_tokens": max(1, len(txt.split())),
            "max_context": 8192,
            "budget": 7900,
            "strategy": "truncate",
            "dropped_messages": 0,
        }
        return f"OK: {txt}"

    def infer_stream(self, messages, max_tokens, temperature, cancel_event=None):
        # simple two-piece stream; respects cancel_event if set during streaming
        outputs = ["hello", " world"]
        for piece in outputs:
            if cancel_event is not None and cancel_event.is_set():
                break
            yield piece
            # tiny delay to allow cancel test to interleave
            time.sleep(0.01)

    def get_context_report(self):
        return {
            "compressionEnabled": True,
            "strategy": "truncate",
            "safetyMargin": 256,
            "modelMaxContext": 8192,
            "tokenizerModelMaxLength": 8192,
            "last": self.last_context_info,
        }


@contextmanager
def patched_engine():
    # Patch global engine so server does not load real model
    prev_engine = main._engine
    prev_err = main._engine_error
    fake = FakeEngine()
    main._engine = fake
    main._engine_error = None
    try:
        yield fake
    finally:
        main._engine = prev_engine
        main._engine_error = prev_err


def get_client():
    return TestClient(main.app)


def test_health_ready_and_context():
    with patched_engine():
        client = get_client()
        r = client.get("/health")
        assert r.status_code == 200
        body = r.json()
        assert body["ok"] is True
        assert body["modelReady"] is True
        assert body["modelId"] == "fake-model"
        # context block exists with required fields
        ctx = body["context"]
        assert ctx["compressionEnabled"] is True
        assert "last" in ctx
        assert isinstance(ctx["last"].get("prompt_tokens"), int)


def test_health_with_engine_error():
    # simulate model load error path
    prev_engine = main._engine
    prev_err = main._engine_error
    try:
        main._engine = None
        main._engine_error = "boom"
        client = get_client()
        r = client.get("/health")
        assert r.status_code == 200
        body = r.json()
        assert body["modelReady"] is False
        assert body["error"] == "boom"
    finally:
        main._engine = prev_engine
        main._engine_error = prev_err


def test_chat_non_stream_validation():
    with patched_engine():
        client = get_client()
        # missing messages should 400
        r = client.post("/v1/chat/completions", json={"messages": []})
        assert r.status_code == 400


def test_chat_non_stream_success_and_usage_context():
    with patched_engine():
        client = get_client()
        payload = {
            "messages": [{"role": "user", "content": "Hello Qwen"}],
            "max_tokens": 8,
            "temperature": 0.0,
        }
        r = client.post("/v1/chat/completions", json=payload)
        assert r.status_code == 200
        body = r.json()
        assert body["object"] == "chat.completion"
        assert body["choices"][0]["message"]["content"].startswith("OK:")
        # usage prompt_tokens filled from engine.last_context_info
        assert body["usage"]["prompt_tokens"] >= 1
        # response includes context echo
        assert "context" in body
        assert "prompt_tokens" in body["context"]


def test_chat_non_stream_parse_error_to_400():
    with patched_engine():
        client = get_client()
        payload = {
            "messages": [{"role": "user", "content": "PARSE_ERR trigger"}],
            "max_tokens": 4,
        }
        r = client.post("/v1/chat/completions", json=payload)
        # ValueError in engine -> 400 per API contract
        assert r.status_code == 400


def read_sse_lines(resp):
    # Utility to parse event-stream into list of data payloads (including [DONE])
    lines = []
    buf = b""

    # Starlette TestClient (httpx) responses expose iter_bytes()/iter_raw(), not requests.iter_content().
    # Fall back to available iterator or to full content if streaming isn't supported.
    iterator = None
    for name in ("iter_bytes", "iter_raw", "iter_content"):
        it = getattr(resp, name, None)
        if callable(it):
            iterator = it
            break

    if iterator is None:
        data = getattr(resp, "content", b"")
        if isinstance(data, str):
            data = data.encode("utf-8", "ignore")
        buf = data
    else:
        for chunk in iterator():
            if not chunk:
                continue
            if isinstance(chunk, str):
                chunk = chunk.encode("utf-8", "ignore")
            buf += chunk
            while b"\n\n" in buf:
                frame, buf = buf.split(b"\n\n", 1)
                # keep original frame text for asserts
                lines.append(frame.decode("utf-8", errors="ignore"))

    # Drain any leftover
    if buf:
        lines.append(buf.decode("utf-8", errors="ignore"))
    return lines


def test_chat_stream_sse_flow_and_resume():
    with patched_engine():
        client = get_client()
        payload = {
            "session_id": "s1",
            "stream": True,
            "messages": [{"role": "user", "content": "stream please"}],
            "max_tokens": 8,
            "temperature": 0.2,
        }
        with client.stream("POST", "/v1/chat/completions", json=payload) as resp:
            assert resp.status_code == 200
            lines = read_sse_lines(resp)
        # Must contain role delta, content pieces, finish chunk, and [DONE]
        joined = "\n".join(lines)
        assert "delta" in joined
        assert "[DONE]" in joined

        # Resume from event index 0 should receive at least one subsequent event
        headers = {"Last-Event-ID": "s1:0"}
        with client.stream("POST", "/v1/chat/completions", headers=headers, json=payload) as resp2:
            assert resp2.status_code == 200
            lines2 = read_sse_lines(resp2)
        assert any("data:" in l for l in lines2)
        assert "[DONE]" in "\n".join(lines2)

        # Invalid Last-Event-ID format should not crash (covered by try/except)
        headers_bad = {"Last-Event-ID": "not-an-index"}
        with client.stream("POST", "/v1/chat/completions", headers=headers_bad, json=payload) as resp3:
            assert resp3.status_code == 200
            _ = read_sse_lines(resp3)  # just ensure no crash


def test_cancel_endpoint_stops_generation():
    with patched_engine():
        client = get_client()
        payload = {
            "session_id": "to-cancel",
            "stream": True,
            "messages": [{"role": "user", "content": "cancel me"}],
        }
        # Start streaming in background (client.stream keeps the connection open)
        with client.stream("POST", "/v1/chat/completions", json=payload) as resp:
            # Immediately cancel
            rc = client.post("/v1/cancel/to-cancel")
            assert rc.status_code == 200
            # Stream should end with [DONE] without hanging
            lines = read_sse_lines(resp)
            assert "[DONE]" in "\n".join(lines)


def test_cancel_unknown_session_is_ok():
    with patched_engine():
        client = get_client()
        rc = client.post("/v1/cancel/does-not-exist")
        # Endpoint returns ok regardless (idempotent, operationally safe)
        assert rc.status_code == 200


def test_edge_large_last_event_id_after_finish_yields_done():
    with patched_engine():
        client = get_client()
        payload = {
            "session_id": "done-session",
            "stream": True,
            "messages": [{"role": "user", "content": "edge"}],
        }
        # Complete a run
        with client.stream("POST", "/v1/chat/completions", json=payload) as resp:
            _ = read_sse_lines(resp)
        # Resume with huge index; should return DONE quickly
        headers = {"Last-Event-ID": "done-session:99999"}
        with client.stream("POST", "/v1/chat/completions", headers=headers, json=payload) as resp2:
            lines2 = read_sse_lines(resp2)
        assert "[DONE]" in "\n".join(lines2)


def test_stream_resume_basic_functionality():
    """Test that basic streaming resume functionality works correctly"""
    with patched_engine():
        client = get_client()
        session_id = "resume-basic-test"
        payload = {
            "session_id": session_id,
            "stream": True,
            "messages": [{"role": "user", "content": "test resume"}],
            "max_tokens": 100,
        }

        # Complete a streaming session first
        with client.stream("POST", "/v1/chat/completions", json=payload) as resp:
            complete_lines = read_sse_lines(resp)

        # Verify we got some data lines
        data_lines = [line for line in complete_lines if line.startswith("data: ")]
        assert len(data_lines) > 0, "Should have received some data lines in complete session"

        # Test resume from index 0 (should replay everything)
        headers = {"Last-Event-ID": f"{session_id}:0"}
        with client.stream("POST", "/v1/chat/completions", headers=headers, json=payload) as resp2:
            resume_lines = read_sse_lines(resp2)

        # Should get data lines again
        resume_data_lines = [line for line in resume_lines if line.startswith("data: ")]
        assert len(resume_data_lines) > 0, "Should have received data lines on resume"

        # Should end with [DONE]
        assert any("[DONE]" in line for line in resume_lines), "Resume should end with [DONE]"


def test_stream_resume_preserves_exact_chunk_order():
    """Test that resume maintains exact chunk order and content"""
    with patched_engine():
        client = get_client()
        session_id = "order-test"
        payload = {
            "session_id": session_id,
            "stream": True,
            "messages": [{"role": "user", "content": "test order"}],
        }

        # Get complete session with default chunks
        with client.stream("POST", "/v1/chat/completions", json=payload) as resp:
            complete_lines = read_sse_lines(resp)

        complete_chunks = []
        for line in complete_lines:
            if line.startswith("data: "):
                try:
                    data = json.loads(line[6:])
                    if "choices" in data and data["choices"][0].get("delta", {}).get("content"):
                        complete_chunks.append(data["choices"][0]["delta"]["content"])
                except (json.JSONDecodeError, KeyError):
                    continue

        # Skip test if no chunks received (default FakeEngine may not produce content chunks)
        if len(complete_chunks) == 0:
            pytest.skip("Default FakeEngine does not produce content chunks for this test")

        # Test resume from middle point
        resume_point = min(1, len(complete_chunks) - 1)  # Resume after first chunk, or 0 if only 1 chunk
        headers = {"Last-Event-ID": f"{session_id}:{resume_point}"}

        with client.stream("POST", "/v1/chat/completions", headers=headers, json=payload) as resp:
            resume_lines = read_sse_lines(resp)

        resume_chunks = []
        for line in resume_lines:
            if line.startswith("data: "):
                try:
                    data = json.loads(line[6:])
                    if "choices" in data and data["choices"][0].get("delta", {}).get("content"):
                        resume_chunks.append(data["choices"][0]["delta"]["content"])
                except (json.JSONDecodeError, KeyError):
                    continue

        # Should get remaining chunks after resume point
        expected_resume_chunks = complete_chunks[resume_point:]

        assert resume_chunks == expected_resume_chunks, (
            f"Order preservation failed. Resume chunks: {resume_chunks}, "
            f"Expected: {expected_resume_chunks}"
        )


def test_stream_resume_with_partial_disconnect():
    """Test resume when client disconnects mid-stream and reconnects"""
    with patched_engine():
        client = get_client()
        session_id = "disconnect-test"
        payload = {
            "session_id": session_id,
            "stream": True,
            "messages": [{"role": "user", "content": "test disconnect"}],
        }

        # Complete a session first to populate buffers
        with client.stream("POST", "/v1/chat/completions", json=payload) as resp:
            complete_lines = read_sse_lines(resp)

        # Extract chunks and event IDs
        received_chunks = []
        event_ids = []
        for line in complete_lines:
            if line.startswith("data: "):
                try:
                    data = json.loads(line[6:])
                    if "choices" in data and data["choices"][0].get("delta", {}).get("content"):
                        chunk = data["choices"][0]["delta"]["content"]
                        received_chunks.append(chunk)
                        if "id" in data:
                            event_ids.append(data["id"])
                except (json.JSONDecodeError, KeyError):
                    continue

        if len(received_chunks) < 2:
            pytest.skip("Not enough chunks received for disconnect test")

        # Simulate disconnect after first chunk
        resume_index = 0  # Resume from beginning
        headers = {"Last-Event-ID": f"{session_id}:{resume_index}"}

        with client.stream("POST", "/v1/chat/completions", headers=headers, json=payload) as resp:
            resume_lines = read_sse_lines(resp)

        resume_chunks = []
        for line in resume_lines:
            if line.startswith("data: "):
                try:
                    data = json.loads(line[6:])
                    if "choices" in data and data["choices"][0].get("delta", {}).get("content"):
                        resume_chunks.append(data["choices"][0]["delta"]["content"])
                except (json.JSONDecodeError, KeyError):
                    continue

        # Should get all chunks again (resume from 0 replays everything)
        assert resume_chunks == received_chunks, (
            f"Resume from beginning failed. Got: {resume_chunks}, Expected: {received_chunks}"
        )


def test_stream_resume_buffer_overflow_handling():
    """Test resume when buffer overflows and older chunks are lost"""
    with patched_engine():
        client = get_client()
        session_id = "overflow-test"
        payload = {
            "session_id": session_id,
            "stream": True,
            "messages": [{"role": "user", "content": "test overflow"}],
        }

        # Complete a session with default chunks
        with client.stream("POST", "/v1/chat/completions", json=payload) as resp:
            complete_lines = read_sse_lines(resp)

        # Count total chunks produced
        total_chunks = sum(1 for line in complete_lines
                          if line.startswith("data: ") and '"content"' in line)

        if total_chunks == 0:
            pytest.skip("No content chunks produced by default engine")

        # Try to resume from early index
        early_resume_index = min(1, total_chunks - 1)  # Resume after first chunk
        headers = {"Last-Event-ID": f"{session_id}:{early_resume_index}"}

        with client.stream("POST", "/v1/chat/completions", headers=headers, json=payload) as resp:
            resume_lines = read_sse_lines(resp)

        resume_chunks = []
        for line in resume_lines:
            if line.startswith("data: "):
                try:
                    data = json.loads(line[6:])
                    if "choices" in data and data["choices"][0].get("delta", {}).get("content"):
                        resume_chunks.append(data["choices"][0]["delta"]["content"])
                except (json.JSONDecodeError, KeyError):
                    continue

        # Should get chunks from resume point onwards, or [DONE] if buffer overflowed
        if resume_chunks:
            # Verify we got some chunks
            assert len(resume_chunks) > 0, "Should get some chunks on resume"
        else:
            # If no chunks, should at least get [DONE]
            assert any("[DONE]" in line for line in resume_lines), "Should get [DONE] even with buffer overflow"


def test_stream_resume_concurrent_sessions_isolation():
    """Test that resume works correctly with multiple concurrent sessions"""
    with patched_engine():
        client = get_client()

        # Create multiple concurrent sessions with different session IDs
        session_ids = ["session_A", "session_B", "session_C"]
        sessions_data = {}

        for sid in session_ids:
            payload = {
                "session_id": sid,
                "stream": True,
                "messages": [{"role": "user", "content": f"test {sid}"}],
            }

            # Complete session
            with client.stream("POST", "/v1/chat/completions", json=payload) as resp:
                lines = read_sse_lines(resp)

            chunks = []
            for line in lines:
                if line.startswith("data: "):
                    try:
                        data = json.loads(line[6:])
                        if "choices" in data and data["choices"][0].get("delta", {}).get("content"):
                            chunks.append(data["choices"][0]["delta"]["content"])
                    except (json.JSONDecodeError, KeyError):
                        continue

            sessions_data[sid] = chunks

        # Skip if no chunks received
        if all(len(chunks) == 0 for chunks in sessions_data.values()):
            pytest.skip("No content chunks received for any session")

        # Test resume for each session independently
        for sid in session_ids:
            if len(sessions_data[sid]) < 2:
                continue  # Skip sessions with too few chunks

            resume_point = 0  # Resume from beginning
            headers = {"Last-Event-ID": f"{sid}:{resume_point}"}
            payload = {
                "session_id": sid,
                "stream": True,
                "messages": [{"role": "user", "content": f"test {sid}"}],
            }

            with client.stream("POST", "/v1/chat/completions", headers=headers, json=payload) as resp:
                resume_lines = read_sse_lines(resp)

            resume_chunks = []
            for line in resume_lines:
                if line.startswith("data: "):
                    try:
                        data = json.loads(line[6:])
                        if "choices" in data and data["choices"][0].get("delta", {}).get("content"):
                            resume_chunks.append(data["choices"][0]["delta"]["content"])
                    except (json.JSONDecodeError, KeyError):
                        continue

            # Should get all chunks again (resume from 0)
            assert resume_chunks == sessions_data[sid], (
                f"Session {sid} resume failed: {resume_chunks} != {sessions_data[sid]}"
            )


def test_stream_resume_with_sqlite_persistence():
    """Test resume works correctly with SQLite persistence enabled"""
    # This test requires setting up SQLite persistence
    original_persist = main.PERSIST_SESSIONS
    original_db_path = main.SESSIONS_DB_PATH

    try:
        # Enable persistence for this test
        main.PERSIST_SESSIONS = True
        main.SESSIONS_DB_PATH = ":memory:"  # Use in-memory SQLite for test

        # Reinitialize the SQLite store
        main._DB_STORE = main._SQLiteStore(main.SESSIONS_DB_PATH)

        with patched_engine():
            client = get_client()
            session_id = "persistent-test"
            payload = {
                "session_id": session_id,
                "stream": True,
                "messages": [{"role": "user", "content": "test persistence"}],
            }

            # Complete session to populate SQLite
            with client.stream("POST", "/v1/chat/completions", json=payload) as resp:
                complete_lines = read_sse_lines(resp)

            # Extract chunks from complete session
            complete_chunks = []
            for line in complete_lines:
                if line.startswith("data: "):
                    try:
                        data = json.loads(line[6:])
                        if "choices" in data and data["choices"][0].get("delta", {}).get("content"):
                            complete_chunks.append(data["choices"][0]["delta"]["content"])
                    except (json.JSONDecodeError, KeyError):
                        continue

            if len(complete_chunks) == 0:
                pytest.skip("No content chunks received for persistence test")

            # Verify session was persisted
            assert main._DB_STORE.session_meta(session_id)[0] == True  # Should be marked finished

            # Test resume from SQLite
            resume_point = min(1, len(complete_chunks) - 1)
            headers = {"Last-Event-ID": f"{session_id}:{resume_point}"}

            with client.stream("POST", "/v1/chat/completions", headers=headers, json=payload) as resp:
                resume_lines = read_sse_lines(resp)

            resume_chunks = []
            for line in resume_lines:
                if line.startswith("data: "):
                    try:
                        data = json.loads(line[6:])
                        if "choices" in data and data["choices"][0].get("delta", {}).get("content"):
                            resume_chunks.append(data["choices"][0]["delta"]["content"])
                    except (json.JSONDecodeError, KeyError):
                        continue

            expected_chunks = complete_chunks[resume_point:]
            assert resume_chunks == expected_chunks, (
                f"SQLite resume failed: {resume_chunks} != {expected_chunks}"
            )

    finally:
        # Restore original settings
        main.PERSIST_SESSIONS = original_persist
        main.SESSIONS_DB_PATH = original_db_path
        main._DB_STORE = main._SQLiteStore(main.SESSIONS_DB_PATH) if main.PERSIST_SESSIONS else None


def test_stream_resume_data_integrity_with_unicode():
    """Test resume preserves Unicode characters correctly"""
    with patched_engine():
        client = get_client()
        session_id = "unicode-test"
        payload = {
            "session_id": session_id,
            "stream": True,
            "messages": [{"role": "user", "content": "test unicode"}],
        }

        # Complete session
        with client.stream("POST", "/v1/chat/completions", json=payload) as resp:
            complete_lines = read_sse_lines(resp)

        complete_chunks = []
        for line in complete_lines:
            if line.startswith("data: "):
                try:
                    data = json.loads(line[6:])
                    if "choices" in data and data["choices"][0].get("delta", {}).get("content"):
                        complete_chunks.append(data["choices"][0]["delta"]["content"])
                except (json.JSONDecodeError, KeyError):
                    continue

        if len(complete_chunks) == 0:
            pytest.skip("No content chunks received for unicode test")

        # Test resume from middle
        resume_point = min(1, len(complete_chunks) - 1)
        headers = {"Last-Event-ID": f"{session_id}:{resume_point}"}

        with client.stream("POST", "/v1/chat/completions", headers=headers, json=payload) as resp:
            resume_lines = read_sse_lines(resp)

        resume_chunks = []
        for line in resume_lines:
            if line.startswith("data: "):
                try:
                    data = json.loads(line[6:])
                    if "choices" in data and data["choices"][0].get("delta", {}).get("content"):
                        resume_chunks.append(data["choices"][0]["delta"]["content"])
                except (json.JSONDecodeError, KeyError):
                    continue

        expected_resume_chunks = complete_chunks[resume_point:]
        assert resume_chunks == expected_resume_chunks

        # Verify Unicode integrity (basic check)
        for actual, expected in zip(resume_chunks, expected_resume_chunks):
            assert actual == expected, f"Content mismatch: '{actual}' != '{expected}'"

def test_ktp_ocr_success():
    # Mock RapidOCR to return test text lines that should parse to expected KTP data
    test_ocr_texts = [
        "NIK : 1234567890123456",
        "Nama : JOHN DOE",
        "Tempat/Tgl Lahir : JAKARTA, 01-01-1990",
        "Jenis Kelamin : LAKI-LAKI",
        "Alamat : JL. JEND. SUDIRMAN KAV. 52-53",
        "RT/RW : 001/001",
        "Kel/Desa : SENAYAN",
        "Kecamatan : KEBAYORAN BARU",
        "Agama : ISLAM",
        "Status Perkawinan : KAWIN",
        "Pekerjaan : PEGAWAI SWASTA",
        "Kewarganegaraan : WNI",
        "Berlaku Hingga : SEUMUR HIDUP"
    ]

    # Mock the OCR result format: [[(bbox, text, confidence), ...]]
    mock_ocr_result = [[(None, text, 0.9) for text in test_ocr_texts]]

    # Patch get_ocr_engine to return a mock OCR engine
    original_get_ocr_engine = main.get_ocr_engine
    mock_engine = lambda img: mock_ocr_result
    main.get_ocr_engine = lambda: mock_engine

    try:
        client = get_client()
        with open("image.jpg", "rb") as f:
            files = {"image": ("image.jpg", f, "image/jpeg")}
            r = client.post("/ktp-ocr/", files=files)

        assert r.status_code == 200
        body = r.json()
        assert body["nik"] == "1234567890123456"
        assert body["nama"] == "John Doe"
        assert body["tempat_lahir"] == "Jakarta"
        assert body["tgl_lahir"] == "01-01-1990"
        assert body["jenis_kelamin"] == "LAKI-LAKI"
        assert body["alamat"]["name"] == "JL. JEND. SUDIRMAN KAV. 52-53"
        assert body["alamat"]["rt_rw"] == "001/001"
        assert body["alamat"]["kel_desa"] == "Senayan"
        assert body["alamat"]["kecamatan"] == "Kebayoran Baru"
        assert body["agama"] == "Islam"
        assert body["status_perkawinan"] == "Kawin"
        assert body["pekerjaan"] == "Pegawai Swasta"
        assert body["kewarganegaraan"] == "Wni"
        assert body["berlaku_hingga"] == "Seumur Hidup"
    finally:
        # Restore original function
        main.get_ocr_engine = original_get_ocr_engine