# -*- coding: utf-8 -*- """ pgp_import command Import keys and signatures from a given GPG keyring. Usage: ./manage.py pgp_import """ from collections import namedtuple, OrderedDict from datetime import datetime import logging from pytz import utc import subprocess import sys from django.core.management.base import BaseCommand, CommandError from django.db import transaction from devel.models import DeveloperKey, PGPSignature from devel.utils import UserFinder logging.basicConfig( level=logging.INFO, format='%(asctime)s -> %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', stream=sys.stderr) logger = logging.getLogger() class Command(BaseCommand): args = "" help = "Import keys and signatures from a given GPG keyring." def handle(self, *args, **options): v = int(options.get('verbosity', None)) if v == 0: logger.level = logging.ERROR elif v == 1: logger.level = logging.INFO elif v == 2: logger.level = logging.DEBUG if len(args) < 1: raise CommandError("keyring_path must be provided") import_keys(args[0]) import_signatures(args[0]) def get_date(epoch_string): '''Convert a epoch string into a python 'date' object (not datetime).''' if not epoch_string: return None return datetime.utcfromtimestamp(int(epoch_string)).date() def get_datetime(epoch_string): '''Convert a epoch string into a python 'datetime' object.''' if not epoch_string: return None return datetime.utcfromtimestamp(int(epoch_string)).replace(tzinfo=utc) def call_gpg(keyring, *args): # GPG is stupid and interprets any filename without path portion as being # in ~/.gnupg/. Fake it out if we just get a bare filename. if '/' not in keyring: keyring = './%s' % keyring gpg_cmd = ["gpg2", "--no-default-keyring", "--keyring", keyring, "--with-colons", "--fixed-list-mode"] gpg_cmd.extend(args) logger.info("running command: %s", ' '.join(gpg_cmd)) proc = subprocess.Popen(gpg_cmd, stdout=subprocess.PIPE) outdata, errdata = proc.communicate() if proc.returncode != 0: logger.error(errdata) raise subprocess.CalledProcessError(proc.returncode, gpg_cmd) return outdata class KeyData(object): def __init__(self, key, created, expires): self.key = key self.created = get_datetime(created) self.expires = get_datetime(expires) self.parent = None self.revoked = None self.db_id = None def parse_keydata(data): keys = OrderedDict() current_pubkey = None # parse all of the output from our successful GPG command logger.info("parsing command output") node = None for line in data.split('\n'): parts = line.split(':') if parts[0] == 'pub': key = parts[4] current_pubkey = key keys[key] = KeyData(key, parts[5], parts[6]) node = parts[0] elif parts[0] == 'sub': key = parts[4] keys[key] = KeyData(key, parts[5], parts[6]) keys[key].parent = current_pubkey node = parts[0] elif parts[0] == 'uid': node = parts[0] elif parts[0] == 'rev' and node in ('pub', 'sub'): keys[current_pubkey].revoked = get_datetime(parts[5]) return keys def find_key_owner(key, keys, finder): '''Recurse up the chain, looking for an owner.''' if key is None: return None owner = finder.find_by_pgp_key(key.key) if owner: return owner if key.parent: return find_key_owner(keys[key.parent], keys, finder) return None def import_keys(keyring): outdata = call_gpg(keyring, "--list-sigs") keydata = parse_keydata(outdata) logger.info("creating or finding %d keys", len(keydata)) created_ct = updated_ct = 0 with transaction.commit_on_success(): finder = UserFinder() # we are dependent on parents coming before children; parse_keydata # uses an OrderedDict to ensure this is the case. for data in keydata.values(): parent_id = None if data.parent: parent_data = keydata.get(data.parent, None) if parent_data: parent_id = parent_data.db_id other = { 'expires': data.expires, 'revoked': data.revoked, 'parent_id': parent_id, } dkey, created = DeveloperKey.objects.get_or_create( key=data.key, created=data.created, defaults=other) data.db_id = dkey.id # set or update any additional data we might need to needs_save = False if created: created_ct += 1 else: for k, v in other.items(): if getattr(dkey, k) != v: setattr(dkey, k, v) needs_save = True if dkey.owner_id is None: owner = find_key_owner(data, keydata, finder) if owner is not None: dkey.owner = owner needs_save = True if needs_save: dkey.save() updated_ct += 1 key_ct = DeveloperKey.objects.all().count() logger.info("%d total keys in database", key_ct) logger.info("created %d, updated %d keys", created_ct, updated_ct) SignatureData = namedtuple('SignatureData', ('signer', 'signee', 'created', 'expires', 'valid')) def parse_sigdata(data): nodes = {} edges = [] current_pubkey = None # parse all of the output from our successful GPG command logger.info("parsing command output") for line in data.split('\n'): parts = line.split(':') if parts[0] == 'pub': current_pubkey = parts[4] nodes[current_pubkey] = None if parts[0] == 'uid': uid = parts[9] # only set uid if this is the first one encountered if nodes[current_pubkey] is None: nodes[current_pubkey] = uid if parts[0] == 'sig': signer = parts[4] created = get_date(parts[5]) expires = None if parts[6]: expires = get_date(parts[6]) valid = parts[1] != '-' edge = SignatureData(signer, current_pubkey, created, expires, valid) edges.append(edge) return nodes, edges def import_signatures(keyring): outdata = call_gpg(keyring, "--list-sigs") nodes, edges = parse_sigdata(outdata) # now prune the data down to what we actually want. # prune edges not in nodes, remove duplicates, and self-sigs pruned_edges = {edge for edge in edges if edge.signer in nodes and edge.signer != edge.signee} logger.info("creating or finding %d signatures", len(pruned_edges)) created_ct = updated_ct = 0 with transaction.commit_on_success(): for edge in pruned_edges: sig, created = PGPSignature.objects.get_or_create( signer=edge.signer, signee=edge.signee, created=edge.created, expires=edge.expires, defaults={ 'valid': edge.valid }) if sig.valid != edge.valid: sig.valid = edge.valid sig.save() updated_ct = 1 if created: created_ct += 1 sig_ct = PGPSignature.objects.all().count() logger.info("%d total signatures in database", sig_ct) logger.info("created %d, updated %d signatures", created_ct, updated_ct) # vim: set ts=4 sw=4 et: