๐Ÿ“ฆ EqualifyEverything / scan

๐Ÿ“„ update.py ยท 97 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97import psycopg2
import json
from datetime import datetime, timezone
from utils.watch import logger
from data.access import connection

# Log Emoji: ๐Ÿ—„๏ธ_๐Ÿ”ง

def execute_update(query, params=None, fetchone=True):
 #  logger.debug(f'๐Ÿ—„๏ธ   ๐Ÿ”ง Executing query: {query}')
 #  logger.debug(f'๐Ÿ—„๏ธ   ๐Ÿ”ง Query parameters: {params}... ')

   # Connect to the database
   conn = connection()
   conn.open()
   logger.debug(f'๐Ÿ—„๏ธ   ๐Ÿ”ง Database connection opened')

   # Create a cursor
   cur = conn.conn.cursor()

   try:
      # Execute the query
      cur.execute(query, params)
      conn.conn.commit()
      logger.info(f'๐Ÿ—„๏ธ   ๐Ÿ”ง Query executed and committed')

      # Fetch the results if requested
      result = None
      if fetchone:
            result = cur.fetchone() or ()  # return an empty tuple if None is returned
      else:
            result = cur.fetchall() or []  # return an empty list if None is returned
            logger.debug(f'๐Ÿ—„๏ธ   ๐Ÿ”ง Fetched results: {result}')
   except Exception as e:
      #  logger.error(f'๐Ÿ—„๏ธ   ๐Ÿ”ง Error executing update query: {e}')
        result = None

   # Close the cursor and connection
   cur.close()
   conn.close()
   logger.debug(f'๐Ÿ—„๏ธ   ๐Ÿ”ง Cursor and connection closed')

   return result




def tech_check_failure(url_id):
   logger.info(f'๐Ÿ—„๏ธ   ๐Ÿ”ง Logging Tech Check Failure')
   query = """
       UPDATE targets.urls
       SET active_scan_tech = False,
         scanned_at_tech = %s
       WHERE id = %s
   """
   try:
       execute_update(query, (datetime.now(timezone.utc), url_id))
       logger.debug(f'๐Ÿ—„๏ธ   ๐Ÿ”ง Marked Failure: {url_id}')
       return True
   except Exception as e:  # Add 'Exception as e' to capture the exception details
       logger.error(f'๐Ÿ—„๏ธ   ๐Ÿ”ง Failed to Mark Failure: {url_id} - Error: {e}')  # Display the error message
       return False

def mark_url_axe_scanned(url_id):

    query = """
        UPDATE targets.urls
        SET scanned_at_axe = %s
        WHERE id = %s;
   """
    try:
       execute_update(query, (datetime.now(timezone.utc), url_id))
       logger.debug(f'๐Ÿ—„๏ธ   ๐Ÿ”ง Marked {url_id} as Scanned')
       return True
    except Exception as e:  # Add 'Exception as e' to capture the exception details
      logger.error(f'๐Ÿ—„๏ธ   ๐Ÿ”ง Failed to mark as Scanned: {url_id} - Error: {e}')  # Display the error message
      return False


def tech_mark_url(url_id):
   query = """
      UPDATE targets.urls
         SET scanned_at_tech = %s
         WHERE id = %s;
   """
   try:
       execute_update(query, (datetime.now(timezone.utc), url_id))
       logger.debug(f'๐Ÿ—„๏ธ   ๐Ÿ”ง Marked {url_id} as Teched')
       return True
   except Exception as e:  # Add 'Exception as e' to capture the exception details
      logger.error(f'๐Ÿ—„๏ธ   ๐Ÿ”ง Failed to mark as Teched: {url_id} - Error: {e}')  # Display the error message
      return False