File size: 3,770 Bytes
a4b70d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import unittest
import time

from g4f.tools.run_tools import ThinkingProcessor, Reasoning

class TestThinkingProcessor(unittest.TestCase):
    def assertEqualReasoning(self, first, second, msg=None):
        if isinstance(first, Reasoning) and isinstance(second, Reasoning):
            if first.status != second.status or first.is_thinking != second.is_thinking or first.token != second.token or first.label != second.label:
                raise self.failureException(msg or f"{first.label} != {second.label}")
        elif first != second:
            raise self.failureException(msg or f"{first} != {second}")

    def test_non_thinking_chunk(self):
        chunk = "This is a regular text."
        expected_time, expected_result = 0, [chunk]
        actual_time, actual_result = ThinkingProcessor.process_thinking_chunk(chunk)
        self.assertEqual(actual_time, expected_time)
        self.assertEqual(actual_result, expected_result)

    def test_thinking_start(self):
        chunk = "Hello <think>World"
        expected_time = time.time()
        expected_result = ["Hello ", Reasoning(status="πŸ€” Is thinking...", is_thinking="<think>"), Reasoning("World")]
        actual_time, actual_result = ThinkingProcessor.process_thinking_chunk(chunk)
        self.assertAlmostEqual(actual_time, expected_time, delta=1)
        self.assertEqual(actual_result[0], expected_result[0])
        self.assertEqualReasoning(actual_result[1], expected_result[1])
        self.assertEqualReasoning(actual_result[2], expected_result[2])

    def test_thinking_end(self):
        start_time = time.time()
        chunk = "token</think> content after"
        expected_result = [Reasoning("token"), Reasoning(status="", is_thinking="</think>"), " content after"]
        actual_time, actual_result = ThinkingProcessor.process_thinking_chunk(chunk, start_time)
        self.assertEqual(actual_time, 0)
        self.assertEqualReasoning(actual_result[0], expected_result[0])
        self.assertEqualReasoning(actual_result[1], expected_result[1])
        self.assertEqual(actual_result[2], expected_result[2])

    def test_thinking_start_and_end(self):
        start_time = time.time()
        chunk = "<think>token</think> content after"
        expected_result = [Reasoning(status="πŸ€” Is thinking...", is_thinking="<think>"), Reasoning("token"), Reasoning(status="", is_thinking="</think>"), " content after"]
        actual_time, actual_result = ThinkingProcessor.process_thinking_chunk(chunk, start_time)
        self.assertEqual(actual_time, 0)
        self.assertEqualReasoning(actual_result[0], expected_result[0])
        self.assertEqualReasoning(actual_result[1], expected_result[1])
        self.assertEqualReasoning(actual_result[2], expected_result[2])
        self.assertEqual(actual_result[3], expected_result[3])

    def test_ongoing_thinking(self):
        start_time = time.time()
        chunk = "Still thinking..."
        expected_result = [Reasoning("Still thinking...")]
        actual_time, actual_result = ThinkingProcessor.process_thinking_chunk(chunk, start_time)
        self.assertEqual(actual_time, start_time)
        self.assertEqualReasoning(actual_result[0], expected_result[0])

    def test_chunk_with_text_after_think(self):
        chunk = "Start <think>Middle</think>End"
        expected_time = 0
        expected_result = ["Start ", Reasoning(status="πŸ€” Is thinking...", is_thinking="<think>"), Reasoning("Middle"), Reasoning(status="", is_thinking="</think>"), "End"]
        actual_time, actual_result = ThinkingProcessor.process_thinking_chunk(chunk)
        self.assertEqual(actual_time, expected_time)
        for i in range(1, len(expected_result)):
            self.assertEqualReasoning(actual_result[i], expected_result[i])