๐Ÿ“ฆ EqualifyEverything / integration-axe

๐Ÿ“„ process.py ยท 108 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108import os
import time
import json
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from axe_selenium_python import Axe
from utils.watch import logger
from flask import jsonify
from utils.yeet_back import axe_catcher
from utils.auth import rabbit


def axe_scan(app, body, channel=None, delivery_tag=None):
    with app.app_context():
        url = None
        url_id = None
        try:
            if isinstance(body, str):  # Check if 'body' is a string
                payload = json.loads(body)
            else:
                payload = json.loads(body)
            url = payload.get('url')
            url_id = payload.get('url_id')
            logger.debug(f'๐ŸŒŸ Starting to process: {payload}')

            # Set the proxy settings using environment variables
            use_proxy = os.environ.get('USE_PROXY', 'false').lower() == 'true'
            proxy_http = os.environ.get('PROXY_HTTP')
            proxy_https = os.environ.get('PROXY_HTTPS')
            options = webdriver.ChromeOptions()
            if use_proxy:
                if proxy_http:
                    options.add_argument(f'--proxy-server={proxy_http}')
                if proxy_https:
                    options.add_argument(f'--proxy-server={proxy_https}')
            options.add_argument('--headless')
            options.add_argument('--no-sandbox')
            options.add_argument('--disable-dev-shm-usage')
            options.add_argument('--stdout')
            options.add_argument('--chromedriver-path /usr/local/bin/chromedriver')

            # Disable file downloads
            prefs = {"download_restrictions": 3}  # Disable all downloads
            options.add_experimental_option("prefs", prefs)

            driver = webdriver.Chrome(options=options)

            logger.debug(f'Testing URL: {url}')
            driver.get(url)

            axe = Axe(driver)
            axe.inject()

            # Debug Results
            results = axe.run()

            # Convert the results dictionary to a JSON string
            results_json = json.dumps(results)

            # Convert the JSON string back to a dictionary
            results_dict = json.loads(results_json)

            # Get Axe Driver Info
            user_agent = driver.execute_script("return navigator.userAgent;")
            window_width = driver.execute_script("return window.innerWidth;")
            window_height = driver.execute_script("return window.innerHeight;")
            orientation_angle = driver.execute_script("return window.orientation;")
            orientation_type = driver.execute_script("return screen.orientation && screen.orientation.type;")

            axe_driver_specs = {
                'engine_name': 'Axe',
                'orientation_angle': orientation_angle,
                'orientation_type': orientation_type,
                'user_agent': user_agent,
                'window_height': window_height,
                'window_width': window_width,
            }
            # logger.debug(f'Axe Driver Specs: {axe_driver_specs}')
            # Axe Shut Down
            driver.quit()

            # Yeet to Yeet Back
            axe_catcher(results_dict, url_id, axe_driver_specs)

            if channel and delivery_tag:
                channel.basic_ack(delivery_tag)

        except Exception as e:
            logger.error(f'โŒ Error processing URL {url}: {e}', exc_info=True)

            # Send the URL to the axe_scan_error queue
            if url and url_id:
                error_message = json.dumps({'url': url, 'url_id': url_id})
                rabbit('axe_scan_error', error_message)

            if channel and delivery_tag:
                channel.basic_nack(delivery_tag)


# Tests
def test_axe_scan():
    body = json.dumps({'url': 'https://example.com'})
    response = axe_scan(body)
    assert response.status_code == 200
    data = response.json
    assert 'url' in data
    assert 'results' in data