import asyncio from html.parser import HTMLParser from random import sample from sys import stderr from urllib.parse import urlparse from httpx import AsyncClient from .linkmap import LinkMap class HTMLLinkFinder(HTMLParser): links = [] def handle_starttag(self, tag, attrs): if tag != "a": return for a in attrs: attr, val = a if attr != "href": continue if val.startswith("https://") or val.startswith("http://"): if not val in self.links: self.links.append(val) def get_links(self, input_html:str): self.feed(input_html) return self.links class LinkMapFromSitelinksGenerator: site_request_max_len = 10000000 # bytes site_request_timeout = 10 # seconds generated_linkmap = LinkMap() max_links_per_site = 3 max_depth = 3 enable_log = False def log(self, something): if self.enable_log: print(something, file=stderr) async def _get_html(self, url:str, client:AsyncClient) -> str: content = bytearray() content_size = 0 # receive up to self.site_request_max_len bytes after # a maximum of self.site_request_timeout seconds self.log(f"Request: {url}") async with client.stream( "GET", url, timeout=self.site_request_timeout, follow_redirects=True ) as stream: async for chunk in stream.aiter_bytes(1024): content_size += len(chunk) if content_size > self.site_request_max_len: self.log(f"Maximum content length exceeded! received: {content_size} (maximum: {self.site_request_max_len})") break else: content.extend(chunk) # decode try: html_content = content.decode() except UnicodeDecodeError: self.log(f"Couldn't decode {url}") html_content = "" return html_content async def _get_linked_sites_coro(self, url, client:AsyncClient): linked_sites = [] try: html = await self._get_html(url, client) found_links = HTMLLinkFinder().get_links(html) found_links = sample(found_links, min(self.max_links_per_site, len(found_links))) for l in found_links: self.log(f"Found {l}") if l != None: linked_sites.append(l) except KeyboardInterrupt: exit("KeyboardInterrupt") except Exception as e: self.log("An exception occcured while trying to get links from '" + url + "': ") self.log(e) return url, linked_sites async def _get_linked_sites(self, urls:list, client:AsyncClient): # get results results = await asyncio.gather(*[self._get_linked_sites_coro(url, client) for url in urls]) results_as_dict = {} for url, links in results: results_as_dict[url] = links return results_as_dict async def _generate_linkmap(self, start_urls:list, _current_depth:int, client:AsyncClient): linkdict = {} linked_sites = await self._get_linked_sites(start_urls, client) for url in linked_sites: linkdict[url] = {} self.generated_linkmap.add_link(url) for l in linked_sites[url]: if l != url: linkdict[url][l] = {} self.generated_linkmap.add_link_connection(url, l)# if _current_depth < self.max_depth: for url in linkdict: linkdict[url] = await self._generate_linkmap(list(linkdict[url]), _current_depth + 1, client) async def generate(self, start_url:str, max_depth:int=3, max_links_per_site:int=3): self.generated_linkmap = LinkMap() self.max_links_per_site = max_links_per_site self.max_depth = max_depth async with AsyncClient() as client: await self._generate_linkmap([start_url], 1, client) def get_linkmap(self) -> LinkMap: return self.generated_linkmap