From 54d71bcab54619ef5cba15e6828c3a9508398f05 Mon Sep 17 00:00:00 2001 From: PadishahIII <350717997@qq.com> Date: Sat, 25 May 2024 23:27:34 +0800 Subject: [PATCH] 1.4 --- .gitignore | 1 + README.md | 20 +++- pyproject.toml | 2 +- src/secretscraper/__init__.py | 2 +- src/secretscraper/cmdline.py | 19 +++- src/secretscraper/crawler.py | 9 +- src/secretscraper/entity.py | 3 + src/secretscraper/facade.py | 116 +++++++++++++----------- src/secretscraper/output_formatter.py | 44 +++++++-- src/secretscraper/util.py | 38 ++++++++ tests/local_tests/local_test_crawler.py | 1 + tests/test_facade.py | 19 ++-- tests/test_util.py | 16 +++- 13 files changed, 209 insertions(+), 81 deletions(-) diff --git a/.gitignore b/.gitignore index 369e8e6..ebe127e 100644 --- a/.gitignore +++ b/.gitignore @@ -186,3 +186,4 @@ venv.bak/ ### Dynaconf config **/*.local.yml **/.secrets.yml +*.csv diff --git a/README.md b/README.md index 15bffdb..910b0b0 100644 --- a/README.md +++ b/README.md @@ -91,7 +91,7 @@ Options: 2(thorough) for max_depth=2, default 1 --max-page INTEGER Max page number to crawl, default 100000 --max-depth INTEGER Max depth to crawl, default 1 - -o, --outfile FILE Output result to specified file + -o, --outfile FILE Output result to specified file in csv format -s, --status TEXT Filter response status to display, seperated by commas, e.g. 200,300-400 -x, --proxy TEXT Set proxy, e.g. http://127.0.0.1:8080, @@ -122,9 +122,9 @@ to `--max-depth 2`. By default the normal mode `-m 1` is adopted with max depth secretscraper -u https://scrapeme.live/shop/ -m 2 ``` -#### Write Results to File +#### Write Results to Csv File ```bash -secretscraper -u https://scrapeme.live/shop/ -o result.log +secretscraper -u https://scrapeme.live/shop/ -o result.csv ``` #### Hide Regex Result @@ -138,6 +138,15 @@ secretscraper -u https://scrapeme.live/shop/ -H secretscraper -l ``` +#### Switch to hyperscan +I have implemented the regex matching functionality with both `hyperscan` and `re` module, `re` module is used as default, if you purse higher performance, you can switch to `hyperscan` by changing the `handler_type` to `hyperscan` in `settings.yml`. + +There are some pitfalls of `hyperscan` which you have to take caution to use it: +1. not support regex group: you can not extract content by parentheses. +2. different syntax from `re` + +You'd better write regex separately for the two regex engine. + #### Customize Configuration The built-in config is shown as below. You can assign custom configuration via `-i settings.yml`. ```yaml @@ -145,6 +154,7 @@ verbose: false debug: false loglevel: critical logpath: log +handler_type: re proxy: "" # http://127.0.0.1:7890 max_depth: 1 # 0 for no limit @@ -231,6 +241,10 @@ rules: --- # Change Log +## 2024.5.25 Version 1.4 +- Support csv output +- Set `re` module as regex engine by default +- Support to select regex engine by configuration `handler_type` ## 2024.4.30 Version 1.3.9 - Add `--validate` option: Validate urls after the crawler finish, which helps reduce useless links - Optimize url collector diff --git a/pyproject.toml b/pyproject.toml index ab203a5..1fbf384 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "secretscraper" -version = "1.3.9.4" +version = "1.4" description = "SecretScraper is a web scraper tool that can scrape the content through target websites and extract secret information via regular expression." readme = "README.md" authors = ["Padishah "] diff --git a/src/secretscraper/__init__.py b/src/secretscraper/__init__.py index d2f2795..91d3a4c 100644 --- a/src/secretscraper/__init__.py +++ b/src/secretscraper/__init__.py @@ -1,3 +1,3 @@ """SecretScraper""" -__version__ = "1.3.9.4" +__version__ = "1.4" diff --git a/src/secretscraper/cmdline.py b/src/secretscraper/cmdline.py index f68898e..8f3af55 100644 --- a/src/secretscraper/cmdline.py +++ b/src/secretscraper/cmdline.py @@ -1,10 +1,11 @@ """Command line""" - +import dataclasses import functools import logging import pathlib import click +import dynaconf from click import Context from dynaconf.base import Settings @@ -17,6 +18,13 @@ facade_obj = None +@dataclasses.dataclass +class ExternalEntry: + """Expose objects for external library""" + facade_obj: CrawlerFacade + facade_settings: dynaconf.Dynaconf + + # @click.group(invoke_without_command=True) # @click.pass_context @click.command() @@ -78,7 +86,7 @@ @click.option( "-o", "--outfile", - help="Output result to specified file", + help="Output result to specified file in csv format", type=click.Path( exists=False, file_okay=True, dir_okay=False, path_type=pathlib.Path ), @@ -105,6 +113,10 @@ type=click.Path(exists=True, file_okay=True, dir_okay=True, path_type=pathlib.Path)) def main(**options): """Main commands""" + start(options) + + +def start(options: dict): if options["version"]: click.echo(__version__) exit(0) @@ -138,7 +150,9 @@ def main(**options): else: facade = CrawlerFacade(settings, options_dict, print_func=print_func) facade_obj = facade + ExternalEntry.facade_obj = facade facade_settings = facade.settings + ExternalEntry.facade_settings = facade_settings except FacadeException as e: click.echo(f"Error: {e}") exit(1) @@ -157,6 +171,7 @@ def generate_configuration(file: pathlib.Path): debug: false loglevel: critical logpath: log +handler_type: re proxy: "" # http://127.0.0.1:7890 max_depth: 1 # 0 for no limit diff --git a/src/secretscraper/crawler.py b/src/secretscraper/crawler.py index 4a2d465..eba2300 100644 --- a/src/secretscraper/crawler.py +++ b/src/secretscraper/crawler.py @@ -28,7 +28,7 @@ from .config import settings from .exception import CrawlerException -from .util import Range +from .util import Range, get_response_title logger = logging.getLogger(__name__) @@ -244,7 +244,12 @@ async def process_one(self, url_node: URLNode): response = await self.fetch(url_node.url) if response is not None: # and response.status == 200 url_node.response_status = str(response.status_code) - + url_node.title = get_response_title(response) + try: + url_node.content_length = int(response.headers.get('content-length')) + except Exception: + pass + url_node.content_type = response.headers.get('content-type') response_text: str = response.text # try: # response_text: str = await response.text( diff --git a/src/secretscraper/entity.py b/src/secretscraper/entity.py index 3912fae..4f5d68d 100644 --- a/src/secretscraper/entity.py +++ b/src/secretscraper/entity.py @@ -23,6 +23,9 @@ class URLNode: response_status: str = field(default="Unknown", hash=False, compare=False) depth: int = field(default=0, hash=False, compare=False) parent: typing.Optional["URLNode"] = field(default=None, hash=False, compare=False) + content_length: int = field(hash=False, compare=False, default=-1) + content_type: str = field(hash=False, compare=False, default="") + title: str = field(hash=False, compare=False, default="") def __post_init__(self): if self.parent is not None and self.depth <= self.parent.depth: diff --git a/src/secretscraper/facade.py b/src/secretscraper/facade.py index b1ef02a..e22ec4c 100644 --- a/src/secretscraper/facade.py +++ b/src/secretscraper/facade.py @@ -30,11 +30,12 @@ def print_func(f: typing.IO, func: typing.Callable, content: str, **kwargs) -> None: func(content, **kwargs) - func(content, file=f, **kwargs) + if f is not None: + func(content, file=f, **kwargs) def print_func_colorful( - f: typing.IO, + f: typing.Optional[typing.IO], func: typing.Callable, content: str, fg: str = None, @@ -70,7 +71,7 @@ def __init__( self.custom_settings = custom_settings self.formatter = Formatter() self.hide_regex: bool = False - self.outfile = pathlib.Path(__file__).parent / "crawler.log" + self.outfile: typing.Optional[pathlib.Path] = None # pathlib.Path(__file__).parent / "crawler.log" self.print_func = print_func self.debug: bool = False self.follow_redirects: bool = False @@ -79,57 +80,62 @@ def __init__( def start(self): """Start the crawler and output""" - with self.outfile.open("w") as f: - try: - - # print_func(f"Starting crawler...") - print_func_colorful(f, - self.print_func, - f"Target URLs: {', '.join(self.crawler.start_urls)}", - bold=True, - blink=True, - ) - self.crawler.start() - self.crawler.start_validate() - if self.detail_output: - # print_func_colorful(self.print_func,f"Total page: {self.crawler.total_page}") - f.write(self.formatter.output_url_hierarchy(self.crawler.url_dict, True)) - - if not self.hide_regex: - print_func_colorful(f, self.print_func, - f"{self.formatter.output_secrets(self.crawler.url_secrets)}" - ) - print_func_colorful(f, self.print_func, f"{self.formatter.output_js(self.crawler.js_dict)}") - f.write(self.formatter.output_found_domains(list(self.crawler.found_urls), True)) - else: - # tidy output - # URLs per domain - domains = set() - for url in self.crawler.start_urls: - try: - obj = urlparse(url) - domain, _ = to_host_port(obj.netloc) - if len(domain) > 0: - domains.add(domain.strip()) - except: - pass - f.write(self.formatter.output_url_per_domain(domains, self.crawler.url_dict)) - # JS per domain - f.write(self.formatter.output_url_per_domain(domains, self.crawler.js_dict, "JS")) - # Domains - f.write(self.formatter.output_found_domains(list(self.crawler.found_urls), True)) - # Secrets - if not self.hide_regex: - print_func_colorful(f, self.print_func, - f"{self.formatter.output_secrets(self.crawler.url_secrets)}" - ) - except KeyboardInterrupt: - self.print_func("\nExiting...") - self.crawler.close_all() - except Exception as e: - self.print_func(f"Unexpected error: {e}.\nExiting...") - self.crawler.close_all() - # raise FacadeException from e + # with self.outfile.open("w") as f: + f = None + try: + + # print_func(f"Starting crawler...") + print_func_colorful(f, + self.print_func, + f"Target URLs: {', '.join(self.crawler.start_urls)}", + bold=True, + blink=True, + ) + self.crawler.start() + self.crawler.start_validate() + if self.detail_output: + # print_func_colorful(self.print_func,f"Total page: {self.crawler.total_page}") + self.formatter.output_url_hierarchy(self.crawler.url_dict, True) + + if not self.hide_regex: + print_func_colorful(f, self.print_func, + f"{self.formatter.output_secrets(self.crawler.url_secrets)}" + ) + print_func_colorful(f, self.print_func, f"{self.formatter.output_js(self.crawler.js_dict)}") + self.formatter.output_found_domains(list(self.crawler.found_urls), True) + else: + # tidy output + # URLs per domain + domains = set() + for url in self.crawler.start_urls: + try: + obj = urlparse(url) + domain, _ = to_host_port(obj.netloc) + if len(domain) > 0: + domains.add(domain.strip()) + except: + pass + self.formatter.output_url_per_domain(domains, self.crawler.url_dict) + # JS per domain + self.formatter.output_url_per_domain(domains, self.crawler.js_dict, "JS") + # Domains + self.formatter.output_found_domains(list(self.crawler.found_urls), True) + # Secrets + if not self.hide_regex: + print_func_colorful(f, self.print_func, + f"{self.formatter.output_secrets(self.crawler.url_secrets)}" + ) + if self.outfile is not None: + self.formatter.output_csv(self.outfile, self.crawler.url_dict, self.crawler.url_secrets) + print_func_colorful(None, self.print_func, f"Save result to csv file {self.outfile.name}", fg="green", + bold=True) + except KeyboardInterrupt: + self.print_func("\nExiting...") + self.crawler.close_all() + except Exception as e: + self.print_func(f"Unexpected error: {e}.\nExiting...") + self.crawler.close_all() + # raise FacadeException from e def create_crawler(self) -> Crawler: """Create a Crawler""" @@ -260,7 +266,7 @@ def create_crawler(self) -> Crawler: # Read rules from config file rules: typing.Dict[str, str] = read_rules_from_setting(self.settings) - handler_type = self.settings.get("handler_type", "regex") + handler_type = self.settings.get("handler_type", "re") if handler_type == "hyperscan": handler = get_regex_handler(rules) print_config(f"Using regex handler: Hyperscan") diff --git a/src/secretscraper/output_formatter.py b/src/secretscraper/output_formatter.py index 4862836..ac9f88f 100644 --- a/src/secretscraper/output_formatter.py +++ b/src/secretscraper/output_formatter.py @@ -46,6 +46,8 @@ def format_colorful_status(self, status: str) -> str: return click.style(status, fg="red") def format_normal_result(self, content: str) -> str: + if content == "": + return "" return click.style(content, fg="bright_blue") def filter(self, url: URLNode) -> bool: @@ -67,6 +69,13 @@ def filter(self, url: URLNode) -> bool: return False # default discard return True + def format_single_url(self, url: URLNode) -> str: + return self.format_normal_result(f"{str(url.url)}") \ + + " [" \ + + self.format_colorful_status(url.response_status) \ + + "]" \ + + f" [Content-Length: {self.format_normal_result(str(url.content_length)) if url.content_length > 0 else ''}] [Content-Type: {self.format_normal_result(url.content_type)}] [Title: {self.format_normal_result(url.title)}]" + def output_found_domains( self, found_urls: typing.Iterable[URLNode], is_print: bool = False ) -> str: @@ -93,7 +102,7 @@ def output_url_hierarchy( url_hierarchy = "" for base, urls in url_dict.items(): url_set = { - f"{str(url.url)} [{str(url.response_status)}]" + self.format_single_url(url) for url in urls if self.filter(url) } @@ -104,10 +113,7 @@ def output_url_hierarchy( url_hierarchy = "" for base, urls in url_dict.items(): url_set = { - self.format_normal_result(f"{str(url.url)}") - + " [" - + self.format_colorful_status(url.response_status) - + "]" + self.format_single_url(url) for url in urls if self.filter(url) } @@ -148,10 +154,7 @@ def output_url_per_domain( if urls is None or len(urls) == 0: continue url_set = { - self.format_normal_result(f"{str(url.url)}") - + " [" - + self.format_colorful_status(url.response_status) - + "]" + self.format_single_url(url) for url in urls if self.filter(url) } @@ -227,3 +230,26 @@ def output_local_scan_secrets(self, path_secrets: typing.Dict[pathlib.Path, typi result += s click.echo(s) return result + + def output_csv( + self, + outfile: pathlib.Path, + url_dict: typing.Dict[URLNode, typing.Iterable[URLNode]], + url_secrets: typing.Dict[URLNode, typing.Iterable[Secret]], + + ) -> None: + import csv + with outfile.open("w", encoding='utf-8', errors='replace') as f: + writer = csv.writer(f) + writer.writerow(("URL", "Title", "Response Code", "Content Length", "Content Type", "Secrets")) + url_nodes: typing.Set[URLNode] = set() + for key, urls in url_dict.items(): + url_nodes.add(key) + for url in urls: + url_nodes.add(url) + for url in url_nodes: + row = [url.url, url.title, url.response_status, url.content_length, url.content_type] + if url in url_secrets: + secrets = [f"{secret.type}: {secret.data}" for secret in url_secrets[url]] + row += ['\n'.join(secrets)] + writer.writerow(row) diff --git a/src/secretscraper/util.py b/src/secretscraper/util.py index d92d688..37a766f 100644 --- a/src/secretscraper/util.py +++ b/src/secretscraper/util.py @@ -3,8 +3,12 @@ import re import typing from collections import namedtuple +from pathlib import Path from urllib.parse import urlparse +from threading import Thread +import requests import tldextract +from bs4 import BeautifulSoup # from dynaconf import LazySettings @@ -99,3 +103,37 @@ def is_hyperscan() -> bool: return True except ImportError: return False + + +def get_response_title(response: requests.Response) -> str: + """Get the response title""" + bs = BeautifulSoup(response.text, "html.parser") + titles = list() + for t in bs.find_all('title'): + text = t.get_text() + titles.append(text.replace("\n", " ").replace("\r", " ").strip()) + return "|".join(titles) + + +import http.server + + +def start_local_test_http_server(host: str, port: int, server_dir: Path = None) -> tuple[ + Thread, http.server.HTTPServer]: + """Start local test server""" + + if server_dir is None: + DIR = str( + Path(__file__).parent.parent.parent.joinpath("tests").joinpath("resources").joinpath( + "local_server").absolute()) + else: + DIR = str(server_dir.absolute()) + + class Handler(http.server.SimpleHTTPRequestHandler): + def __init__(self, *args, **kwargs): + super().__init__(*args, directory=DIR, **kwargs) + + httpd = http.server.HTTPServer((host, port), Handler) + thread = Thread(target=httpd.serve_forever) + thread.start() + return thread, httpd diff --git a/tests/local_tests/local_test_crawler.py b/tests/local_tests/local_test_crawler.py index 96815b7..d307539 100644 --- a/tests/local_tests/local_test_crawler.py +++ b/tests/local_tests/local_test_crawler.py @@ -2,6 +2,7 @@ import functools import logging +import typing import aiohttp import pytest diff --git a/tests/test_facade.py b/tests/test_facade.py index 511b53a..ee7f782 100644 --- a/tests/test_facade.py +++ b/tests/test_facade.py @@ -231,13 +231,18 @@ def test_crawler_facade_update_crawler( ) def test_normal_run(clicker: CliRunner, invoke_args: typing.List[str]): - result = clicker.invoke(main, invoke_args) - if result.exception is not None: - logger.exception(result.exception) - raise result.exception - with click.open_file("1.log", "w") as f: - click.echo(result.output, file=f) - print(result) + from secretscraper.util import start_local_test_http_server + thread, httpd = start_local_test_http_server("127.0.0.1", 8888) + try: + result = clicker.invoke(main, invoke_args) + if result.exception is not None: + logger.exception(result.exception) + raise result.exception + with click.open_file("1.log", "w") as f: + click.echo(result.output, file=f) + print(result) + finally: + httpd.shutdown() # @pytest.mark.parametrize( # TODO: cannot copy file in github actions diff --git a/tests/test_util.py b/tests/test_util.py index 8646799..554d68f 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -1,4 +1,6 @@ -from secretscraper.util import read_rules_from_setting +from secretscraper.util import read_rules_from_setting, start_local_test_http_server +import requests +from signal import pthread_kill, SIGKILL from . import settings @@ -6,3 +8,15 @@ def test_read_rules_from_setting(): d = read_rules_from_setting(settings) assert len(d) > 0 + + +def test_start_local_test_http_server(): + thread, httpd = start_local_test_http_server("127.0.0.1", 8888) + res = requests.get(f"http://127.0.0.1:8888/index.html") + try: + assert res.status_code == 200 + except AssertionError as e: + raise e + finally: + httpd.shutdown() + print(1)