from html.parser import HTMLParser from random import sample from requests import get as request_get from sys import stderr from urllib.parse import urlparse class _HTMLExternalLinkFinder(HTMLParser): links = [] def handle_starttag(self, tag, attrs): if tag == "a": for a in attrs: attr, val = a if attr == "href": if val.startswith("https://") or val.startswith("http://"): if not val in self.links: self.links.append(val) def get_links(self, input_html:str): self.feed(input_html) return self.links class LinkMap: links = [] link_connections = [ # (link1, link2) ] def add_link_connection(self, link1, link2): if not link1 in self.links: self.links.append(link1) if not link2 in self.links: self.links.append(link2) self.link_connections.append((link1, link2)) def add_link(self, link): if not link in self.links: self.links.append(link) class LinkMapFromSitelinksGenerator: site_request_max_len = 10000000 # bytes site_request_timeout = 10 # seconds already_visited = [] generated_linkmap = LinkMap() max_links_per_site = 3 max_depth = 3 enable_log = False def log(self, something): if self.enable_log: print(something, file=stderr) def _get_html(self, url:str) -> str: html_content = "" # receive up to self.site_request_max_len bytes after a maximum of self.site_request_timeout seconds self.log("-----" + url) response = request_get(url, stream=True, timeout=self.site_request_timeout) response.raise_for_status() content_size = 0 content_chunks = [] for chunk in response.iter_content(1024, decode_unicode=True): content_size += len(chunk) if content_size > self.site_request_max_len: self.log(f"Maximum content length exceeded! received: {content_size} (maximum: {self.site_request_max_len})") break else: content_chunks.append(chunk) html_content = "".join(content_chunks) return html_content def _get_linked_sites(self, url:str): sites = [] if not url in self.already_visited: try: html = self._get_html(url) found_links = _HTMLExternalLinkFinder().get_links(html) found_links = sample(found_links, min(self.max_links_per_site, len(found_links))) self.log("\n".join(found_links)) for l in found_links: if l != None: sites.append(l) self.already_visited.append(url) except KeyboardInterrupt: exit("KeyboardInterrupt") except Exception as e: self.log("An exception occcured while trying to get links from '" + url + "': ") self.log(e) return sites def _generate_linkmap(self, start_urls:list, _current_depth:int): linkdict = {} for url in start_urls: linkdict[url] = {} self.generated_linkmap.add_link(url) linked_sites = self._get_linked_sites(url) for l in linked_sites: if l != url: linkdict[url][l] = {} self.generated_linkmap.add_link_connection(url, l) if _current_depth < self.max_depth: for url in linkdict: linkdict[url] = self._generate_linkmap(list(linkdict[url]), _current_depth + 1) def generate(self, start_url:str, max_depth:int=3, max_links_per_site:int=3): self.already_visited = [] self.generated_linkmap = LinkMap() self.max_links_per_site = max_links_per_site self.max_depth = max_depth self._generate_linkmap([start_url], 1) def get_linkmap(self) -> LinkMap: return self.generated_linkmap