1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161import os import sys import psycopg2 import json import subprocess import logging # Configure logging logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # Create console handler and set level to debug ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.DEBUG) # Create formatter formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s') # Add formatter to console handler ch.setFormatter(formatter) # Add console handler to logger logger.addHandler(ch) # Postgre connection info db_host = "localhost" db_port = 8432 db_user = "a11yPython" db_password = "SnakeInTheWeb" db_name = "a11y" # Connect to database try: conn = psycopg2.connect(host=db_host, port=db_port, user=db_user, password=db_password, database=db_name) cur = conn.cursor() logger.info(f"Connected to database.") except psycopg2.Error as e: logger.error(f"Could not connect to database: {e}") sys.exit(1) # logger.info(f" ") def axe_scan(): # Define header mapping with open('../maps/axe.json') as f: header_mapping = json.load(f) while True: # Query the database for a URL to test cur.execute("SELECT url, id as \"url_id\" FROM staging.urls WHERE active=true AND gsa_site_scan_url_id IS NOT NULL ORDER BY Random() LIMIT 1") result = cur.fetchone() if result: url = result[0] cmd = f'axe {url} --chromedriver-path /usr/local/bin/chromedriver --chrome-options="no-sandbox" --stdout' try: output = subprocess.check_output(cmd, shell=True) response = json.loads(output.decode('utf-8')) #logger.debug(f" Response: {response} ") # Get top-level values to insert into the scans table engine_name = response.get('engine_name') engine_version = response.get('engine_version') env_orientation_angle = response.get('env_orientation_angle') env_orientation_type = response.get('env_orientation_type') env_user_agent = response.get('env_user_agent') env_window_height = response.get('env_window_height') env_window_width = response.get('env_window_width') reporter = response.get('reporter') runner_name = response.get('runner_name') scanned_at = response.get('scanned_at') # Insert top-level values into scans table cur.execute(""" INSERT INTO results.scans ( engine_name, engine_version, env_orientation_angle, env_orientation_type, env_user_agent, env_window_height, env_window_width, reporter, runner_name, scanned_at, url ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) """, ( engine_name, engine_version, env_orientation_angle, env_orientation_type, env_user_agent, env_window_height, env_window_width, reporter, runner_name, scanned_at, url )) # Log the completion of the scan logger.info(f"COMPLETE: {runner_name} scan of {url} ") except subprocess.CalledProcessError as e: # If there's an error, insert the URL into the problem_urls table cur.execute(""" INSERT INTO results.problem_urls ( url, scan_type, response, url_id ) VALUES ( %s, 'axe', %s, %s ) """, (url, str(e), result[1])) conn.commit() # Log the error and continue to the next URL logger.error(f"{str(e)}: axe scan of {url} ") continue except Exception as e: # If there's an error, insert the URL into the problem_urls table cur.execute(""" INSERT INTO results.problem_urls ( url, scan_type, response, url_id ) VALUES ( %s, 'axe', %s, %s ) """, (url, str(e), result[1])) conn.commit() # Log the error and continue to the next URL logger.error(f"{str(e)}: axe scan of {url} ") continue # Process response fixed_axe = fix_axe(response, header_mapping) else: # Log when there are no more URLs to test logger.info(f"No more URLs to test") def fix_axe(response_dict, mapping): processed = {} try: for key, value in response_dict.items(): if isinstance(value, dict): for inner_key, inner_value in value.items(): combined_key = f"{key}.{inner_key}" new_key = mapping.get(combined_key) if new_key: processed[new_key] = inner_value else: processed[combined_key] = inner_value else: new_key = mapping.get(key) if new_key: processed[new_key] = value else: processed[key] = value logger.info(f"Axe Fixed ") except Exception as e: logger.error(f"Error fixing Axe: {str(e)}") return processed if __name__ == '__main__': logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s') axe_scan()