File size: 2,376 Bytes
40ac571
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import os
import cv2
import numpy as np
from skimage.metrics import structural_similarity as ssim
from collections import deque
import shutil

def frame_analysis(prev_frame, curr_frame):
    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
    curr_gray = cv2.cvtColor(curr_frame, cv2.COLOR_BGR2GRAY)

    ssim_score = ssim(prev_gray, curr_gray)
    mean_diff = np.mean(np.abs(curr_frame.astype(float) - prev_frame.astype(float)))

    return ssim_score, mean_diff

def is_anomaly(ssim_score, mean_diff, ssim_history, mean_diff_history):
    if len(ssim_history) < 5:
        return False

    ssim_avg = np.mean(ssim_history)
    mean_diff_avg = np.mean(mean_diff_history)

    ssim_threshold = 0.8
    mean_diff_threshold = 7.0

    if (ssim_score < ssim_avg * 0.75 and mean_diff > mean_diff_avg * 1.25) or \
       (ssim_score < ssim_threshold and mean_diff > mean_diff_threshold):
        return True

    return False

def process_frames(input_folder, output_folder):
    frames = sorted([f for f in os.listdir(input_folder) if f.endswith('.png') or f.endswith('.jpg')])

    normal_folder = os.path.join(output_folder, 'normal')
    anomaly_folder = os.path.join(output_folder, 'anomaly')

    os.makedirs(normal_folder, exist_ok=True)
    os.makedirs(anomaly_folder, exist_ok=True)

    prev_frame = None
    ssim_history = deque(maxlen=5)
    mean_diff_history = deque(maxlen=5)

    for idx, frame_name in enumerate(frames):
        frame_path = os.path.join(input_folder, frame_name)
        frame = cv2.imread(frame_path)

        if prev_frame is not None:
            ssim_score, mean_diff = frame_analysis(prev_frame, frame)
            ssim_history.append(ssim_score)
            mean_diff_history.append(mean_diff)

            if is_anomaly(ssim_score, mean_diff, ssim_history, mean_diff_history):
                print(f"Anomaly detected in frame {frame_name}")
                shutil.copy(frame_path, os.path.join(anomaly_folder, frame_name))
            else:
                shutil.copy(frame_path, os.path.join(normal_folder, frame_name))
        else:
            shutil.copy(frame_path, os.path.join(normal_folder, frame_name))

        prev_frame = frame

if __name__ == "__main__":
    input_folder = "anomaly_test"
    output_folder = "test_anomaly_detect"

    process_frames(input_folder, output_folder)
    print("Processing completed.")