1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128import os
import time
import json
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from axe_selenium_python import Axe
from utils.watch import logger
from flask import jsonify
from utils.yeet_back import axe_catcher
from utils.auth import rabbit
def axe_scan(app, body, channel=None, delivery_tag=None):
"""
Description:
This function performs an accessibility audit using Axe-Core for a given URL.
Args:
app: The Flask application context.
body: The HTTP POST request body containing the URL to be audited.
channel: The RabbitMQ channel for the message. Default is None.
delivery_tag: The RabbitMQ message delivery tag. Default is None.
Returns:
None
Raises:
Exception: If an error occurs during the accessibility audit.
"""
with app.app_context():
url = None
url_id = None
try:
if isinstance(body, str): # Check if 'body' is a string
payload = json.loads(body)
else:
payload = json.loads(body)
url = payload.get('url')
url_id = payload.get('url_id')
logger.debug(f'๐ Starting to process: {payload}')
# Set the proxy settings using environment variables
use_proxy = os.environ.get('USE_PROXY', 'false').lower() == 'true'
proxy_http = os.environ.get('PROXY_HTTP')
proxy_https = os.environ.get('PROXY_HTTPS')
options = webdriver.ChromeOptions()
if use_proxy:
if proxy_http:
options.add_argument(f'--proxy-server={proxy_http}')
if proxy_https:
options.add_argument(f'--proxy-server={proxy_https}')
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--stdout')
options.add_argument('--chromedriver-path /usr/local/bin/chromedriver')
# Disable file downloads
prefs = {"download_restrictions": 3} # Disable all downloads
options.add_experimental_option("prefs", prefs)
driver = webdriver.Chrome(options=options)
logger.debug(f'Testing URL: {url}')
driver.get(url)
axe = Axe(driver)
axe.inject()
# Debug Results
results = axe.run()
# Convert the results dictionary to a JSON string
results_json = json.dumps(results)
# Convert the JSON string back to a dictionary
results_dict = json.loads(results_json)
# Get Axe Driver Info
user_agent = driver.execute_script("return navigator.userAgent;")
window_width = driver.execute_script("return window.innerWidth;")
window_height = driver.execute_script("return window.innerHeight;")
orientation_angle = driver.execute_script("return window.orientation;")
orientation_type = driver.execute_script("return screen.orientation && screen.orientation.type;")
axe_driver_specs = {
'engine_name': 'Axe',
'orientation_angle': orientation_angle,
'orientation_type': orientation_type,
'user_agent': user_agent,
'window_height': window_height,
'window_width': window_width,
}
# logger.debug(f'Axe Driver Specs: {axe_driver_specs}')
# Axe Shut Down
driver.quit()
# Yeet to Yeet Back
axe_catcher(results_dict, url_id, axe_driver_specs)
if channel and delivery_tag:
channel.basic_ack(delivery_tag)
except Exception as e:
logger.error(f'โ Error processing URL {url}: {e}', exc_info=True)
# Send the URL to the axe_scan_error queue
if url and url_id:
# Send a message to the error_crawler queue
error_payload = json.dumps({
"url_id": url_id,
"url": url,
"error_message": {e}
})
rabbit('error_axe', error_payload)
if channel and delivery_tag:
channel.basic_nack(delivery_tag)
# Send a message to the error_crawler queue
error_payload = json.dumps({
"url_id": url_id,
"url": url,
"error_message": error_message
})
send_to_queue("error_crawler", error_payload)