Compare commits
49 Commits
@ -1,13 +1,16 @@
|
||||
__date__ = "18 July 2020"
|
||||
__version__ = "0.2.1"
|
||||
__date__ = "4 May 2021"
|
||||
__version__ = "0.5.1"
|
||||
__author__ = "Alexander \"Arav\" Andreev"
|
||||
__email__ = "me@arav.top"
|
||||
__copyright__ = f"Copyright (c) 2020 {__author__} <{__email__}>"
|
||||
__copyright__ = f"Copyright (c) 2020,2021 {__author__} <{__email__}>"
|
||||
__license__ = \
|
||||
"""This program is licensed under the terms of the MIT license.
|
||||
For a copy see COPYING file in a directory of the program, or
|
||||
see <https://opensource.org/licenses/MIT>"""
|
||||
|
||||
|
||||
USER_AGENT = f"ScrapTheChan/{__version__}"
|
||||
|
||||
VERSION = \
|
||||
f"ScrapTheChan ver. {__version__} ({__date__})\n\n{__copyright__}\n"\
|
||||
f"ScrapTheChan ver. {__version__} ({__date__})\n{__copyright__}\n"\
|
||||
f"\n{__license__}"
|
||||
|
@ -1,23 +1,23 @@
|
||||
"""FileInfo object stores all needed information about a file."""
|
||||
"""FileInfo object stores information about a file."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
__all__ = ["FileInfo"]
|
||||
|
||||
|
||||
@dataclass(frozen=True, order=True)
|
||||
class FileInfo:
|
||||
"""Stores all needed information about a file.
|
||||
"""Stores information about a file.
|
||||
|
||||
Arguments:
|
||||
- `name` -- name of a file;
|
||||
- `size` -- size of a file;
|
||||
- `dlurl` -- full download URL for a file;
|
||||
- `hash_value` -- hash sum of a file;
|
||||
- `hash_algo` -- hash algorithm used (e.g. md5).
|
||||
"""
|
||||
def __init__(self, name: str, size: int, dlurl: str,
|
||||
hash_value: str, hash_algo: str) -> None:
|
||||
self.name = name
|
||||
self.size = size
|
||||
self.dlurl = dlurl
|
||||
self.hash_value = hash_value
|
||||
self.hash_algo = hash_algo
|
||||
Fields:
|
||||
- `name` -- name of a file;
|
||||
- `size` -- size of a file;
|
||||
- `download_url` -- full download URL for a file;
|
||||
- `hash_value` -- hash sum of a file;
|
||||
- `hash_algorithm` -- hash algorithm used (e.g. md5).
|
||||
"""
|
||||
name: str
|
||||
size: int
|
||||
download_url: str
|
||||
hash_value: str
|
||||
hash_algorithm: str
|
||||
|
@ -0,0 +1,25 @@
|
||||
from typing import Optional
|
||||
|
||||
from scrapthechan.parsers.tinyboardlike import TinyboardLikeParser
|
||||
|
||||
__all__ = ["EightKunParser"]
|
||||
|
||||
|
||||
class EightKunParser(TinyboardLikeParser):
|
||||
"""JSON parser for 8kun.top image board."""
|
||||
|
||||
def __init__(self, board: str, thread: str,
|
||||
skip_posts: Optional[int] = None) -> None:
|
||||
super().__init__(board, thread, skip_posts)
|
||||
|
||||
@property
|
||||
def imageboard(self) -> str:
|
||||
return "8kun.top"
|
||||
|
||||
@property
|
||||
def json_thread_url(self) -> str:
|
||||
return "https://8kun.top/{board}/res/{thread}.json"
|
||||
|
||||
@property
|
||||
def file_base_url(self) -> str:
|
||||
return "https://media.8kun.top/file_dl/{filename}"
|
@ -1,51 +1,25 @@
|
||||
from re import match
|
||||
from typing import List, Optional
|
||||
from typing import Optional
|
||||
|
||||
from scrapthechan.fileinfo import FileInfo
|
||||
from scrapthechan.parser import Parser
|
||||
from scrapthechan.parsers.tinyboardlike import TinyboardLikeParser
|
||||
|
||||
__all__ = ["FourChanParser"]
|
||||
|
||||
|
||||
class FourChanParser(Parser):
|
||||
class FourChanParser(TinyboardLikeParser):
|
||||
"""JSON parser for 4chan.org image board."""
|
||||
|
||||
__url_thread_json = "https://a.4cdn.org/{board}/thread/{thread}.json"
|
||||
__url_file_link = "https://i.4cdn.org/{board}/{filename}"
|
||||
|
||||
def __init__(self, board: str, thread: str,
|
||||
skip_posts: Optional[int] = None) -> None:
|
||||
posts = self._get_json(self.__url_thread_json.format(board=board, \
|
||||
thread=thread))['posts']
|
||||
super(FourChanParser, self).__init__(board, thread, posts, skip_posts)
|
||||
super().__init__(board, thread, skip_posts)
|
||||
|
||||
@property
|
||||
def imageboard(self) -> str:
|
||||
return "4chan.org"
|
||||
|
||||
@property
|
||||
def op(self) -> Optional[str]:
|
||||
op = ""
|
||||
if 'sub' in self._op_post:
|
||||
op = f"{self._op_post['sub']}\n"
|
||||
if 'com' in self._op_post:
|
||||
op += self._op_post['com']
|
||||
return op if not op == "" else None
|
||||
|
||||
def _parse_post(self, post: dict) -> List[FileInfo]:
|
||||
if not 'tim' in post: return None
|
||||
|
||||
dlfname = f"{post['tim']}{post['ext']}"
|
||||
|
||||
if "filename" in post:
|
||||
if match(post['filename'], r"^image\.\w+$") is None:
|
||||
filename = dlfname
|
||||
else:
|
||||
filename = f"{post['filename']}{post['ext']}"
|
||||
|
||||
# Hash algorithm is hardcoded since it is highly unlikely that it will
|
||||
# be changed in foreseeable future. And if it'll change then this line
|
||||
# will be necessarily updated anyway.
|
||||
return [FileInfo(filename, post['fsize'],
|
||||
self.__url_file_link.format(board=self.board, filename=dlfname),
|
||||
post['md5'], 'md5')]
|
||||
def json_thread_url(self) -> str:
|
||||
return "https://a.4cdn.org/{board}/thread/{thread}.json"
|
||||
|
||||
@property
|
||||
def file_base_url(self) -> str:
|
||||
return "https://i.4cdn.org/{board}/{filename}"
|
||||
|
@ -1,66 +1,25 @@
|
||||
from re import match
|
||||
from typing import List, Optional
|
||||
from typing import Optional
|
||||
|
||||
from scrapthechan.parser import Parser
|
||||
from scrapthechan.fileinfo import FileInfo
|
||||
from scrapthechan.parsers.tinyboardlike import TinyboardLikeParser
|
||||
|
||||
__all__ = ["LainchanParser"]
|
||||
|
||||
|
||||
class LainchanParser(Parser):
|
||||
"""JSON parser for lainchan.org image board.
|
||||
JSON structure is identical to 4chan.org's, so this parser is just inherited
|
||||
from 4chan.org's parser and only needed things are redefined.
|
||||
"""
|
||||
|
||||
__url_thread_json = "https://lainchan.org/{board}/res/{thread}.json"
|
||||
__url_file_link = "https://lainchan.org/{board}/src/{filename}"
|
||||
class LainchanParser(TinyboardLikeParser):
|
||||
"""JSON parser for lainchan.org image board."""
|
||||
|
||||
def __init__(self, board: str, thread: str,
|
||||
skip_posts: Optional[int] = None) -> None:
|
||||
posts = self._get_json(self.__url_thread_json.format(board=board, \
|
||||
thread=thread))['posts']
|
||||
super(LainchanParser, self).__init__(board, thread, posts, skip_posts)
|
||||
super().__init__(board, thread, skip_posts)
|
||||
|
||||
@property
|
||||
def imageboard(self) -> str:
|
||||
return "lainchan.org"
|
||||
|
||||
@property
|
||||
def op(self) -> Optional[str]:
|
||||
op = ""
|
||||
if 'sub' in self._op_post:
|
||||
op = f"{self._op_post['sub']}\n"
|
||||
if 'com' in self._op_post:
|
||||
op += self._op_post['com']
|
||||
return op if not op == "" else None
|
||||
|
||||
def _parse_post(self, post) -> List[FileInfo]:
|
||||
if not 'tim' in post: return None
|
||||
|
||||
dlfname = f"{post['tim']}{post['ext']}"
|
||||
|
||||
if "filename" in post:
|
||||
if match(post['filename'], r"^image\.\w+$") is None:
|
||||
filename = dlfname
|
||||
else:
|
||||
filename = f"{post['filename']}{post['ext']}"
|
||||
|
||||
files = []
|
||||
files.append(FileInfo(filename, post['fsize'],
|
||||
self.__url_file_link.format(board=self.board, filename=dlfname),
|
||||
post['md5'], 'md5'))
|
||||
@property
|
||||
def json_thread_url(self) -> str:
|
||||
return "https://lainchan.org/{board}/res/{thread}.json"
|
||||
|
||||
if "extra_files" in post:
|
||||
for f in post["extra_files"]:
|
||||
dlfname = f"{f['tim']}{f['ext']}"
|
||||
if "filename" in post:
|
||||
if match(post['filename'], r"^image\.\w+$") is None:
|
||||
filename = dlfname
|
||||
else:
|
||||
filename = f"{post['filename']}{post['ext']}"
|
||||
dlurl = self.__url_file_link.format(board=self.board, \
|
||||
filename=dlfname)
|
||||
files.append(FileInfo(filename, f['fsize'], \
|
||||
dlurl, f['md5'], 'md5'))
|
||||
return files
|
||||
@property
|
||||
def file_base_url(self) -> str:
|
||||
return "https://lainchan.org/{board}/src/{filename}"
|
||||
|
@ -0,0 +1,51 @@
|
||||
from re import match
|
||||
from typing import List, Optional
|
||||
|
||||
from scrapthechan.parser import Parser
|
||||
from scrapthechan.fileinfo import FileInfo
|
||||
|
||||
|
||||
__all__ = ["TinyboardLikeParser"]
|
||||
|
||||
|
||||
class TinyboardLikeParser(Parser):
|
||||
"""Base parser for imageboards that are based on Tinyboard, or have similar
|
||||
JSON API."""
|
||||
def __init__(self, board: str, thread: str,
|
||||
skip_posts: Optional[int] = None) -> None:
|
||||
super().__init__(board, thread, skip_posts)
|
||||
|
||||
def _extract_posts_list(self, lst: List) -> List[dict]:
|
||||
return lst['posts']
|
||||
|
||||
def _parse_post(self, post: dict) -> Optional[List[FileInfo]]:
|
||||
if not 'tim' in post: return None
|
||||
|
||||
dlfname = f"{post['tim']}{post['ext']}"
|
||||
|
||||
if "filename" in post:
|
||||
if match(r"^image\.\w+$", post['filename']) is None:
|
||||
filename = dlfname
|
||||
else:
|
||||
filename = f"{post['filename']}{post['ext']}"
|
||||
|
||||
files = []
|
||||
|
||||
files.append(FileInfo(filename, post['fsize'],
|
||||
self.file_base_url.format(board=self.board, filename=dlfname),
|
||||
post['md5'], 'md5'))
|
||||
|
||||
if "extra_files" in post:
|
||||
for f in post["extra_files"]:
|
||||
dlfname = f"{f['tim']}{f['ext']}"
|
||||
if "filename" in post:
|
||||
if match(r"^image\.\w+$", post['filename']) is None:
|
||||
filename = dlfname
|
||||
else:
|
||||
filename = f"{post['filename']}{post['ext']}"
|
||||
dlurl = self.file_base_url.format(board=self.board, \
|
||||
filename=dlfname)
|
||||
files.append(FileInfo(filename, f['fsize'], \
|
||||
dlurl, f['md5'], 'md5'))
|
||||
|
||||
return files
|
@ -1,96 +1,146 @@
|
||||
"""Base Scraper implementation."""
|
||||
"""Base class for all scrapers that will actually do the job."""
|
||||
|
||||
from base64 import b64encode
|
||||
from os import remove, stat
|
||||
from os.path import exists, join, getsize
|
||||
import re
|
||||
from typing import List, Callable
|
||||
from urllib.request import urlretrieve, URLopener
|
||||
from urllib.request import urlretrieve, URLopener, HTTPError, URLError
|
||||
import hashlib
|
||||
from http.client import HTTPException
|
||||
|
||||
from scrapthechan import __version__
|
||||
from scrapthechan import USER_AGENT
|
||||
from scrapthechan.fileinfo import FileInfo
|
||||
|
||||
__all__ = ["Scraper"]
|
||||
|
||||
|
||||
class Scraper:
|
||||
"""Base scraper implementation.
|
||||
|
||||
Arguments:
|
||||
save_directory -- a path to a directory where file will be
|
||||
saved;
|
||||
files -- a list of FileInfo objects;
|
||||
download_progress_callback -- a callback function that will be called
|
||||
for each file started downloading.
|
||||
"""
|
||||
def __init__(self, save_directory: str, files: List[FileInfo],
|
||||
download_progress_callback: Callable[[int], None] = None) -> None:
|
||||
self._save_directory = save_directory
|
||||
self._files = files
|
||||
self._url_opener = URLopener()
|
||||
self._url_opener.version = f"ScrapTheChan/{__version__}"
|
||||
self._progress_callback = download_progress_callback
|
||||
"""Base class for all scrapers that will actually do the job.
|
||||
|
||||
Arguments:
|
||||
save_directory -- a path to a directory where file will be
|
||||
saved;
|
||||
files -- a list of FileInfo objects;
|
||||
download_progress_callback -- a callback function that will be called
|
||||
for each file started downloading.
|
||||
"""
|
||||
def __init__(self, save_directory: str, files: List[FileInfo],
|
||||
download_progress_callback: Callable[[int], None] = None) -> None:
|
||||
self._save_directory = save_directory
|
||||
self._files = files
|
||||
self._url_opener = URLopener()
|
||||
self._url_opener.addheaders = [('User-Agent', USER_AGENT)]
|
||||
self._url_opener.version = USER_AGENT
|
||||
self._progress_callback = download_progress_callback
|
||||
|
||||
def run(self):
|
||||
raise NotImplementedError
|
||||
def run(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def _same_filename(self, filename: str, path: str) -> str:
|
||||
"""Check if there is a file with same name. If so then add incremental
|
||||
number enclosed in brackets to a name of a new one."""
|
||||
newname = filename
|
||||
while exists(join(path, newname)):
|
||||
has_extension = newname.rfind(".") != -1
|
||||
if has_extension:
|
||||
l, r = newname.rsplit(".", 1)
|
||||
lbracket = l.rfind("(")
|
||||
if lbracket == -1:
|
||||
newname = f"{l}(1).{r}"
|
||||
else:
|
||||
num = l[lbracket+1:-1]
|
||||
if num.isnumeric():
|
||||
newname = f"{l[:lbracket]}({int(num)+1}).{r}"
|
||||
else:
|
||||
newname = f"{l}(1).{r}"
|
||||
else:
|
||||
lbracket = l.rfind("(")
|
||||
if lbracket == -1:
|
||||
newname = f"{newname}(1)"
|
||||
else:
|
||||
num = newname[lbracket+1:-1]
|
||||
if num.isnumeric():
|
||||
newname = f"{newname[:lbracket]}({int(num)+1})"
|
||||
return newname
|
||||
def _same_filename(self, filename: str, path: str) -> str:
|
||||
"""Check if there is a file with same name. If so then add incremental
|
||||
number enclosed in brackets to a name of a new one."""
|
||||
newname = filename
|
||||
while exists(join(path, newname)):
|
||||
has_extension = newname.rfind(".") != -1
|
||||
if has_extension:
|
||||
l, r = newname.rsplit(".", 1)
|
||||
lbracket = l.rfind("(")
|
||||
if lbracket == -1:
|
||||
newname = f"{l}(1).{r}"
|
||||
else:
|
||||
num = l[lbracket+1:-1]
|
||||
if num.isnumeric():
|
||||
newname = f"{l[:lbracket]}({int(num)+1}).{r}"
|
||||
else:
|
||||
newname = f"{l}(1).{r}"
|
||||
else:
|
||||
lbracket = l.rfind("(")
|
||||
if lbracket == -1:
|
||||
newname = f"{newname}(1)"
|
||||
else:
|
||||
num = newname[lbracket+1:-1]
|
||||
if num.isnumeric():
|
||||
newname = f"{newname[:lbracket]}({int(num)+1})"
|
||||
return newname
|
||||
|
||||
def _hash_file(self, filename: str, hash_algo: str = "md5",
|
||||
blocksize: int = 1048576) -> (str, str):
|
||||
"""Compute hash of a file."""
|
||||
hash_func = hashlib.new(hash_algo)
|
||||
with open(filename, 'rb') as f:
|
||||
buf = f.read(blocksize)
|
||||
while len(buf) > 0:
|
||||
hash_func.update(buf)
|
||||
buf = f.read(blocksize)
|
||||
return hash_func.hexdigest(), hash_func.digest()
|
||||
def _hash_file(self, filepath: str, hash_algorithm: str = "md5",
|
||||
blocksize: int = 1048576) -> (str, str):
|
||||
"""Compute hash of a file."""
|
||||
if hash_algorithm is None:
|
||||
return None
|
||||
hash_func = hashlib.new(hash_algorithm)
|
||||
with open(filepath, 'rb') as f:
|
||||
buf = f.read(blocksize)
|
||||
while len(buf) > 0:
|
||||
hash_func.update(buf)
|
||||
buf = f.read(blocksize)
|
||||
return hash_func.hexdigest(), b64encode(hash_func.digest()).decode()
|
||||
|
||||
def _is_file_ok(self, f: FileInfo, filepath: str) -> bool:
|
||||
"""Check if a file exist and isn't broken."""
|
||||
if not exists(filepath):
|
||||
return False
|
||||
computed_size = getsize(filepath)
|
||||
is_size_match = f.size == computed_size \
|
||||
or f.size == round(computed_size / 1024)
|
||||
hexdig, dig = self._hash_file(filepath, f.hash_algo)
|
||||
is_hash_match = f.hash_value == hexdig \
|
||||
or f.hash_value == b64encode(dig).decode()
|
||||
return is_size_match and is_hash_match
|
||||
def _check_file(self, f: FileInfo, filepath: str) -> bool:
|
||||
"""Check if a file exist and isn't broken."""
|
||||
if not exists(filepath):
|
||||
return False
|
||||
computed_size = getsize(filepath)
|
||||
if not (f.size == computed_size \
|
||||
or f.size == round(computed_size / 1024)):
|
||||
return False
|
||||
if not f.hash_algorithm is None:
|
||||
hexdig, dig = self._hash_file(filepath, f.hash_algorithm)
|
||||
return f.hash_value == hexdig or f.hash_value == dig
|
||||
return True
|
||||
|
||||
def _download_file(self, f: FileInfo):
|
||||
"""Download a single file."""
|
||||
filepath = join(self._save_directory, f.name)
|
||||
if self._is_file_ok(f, filepath):
|
||||
return True
|
||||
elif exists(filepath):
|
||||
filepath = join(self._save_directory, \
|
||||
self._same_filename(f.name, self._save_directory))
|
||||
self._url_opener.retrieve(f.dlurl, filepath)
|
||||
def _download_file(self, f: FileInfo):
|
||||
"""Download a single file."""
|
||||
is_same_filename = False
|
||||
filepath = join(self._save_directory, f.name)
|
||||
orig_filepath = filepath
|
||||
if self._check_file(f, filepath):
|
||||
return
|
||||
elif exists(filepath):
|
||||
is_same_filename = True
|
||||
filepath = join(self._save_directory, \
|
||||
self._same_filename(f.name, self._save_directory))
|
||||
try:
|
||||
retries = 3
|
||||
while retries > 0:
|
||||
self._url_opener.retrieve(f.download_url, filepath)
|
||||
if not self._check_file(f, filepath):
|
||||
remove(filepath)
|
||||
retries -= 1
|
||||
else:
|
||||
break
|
||||
if retries == 0:
|
||||
print(f"Cannot retrieve {f.download_url}, {filepath}.")
|
||||
return
|
||||
if is_same_filename:
|
||||
_, f1_dig = self._hash_file(orig_filepath, f.hash_algorithm)
|
||||
_, f2_dig = self._hash_file(filepath, f.hash_algorithm)
|
||||
if f1_dig == f2_dig:
|
||||
remove(filepath)
|
||||
except FileNotFoundError as e:
|
||||
print("File Not Found", filepath)
|
||||
except HTTPError as e:
|
||||
print("HTTP Error", e.code, e.reason, f.download_url)
|
||||
if exists(filepath):
|
||||
remove(filepath)
|
||||
except HTTPException:
|
||||
print("HTTP Exception for", f.download_url)
|
||||
if exists(filepath):
|
||||
remove(filepath)
|
||||
except URLError as e:
|
||||
print("URL Error for", f.download_url)
|
||||
if exists(filepath):
|
||||
remove(filepath)
|
||||
except ConnectionResetError:
|
||||
print("Connection reset for", f.download_url)
|
||||
if exists(filepath):
|
||||
remove(filepath)
|
||||
except ConnectionRefusedError:
|
||||
print("Connection refused for", f.download_url)
|
||||
if exists(filepath):
|
||||
remove(filepath)
|
||||
except ConnectionAbortedError:
|
||||
print("Connection aborted for", f.download_url)
|
||||
if exists(filepath):
|
||||
remove(filepath)
|
||||
|
@ -1,15 +0,0 @@
|
||||
"""Implementation of basic sequential one-threaded scraper that downloads
|
||||
files one by one."""
|
||||
|
||||
from scrapthechan.scraper import Scraper
|
||||
|
||||
__all__ = ["BasicScraper"]
|
||||
|
||||
|
||||
class BasicScraper(Scraper):
|
||||
def run(self):
|
||||
"""Download files one by one."""
|
||||
for i, f in enumerate(self._files, start=1):
|
||||
if not self._progress_callback is None:
|
||||
self._progress_callback(i)
|
||||
self._download_file(f)
|
Loading…
Reference in new issue