This repository has been archived on 2025-09-28. You can view files and clone it, but you cannot make any changes to it's state, such as pushing and creating new issues, pull requests or comments.
linkmapy/lib/linkmap_from_sitelinks.py

122 lines
4.1 KiB
Python
Raw Permalink Normal View History

import asyncio
from html.parser import HTMLParser
from random import sample
from sys import stderr
from urllib.parse import urlparse
from httpx import AsyncClient
from .linkmap import LinkMap
class HTMLLinkFinder(HTMLParser):
links = []
def handle_starttag(self, tag, attrs):
if tag != "a":
return
for a in attrs:
attr, val = a
if attr != "href":
continue
if val.startswith("https://") or val.startswith("http://"):
if not val in self.links:
self.links.append(val)
def get_links(self, input_html:str):
self.feed(input_html)
return self.links
class LinkMapFromSitelinksGenerator:
site_request_max_len = 10000000 # bytes
site_request_timeout = 10 # seconds
generated_linkmap = LinkMap()
max_links_per_site = 3
max_depth = 3
enable_log = False
def log(self, something):
if self.enable_log:
print(something, file=stderr)
async def _get_html(self, url:str, client:AsyncClient) -> str:
content = bytearray()
content_size = 0
# receive up to self.site_request_max_len bytes after
# a maximum of self.site_request_timeout seconds
self.log(f"Request: {url}")
async with client.stream(
"GET",
url,
timeout=self.site_request_timeout,
follow_redirects=True
) as stream:
async for chunk in stream.aiter_bytes(1024):
content_size += len(chunk)
if content_size > self.site_request_max_len:
self.log(f"Maximum content length exceeded! received: {content_size} (maximum: {self.site_request_max_len})")
break
else:
content.extend(chunk)
# decode
try:
html_content = content.decode()
except UnicodeDecodeError:
self.log(f"Couldn't decode {url}")
html_content = ""
return html_content
async def _get_linked_sites_coro(self, url, client:AsyncClient):
linked_sites = []
try:
html = await self._get_html(url, client)
found_links = HTMLLinkFinder().get_links(html)
found_links = sample(found_links, min(self.max_links_per_site, len(found_links)))
for l in found_links:
self.log(f"Found {l}")
if l != None:
linked_sites.append(l)
except KeyboardInterrupt:
exit("KeyboardInterrupt")
except Exception as e:
self.log("An exception occcured while trying to get links from '" + url + "': ")
self.log(e)
return url, linked_sites
async def _get_linked_sites(self, urls:list, client:AsyncClient):
# get results
results = await asyncio.gather(*[self._get_linked_sites_coro(url, client) for url in urls])
results_as_dict = {}
for url, links in results:
results_as_dict[url] = links
return results_as_dict
async def _generate_linkmap(self, start_urls:list, _current_depth:int, client:AsyncClient):
linkdict = {}
linked_sites = await self._get_linked_sites(start_urls, client)
for url in linked_sites:
linkdict[url] = {}
self.generated_linkmap.add_link(url)
for l in linked_sites[url]:
if l != url:
linkdict[url][l] = {}
self.generated_linkmap.add_link_connection(url, l)#
if _current_depth < self.max_depth:
for url in linkdict:
linkdict[url] = await self._generate_linkmap(list(linkdict[url]), _current_depth + 1, client)
async def generate(self, start_url:str, max_depth:int=3, max_links_per_site:int=3):
self.generated_linkmap = LinkMap()
self.max_links_per_site = max_links_per_site
self.max_depth = max_depth
async with AsyncClient() as client:
await self._generate_linkmap([start_url], 1, client)
def get_linkmap(self) -> LinkMap:
return self.generated_linkmap