# -*- coding: utf-8 -*- """ mirrorcheck command Poll every active mirror URL we have in the database, grab the 'lastsync' file, and record details about how long it took and how up to date the mirror is. If we encounter errors, record those as well. Usage: ./manage.py mirrorcheck """ from collections import deque from datetime import datetime from httplib import HTTPException import logging import os from optparse import make_option from pytz import utc import re import socket import subprocess import sys import time import tempfile from threading import Thread import types from Queue import Queue, Empty import urllib2 from django.core.management.base import NoArgsCommand from django.db import transaction from django.utils.timezone import now from mirrors.models import MirrorUrl, MirrorLog, CheckLocation logging.basicConfig( level=logging.WARNING, format='%(asctime)s -> %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', stream=sys.stderr) logger = logging.getLogger() class Command(NoArgsCommand): option_list = NoArgsCommand.option_list + ( make_option('-t', '--timeout', dest='timeout', type='float', default=10.0, help='Timeout value for connecting to URL'), make_option('-l', '--location', dest='location', type='int', help='ID of CheckLocation object to use for this run'), ) help = "Runs a check on all known mirror URLs to determine their up-to-date status." def handle_noargs(self, **options): v = int(options.get('verbosity', 0)) if v == 0: logger.level = logging.ERROR elif v == 1: logger.level = logging.WARNING elif v == 2: logger.level = logging.DEBUG timeout = options.get('timeout') urls = MirrorUrl.objects.select_related('protocol').filter( mirror__active=True, mirror__public=True) location = options.get('location', None) if location: location = CheckLocation.objects.get(id=location) family = location.family monkeypatch_getaddrinfo(family) if family == socket.AF_INET6: urls = urls.filter(has_ipv6=True) elif family == socket.AF_INET: urls = urls.filter(has_ipv4=True) pool = MirrorCheckPool(urls, location, timeout) pool.run() return 0 def monkeypatch_getaddrinfo(force_family=socket.AF_INET): '''Force the Python socket module to connect over the designated family; e.g. socket.AF_INET or socket.AF_INET6.''' orig = socket.getaddrinfo def wrapper(host, port, family=0, socktype=0, proto=0, flags=0): return orig(host, port, force_family, socktype, proto, flags) socket.getaddrinfo = wrapper def parse_lastsync(log, data): '''lastsync file should be an epoch value created by us.''' try: parsed_time = datetime.utcfromtimestamp(int(data)) log.last_sync = parsed_time.replace(tzinfo=utc) except ValueError: # it is bad news to try logging the lastsync value; # sometimes we get a crazy-encoded web page. # if we couldn't parse a time, this is a failure. log.last_sync = None log.error = "Could not parse time from lastsync" log.is_success = False def check_mirror_url(mirror_url, location, timeout): if location: if location.family == socket.AF_INET6: ipopt = '--ipv6' elif location.family == socket.AF_INET: ipopt = '--ipv4' url = mirror_url.url + 'lastsync' logger.info("checking URL %s", url) log = MirrorLog(url=mirror_url, check_time=now(), location=location) headers = {'User-Agent': 'archweb/1.0'} req = urllib2.Request(url, None, headers) try: start = time.time() result = urllib2.urlopen(req, timeout=timeout) data = result.read() result.close() end = time.time() parse_lastsync(log, data) log.duration = end - start logger.debug("success: %s, %.2f", url, log.duration) except urllib2.HTTPError as e: if e.code == 404: # we have a duration, just not a success end = time.time() log.duration = end - start log.is_success = False log.error = str(e) logger.debug("failed: %s, %s", url, log.error) except urllib2.URLError as e: log.is_success = False log.error = e.reason if isinstance(e.reason, types.StringTypes) and \ re.search(r'550.*No such file', e.reason): # similar to 404 case above, still record duration end = time.time() log.duration = end - start if isinstance(e.reason, socket.timeout): log.error = "Connection timed out." elif isinstance(e.reason, socket.error): log.error = e.reason.args[1] logger.debug("failed: %s, %s", url, log.error) except HTTPException as e: # e.g., BadStatusLine log.is_success = False log.error = "Exception in processing HTTP request." logger.debug("failed: %s, %s", url, log.error) except socket.timeout as e: log.is_success = False log.error = "Connection timed out." logger.debug("failed: %s, %s", url, log.error) except socket.error as e: log.is_success = False log.error = str(e) logger.debug("failed: %s, %s", url, log.error) return log def check_rsync_url(mirror_url, location, timeout): url = mirror_url.url + 'lastsync' logger.info("checking URL %s", url) log = MirrorLog(url=mirror_url, check_time=now(), location=location) tempdir = tempfile.mkdtemp() ipopt = '' if location: if location.family == socket.AF_INET6: ipopt = '--ipv6' elif location.family == socket.AF_INET: ipopt = '--ipv4' lastsync_path = os.path.join(tempdir, 'lastsync') rsync_cmd = ["rsync", "--quiet", "--contimeout=%d" % timeout, "--timeout=%d" % timeout] if ipopt: rsync_cmd.append(ipopt) rsync_cmd.append(url) rsync_cmd.append(lastsync_path) try: with open(os.devnull, 'w') as devnull: logger.debug("rsync cmd: %s", ' '.join(rsync_cmd)) proc = subprocess.Popen(rsync_cmd, stdout=devnull, stderr=subprocess.PIPE) start = time.time() _, errdata = proc.communicate() end = time.time() log.duration = end - start if proc.returncode != 0: logger.debug("error: %s, %s", url, errdata) log.is_success = False log.error = errdata.strip() # look at rsync error code- if we had a command error or timed out, # don't record a duration as it is misleading if proc.returncode in (1, 30, 35): log.duration = None else: logger.debug("success: %s, %.2f", url, log.duration) with open(lastsync_path, 'r') as lastsync: parse_lastsync(log, lastsync.read()) finally: if os.path.exists(lastsync_path): os.unlink(lastsync_path) os.rmdir(tempdir) return log def mirror_url_worker(work, output, location, timeout): while True: try: url = work.get(block=False) try: if url.protocol.protocol == 'rsync': log = check_rsync_url(url, location, timeout) elif (url.protocol.protocol == 'ftp' and location and location.family == socket.AF_INET6): # IPv6 + FTP don't work; skip checking completely log = None else: log = check_mirror_url(url, location, timeout) if log: output.append(log) finally: work.task_done() except Empty: return 0 class MirrorCheckPool(object): def __init__(self, urls, location, timeout=10, num_threads=10): self.tasks = Queue() self.logs = deque() for url in list(urls): self.tasks.put(url) self.threads = [] for i in range(num_threads): thread = Thread(target=mirror_url_worker, args=(self.tasks, self.logs, location, timeout)) thread.daemon = True self.threads.append(thread) @transaction.commit_on_success def run(self): logger.debug("starting threads") for thread in self.threads: thread.start() logger.debug("joining on all threads") self.tasks.join() logger.debug("processing %d log entries", len(self.logs)) MirrorLog.objects.bulk_create(self.logs) logger.debug("log entries saved") # vim: set ts=4 sw=4 et: