Compare commits
52 Commits
17 changed files with 551 additions and 331 deletions
@ -1,13 +1,16 @@
|
||||
__date__ = "18 Jule 2020" |
||||
__version__ = "0.2.0" |
||||
__date__ = "4 May 2021" |
||||
__version__ = "0.5.1" |
||||
__author__ = "Alexander \"Arav\" Andreev" |
||||
__email__ = "me@arav.top" |
||||
__copyright__ = f"Copyright (c) 2020 {__author__} <{__email__}>" |
||||
__copyright__ = f"Copyright (c) 2020,2021 {__author__} <{__email__}>" |
||||
__license__ = \ |
||||
"""This program is licensed under the terms of the MIT license. |
||||
For a copy see COPYING file in a directory of the program, or |
||||
see <https://opensource.org/licenses/MIT>""" |
||||
|
||||
|
||||
USER_AGENT = f"ScrapTheChan/{__version__}" |
||||
|
||||
VERSION = \ |
||||
f"ScrapTheChan ver. {__version__} ({__date__})\n\n{__copyright__}\n"\ |
||||
f"ScrapTheChan ver. {__version__} ({__date__})\n{__copyright__}\n"\ |
||||
f"\n{__license__}" |
||||
|
@ -1,23 +1,23 @@
|
||||
"""FileInfo object stores all needed information about a file.""" |
||||
"""FileInfo object stores information about a file.""" |
||||
|
||||
from dataclasses import dataclass |
||||
|
||||
__all__ = ["FileInfo"] |
||||
|
||||
|
||||
@dataclass(frozen=True, order=True) |
||||
class FileInfo: |
||||
"""Stores all needed information about a file. |
||||
"""Stores information about a file. |
||||
|
||||
Arguments: |
||||
- `name` -- name of a file; |
||||
- `size` -- size of a file; |
||||
- `dlurl` -- full download URL for a file; |
||||
- `hash_value` -- hash sum of a file; |
||||
- `hash_algo` -- hash algorithm used (e.g. md5). |
||||
""" |
||||
def __init__(self, name: str, size: int, dlurl: str, |
||||
hash_value: str, hash_algo: str) -> None: |
||||
self.name = name |
||||
self.size = size |
||||
self.dlurl = dlurl |
||||
self.hash_value = hash_value |
||||
self.hash_algo = hash_algo |
||||
Fields: |
||||
- `name` -- name of a file; |
||||
- `size` -- size of a file; |
||||
- `download_url` -- full download URL for a file; |
||||
- `hash_value` -- hash sum of a file; |
||||
- `hash_algorithm` -- hash algorithm used (e.g. md5). |
||||
""" |
||||
name: str |
||||
size: int |
||||
download_url: str |
||||
hash_value: str |
||||
hash_algorithm: str |
||||
|
@ -0,0 +1,25 @@
|
||||
from typing import Optional |
||||
|
||||
from scrapthechan.parsers.tinyboardlike import TinyboardLikeParser |
||||
|
||||
__all__ = ["EightKunParser"] |
||||
|
||||
|
||||
class EightKunParser(TinyboardLikeParser): |
||||
"""JSON parser for 8kun.top image board.""" |
||||
|
||||
def __init__(self, board: str, thread: str, |
||||
skip_posts: Optional[int] = None) -> None: |
||||
super().__init__(board, thread, skip_posts) |
||||
|
||||
@property |
||||
def imageboard(self) -> str: |
||||
return "8kun.top" |
||||
|
||||
@property |
||||
def json_thread_url(self) -> str: |
||||
return "https://8kun.top/{board}/res/{thread}.json" |
||||
|
||||
@property |
||||
def file_base_url(self) -> str: |
||||
return "https://media.8kun.top/file_dl/{filename}" |
@ -1,51 +1,25 @@
|
||||
from re import match |
||||
from typing import List, Optional |
||||
from typing import Optional |
||||
|
||||
from scrapthechan.fileinfo import FileInfo |
||||
from scrapthechan.parser import Parser |
||||
from scrapthechan.parsers.tinyboardlike import TinyboardLikeParser |
||||
|
||||
__all__ = ["FourChanParser"] |
||||
|
||||
|
||||
class FourChanParser(Parser): |
||||
class FourChanParser(TinyboardLikeParser): |
||||
"""JSON parser for 4chan.org image board.""" |
||||
|
||||
__url_thread_json = "https://a.4cdn.org/{board}/thread/{thread}.json" |
||||
__url_file_link = "https://i.4cdn.org/{board}/{filename}" |
||||
|
||||
def __init__(self, board: str, thread: str, |
||||
skip_posts: Optional[int] = None) -> None: |
||||
posts = self._get_json(self.__url_thread_json.format(board=board, \ |
||||
thread=thread))['posts'] |
||||
super(FourChanParser, self).__init__(board, thread, posts, skip_posts) |
||||
super().__init__(board, thread, skip_posts) |
||||
|
||||
@property |
||||
def imageboard(self) -> str: |
||||
return "4chan.org" |
||||
|
||||
@property |
||||
def op(self) -> Optional[str]: |
||||
op = "" |
||||
if 'sub' in self._op_post: |
||||
op = f"{self._op_post['sub']}\n" |
||||
if 'com' in self._op_post: |
||||
op += self._op_post['com'] |
||||
return op if not op == "" else None |
||||
|
||||
def _parse_post(self, post: dict) -> List[FileInfo]: |
||||
if not 'tim' in post: return None |
||||
|
||||
dlfname = f"{post['tim']}{post['ext']}" |
||||
|
||||
if "filename" in post: |
||||
if match(post['filename'], r"^image\.\w+$") is None: |
||||
filename = dlfname |
||||
else: |
||||
filename = f"{post['filename']}{post['ext']}" |
||||
|
||||
# Hash algorithm is hardcoded since it is highly unlikely that it will |
||||
# be changed in foreseeable future. And if it'll change then this line |
||||
# will be necessarily updated anyway. |
||||
return [FileInfo(filename, post['fsize'], |
||||
self.__url_file_link.format(board=self.board, filename=dlfname), |
||||
post['md5'], 'md5')] |
||||
def json_thread_url(self) -> str: |
||||
return "https://a.4cdn.org/{board}/thread/{thread}.json" |
||||
|
||||
@property |
||||
def file_base_url(self) -> str: |
||||
return "https://i.4cdn.org/{board}/{filename}" |
||||
|
@ -1,66 +1,25 @@
|
||||
from re import match |
||||
from typing import List, Optional |
||||
from typing import Optional |
||||
|
||||
from scrapthechan.parser import Parser |
||||
from scrapthechan.fileinfo import FileInfo |
||||
from scrapthechan.parsers.tinyboardlike import TinyboardLikeParser |
||||
|
||||
__all__ = ["LainchanParser"] |
||||
|
||||
|
||||
class LainchanParser(Parser): |
||||
"""JSON parser for lainchan.org image board. |
||||
JSON structure is identical to 4chan.org's, so this parser is just inherited |
||||
from 4chan.org's parser and only needed things are redefined. |
||||
""" |
||||
|
||||
__url_thread_json = "https://lainchan.org/{board}/res/{thread}.json" |
||||
__url_file_link = "https://lainchan.org/{board}/src/{filename}" |
||||
class LainchanParser(TinyboardLikeParser): |
||||
"""JSON parser for lainchan.org image board.""" |
||||
|
||||
def __init__(self, board: str, thread: str, |
||||
skip_posts: Optional[int] = None) -> None: |
||||
posts = self._get_json(self.__url_thread_json.format(board=board, \ |
||||
thread=thread))['posts'] |
||||
super(LainchanParser, self).__init__(board, thread, posts, skip_posts) |
||||
super().__init__(board, thread, skip_posts) |
||||
|
||||
@property |
||||
def imageboard(self) -> str: |
||||
return "lainchan.org" |
||||
|
||||
@property |
||||
def op(self) -> Optional[str]: |
||||
op = "" |
||||
if 'sub' in self._op_post: |
||||
op = f"{self._op_post['sub']}\n" |
||||
if 'com' in self._op_post: |
||||
op += self._op_post['com'] |
||||
return op if not op == "" else None |
||||
|
||||
def _parse_post(self, post) -> List[FileInfo]: |
||||
if not 'tim' in post: return None |
||||
|
||||
dlfname = f"{post['tim']}{post['ext']}" |
||||
|
||||
if "filename" in post: |
||||
if match(post['filename'], r"^image\.\w+$") is None: |
||||
filename = dlfname |
||||
else: |
||||
filename = f"{post['filename']}{post['ext']}" |
||||
|
||||
files = [] |
||||
files.append(FileInfo(filename, post['fsize'], |
||||
self.__url_file_link.format(board=self.board, filename=dlfname), |
||||
post['md5'], 'md5')) |
||||
@property |
||||
def json_thread_url(self) -> str: |
||||
return "https://lainchan.org/{board}/res/{thread}.json" |
||||
|
||||
if "extra_files" in post: |
||||
for f in post["extra_files"]: |
||||
dlfname = f"{f['tim']}{f['ext']}" |
||||
if "filename" in post: |
||||
if match(post['filename'], r"^image\.\w+$") is None: |
||||
filename = dlfname |
||||
else: |
||||
filename = f"{post['filename']}{post['ext']}" |
||||
dlurl = self.__url_file_link.format(board=self.board, \ |
||||
filename=dlfname) |
||||
files.append(FileInfo(filename, f['fsize'], \ |
||||
dlurl, f['md5'], 'md5')) |
||||
return files |
||||
@property |
||||
def file_base_url(self) -> str: |
||||
return "https://lainchan.org/{board}/src/{filename}" |
||||
|
@ -0,0 +1,51 @@
|
||||
from re import match |
||||
from typing import List, Optional |
||||
|
||||
from scrapthechan.parser import Parser |
||||
from scrapthechan.fileinfo import FileInfo |
||||
|
||||
|
||||
__all__ = ["TinyboardLikeParser"] |
||||
|
||||
|
||||
class TinyboardLikeParser(Parser): |
||||
"""Base parser for imageboards that are based on Tinyboard, or have similar |
||||
JSON API.""" |
||||
def __init__(self, board: str, thread: str, |
||||
skip_posts: Optional[int] = None) -> None: |
||||
super().__init__(board, thread, skip_posts) |
||||
|
||||
def _extract_posts_list(self, lst: List) -> List[dict]: |
||||
return lst['posts'] |
||||
|
||||
def _parse_post(self, post: dict) -> Optional[List[FileInfo]]: |
||||
if not 'tim' in post: return None |
||||
|
||||
dlfname = f"{post['tim']}{post['ext']}" |
||||
|
||||
if "filename" in post: |
||||
if match(r"^image\.\w+$", post['filename']) is None: |
||||
filename = dlfname |
||||
else: |
||||
filename = f"{post['filename']}{post['ext']}" |
||||
|
||||
files = [] |
||||
|
||||
files.append(FileInfo(filename, post['fsize'], |
||||
self.file_base_url.format(board=self.board, filename=dlfname), |
||||
post['md5'], 'md5')) |
||||
|
||||
if "extra_files" in post: |
||||
for f in post["extra_files"]: |
||||
dlfname = f"{f['tim']}{f['ext']}" |
||||
if "filename" in post: |
||||
if match(r"^image\.\w+$", post['filename']) is None: |
||||
filename = dlfname |
||||
else: |
||||
filename = f"{post['filename']}{post['ext']}" |
||||
dlurl = self.file_base_url.format(board=self.board, \ |
||||
filename=dlfname) |
||||
files.append(FileInfo(filename, f['fsize'], \ |
||||
dlurl, f['md5'], 'md5')) |
||||
|
||||
return files |
@ -1,96 +1,146 @@
|
||||
"""Base Scraper implementation.""" |
||||
"""Base class for all scrapers that will actually do the job.""" |
||||
|
||||
from base64 import b64encode |
||||
from os import remove, stat |
||||
from os.path import exists, join, getsize |
||||
import re |
||||
from typing import List, Callable |
||||
from urllib.request import urlretrieve, URLopener |
||||
from urllib.request import urlretrieve, URLopener, HTTPError, URLError |
||||
import hashlib |
||||
from http.client import HTTPException |
||||
|
||||
from scrapthechan import __version__ |
||||
from scrapthechan import USER_AGENT |
||||
from scrapthechan.fileinfo import FileInfo |
||||
|
||||
__all__ = ["Scraper"] |
||||
|
||||
|
||||
class Scraper: |
||||
"""Base scraper implementation. |
||||
|
||||
Arguments: |
||||
save_directory -- a path to a directory where file will be |
||||
saved; |
||||
files -- a list of FileInfo objects; |
||||
download_progress_callback -- a callback function that will be called |
||||
for each file started downloading. |
||||
""" |
||||
def __init__(self, save_directory: str, files: List[FileInfo], |
||||
download_progress_callback: Callable[[int], None] = None) -> None: |
||||
self._save_directory = save_directory |
||||
self._files = files |
||||
self._url_opener = URLopener() |
||||
self._url_opener.version = f"ScrapTheChan/{__version__}" |
||||
self._progress_callback = download_progress_callback |
||||
"""Base class for all scrapers that will actually do the job. |
||||
|
||||
Arguments: |
||||
save_directory -- a path to a directory where file will be |
||||
saved; |
||||
files -- a list of FileInfo objects; |
||||
download_progress_callback -- a callback function that will be called |
||||
for each file started downloading. |
||||
""" |
||||
def __init__(self, save_directory: str, files: List[FileInfo], |
||||
download_progress_callback: Callable[[int], None] = None) -> None: |
||||
self._save_directory = save_directory |
||||
self._files = files |
||||
self._url_opener = URLopener() |
||||
self._url_opener.addheaders = [('User-Agent', USER_AGENT)] |
||||
self._url_opener.version = USER_AGENT |
||||
self._progress_callback = download_progress_callback |
||||
|
||||
def run(self): |
||||
raise NotImplementedError |
||||
def run(self): |
||||
raise NotImplementedError |
||||
|
||||
def _same_filename(self, filename: str, path: str) -> str: |
||||
"""Check if there is a file with same name. If so then add incremental |
||||
number enclosed in brackets to a name of a new one.""" |
||||
newname = filename |
||||
while exists(join(path, newname)): |
||||
has_extension = newname.rfind(".") != -1 |
||||
if has_extension: |
||||
l, r = newname.rsplit(".", 1) |
||||
lbracket = l.rfind("(") |
||||
if lbracket == -1: |
||||
newname = f"{l}(1).{r}" |
||||
else: |
||||
num = l[lbracket+1:-1] |
||||
if num.isnumeric(): |
||||
newname = f"{l[:lbracket]}({int(num)+1}).{r}" |
||||
else: |
||||
newname = f"{l}(1).{r}" |
||||
else: |
||||
lbracket = l.rfind("(") |
||||
if lbracket == -1: |
||||
newname = f"{newname}(1)" |
||||
else: |
||||
num = newname[lbracket+1:-1] |
||||
if num.isnumeric(): |
||||
newname = f"{newname[:lbracket]}({int(num)+1})" |
||||
return newname |
||||
def _same_filename(self, filename: str, path: str) -> str: |
||||
"""Check if there is a file with same name. If so then add incremental |
||||
number enclosed in brackets to a name of a new one.""" |
||||
newname = filename |
||||
while exists(join(path, newname)): |
||||
has_extension = newname.rfind(".") != -1 |
||||
if has_extension: |
||||
l, r = newname.rsplit(".", 1) |
||||
lbracket = l.rfind("(") |
||||
if lbracket == -1: |
||||
newname = f"{l}(1).{r}" |
||||
else: |
||||
num = l[lbracket+1:-1] |
||||
if num.isnumeric(): |
||||
newname = f"{l[:lbracket]}({int(num)+1}).{r}" |
||||
else: |
||||
newname = f"{l}(1).{r}" |
||||
else: |
||||
lbracket = l.rfind("(") |
||||
if lbracket == -1: |
||||
newname = f"{newname}(1)" |
||||
else: |
||||
num = newname[lbracket+1:-1] |
||||
if num.isnumeric(): |
||||
newname = f"{newname[:lbracket]}({int(num)+1})" |
||||
return newname |
||||
|
||||
def _hash_file(self, filename: str, hash_algo: str = "md5", |
||||
blocksize: int = 1048576) -> (str, str): |
||||
"""Compute hash of a file.""" |
||||
hash_func = hashlib.new(hash_algo) |
||||
with open(filename, 'rb') as f: |
||||
buf = f.read(blocksize) |
||||
while len(buf) > 0: |
||||
hash_func.update(buf) |
||||
buf = f.read(blocksize) |
||||
return hash_func.hexdigest(), hash_func.digest() |
||||
def _hash_file(self, filepath: str, hash_algorithm: str = "md5", |
||||
blocksize: int = 1048576) -> (str, str): |
||||
"""Compute hash of a file.""" |
||||
if hash_algorithm is None: |
||||
return None |
||||
hash_func = hashlib.new(hash_algorithm) |
||||
with open(filepath, 'rb') as f: |
||||
buf = f.read(blocksize) |
||||
while len(buf) > 0: |
||||
hash_func.update(buf) |
||||
buf = f.read(blocksize) |
||||
return hash_func.hexdigest(), b64encode(hash_func.digest()).decode() |
||||
|
||||
def _is_file_ok(self, f: FileInfo, filepath: str) -> bool: |
||||
"""Check if a file exist and isn't broken.""" |
||||
if not exists(filepath): |
||||
return False |
||||
computed_size = getsize(filepath) |
||||
is_size_match = f.size == computed_size \ |
||||
or f.size == round(computed_size / 1024) |
||||
hexdig, dig = self._hash_file(filepath, f.hash_algo) |
||||
is_hash_match = f.hash_value == hexdig \ |
||||
or f.hash_value == b64encode(dig).decode() |
||||
return is_size_match and is_hash_match |
||||
def _check_file(self, f: FileInfo, filepath: str) -> bool: |
||||
"""Check if a file exist and isn't broken.""" |
||||
if not exists(filepath): |
||||
return False |
||||
computed_size = getsize(filepath) |
||||
if not (f.size == computed_size \ |
||||
or f.size == round(computed_size / 1024)): |
||||
return False |
||||
if not f.hash_algorithm is None: |
||||
hexdig, dig = self._hash_file(filepath, f.hash_algorithm) |
||||
return f.hash_value == hexdig or f.hash_value == dig |
||||
return True |
||||
|
||||
def _download_file(self, f: FileInfo): |
||||
"""Download a single file.""" |
||||
filepath = join(self._save_directory, f.name) |
||||
if self._is_file_ok(f, filepath): |
||||
return True |
||||
elif exists(filepath): |
||||
filepath = join(self._save_directory, \ |
||||
self._same_filename(f.name, self._save_directory)) |
||||
self._url_opener.retrieve(f.dlurl, filepath) |
||||
def _download_file(self, f: FileInfo): |
||||
"""Download a single file.""" |
||||
is_same_filename = False |
||||
filepath = join(self._save_directory, f.name) |
||||
orig_filepath = filepath |
||||
if self._check_file(f, filepath): |
||||
return |
||||
elif exists(filepath): |
||||
is_same_filename = True |
||||
filepath = join(self._save_directory, \ |
||||
self._same_filename(f.name, self._save_directory)) |
||||
try: |
||||
retries = 3 |
||||
while retries > 0: |
||||
self._url_opener.retrieve(f.download_url, filepath) |
||||
if not self._check_file(f, filepath): |
||||
remove(filepath) |
||||
retries -= 1 |
||||
else: |
||||
break |
||||
if retries == 0: |
||||
print(f"Cannot retrieve {f.download_url}, {filepath}.") |
||||
return |
||||
if is_same_filename: |
||||
_, f1_dig = self._hash_file(orig_filepath, f.hash_algorithm) |
||||
_, f2_dig = self._hash_file(filepath, f.hash_algorithm) |
||||
if f1_dig == f2_dig: |
||||
remove(filepath) |
||||
except FileNotFoundError as e: |
||||
print("File Not Found", filepath) |
||||
except HTTPError as e: |
||||
print("HTTP Error", e.code, e.reason, f.download_url) |
||||
if exists(filepath): |
||||
remove(filepath) |
||||
except HTTPException: |
||||
print("HTTP Exception for", f.download_url) |
||||
if exists(filepath): |
||||
remove(filepath) |
||||
except URLError as e: |
||||
print("URL Error for", f.download_url) |
||||
if exists(filepath): |
||||
remove(filepath) |
||||
except ConnectionResetError: |
||||
print("Connection reset for", f.download_url) |
||||
if exists(filepath): |
||||
remove(filepath) |
||||
except ConnectionRefusedError: |
||||
print("Connection refused for", f.download_url) |
||||
if exists(filepath): |
||||
remove(filepath) |
||||
except ConnectionAbortedError: |
||||
print("Connection aborted for", f.download_url) |
||||
if exists(filepath): |
||||
remove(filepath) |
||||
|
@ -1,15 +0,0 @@
|
||||
"""Implementation of basic sequential one-threaded scraper that downloads |
||||
files one by one.""" |
||||
|
||||
from scrapthechan.scraper import Scraper |
||||
|
||||
__all__ = ["BasicScraper"] |
||||
|
||||
|
||||
class BasicScraper(Scraper): |
||||
def run(self): |
||||
"""Download files one by one.""" |
||||
for i, f in enumerate(self._files, start=1): |
||||
if not self._progress_callback is None: |
||||
self._progress_callback(i) |
||||
self._download_file(f) |