1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94import os
import time
import json
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from utils.watch import logger
from utils.auth import rabbit
def clean_url(url):
url = url.split('#')[0]
url = url.split('?')[0]
return url
def is_valid_url(url):
return not (url.startswith('mailto:') or url.startswith('tel:'))
def scrape_url(url_id, url):
logger.debug(f'๐ Starting to process: {url}')
# Set the proxy settings using environment variables
use_proxy = os.environ.get('USE_PROXY', 'false').lower() == 'true'
logger.debug(f'USE_PROXY: {use_proxy} ')
proxy_http = os.environ.get('PROXY_HTTP')
if proxy_http:
proxy_http = f'http://{proxy_http}'
logger.debug(f'PROXY_HTTP: {proxy_http}')
proxy_https = os.environ.get('PROXY_HTTPS')
if proxy_https:
proxy_https = f'http://{proxy_https}'
logger.debug(f'PROXY_HTTPS: {proxy_https} ')
proxies = {'http': proxy_http, 'https': proxy_https} if use_proxy else None
logger.debug(f'Proxies: {proxies} ')
response = requests.get(url, proxies=proxies, verify=False)
soup = BeautifulSoup(response.content, 'html.parser')
# Extract and clean all URLs from the web page
raw_links = [a['href'] for a in soup.find_all('a', href=True)]
cleaned_links = [clean_url(urljoin(url, raw_link)) for raw_link in raw_links if is_valid_url(raw_link)]
# Deduplicate URLs
deduplicated_links = list(set(cleaned_links))
# TODO: Process the urls?
return deduplicated_links
def send_to_queue(queue_name, message):
rabbit(queue_name, message)
logger.info(f'๐ค Sent to {queue_name} queue: {message}')
def process_message(channel, method, properties, body):
url = None
url_id = None
try:
payload = json.loads(body)
url = payload.get('url')
url_id = payload.get('url_id')
logger.debug(f'๐ Payload received: {payload}')
deduplicated_links = scrape_url(url_id, url)
logger.debug(f'๐ Deduplicated links: {deduplicated_links}')
# Create a list of dictionaries with source_url_id and url
deduplicated_links_list = [{"source_url_id": url_id, "url": deduplicated_url} for deduplicated_url in deduplicated_links]
# Convert the list to a JSON string
message = json.dumps(deduplicated_links_list)
# Send the JSON array as a single message to the landing_crawler queue
send_to_queue("landing_crawler", message)
channel.basic_ack(delivery_tag=method.delivery_tag)
logger.debug(f'โ
Successfully processed: {url}')
except Exception as e:
error_message = f"โ Failed to process {url}: {e}"
logger.error(error_message)
time.sleep(15) # Pause for 15 seconds
channel.basic_nack(delivery_tag=method.delivery_tag)
# Send a message to the error_crawler queue
error_payload = json.dumps({
"url_id": url_id,
"url": url,
"error_message": error_message
})
send_to_queue("error_crawler", error_payload)