๐Ÿ“ฆ EqualifyEverything / integration-crawler

๐Ÿ“„ scrape.py ยท 134 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134import os
import time
import json
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from utils.watch import logger
from utils.auth import rabbit
from utils.health import test_proxy


def clean_url(url):
    url = url.split('#')[0]
    url = url.split('?')[0]
    return url


def is_valid_url(url):
    return not (url.startswith('mailto:') or url.startswith('tel:'))


def scrape_url(url_id, url):
    logger.debug(f'๐ŸŒŸ Starting to process: {url}')

    # Set the proxy settings using environment variables
    use_proxy = os.environ.get('USE_PROXY', 'false').lower() == 'true'
    logger.debug(f'USE_PROXY: {use_proxy} ')
    proxy_http = os.environ.get('PROXY_HTTP')
    if proxy_http:
        proxy_http = f'http://{proxy_http}'
    logger.debug(f'PROXY_HTTP: {proxy_http}')
    proxy_https = os.environ.get('PROXY_HTTPS')
    if proxy_https:
        proxy_https = f'http://{proxy_https}'
    logger.debug(f'PROXY_HTTPS: {proxy_https} ')
    proxies = {'http': proxy_http, 'https': proxy_https} if use_proxy else None
    logger.debug(f'Proxies: {proxies} ')

    response = requests.get(url, proxies=proxies, verify=False, timeout=10)
    soup = BeautifulSoup(response.content, 'html.parser')

    # Extract and clean all URLs from the web page
    raw_links = [a['href'] for a in soup.find_all('a', href=True)]
    cleaned_links = [
            clean_url(
                urljoin(
                    url, raw_link
                )) for raw_link in raw_links if is_valid_url(raw_link)
            ]

    # Deduplicate URLs
    deduplicated_links = list(set(cleaned_links))

    # TODO: Process the urls?

    return deduplicated_links


def send_to_queue(queue_name, message):
    rabbit(queue_name, message)
    logger.info(f'๐Ÿ“ค Sent to {queue_name} queue: {message}')


def process_message(channel, method, properties, body):
    url = None
    url_id = None
    try:
        payload = json.loads(body)
        url = payload.get('url')
        url_id = payload.get('url_id')
        logger.debug(f'๐Ÿ” Payload received: {payload}')

        deduplicated_links = scrape_url(url_id, url)
        logger.debug(f'๐Ÿ”— Deduplicated links: {deduplicated_links}')

        # Create a list of dictionaries with source_url_id and url
        deduplicated_links_list = [
                {
                    "source_url_id": url_id,
                    "url": deduplicated_url
                } for deduplicated_url in deduplicated_links
            ]

        # Convert the list to a JSON string
        message = json.dumps(deduplicated_links_list)

        # Send the JSON array as a single message to the landing_crawler queue
        send_to_queue("landing_crawler", message)

        channel.basic_ack(delivery_tag=method.delivery_tag)
        logger.debug(f'Successfully processed: {url}')
    except requests.exceptions.Timeout as e:
        error_message = f"โŒ Failed to process {url}: Request timed out. {e}"
        logger.error(error_message)
        time.sleep(1)  # Pause for 1 second
        channel.basic_nack(delivery_tag=method.delivery_tag)

        # Send a message to the error_crawler queue
        error_payload = json.dumps({
            "url_id": url_id,
            "url": url,
            "error_message": error_message
        })
        send_to_queue("error_crawler", error_payload)
        channel.basic_ack(delivery_tag=method.delivery_tag)
    # Proxy Exceptions
    except requests.exceptions.ProxyError as e:
        error_message = f"โŒ Failed to process {url}: Proxy error. {e}"
        logger.error(error_message)

        if not test_proxy():
            error_payload = json.dumps({
                "url_id": url_id,
                "url": url,
                "error_message": error_message
            })
            send_to_queue("error_crawler", error_payload)

        time.sleep(1)  # Pause for 15 seconds
        channel.basic_ack(delivery_tag=method.delivery_tag)
    # Other exceptions
    except Exception as e:
        error_message = f"โŒ Failed to process {url}: {e}"
        logger.error(error_message)
        time.sleep(1)  # Pause for 15 seconds
        channel.basic_ack(delivery_tag=method.delivery_tag)
        # Send a message to the error_crawler queue
        error_payload = json.dumps({
            "url_id": url_id,
            "url": url,
            "error_message": error_message
        })
        send_to_queue("error_crawler", error_payload)