Source code for scylla.scheduler
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from multiprocessing import Queue, Process
from threading import Thread
import pyppeteer
import schedule
from scylla.config import get_config
from scylla.database import ProxyIP
from scylla.jobs import validate_proxy_ip
from scylla.loggings import logger
from scylla.providers import *
from scylla.worker import Worker
FEED_FROM_DB_INTERVAL_MINUTES = 30
[docs]def fetch_ips(q: Queue, validator_queue: Queue):
logger.debug('fetch_ips...')
worker = Worker()
while True:
try:
provider: BaseProvider = q.get()
provider_name = provider.__class__.__name__
logger.debug('Get a provider from the provider queue: ' + provider_name)
for url in provider.urls():
html = worker.get_html(url, render_js=provider.should_render_js())
if html:
proxies = provider.parse(html)
for p in proxies:
validator_queue.put(p)
# logger.debug('Put new proxy ip into queue: {}'.format(p.__str__()))
logger.info(
' {}: feed {} potential proxies into the validator queue'.format(provider_name, len(proxies))
)
except (KeyboardInterrupt, InterruptedError, SystemExit):
worker.stop()
logger.info('worker_process exited.')
break
except pyppeteer.errors.PyppeteerError as e:
logger.debug("""pyppeteer.errors.PyppeteerError detected: %s\n
'Please make sure you have installed all the dependencies for chromium correctly""", e)
[docs]def validate_ips(validator_queue: Queue, validator_pool: ThreadPoolExecutor):
while True:
try:
proxy: ProxyIP = validator_queue.get()
validator_pool.submit(validate_proxy_ip, p=proxy)
except (KeyboardInterrupt, SystemExit):
break
[docs]def cron_schedule(scheduler, only_once=False):
"""
:param scheduler: the Scheduler instance
:param only_once: flag for testing
"""
def feed():
scheduler.feed_providers()
def feed_from_db():
# TODO: better query (order by attempts)
proxies = ProxyIP.select().where(ProxyIP.updated_at > datetime.now() - timedelta(days=14))
for p in proxies:
scheduler.validator_queue.put(p)
logger.debug('Feed {} proxies from the database for a second time validation'.format(len(proxies)))
# feed providers at the very beginning
scheduler.feed_providers()
schedule.every(10).minutes.do(feed)
schedule.every(FEED_FROM_DB_INTERVAL_MINUTES).minutes.do(feed_from_db)
logger.info('Start python scheduler')
flag = True
# After 1 minute, try feed_from_db() for the first time
wait_time_for_feed_from_db = 1 if only_once else 60
time.sleep(wait_time_for_feed_from_db)
feed_from_db()
while flag:
try:
schedule.run_pending()
if only_once:
flag = False
else:
time.sleep(60)
except (KeyboardInterrupt, InterruptedError):
logger.info('Stopping python scheduler')
break
[docs]class Scheduler(object):
def __init__(self):
self.worker_queue = Queue()
self.validator_queue = Queue()
self.worker_process = None
self.validator_thread = None
self.cron_thread = None
self.validator_pool = ThreadPoolExecutor(max_workers=int(get_config('validation_pool', default='31')))
[docs] def start(self):
"""
Start the scheduler with processes for worker (fetching candidate proxies from different providers),
and validator threads for checking whether the fetched proxies are able to use.
"""
logger.info('Scheduler starts...')
self.cron_thread = Thread(target=cron_schedule, args=(self,), daemon=True)
self.worker_process = Process(target=fetch_ips, args=(self.worker_queue, self.validator_queue))
self.validator_thread = Thread(target=validate_ips, args=(self.validator_queue, self.validator_pool))
self.cron_thread.daemon = True
self.worker_process.daemon = True
self.validator_thread.daemon = True
self.cron_thread.start()
self.worker_process.start() # Python will wait for all process finished
logger.info('worker_process started')
self.validator_thread.start()
logger.info('validator_thread started')
[docs] def join(self):
"""
Wait for worker processes and validator threads
"""
while (self.worker_process and self.worker_process.is_alive()) or (
self.validator_thread and self.validator_thread.is_alive()):
try:
self.worker_process.join()
self.validator_thread.join()
except (KeyboardInterrupt, SystemExit):
break
[docs] def feed_providers(self):
logger.debug('feed {} providers...'.format(len(all_providers)))
for provider in all_providers:
self.worker_queue.put(provider())
[docs] def stop(self):
self.worker_queue.close()
self.worker_process.terminate()
# self.validator_thread.terminate() # TODO: 'terminate' the thread using a flag
self.validator_pool.shutdown(wait=False)