warc2zim/tests/test_warc_to_zim.py
2025-02-03 14:50:58 +00:00

833 lines
26 KiB
Python

#!/usr/bin/env python
# vim: ai ts=4 sts=4 et sw=4 nu
import io
import json
import pathlib
import re
import time
from urllib.parse import unquote
import pytest
import requests
from zimscraperlib.image.conversion import convert_image, convert_svg2png
from zimscraperlib.image.probing import format_for
from zimscraperlib.image.transformation import resize_image
from zimscraperlib.zim import Archive
from warc2zim.__about__ import __version__
from warc2zim.converter import iter_warc_records
from warc2zim.main import main
from warc2zim.utils import get_record_url
ZIM_ILLUSTRATION_SIZE = 48
TEST_DATA_DIR = pathlib.Path(__file__).parent / "data"
# special data dir for WARC files which are not supposed to be ran in the
# `test_all_warcs_root_dir` test
TEST_DATA_SPECIAL_DIR = pathlib.Path(__file__).parent / "data-special"
SCRAPER_SUFFIX = "zimit x.y.z-devw"
# ============================================================================
CMDLINES = [
["example-response.warc"],
["example-response.warc", "--progress-file", "progress.json"],
["example-revisit.warc.gz"],
[
"example-revisit.warc.gz",
"-u",
"http://example.iana.org/",
"--lang",
"eng",
],
[
"example-utf8.warc",
"-u",
"https://httpbin.org/anything/utf8=%E2%9C%93?query=test&a=b&1=%E2%9C%93",
],
["single-page-test.warc"],
]
@pytest.fixture(params=CMDLINES, ids=[" ".join(cmds) for cmds in CMDLINES])
def cmdline(request):
return request.param
# ============================================================================
FUZZYCHECKS = [
{
"filename": "video-yt.warc.gz",
"entries": [
"youtube.fuzzy.replayweb.page/get_video_info?video_id=aT-Up5Y4uRI",
"youtube.fuzzy.replayweb.page/videoplayback?id=o-AE3bg3qVNY-gAWwYgL52vgpHKJe9ijdbu2eciNi5Uo_w",
],
},
{
"filename": "video-yt-2.warc.gz",
"entries": [
"youtube.fuzzy.replayweb.page/youtubei/v1/player?videoId=aT-Up5Y4uRI",
"youtube.fuzzy.replayweb.page/videoplayback?id=o-AGDtIqpFRmvgVVZk96wgGyFxL_SFSdpBxs0iBHatQpRD",
],
},
{
"filename": "video-vimeo.warc.gz",
"entries": [
"vimeo.fuzzy.replayweb.page/video/347119375",
"vimeo-cdn.fuzzy.replayweb.page/01/4423/13/347119375/1398505169.mp4",
],
},
]
@pytest.fixture(params=FUZZYCHECKS, ids=[fuzzy["filename"] for fuzzy in FUZZYCHECKS])
def fuzzycheck(request):
return request.param
# ============================================================================
class TestWarc2Zim:
def list_articles(self, zimfile):
zim_fh = Archive(zimfile)
for x in range(zim_fh.entry_count):
yield zim_fh.get_entry_by_id(x)
def get_main_entry_with_redirect(self, zimfile):
zim_fh = Archive(zimfile)
if zim_fh.main_entry.is_redirect:
return zim_fh.main_entry.get_redirect_entry()
return zim_fh.main_entry
def get_metadata(self, zimfile, name):
zim_fh = Archive(zimfile)
return zim_fh.get_metadata(name)
def get_article(self, zimfile, path):
zim_fh = Archive(zimfile)
return zim_fh.get_content(path)
def get_article_raw(self, zimfile, path):
zim_fh = Archive(zimfile)
return zim_fh.get_item(path)
def assert_item_exist(self, zimfile, path):
zim_fh = Archive(zimfile)
assert zim_fh.get_item(path)
def assert_item_does_not_exist(self, zimfile, path):
zim_fh = Archive(zimfile)
try:
payload = zim_fh.get_item(path)
except KeyError:
payload = None
assert payload is None
def verify_warc_and_zim(self, warcfile, zimfile):
assert pathlib.Path(warcfile).is_file()
assert pathlib.Path(zimfile).is_file()
# [TOFIX]
head_insert = b""
# track to avoid checking duplicates, which are not written to ZIM
warc_urls = set()
zim_fh = Archive(zimfile)
assert zim_fh.get_text_metadata("Scraper").startswith(f"warc2zim {__version__}")
assert zim_fh.get_text_metadata("X-ContentDate")
for record in iter_warc_records([warcfile]):
url = get_record_url(record)
if not url:
continue
if url in warc_urls:
continue
if record.rec_type not in (("response", "resource", "revisit")):
continue
# ignore revisit records that are to the same url
if (
record.rec_type == "revisit"
and record.rec_headers["WARC-Refers-To-Target-URI"] == url
):
continue
# parse headers as record, ensure headers match
url_no_scheme = url.split("//", 2)[1]
if "www.youtube.com/embed" in url_no_scheme:
# We know that those url are rewritten in zim. Don't check for them.
break
url_no_scheme = re.sub(r"\?\d+$", "?", url_no_scheme)
# remove user/password
if "@" in url_no_scheme:
at_index = url_no_scheme.index("@")
if at_index >= 0:
if "/" in url_no_scheme:
slash_index = url_no_scheme.index("/")
if at_index < slash_index:
url_no_scheme = url_no_scheme[at_index + 1 :]
else:
url_no_scheme = url_no_scheme[at_index + 1 :]
# remove trailing ?
if url_no_scheme.endswith("?"):
url_no_scheme = url_no_scheme[:-1]
# unquote url since everything is not encoded in ZIM
url_no_scheme = unquote(url_no_scheme)
# ensure payloads match
try:
payload = zim_fh.get_item(url_no_scheme)
except KeyError:
payload = None
if record.http_headers and record.http_headers.get("Content-Length") == "0":
if record.http_headers.get("Location"):
assert payload # this is a redirect, it must be handled
else:
assert not payload
elif record.rec_type == "revisit":
# We must have a payload
# We should check with the content of the targeted record...
# But difficult to test as we don't have it
assert payload
elif record.rec_type == "response":
# We must have a payload
assert payload
payload_content = (
payload.content.tobytes() # pyright:ignore[reportAttributeAccessIssue]
)
# if HTML, still need to account for the head insert, otherwise should
# have exact match
if payload.mimetype.startswith( # pyright:ignore[reportAttributeAccessIssue]
"text/html"
):
assert head_insert in payload_content
elif record.rec_type == "resource":
# we do not want to embed resources "as-is"
assert not payload
warc_urls.add(url)
def rebuild_favicon_bytes(self, zim, favicon_path) -> bytes:
favicon_bytes = self.get_article(
zim,
favicon_path,
)
assert favicon_bytes
dst = io.BytesIO()
if format_for(io.BytesIO(favicon_bytes), from_suffix=False) == "SVG":
convert_svg2png(
io.BytesIO(favicon_bytes),
dst,
ZIM_ILLUSTRATION_SIZE,
ZIM_ILLUSTRATION_SIZE,
)
else:
convert_image(io.BytesIO(favicon_bytes), dst, fmt="PNG")
resize_image(
dst,
width=ZIM_ILLUSTRATION_SIZE,
height=ZIM_ILLUSTRATION_SIZE,
method="cover",
)
return dst.getvalue()
def test_warc_to_zim_specify_params_and_metadata(self, tmp_path):
zim_output = "zim-out-filename.zim"
main(
[
"-v",
str(TEST_DATA_DIR / "example-response.warc"),
"--name",
"example-response",
"--output",
str(tmp_path),
"--zim-file",
zim_output,
"--tags",
" foo ;bar; ; some;_foo:bar;_foo_,_bar_",
"--desc",
"test zim",
"--title",
"Some Title",
"--scraper-suffix",
SCRAPER_SUFFIX,
]
)
zim_output = tmp_path / zim_output
assert pathlib.Path(zim_output).is_file()
all_articles = {
article.path: article.title for article in self.list_articles(zim_output)
}
assert all_articles == {
# entries from WARC
"example.com/": "Example Domain",
"_zim_static/__wb_module_decl.js": "_zim_static/__wb_module_decl.js",
"_zim_static/wombat.js": "_zim_static/wombat.js",
"_zim_static/wombatSetup.js": "_zim_static/wombatSetup.js",
}
zim_fh = Archive(zim_output)
# ZIM metadata
assert list(zim_fh.metadata.keys()) == [
"Counter",
"Creator",
"Date",
"Description",
"Language",
"Name",
"Publisher",
"Scraper",
"Tags",
"Title",
"X-ContentDate",
]
assert zim_fh.has_fulltext_index
assert zim_fh.has_title_index
assert self.get_metadata(zim_output, "Description") == b"test zim"
# we compare sets of tags since tags ordering has no meaning
assert set(
self.get_metadata(zim_output, "Tags").decode("utf-8").split(";")
) == {
"_ftindex:yes",
"_category:other",
"some",
"foo",
"bar",
"_foo:bar",
"_foo_,_bar_",
}
assert self.get_metadata(zim_output, "Title") == b"Some Title"
assert (
zim_fh.get_text_metadata("Scraper") == f"warc2zim {__version__},"
"webrecorder.io 2.0 (warcprox 1.4-20151022181819-1a48f12),zimit x.y.z-devw"
)
assert zim_fh.get_text_metadata("X-ContentDate") == "2016-02-25"
def test_warc_to_zim_main(self, cmdline, tmp_path):
# intput filename
filename = cmdline[0]
# set intput filename (first arg) to absolute path from test dir
warcfile = TEST_DATA_DIR / filename
cmdline[0] = str(warcfile)
cmdline.extend(["--output", str(tmp_path), "--name", filename])
main(cmdline)
zimfile = filename + "_" + time.strftime("%Y-%m") + ".zim"
if "--progress-file" in cmdline:
with open(tmp_path / "progress.json") as fh:
progress = json.load(fh)
assert (
progress["written"] > 0
and progress["total"] > 0
and progress["written"] <= progress["total"]
)
self.verify_warc_and_zim(warcfile, tmp_path / zimfile)
def test_same_domain_only(self, tmp_path):
zim_output = "same-domain.zim"
main(
[
str(TEST_DATA_DIR / "example-revisit.warc.gz"),
"--favicon",
"http://example.com/favicon.ico",
"--include-domains",
"example.com/",
"--lang",
"eng",
"--zim-file",
zim_output,
"--name",
"same-domain",
"--output",
str(tmp_path),
]
)
zim_output = tmp_path / zim_output
for article in self.list_articles(zim_output):
url = article.path
# ignore the replay files, which have only one path segment
if not url.startswith("_zim_static/"):
assert url.startswith("example.com/")
def test_skip_self_redirect(self, tmp_path):
zim_output = "self-redir.zim"
main(
[
str(TEST_DATA_DIR / "self-redirect.warc"),
"--output",
str(tmp_path),
"--zim-file",
zim_output,
"--name",
"self-redir",
]
)
zim_output = tmp_path / zim_output
def test_include_domains_favicon_and_language(self, tmp_path):
zim_output = "spt.zim"
main(
[
str(TEST_DATA_DIR / "single-page-test.warc"),
"-i",
"reseau-canope.fr",
"--output",
str(tmp_path),
"--zim-file",
zim_output,
"--name",
"spt",
]
)
zim_output = tmp_path / zim_output
for article in self.list_articles(zim_output):
url = article.path
# ignore the replay files, which have only one path segment
if not url.startswith("_zim_static/"):
assert "reseau-canope.fr/" in url
# test detected language
assert self.get_metadata(zim_output, "Language") == b"fra"
# test detected favicon
zim_favicon = self.get_metadata(zim_output, "Illustration_48x48@1")
assert zim_favicon
assert (
self.rebuild_favicon_bytes(
zim_output,
"lesfondamentaux.reseau-canope.fr/fileadmin/template/img/favicon.ico",
)
== zim_favicon
)
# test default tags added ; we compare sets of tags since tags ordering has no
# meaning
assert set(
self.get_metadata(zim_output, "Tags").decode("utf-8").split(";")
) == {
"_ftindex:yes",
"_category:other",
}
def test_website_with_redirect(self, tmp_path):
zim_output = "kiwix.zim"
main(
[
str(TEST_DATA_DIR / "kiwix-with-redirects.warc.gz"),
"-u",
"http://www.kiwix.org",
"--output",
str(tmp_path),
"--zim-file",
zim_output,
"--name",
"kiwix",
]
)
zim_output = tmp_path / zim_output
# check that redirections have been followed
assert self.get_main_entry_with_redirect(zim_output).path == "kiwix.org/en/"
# test detected language
assert self.get_metadata(zim_output, "Language") == b"eng"
# test detected favicon
zim_favicon = self.get_metadata(zim_output, "Illustration_48x48@1")
assert zim_favicon
assert (
self.rebuild_favicon_bytes(
zim_output,
"kiwix.org/favicon.ico",
)
== zim_favicon
)
def test_all_warcs_root_dir(self, tmp_path):
zim_output = "test-all.zim"
main(
[
str(TEST_DATA_DIR),
"--output",
str(tmp_path),
"--zim-file",
zim_output,
"--name",
"test-all",
"--url",
"http://example.com",
]
)
zim_output = tmp_path / zim_output
# check articles from different warc records in tests/data dir
# from example.warc.gz
assert self.get_article(zim_output, "example.com/") != b""
# from single-page-test.warc
assert (
self.get_article(
zim_output, "lesfondamentaux.reseau-canope.fr/accueil.html"
)
!= b""
)
# timestamp fuzzy match from example-with-timestamp.warc
assert self.get_article(zim_output, "example.com/path.txt") != b""
def test_fuzzy_urls(self, tmp_path, fuzzycheck):
zim_output = fuzzycheck["filename"] + ".zim"
main(
[
str(TEST_DATA_DIR / fuzzycheck["filename"]),
"--output",
str(tmp_path),
"--zim-file",
zim_output,
"--name",
"test-fuzzy",
]
)
zim_output = tmp_path / zim_output
for entry in fuzzycheck["entries"]:
# This should be item and get_article_raw is eq to getItem and it will fail
# if it is not a item
self.get_article_raw(zim_output, entry)
def test_error_bad_main_page(self, tmp_path):
zim_output_not_created = "zim-out-not-created.zim"
assert (
main(
[
"-v",
str(TEST_DATA_DIR / "example-response.warc"),
"-u",
"https://no-such-url.example.com",
"--output",
str(tmp_path),
"--name",
"bad",
"--zim-file",
zim_output_not_created,
]
)
== 4
)
def test_error_main_page_unprocessable(self, tmp_path):
zim_output_not_created = "zim-out-not-created.zim"
assert (
main(
[
"-v",
str(TEST_DATA_DIR / "main-entry-403.warc.gz"),
"-u",
"https://wikizilla.org/wiki/Doug",
"--output",
str(tmp_path),
"--name",
"bad",
"--zim-file",
zim_output_not_created,
]
)
== 4
)
assert not (pathlib.Path(tmp_path) / zim_output_not_created).exists()
def test_args_only(self):
# error, name required
with pytest.raises(SystemExit) as e:
main([])
assert e.value.code == 2
# error, no such output directory
with pytest.raises(SystemExit) as e:
main(["--name", "test", "--output", "/no-such-dir"])
assert e.value.code == 1
# error, name has invalid characters for Linux filesystem
with pytest.raises(SystemExit) as e:
main(["--name", "te/st", "--output", "./"])
assert e.value.code == 3
# success, special return code for no output files
assert main(["--name", "test", "--output", "./"]) == 100
def test_custom_css(self, tmp_path):
custom_css = b"* { background-color: red; }"
custom_css_path = tmp_path / "custom.css"
with open(custom_css_path, "wb") as fh:
fh.write(custom_css)
zim_output = "test-css.zim"
main(
[
str(TEST_DATA_DIR / "example-response.warc"),
"--output",
str(tmp_path),
"--zim-file",
zim_output,
"--name",
"test-css",
"--custom-css",
str(custom_css_path),
]
)
zim_output = tmp_path / zim_output
res = self.get_article(zim_output, "example.com/")
assert b"static_prefix" not in res
assert b"../_zim_static/custom.css" in res
res = self.get_article(zim_output, "_zim_static/custom.css")
assert custom_css == res
def test_custom_css_remote(self, tmp_path):
zim_output = "test-css.zim"
url = (
"https://cdn.jsdelivr.net/npm/bootstrap@4.5.3/dist/css/bootstrap-reboot.css"
)
main(
[
str(TEST_DATA_DIR / "example-response.warc"),
"--output",
str(tmp_path),
"--zim-file",
zim_output,
"--name",
"test-css",
"--custom-css",
url,
]
)
zim_output = tmp_path / zim_output
res = self.get_article(zim_output, "example.com/")
assert b"static_prefix" not in res
assert b"../_zim_static/custom.css" in res
res = self.get_article(zim_output, "_zim_static/custom.css")
assert res == requests.get(url, timeout=10).content
def test_http_return_codes(self, tmp_path):
zim_output = "test-http-return-codes.zim"
main(
[
str(TEST_DATA_DIR / "http-return-codes.warc.gz"),
"--output",
str(tmp_path),
"--zim-file",
zim_output,
"--name",
"test-http-return-codes",
]
)
zim_output = tmp_path / zim_output
for exising_website_items in [
"200-response",
"201-response",
"202-response",
"301-internal-redirect-ok",
"301-external-redirect-ok",
"302-internal-redirect-ok",
"302-external-redirect-ok",
"307-internal-redirect-ok",
"307-external-redirect-ok",
"308-internal-redirect-ok",
"308-external-redirect-ok",
]:
self.assert_item_exist(
zim_output, f"website.test.openzim.org/{exising_website_items}"
)
self.assert_item_exist(zim_output, "www.example.com/")
for ignored_website_items in [
"204-response",
"206-response",
"300-response",
"303-response",
"304-response",
"305-response",
"306-response",
"400-response",
"401-response",
"402-response",
"403-response",
"404-response",
"500-response",
"501-response",
"502-response",
"301-internal-redirect-ko",
"301-external-redirect-ko",
"302-internal-redirect-ko",
"302-external-redirect-ko",
"307-internal-redirect-ko",
"307-external-redirect-ko",
"308-internal-redirect-ko",
"308-external-redirect-ko",
]:
self.assert_item_does_not_exist(
zim_output, f"website.test.openzim.org/{ignored_website_items}"
)
def test_bad_redirections(self, tmp_path):
zim_output = "test-bad-redirections.zim"
main(
[
str(TEST_DATA_DIR / "bad-redirections.warc.gz"),
"--output",
str(tmp_path),
"--zim-file",
zim_output,
"--name",
"test-bad-redirections",
]
)
zim_output = tmp_path / zim_output
for exising_website_items in [
"bad-redirections.html",
]:
self.assert_item_exist(
zim_output, f"website.test.openzim.org/{exising_website_items}"
)
for ignored_website_items in [
"/bad-redir-loop-A",
"/bad-redir-loop-B",
"/bad-redir-loop-C",
"/bad-redir-loop-D",
"/bad-redir-target-A",
"/bad-redir-target-B",
]:
self.assert_item_does_not_exist(
zim_output, f"website.test.openzim.org/{ignored_website_items}"
)
def test_content_resource_types(self, tmp_path):
zim_output = "tests_en_content-resource-types.zim"
main(
[
str(TEST_DATA_DIR / "content-resource-types.warc.gz"),
"--output",
str(tmp_path),
"--zim-file",
zim_output,
"--name",
"tests_en_content-resource-types",
]
)
zim_output = tmp_path / zim_output
res = self.get_article(
zim_output, "website.test.openzim.org/content-types/index.html"
)
assert b"<!-- WB Insert -->" in res # simple check that rewriting has been done
for js_file in [
"website.test.openzim.org/content-types/script1.js",
"website.test.openzim.org/content-types/script2.js",
]:
res = self.get_article(zim_output, js_file)
assert b"wombat" in res # simple check that rewriting has been done
def test_content_encoding_aliases(self, tmp_path):
zim_output = "tests_en_qsl.net-encoding-alias.zim"
main(
[
# cannot be processed like other TEST_DATA_DIR warcs since it needs
# special encoding aliases to be used in --encoding-aliases
str(TEST_DATA_SPECIAL_DIR / "qsl.net-encoding-alias.warc.gz"),
"--output",
str(tmp_path),
"--zim-file",
zim_output,
"--encoding-aliases",
"foo=bar,iso-8559-1=iso-8859-1,fii=bor",
"--name",
"tests_en_qsl.net-encoding-alias",
]
)
zim_output = tmp_path / zim_output
res = self.get_article(zim_output, "www.qsl.net/vk2jem/swlogs.htm")
assert b"<!-- WB Insert -->" in res # simple check that rewriting has been done
def test_solidaritenum(self, tmp_path):
zim_output = "solidaritenum.zim"
main(
[
str(TEST_DATA_DIR / "solidaritenum.warc.gz"),
"--url",
"https://www.solidarite-numerique.fr/tutoriels/comprendre-les-cookies/"
"?thematique=internet",
"--output",
str(tmp_path),
"--zim-file",
zim_output,
"--name",
"spt",
]
)
zim_output = tmp_path / zim_output
# test detected language
assert self.get_metadata(zim_output, "Language") == b"fra"
# test detected favicon
zim_favicon = self.get_metadata(zim_output, "Illustration_48x48@1")
assert zim_favicon
# test favicon is the correct one
assert (
self.rebuild_favicon_bytes(
zim_output,
"www.solidarite-numerique.fr/wp-content/themes/snum-v2/images/ico/"
"favicon.svg",
)
== zim_favicon
)