Source code for scylla.scheduler

import time
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Queue, Process
from threading import Thread

import pyppeteer
import schedule

from scylla.database import ProxyIP
from scylla.jobs import validate_proxy_ip
from scylla.loggings import logger
from scylla.providers import *
from scylla.worker import Worker


[docs]def fetch_ips(q: Queue, validator_queue: Queue): logger.debug('fetch_ips...') worker = Worker() while True: try: provider: BaseProvider = q.get() provider_name = provider.__class__.__name__ logger.debug('Get a provider from the provider queue: ' + provider_name) for url in provider.urls(): html = worker.get_html(url, render_js=provider.should_render_js()) if html: proxies = provider.parse(html) for p in proxies: validator_queue.put(p) logger.debug('Put new proxy ip into queue: {}'.format(p.__str__())) logger.info( ' {}: feed {} potential proxies into the validator queue'.format(provider_name, len(proxies)) ) except (KeyboardInterrupt, InterruptedError, SystemExit): logger.info('worker_process exited.') break except pyppeteer.errors.PyppeteerError as e: logger.debug('pyppeteer.errors.PyppeteerError detected: {}\n'.format(e) + 'Please make sure you have installed all the dependencies for chromium correctly') except Exception as e: worker = Worker() # reset worker logger.warning('Unhandled exception is detected: {}'.format(e))
[docs]def validate_ips(q: Queue, validator_pool: ThreadPoolExecutor): while True: try: proxy: ProxyIP = q.get() validator_pool.submit(validate_proxy_ip, p=proxy) except KeyboardInterrupt: break
[docs]def cron_schedule(scheduler, only_once=False): """ :param scheduler: the Scheduler instance :param only_once: flag for testing """ def feed(): scheduler.feed_providers() # feed providers at the very beginning scheduler.feed_providers() schedule.every(10).minutes.do(feed) logger.info('Start python scheduler') flag = True while flag: try: schedule.run_pending() if only_once: flag = False else: time.sleep(60) except (KeyboardInterrupt, InterruptedError): logger.info('Stopping python scheduler') break
[docs]class Scheduler(object): def __init__(self): self.worker_queue = Queue() self.validator_queue = Queue() self.worker_process = None self.validator_thread = None self.cron_thread = None self.validator_pool = ThreadPoolExecutor(max_workers=20)
[docs] def start(self): """ Start the scheduler with processes for worker (fetching candidate proxies from different providers), and validator threads for checking whether the fetched proxies are able to use. """ logger.info('Scheduler starts...') self.cron_thread = Thread(target=cron_schedule, args=(self,), daemon=True) self.worker_process = Process(target=fetch_ips, args=(self.worker_queue, self.validator_queue)) self.validator_thread = Thread(target=validate_ips, args=(self.validator_queue, self.validator_pool)) self.cron_thread.daemon = True self.worker_process.daemon = True self.validator_thread.daemon = True self.cron_thread.start() self.worker_process.start() # Python will wait for all process finished logger.info('worker_process started') self.validator_thread.start() logger.info('validator_thread started')
[docs] def join(self): """ Wait for worker processes and validator threads """ while (self.worker_process and self.worker_process.is_alive()) or ( self.validator_thread and self.validator_thread.is_alive()): self.worker_process.join() self.validator_thread.join()
[docs] def feed_providers(self): logger.debug('feed {} providers...'.format(len(all_providers))) for provider in all_providers: self.worker_queue.put(provider())
[docs] def stop(self): self.worker_queue.close() self.worker_process.terminate() # self.validator_thread.terminate() # TODO: 'terminate' the thread using a flag self.validator_pool.shutdown(wait=False)