๐Ÿ“ฆ EqualifyEverything / crawler

๐Ÿ“„ insert.py ยท 169 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169import psycopg2
import json
import traceback
from database.access import connection
from utils.watch import logger
from psycopg2.pool import SimpleConnectionPool

# Set use_pooling to True to enable connection pooling
use_pooling = True

# Connection pool
pool = None

if use_pooling:
    conn_params = connection().get_connection_params()
    pool = SimpleConnectionPool(
        minconn=1,
        maxconn=10,
        **conn_params
    )



def connection_pooling():
    return pool.getconn()

def release_pooling(conn):
    pool.putconn(conn)

# Normal Insert

def execute_insert(query, params=None, fetchone=True):
    # logger.debug(f"๐Ÿ—„๏ธโœ๏ธ Executing query: {query}")
    # logger.debug(f"๐Ÿ—„๏ธโœ๏ธ Query parameters: {params}")

    # Connect to the database
    if use_pooling:
        conn = connection_pooling()
    else:
        conn = connection()
        conn.open()
        logger.debug("๐Ÿ—„๏ธโœ๏ธ Database connection opened")

    # Create a cursor
    cur = conn.cursor() # Removed conn.

    try:
        # Execute the query
        cur.execute(query, params)
        conn.commit()   # Removed conn.
        logger.info("๐Ÿ—„๏ธโœ๏ธ๐ŸŸข Query executed and committed")

        # Fetch the results if requested
        result = None
        if fetchone:
            result = cur.fetchone() or () # return an empty tuple if None is returned
        else:
            result = cur.fetchall() or [] # return an empty list if None is returned
            logger.debug(f'๐Ÿ—„๏ธโœ๏ธ Fetched results: {result}')
    except Exception as e:
        logger.error(f"๐Ÿ—„๏ธโœ๏ธ Error executing insert query: {e}\n{traceback.format_exc()}")
        result = None

    # Close the cursor and connection
    cur.close()
    if use_pooling:
        release_pooling(conn)
    else:
        conn.close()
        logger.debug("๐Ÿ—„๏ธโœ๏ธ Cursor and connection closed")

    return result
# # # # # # # # # #

# Bulk Inserts

def execute_bulk_insert(query, params_list):
    # Connect to the database
    if use_pooling:
        conn = connection_pooling()
    else:
        conn = connection()
        conn.open()

    # Create a cursor
    cur = conn.cursor()

    try:
        # Execute the query
        with conn:
            cur.executemany(query, params_list)
            logger.info("๐Ÿ—„๏ธโœ๏ธ๐ŸŸข Query executed and committed")
    except Exception as e:
        logger.error(f"๐Ÿ—„๏ธโœ๏ธ Error executing bulk insert query: {e}\n{traceback.format_exc()}")

    # Close the cursor and connection
    cur.close()
    if use_pooling:
        release_pooling(conn)
    else:
        conn.close()



#########################################################
## Queries

def record_new_url(discovered_url, source_url, crawl_id, sitemapped, domain_id):
    query = """
        INSERT INTO targets.urls (
            url,
            source_url,
            recent_crawl_id,
            sitemapped,
            domain_id
        )
        VALUES (
            %s, %s, %s, %s, %s
        )
        ON CONFLICT (url) DO UPDATE SET recent_crawl_id = %s
        RETURNING url;
    """
    logger.debug(f'๐Ÿ—„๏ธ โœ๏ธ Recording new url: {discovered_url} ')
    execute_insert(query, (discovered_url, source_url, crawl_id, sitemapped, domain_id, crawl_id ))
    # TODO-if there is an error, logger.error that things broke

def record_new_urls(url_records):
    query = """
        INSERT INTO targets.urls (
            url,
            source_url,
            recent_crawl_id,
            sitemapped,
            domain_id
        )
        VALUES (
            %s, %s, %s, %s, %s
        )
        ON CONFLICT (url) DO UPDATE SET recent_crawl_id = %s;
    """
    params_list = [(discovered_url, source_url, crawl_id, sitemapped, domain_id, crawl_id) for discovered_url, source_url, crawl_id, sitemapped, domain_id in url_records]
    execute_bulk_insert(query, params_list)
    # TODO-if there is an error, logger.error that things broke



def create_new_crawl(crawl_type, sitemap_id, agent, domain_id):
    query = """
        INSERT INTO events.crawls (
            crawl_type,
            sitemap_id,
            agent
        )
        VALUES (
            %s, %s, %s
        )
        RETURNING id as crawl_id;
        """
    logger.debug('๐Ÿ—„๏ธ โœ๏ธ Recording New Crawl')
    result = execute_insert(query, (crawl_type, sitemap_id, agent))
    if result:
        crawl_id = result
        logger.info(f'๐Ÿ—„๏ธ๐Ÿ” Created new crawl: {crawl_id} ')
        return crawl_id
    else:
        logger.error('๐Ÿ—„๏ธ๐Ÿ” Unable create new crawl')
        return None