Overview

Sneakpeek - is a platform to author, schedule and monitor scrapers in an easy, fast and extensible way. It’s the best choice for scrapers that have some specific complex scraping logic that needs to be run on a constant basis.

Key features

  • Horizontally scalable

  • Robust scraper scheduler and priority task queue

  • Multiple storage implementations to persist scrapers’ configs, tasks, logs, etc.

  • JSON RPC API to manage the platform programmatically

  • Useful UI to manage all of your scrapers

  • Scraper IDE to enable you developing scrapers right in your browser

  • Easily extendable via middleware

Demo

[Here’s a demo project](https://github.com/flulemon/sneakpeek-demo) which uses Sneakpeek framework.

You can also run the demo using Docker:

docker run -it --rm -p 8080:8080 flulemon/sneakpeek-demo

Once it has started head over to http://localhost:8080 to play around with it.

Table of contents

Quick start

So you want to create a new scraper, first you need to make sure you have installed Sneakpeek:

pip install sneakpeek-py

The next step would be implementing scraper logic (or so called scraper handler):

# file: demo_scraper.py

import json
import logging

from pydantic import BaseModel

from sneakpeek.scraper.model import ScraperContextABC, ScraperHandler


# This defines model of handler parameters that are defined
# in the scraper config and then passed to the handler
class DemoScraperParams(BaseModel):
    url: str

# This is a class which actually implements logic
# Note that you need to inherit the implementation from
# the `sneakpeek.scraper_handler.ScraperHandler`
class DemoScraper(ScraperHandler):
    # You can have any dependencies you want and pass them
    # in the server configuration
    def __init__(self) -> None:
        self._logger = logging.getLogger(__name__)

    # Each handler must define its name so it later
    # can be referenced in scrapers' configuration
    @property
    def name(self) -> str:
        return "demo_scraper"

    # Some example function that processes the response
    # and extracts valuable information
    async def process_page(self, response: str):
        ...

    # This function is called by the worker to execute the logic
    # The only argument that is passed is `sneakpeek.scraper_context.ScraperContext`
    # It implements basic async HTTP client and also provides parameters
    # that are defined in the scraper config
    async def run(self, context: ScraperContextABC) -> str:
        params = DemoScraperParams.parse_obj(context.params)
        # Perform GET request to the URL defined in the scraper config
        response = await context.get(params.url)
        response_body = await response.text()

        # Perform some business logic on a response
        result = await self.process_page(response_body)

        # Return meaningful job summary - must return a string
        return json.dumps({
            "processed_urls": 1,
            "found_results": len(result),
        })

Now that we have some scraper logic, let’s make it run periodically. To do so let’s configure SneakpeekServer:

# file: main.py

import random
from uuid import uuid4

from demo.demo_scraper import DemoScraper
from sneakpeek.logging import configure_logging
from sneakpeek.middleware.parser import ParserMiddleware
from sneakpeek.middleware.rate_limiter_middleware import (
    RateLimiterMiddleware,
    RateLimiterMiddlewareConfig,
)
from sneakpeek.middleware.requests_logging_middleware import RequestsLoggingMiddleware
from sneakpeek.middleware.robots_txt_middleware import RobotsTxtMiddleware
from sneakpeek.middleware.user_agent_injecter_middleware import (
    UserAgentInjecterMiddleware,
    UserAgentInjecterMiddlewareConfig,
)
from sneakpeek.queue.in_memory_storage import InMemoryQueueStorage
from sneakpeek.queue.model import TaskPriority
from sneakpeek.scheduler.in_memory_lease_storage import InMemoryLeaseStorage
from sneakpeek.scheduler.model import TaskSchedule
from sneakpeek.scraper.in_memory_storage import InMemoryScraperStorage
from sneakpeek.scraper.model import Scraper
from sneakpeek.server import SneakpeekServer


def get_server(urls: list[str], is_read_only: bool) -> SneakpeekServer:
    handler = DemoScraper()
    return SneakpeekServer.create(
        handlers=[handler],
        scraper_storage=InMemoryScraperStorage([
            Scraper(
                id=str(uuid4()),
                name=f"Demo Scraper",
                schedule=TaskSchedule.EVERY_MINUTE,
                handler=handler.name,
                config=ScraperConfig(params={"start_url": "http://example.com"}),
                schedule_priority=TaskPriority.NORMAL,
            )
        ]),
        queue_storage=InMemoryQueueStorage(),
        lease_storage=InMemoryLeaseStorage(),
        middlewares=[
            RequestsLoggingMiddleware(),
            RobotsTxtMiddleware(),
            RateLimiterMiddleware(RateLimiterMiddlewareConfig(max_rpm=60)),
            UserAgentInjecterMiddleware(
                UserAgentInjecterMiddlewareConfig(use_external_data=False)
            ),
            ParserMiddleware(),
        ],
    )


def main():
    args = parser.parse_args()
    server = get_server(args.urls, args.read_only)
    configure_logging()
    server.serve()


if __name__ == "__main__":
    main()

Now, the only thing is left is to actually run the server:

python3 run main.py

That’s it! Now you can open http://localhost:8080 and explore the UI to see how you scraper is being automatically scheduled and executed.

Local handler debugging

You can easily test handler without running full-featured server. Here’s how you can do that for the DemoScraper that we have developed in the tutorial.

Add import in the beginning of the file:

from sneakpeek.scraper.runner import ScraperRunner

And add the following lines to the end of the file:

async def main():
    result = await ScraperRunner.debug_handler(
        DemoScraper(),
        config=ScraperConfig(
            params=DemoScraperParams(
                start_url="https://www.ycombinator.com/",
                max_pages=20,
            ).dict(),
        ),
        middlewares=[
            RequestsLoggingMiddleware(),
        ],
    )
    logging.info(f"Finished scraper with result: {result}")

if __name__ == "__main__":
    asyncio.run(main())

For the argument ScraperRunner.debug_handler takes:

  1. An instance of your scraper handler

  2. Scraper config

  3. [Optional] Middleware that will be used in the handler (see full list of the middleware here)

Now you can run you handler as an ordinary Python script. Given it’s in demo_scraper.py file you can use:

python3 demo_scraper.py

Design

Sneakpeek has 6 core components:

  • Scrapers storage - stores list of scrapers and its metadata.

  • Tasks queue - populated by the scheduler or user and is consumed by the queue consumers

  • Lease storage - stores lease (global lock) for scheduler, to make sure there’s only 1 active scheduler at all times.

  • Scheduler - schedules periodic tasks using scrapers in the storage

  • Consumer - consumes tasks queue and executes tasks logic (e.g. scraper logic)

  • API - provides JsonRPC API for interacting with the system

All of the components are run by the SneakpeekServer.

Scrapers Storage

Scraper storage interface is defined in sneakpeek.scraper.model.ScraperStorageABC.

  • InMemoryScraperStorage - in-memory storage. Should either be used in development environment or if the list of scrapers is static and wouldn’t be changed.

  • RedisScraperStorage - redis storage.

Tasks queue

Tasks queue consists of three components: * Storage - tasks storage * Storage - queue implementation * Storage - queue consumer implementation

Currently there 2 storage implementations:

  • InMemoryQueueStorage - in-memory storage. Should only be used in development environment.

  • RedisQueueStorage - redis storage.

Lease storage

Lease storage is used by scheduler to ensure that at any point of time there’s no more than 1 active scheduler instance which can enqueue scraper jobs. This disallows concurrent execution of the scraper.

Lease storage interface is defined in LeaseStorageABC.

Currently there 2 storage implementations:

  • InMemoryLeaseStorage - in-memory storage. Should only be used in development environment.

  • RedisLeaseStorage - redis storage.

Scheduler

Scheduler is responsible for:

  • scheduling scrapers based on their configuration.

  • finding scraper jobs that haven’t sent a heartbeat for a while and mark them as dead

  • cleaning up jobs queue from old historical scraper jobs

  • exporting metrics on number of pending jobs in the queue

As for now there’s only one implementation Scheduler that uses APScheduler.

Queue consumer

Consumer constantly tries to dequeue a job and executes dequeued jobs. As for now there’s only one implementation Consumer.

API

Sneakpeek implements:

  • JsonRPC to programmatically interact with the system, it exposes following methods (available at /api/v1/jsonrpc): * CRUD methods to add, modify and delete scrapers * Get list of scraper’s jobs * Enqueue scraper jobs

  • UI that allows you to interact with the system

  • Swagger documentation (available at /api)

  • Copy of this documentation (available at /docs)

Deployment options

There are multiple options how you can deploy your scrapers depending on your requirements:

One replica that does it all

This is a good option if:

  • you can tolerate some downtime

  • you don’t need to host thousands of scrapers that can be dynamically changed by users

  • you don’t care if you lose the information about the scraper jobs

In this case all you need to do is to:

  • define a list of scrapers in the code (just like in the tutorial)

  • use in-memory storage

Using external storage

If you use some external storage (e.g. redis or RDBMS) for jobs queue and lease storage you’ll be able:

  • to scale workers horizontally until queue, storage or scheduler becomes a bottleneck

  • to have a secondary replicas for the scheduler, so when primary dies for some reason there are fallback options

If you also use the external storage as a scrapers storage you’ll be able to dynamically add, delete and update scrapers via UI or JsonRPC API.

Note that each Sneakpeek server by default runs worker, scheduler and API services, but it’s possible to run only one role at the time, therefore you’ll be able to scale services independently.

Middleware

Sneakpeek allows you to run arbitrary code before the request and after the response has been recieved. This can be helpful if you have some common logic you want to use in your scrapers.

There are some plugins that are already implemented:

Rate limiter

Rate limiter implements leaky bucket algorithm to limit number of requests made to the hosts. If the request is rate limited it can either raise an exception or wait until the request won’t be limited anymore.

Configuration of the middleware is defined in RateLimiterMiddlewareConfig.

How to configure middleware for the SneakpeekServer (will be used globally for all requests):

from sneakpeek.middleware.rate_limiter_middleware import RateLimiterMiddleware, RateLimiterMiddlewareConfig

server = SneakpeekServer.create(
    ...
    middleware=[
        RateLimiterMiddleware(
            RateLimiterMiddlewareConfig(
                # maximum number of requests in a given time window
                max_requests = 60,
                # wait until request won't be rate limited
                rate_limited_strategy = RateLimitedStrategy.WAIT
                # only 60 requests per host are allowed within 1 minute
                time_window = timedelta(minute=1),
            )
        )
    ],
)

How to override middleware settings for a given scraper:

from sneakpeek.middleware.rate_limiter_middleware import RateLimiterMiddlewareConfig

scraper = Scraper(
    ...
    config=ScraperConfig(
        ...
        middleware={
            "rate_limiter": RateLimiterMiddlewareConfig(
                # maximum number of requests in a given time window
                max_requests = 120,
                # throw RateLimiterException if request is rate limited
                rate_limited_strategy = RateLimitedStrategy.THROW
                # only 120 requests per host are allowed within 1 minute
                time_window = timedelta(minute=1),
            )
        }
    ),
)
Robots.txt

Robots.txt middleware can log and optionally block requests if they are disallowed by website robots.txt. If robots.txt is unavailable (e.g. request returns 5xx code) all requests will be allowed.

Configuration of the middleware is defined in RobotsTxtMiddlewareConfig.

How to configure middleware for the SneakpeekServer (will be used globally for all requests):

from sneakpeek.middleware.robots_txt_middleware import RobotsTxtMiddleware, RobotsTxtMiddlewareConfig

server = SneakpeekServer.create(
    ...
    middleware=[
        ProxyMiddleware(
            ProxyMiddlewareConfig(
                violation_strategy = RobotsTxtViolationStrategy.THROW,
            )
        )
    ],
)

How to override middleware settings for a given scraper:

from aiohttp import BasicAuth
from sneakpeek.middleware.robots_txt_middleware import RobotsTxtMiddlewareConfig

scraper = Scraper(
    ...
    config=ScraperConfig(
        ...
        middleware={
            "robots_txt": ProxyMiddlewareConfig(
                violation_strategy = RobotsTxtViolationStrategy.LOG,
            )
        }
    ),
)
User Agent injector

This middleware automatically adds User-Agent header if it’s not present. It uses fake-useragent in order to generate fake real world user agents.

Configuration of the middleware is defined in UserAgentInjecterMiddlewareConfig.

How to configure middleware for the SneakpeekServer (will be used globally for all requests):

from sneakpeek.middleware.user_agent_injecter_middleware import UserAgentInjecterMiddleware, UserAgentInjecterMiddlewareConfig

server = SneakpeekServer.create(
    ...
    middleware=[
        UserAgentInjecterMiddleware(
            UserAgentInjecterMiddlewareConfig(
                use_external_data = True,
                browsers = ["chrome", "firefox"],
            )
        )
    ],
)

How to override middleware settings for a given scraper:

from sneakpeek.middleware.user_agent_injecter_middleware import UserAgentInjecterMiddlewareConfig

scraper = Scraper(
    ...
    config=ScraperConfig(
        ...
        middleware={
            "user_agent_injecter": UserAgentInjecterMiddlewareConfig(
                use_external_data = False,
                browsers = ["chrome", "firefox"],
            )
        }
    ),
)
Proxy middleware

Proxy middleware automatically sets proxy arguments for all HTTP requests. Configuration of the middleware is defined in ProxyMiddlewareConfig.

How to configure middleware for the SneakpeekServer (will be used globally for all requests):

from aiohttp import BasicAuth
from sneakpeek.middleware.proxy_middleware import ProxyMiddleware, ProxyMiddlewareConfig

server = SneakpeekServer.create(
    ...
    middleware=[
        ProxyMiddleware(
            ProxyMiddlewareConfig(
                proxy = "http://example.proxy.com:3128",
                proxy_auth = BasicAuth(login="mylogin", password="securepassword"),
            )
        )
    ],
)

How to override middleware settings for a given scraper:

from aiohttp import BasicAuth
from sneakpeek.middleware.proxy_middleware import ProxyMiddlewareConfig

scraper = Scraper(
    ...
    config=ScraperConfig(
        ...
        middleware={
            "proxy": ProxyMiddlewareConfig(
                proxy = "http://example.proxy.com:3128",
                proxy_auth = BasicAuth(login="mylogin", password="securepassword"),
            )
        }
    ),
)
Requests logging middleware

Requests logging middleware logs all requests being made and received responses.

Configuration of the middleware is defined in RequestsLoggingMiddlewareConfig.

How to configure middleware for the SneakpeekServer (will be used globally for all requests):

from sneakpeek.middleware.requests_logging_middleware import RequestsLoggingMiddleware, RequestsLoggingMiddlewareConfig

server = SneakpeekServer.create(
    ...
    middleware=[
        RequestsLoggingMiddleware(
            RequestsLoggingMiddlewareConfig(
                log_request=True,
                log_response=True,
            )
        )
    ],
)

How to override middleware settings for a given scraper:

from sneakpeek.middleware.requests_logging_middleware import RequestsLoggingMiddlewareConfig

scraper = Scraper(
    ...
    config=ScraperConfig(
        ...
        middleware={
            "requests_logging": RequestsLoggingMiddlewareConfig(
                log_request=True,
                log_response=False,
            )
        }
    ),
)
Implementing your own middleware

The interface for middleware is defined in Middleware. There are 3 ways how middleware can be used: 1. Perform custom logic before request is processed (implement on_request method) 2. Perform custom logic before response is returned to the scraper logic (implement on_response method) 3. Provide some additional functionality a for the scraper implementation - scraper can call any middleware method using ScraperContext. Each middleware is added as an attribute to the passed context, so you can call it like context.<middleware_name>.<middleware_method>(...)

Middleware implementation example
On request middleware

Each request is wrapped in the Request class and you can modify its parameters before it’s dispatched, here’s the schema:

@dataclass
class Request:
    method: HttpMethod
    url: str
    headers: HttpHeaders | None = None
    kwargs: dict[str, Any] | None = None

Here’s the example of the middleware which logs each request URL:

import logging
from typing import Any

import aiohttp
from pydantic import BaseModel

from sneakpeek.middlewares.utils import parse_config_from_obj
from sneakpeek.scraper.model import Middleware, Request


# Each middleware can be configured, its configuration can be
# set globally for all requests or it can be overriden for
# specific scrapers
class MyLoggingMiddlewareConfig(BaseModel):
    some_param: str = "defaul value"

class MyMiddleware(BeforeRequestMiddleware):
  """Middleware description"""

  def __init__(self, default_config: MyLoggingMiddlewareConfig | None = None) -> None:
      self._default_config = default_config or MyLoggingMiddlewareConfig()
      self._logger = logging.getLogger(__name__)

  # The name property is mandatory, it's used in scraper config to override
  # middleware configuration for the given scraper
  @property
  def name(self) -> str:
      return "my_middleware"

  async def on_request(self, request: Request, config: Any | None) -> Request:
      # This converts freeform dictionary into a typed config (it's optional)
      config = parse_config_from_obj(
          config,
          self.name,
          MyLoggingMiddlewareConfig,
          self._default_config,
      )
      self._logger.info(f"Making {request.method.upper()} to {request.url}. {config.some_param}")
      return request
On response middleware

On response method recieves both request and response objects. Response is aiohttp.ClientResponse object.

Here’s the example of the middleware which logs each response body:

import logging
from typing import Any

import aiohttp
from pydantic import BaseModel

from sneakpeek.middleware.base import parse_config_from_obj
from sneakpeek.scraper.model import Middleware, Request


# Each middleware can be configured, its configuration can be
# set globally for all requests or it can be overriden for
# specific scrapers
class MyLoggingMiddlewareConfig(BaseModel):
    some_param: str = "defaul value"


class MyOnResponseMiddleware(Middleware):
  """Middleware description"""

  def __init__(self, default_config: MyLoggingMiddlewareConfig | None = None) -> None:
      self._default_config = default_config or MyLoggingMiddlewareConfig()
      self._logger = logging.getLogger(__name__)

  # The name property is mandatory, it's used in scraper config to override
  # middleware configuration for the given scraper
  @property
  def name(self) -> str:
      return "my_middleware"

  async def on_response(
      self,
      request: Request,
      response: aiohttp.ClientResponse,
      config: Any | None,
  ) -> aiohttp.ClientResponse:
      config = parse_config_from_obj(
          config,
          self.name,
          MyLoggingMiddlewareConfig,
          self._default_config,
      )
      response_body = await response.text()
      self._logger.info(f"Made {request.method.upper()} request to {request.url} - received: status={response.status} body={response_body}")
      return response
Functional middleware

If the middleware doesn’t need to interact with the request or response you can derive it from BaseMiddleware, so that both on_request and on_response method are implemented as pass-through.

Here’s an example of such implementation

import logging
from typing import Any

from sneakpeek.middleware.base import parse_config_from_obj, BaseMiddleware


class MyFunctionalMiddleware(BaseMiddleware):
  """Middleware description"""

  def __init__(self) -> None:
      self._logger = logging.getLogger(__name__)

  # The name property is mandatory, it's used in scraper config to override
  # middleware configuration for the given scraper
  @property
  def name(self) -> str:
      return "my_middleware"

  # This function will be available for scrapers by using
  # `context.my_middleware.custom_funct(some_arg)`
  def custom_func(self, arg1: Any) -> Any:
      return do_something(arg1)

API

Indices