mirror of
https://github.com/mozilla/gecko-dev.git
synced 2024-12-13 18:27:35 +00:00
520 lines
16 KiB
Python
520 lines
16 KiB
Python
#!/usr/bin/env python2.7
|
|
#
|
|
# Copyright 2014 Adobe Systems Incorporated. All Rights Reserved.
|
|
#
|
|
# Adobe permits you to use, modify, and distribute this file in accordance
|
|
# with the terms of the Mozilla Public License, v 2.0 accompanying it. If
|
|
# a copy of the MPL was not distributed with this file, You can obtain one
|
|
# at http://mozilla.org/MPL/2.0/.
|
|
#
|
|
# Creates an Adobe Access signed voucher for x32/x64 windows executables
|
|
# Notes: This is currently python2.7 due to mozilla build system requirements
|
|
|
|
from __future__ import print_function
|
|
|
|
import argparse, bitstring, pprint, hashlib, os, subprocess, sys, tempfile
|
|
from pyasn1.codec.der import encoder as der_encoder
|
|
from pyasn1.type import univ, namedtype, namedval, constraint
|
|
|
|
# Defined in WinNT.h from the Windows SDK
|
|
IMAGE_SCN_MEM_EXECUTE = 0x20000000
|
|
IMAGE_REL_BASED_HIGHLOW = 3
|
|
IMAGE_REL_BASED_DIR64 = 10
|
|
|
|
|
|
# CodeSectionDigest ::= SEQUENCE {
|
|
# offset INTEGER -- section's file offset in the signed binary
|
|
# digestAlgorithm OBJECT IDENTIFIER -- algorithm identifier for the hash value below. For now only supports SHA256.
|
|
# digestValue OCTET STRING -- hash value of the TEXT segment.
|
|
# }
|
|
class CodeSectionDigest(univ.Sequence):
|
|
componentType = namedtype.NamedTypes(
|
|
namedtype.NamedType('offset', univ.Integer()),
|
|
namedtype.NamedType('digestAlgorithm', univ.ObjectIdentifier()),
|
|
namedtype.NamedType('digest', univ.OctetString()))
|
|
|
|
|
|
# CodeSegmentDigest ::= SEQUENCE {
|
|
# offset INTEGER -- TEXT segment's file offset in the signed binary
|
|
# codeSectionDigests SET OF CodeSectionDigests
|
|
# }
|
|
|
|
class SetOfCodeSectionDigest(univ.SetOf):
|
|
componentType = CodeSectionDigest()
|
|
|
|
|
|
class CodeSegmentDigest(univ.Sequence):
|
|
componentType = namedtype.NamedTypes(
|
|
namedtype.NamedType('offset', univ.Integer()),
|
|
namedtype.NamedType('codeSectionDigests', SetOfCodeSectionDigest()))
|
|
|
|
|
|
# ArchitectureDigest ::= SEQUENCE {
|
|
# cpuType ENUMERATED CpuType
|
|
# cpuSubType ENUMERATED CpuSubType
|
|
# CodeSegmentDigests SET OF CodeSegmentDigests
|
|
# }
|
|
class SetOfCodeSegmentDigest(univ.SetOf):
|
|
componentType = CodeSegmentDigest()
|
|
|
|
|
|
class CPUType(univ.Enumerated):
|
|
namedValues = namedval.NamedValues(
|
|
('IMAGE_FILE_MACHINE_I386', 0x14c),
|
|
('IMAGE_FILE_MACHINE_AMD64',0x8664 )
|
|
)
|
|
subtypeSpec = univ.Enumerated.subtypeSpec + \
|
|
constraint.SingleValueConstraint(0x14c, 0x8664)
|
|
|
|
|
|
class CPUSubType(univ.Enumerated):
|
|
namedValues = namedval.NamedValues(
|
|
('IMAGE_UNUSED', 0x0),
|
|
)
|
|
subtypeSpec = univ.Enumerated.subtypeSpec + \
|
|
constraint.SingleValueConstraint(0)
|
|
|
|
|
|
class ArchitectureDigest(univ.Sequence):
|
|
componentType = namedtype.NamedTypes(
|
|
namedtype.NamedType('cpuType', CPUType()),
|
|
namedtype.NamedType('cpuSubType', CPUSubType()),
|
|
namedtype.NamedType('CodeSegmentDigests', SetOfCodeSegmentDigest())
|
|
)
|
|
|
|
|
|
# ApplicationDigest ::= SEQUENCE {
|
|
# version INTEGER
|
|
# digests SET OF ArchitectureDigest
|
|
# }
|
|
class SetOfArchitectureDigest(univ.SetOf):
|
|
componentType = ArchitectureDigest()
|
|
|
|
|
|
class ApplicationDigest(univ.Sequence):
|
|
componentType = namedtype.NamedTypes(
|
|
namedtype.NamedType('version', univ.Integer()),
|
|
namedtype.NamedType('digests', SetOfArchitectureDigest())
|
|
)
|
|
|
|
|
|
def meets_requirements(items, requirements):
|
|
for r in requirements:
|
|
for n, v in r.items():
|
|
if n not in items or items[n] != v: return False
|
|
return True
|
|
|
|
|
|
# return total number of bytes read from items_in excluding leaves
|
|
# TODO: research replacing this with the python built-in struct module
|
|
def parse_items(stream, items_in, items_out):
|
|
bits_read = 0
|
|
total_bits_read = 0
|
|
|
|
for item in items_in:
|
|
name = item[0]
|
|
t = item[1]
|
|
bits = 1 if ":" not in t else int(t[t.index(":") + 1:])
|
|
|
|
if ":" in t and t.find("bytes") >= 0:
|
|
bits = bits * 8
|
|
|
|
if len(item) == 2:
|
|
items_out[name] = stream.read(t)
|
|
bits_read += bits
|
|
total_bits_read += bits
|
|
elif len(item) == 3 or len(item) == 4:
|
|
requirements = list(filter(lambda x: isinstance(x, dict), item[2]))
|
|
sub_items = list(filter(lambda x: isinstance(x, tuple), item[2]))
|
|
|
|
if not meets_requirements(items_out, requirements): continue
|
|
|
|
# has sub-items based on length
|
|
items_out[name] = stream.read(t)
|
|
bits_read += bits
|
|
total_bits_read += bits
|
|
|
|
if len(item) == 4:
|
|
bit_length = items_out[name] * 8
|
|
|
|
if bit_length > 0:
|
|
sub_read, sub_total_read = parse_items(stream, sub_items, items_out)
|
|
bit_length -= sub_read
|
|
total_bits_read += sub_total_read
|
|
|
|
if bit_length > 0:
|
|
items_out[item[3]] = stream.read('bits:' + str(bit_length))
|
|
bits_read += bit_length
|
|
total_bits_read += bit_length
|
|
else:
|
|
raise Exception("unrecognized item" + pprint.pformat(item))
|
|
|
|
return bits_read, total_bits_read
|
|
|
|
|
|
# TODO: perhaps switch to pefile module when it officially supports python3
|
|
class SectionHeader:
|
|
def __init__(self, stream):
|
|
items = [
|
|
('Name', 'bytes:8'),
|
|
('VirtualSize', 'uintle:32'),
|
|
('VirtualAddress', 'uintle:32'),
|
|
('SizeOfRawData', 'uintle:32'),
|
|
('PointerToRawData', 'uintle:32'),
|
|
('PointerToRelocations', 'uintle:32'),
|
|
('PointerToLineNumber', 'uintle:32'),
|
|
('NumberOfRelocations', 'uintle:16'),
|
|
('NumberOfLineNumbers', 'uintle:16'),
|
|
('Characteristics', 'uintle:32')
|
|
]
|
|
self.items = dict()
|
|
self.relocs = dict()
|
|
|
|
_, self.bits_read = parse_items(stream, items, self.items)
|
|
|
|
self.sectionName = self.items['Name'].decode('utf-8')
|
|
self.offset = self.items['PointerToRawData']
|
|
|
|
COFF_DATA_DIRECTORY_TYPES = [
|
|
"Export Table",
|
|
"Import Table",
|
|
"Resource Table",
|
|
"Exception Table",
|
|
"Certificate Tble",
|
|
"Base Relocation Table",
|
|
"Debug",
|
|
"Architecture",
|
|
"Global Ptr",
|
|
"TLS Table",
|
|
"Load Config Table",
|
|
"Bound Import",
|
|
"IAT",
|
|
"Delay Import Descriptor",
|
|
"CLR Runtime Header",
|
|
"Reserved",
|
|
]
|
|
|
|
|
|
def chained_safe_get(obj, names, default=None):
|
|
if obj is None: return default
|
|
|
|
for n in names:
|
|
if n in obj:
|
|
obj = obj[n]
|
|
else:
|
|
return default
|
|
|
|
return obj
|
|
|
|
|
|
class OptionalHeader:
|
|
def __init__(self, stream, size):
|
|
self.items = {}
|
|
items = []
|
|
|
|
if size:
|
|
items += [
|
|
('Magic', 'uintle:16'),
|
|
('MajorLinkerVersion', 'uintle:8'),
|
|
('MinorLinkerVersion', 'uintle:8'),
|
|
('SizeOfCode', 'uintle:32'),
|
|
('SizeOfInitializedData', 'uintle:32'),
|
|
('SizeOfUninitializedData', 'uintle:32'),
|
|
('AddressOfEntryPoint', 'uintle:32'),
|
|
('BaseOfCode', 'uintle:32'),
|
|
]
|
|
|
|
_, self.bits_read = parse_items(stream, items, self.items)
|
|
|
|
items = []
|
|
if self.items['Magic'] == 0x10b: # PE32
|
|
items += [('BaseOfData', 'uintle:32')]
|
|
|
|
address_size = 'uintle:64' if self.items['Magic'] == 0x20b else 'uintle:32'
|
|
|
|
items += [
|
|
('ImageBase', address_size),
|
|
('SectionAlignment', 'uintle:32'),
|
|
('FileAlignment', 'uintle:32'),
|
|
('MajorOperatingSystemVersion', 'uintle:16'),
|
|
('MinorOperatingSystemVersion', 'uintle:16'),
|
|
('MajorImageVersion', 'uintle:16'),
|
|
('MinorImageVersion', 'uintle:16'),
|
|
('MajorSubsystemVersion', 'uintle:16'),
|
|
('MinorSubsystemVersion', 'uintle:16'),
|
|
('Win32VersionValue', 'uintle:32'),
|
|
('SizeOfImage', 'uintle:32'),
|
|
('SizeOfHeaders', 'uintle:32'),
|
|
('CheckSum', 'uintle:32'),
|
|
('Subsystem', 'uintle:16'),
|
|
('DllCharacteristics', 'uintle:16'),
|
|
('SizeOfStackReserve', address_size),
|
|
('SizeOfStackCommit', address_size),
|
|
('SizeOfHeapReserve', address_size),
|
|
('SizeOfHeapCommit', address_size),
|
|
('LoaderFlags', 'uintle:32'),
|
|
('NumberOfRvaAndSizes', 'uintle:32'),
|
|
]
|
|
|
|
if size > 28:
|
|
_, bits_read = parse_items(stream, items, self.items)
|
|
self.bits_read += bits_read
|
|
|
|
if 'NumberOfRvaAndSizes' in self.items:
|
|
index = 0
|
|
self.items['Data Directories'] = dict()
|
|
while self.bits_read / 8 < size:
|
|
d = self.items['Data Directories'][COFF_DATA_DIRECTORY_TYPES[index]] = dict()
|
|
|
|
_, bits_read = parse_items(stream, [('VirtualAddress', 'uintle:32'), ('Size', 'uintle:32')], d)
|
|
self.bits_read += bits_read
|
|
index += 1
|
|
|
|
|
|
class COFFFileHeader:
|
|
def __init__(self, stream):
|
|
self.items = {}
|
|
self.section_headers = []
|
|
|
|
items = [
|
|
('Machine', 'uintle:16'),
|
|
('NumberOfSections', 'uintle:16'),
|
|
('TimeDateStamp', 'uintle:32'),
|
|
('PointerToSymbolTable', 'uintle:32'),
|
|
('NumberOfSymbols', 'uintle:32'),
|
|
('SizeOfOptionalHeader', 'uintle:16'),
|
|
('Characteristics', 'uintle:16')
|
|
]
|
|
_, self.bits_read = parse_items(stream, items, self.items)
|
|
|
|
self.OptionalHeader = OptionalHeader(stream, self.items['SizeOfOptionalHeader'])
|
|
self.bits_read += self.OptionalHeader.bits_read
|
|
|
|
# start reading section headers
|
|
num_sections = self.items['NumberOfSections']
|
|
|
|
while num_sections > 0 :
|
|
section_header = SectionHeader(stream)
|
|
self.bits_read += section_header.bits_read
|
|
self.section_headers.append(section_header)
|
|
num_sections -= 1
|
|
|
|
self.section_headers.sort(key=lambda header: header.offset)
|
|
|
|
# Read Relocations
|
|
self.process_relocs(stream)
|
|
|
|
def process_relocs(self, stream):
|
|
reloc_table = chained_safe_get(self.OptionalHeader.items, ['Data Directories', 'Base Relocation Table'])
|
|
if reloc_table is None: return
|
|
|
|
orig_pos = stream.bitpos
|
|
_, stream.bytepos = self.get_rva_section(reloc_table['VirtualAddress'])
|
|
end_pos = stream.bitpos + reloc_table['Size'] * 8
|
|
|
|
while stream.bitpos < end_pos:
|
|
page_rva = stream.read('uintle:32')
|
|
block_size = stream.read('uintle:32')
|
|
|
|
for i in range(0, int((block_size - 8) / 2)):
|
|
data = stream.read('uintle:16')
|
|
typ = data >> 12
|
|
offset = data & 0xFFF
|
|
|
|
if offset == 0 and i > 0: continue
|
|
|
|
assert(typ == IMAGE_REL_BASED_HIGHLOW or typ == IMAGE_REL_BASED_DIR64)
|
|
|
|
cur_pos = stream.bitpos
|
|
sh, value_bytepos = self.get_rva_section(page_rva + offset)
|
|
stream.bytepos = value_bytepos
|
|
value = stream.read('uintle:32' if typ == IMAGE_REL_BASED_HIGHLOW else 'uintle:64')
|
|
|
|
# remove BaseAddress
|
|
value -= self.OptionalHeader.items['ImageBase']
|
|
|
|
bit_size = (4 if typ == IMAGE_REL_BASED_HIGHLOW else 8) * 8
|
|
stream.overwrite(bitstring.BitArray(uint=value, length=bit_size), pos=value_bytepos * 8)
|
|
stream.pos = cur_pos
|
|
|
|
stream.bitpos = orig_pos
|
|
|
|
def get_rva_section(self, rva):
|
|
for sh in self.section_headers:
|
|
if rva < sh.items['VirtualAddress'] or rva >= sh.items['VirtualAddress'] + sh.items['VirtualSize']:
|
|
continue
|
|
|
|
file_pointer = rva - sh.items['VirtualAddress'] + sh.items['PointerToRawData']
|
|
return sh, file_pointer
|
|
|
|
raise Exception('Could not match RVA to section')
|
|
|
|
|
|
def create_temp_file(suffix=""):
|
|
fd, path = tempfile.mkstemp(suffix=suffix)
|
|
os.close(fd)
|
|
return path
|
|
|
|
|
|
class ExpandPath(argparse.Action):
|
|
def __call__(self, parser, namespace, values, option_string=None):
|
|
setattr(namespace, self.dest, os.path.abspath(os.path.expanduser(values)))
|
|
|
|
|
|
# this does a naming trick since windows doesn't allow multiple usernames for the same server
|
|
def get_password(service_name, user_name):
|
|
try:
|
|
import keyring
|
|
|
|
# windows doesn't allow multiple usernames for the same server, argh
|
|
if sys.platform == "win32":
|
|
password = keyring.get_password(service_name + "-" + user_name, user_name)
|
|
else:
|
|
password = keyring.get_password(service_name, user_name)
|
|
|
|
return password
|
|
except:
|
|
# This allows for manual testing where you do not wish to cache the password on the system
|
|
print("Missing keyring module...getting password manually")
|
|
|
|
return None
|
|
|
|
|
|
def openssl_cmd(app_args, args, password_in, password_out):
|
|
password = get_password(app_args.password_service, app_args.password_user) if (password_in or password_out) else None
|
|
env = None
|
|
args = [app_args.openssl_path] + args
|
|
|
|
if password is not None:
|
|
env = os.environ.copy()
|
|
env["COFF_PW"] = password
|
|
|
|
if password_in: args += ["-passin", "env:COFF_PW"]
|
|
if password_out: args += ["-passout", "env:COFF_PW", "-password", "env:COFF_PW"]
|
|
|
|
p = subprocess.Popen(args, env=env)
|
|
assert p.wait() == 0
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='PE/COFF Signer')
|
|
parser.add_argument('-input', action=ExpandPath, required=True, help="File to parse.")
|
|
parser.add_argument('-output', action=ExpandPath, required=True, help="File to write to.")
|
|
parser.add_argument('-openssl_path', action=ExpandPath, help="Path to OpenSSL to create signed voucher")
|
|
parser.add_argument('-signer_pfx', action=ExpandPath, help="Path to certificate to use to sign voucher. Must contain full certificate chain.")
|
|
parser.add_argument('-password_service', help="Name of Keyring/Wallet service/host")
|
|
parser.add_argument('-password_user', help="Name of Keyring/Wallet user name")
|
|
parser.add_argument('-verbose', action='store_true', help="Verbose output.")
|
|
app_args = parser.parse_args()
|
|
|
|
# to simplify relocation handling we use a mutable BitStream so we can remove
|
|
# the BaseAddress from each relocation
|
|
stream = bitstring.BitStream(filename=app_args.input)
|
|
|
|
# find the COFF header.
|
|
# skip forward past the MSDOS stub header to 0x3c.
|
|
stream.bytepos = 0x3c
|
|
|
|
# read 4 bytes, this is the file offset of the PE signature.
|
|
pe_sig_offset = stream.read('uintle:32')
|
|
stream.bytepos = pe_sig_offset
|
|
|
|
# read 4 bytes, make sure it's a PE signature.
|
|
signature = stream.read('uintle:32')
|
|
if signature != 0x00004550:
|
|
raise Exception("Invalid File")
|
|
|
|
# after signature is the actual COFF file header.
|
|
coff_header = COFFFileHeader(stream)
|
|
|
|
arch_digest = ArchitectureDigest()
|
|
if coff_header.items['Machine'] == 0x14c:
|
|
arch_digest.setComponentByName('cpuType', CPUType('IMAGE_FILE_MACHINE_I386'))
|
|
elif coff_header.items['Machine'] == 0x8664:
|
|
arch_digest.setComponentByName('cpuType', CPUType('IMAGE_FILE_MACHINE_AMD64'))
|
|
|
|
arch_digest.setComponentByName('cpuSubType', CPUSubType('IMAGE_UNUSED'))
|
|
|
|
text_section_headers = list(filter(lambda x: (x.items['Characteristics'] & IMAGE_SCN_MEM_EXECUTE) == IMAGE_SCN_MEM_EXECUTE, coff_header.section_headers))
|
|
|
|
code_segment_digests = SetOfCodeSegmentDigest()
|
|
code_segment_idx = 0
|
|
for code_sect_header in text_section_headers:
|
|
stream.bytepos = code_sect_header.offset
|
|
code_sect_bytes = stream.read('bytes:' + str(code_sect_header.items['VirtualSize']))
|
|
|
|
digester = hashlib.sha256()
|
|
digester.update(code_sect_bytes)
|
|
digest = digester.digest()
|
|
|
|
# with open('segment_' + str(code_sect_header.offset) + ".bin", 'wb') as f:
|
|
# f.write(code_sect_bytes)
|
|
|
|
code_section_digest = CodeSectionDigest()
|
|
code_section_digest.setComponentByName('offset', code_sect_header.offset)
|
|
code_section_digest.setComponentByName('digestAlgorithm', univ.ObjectIdentifier('2.16.840.1.101.3.4.2.1'))
|
|
code_section_digest.setComponentByName('digest', univ.OctetString(digest))
|
|
|
|
set_of_digest = SetOfCodeSectionDigest()
|
|
set_of_digest.setComponentByPosition(0, code_section_digest)
|
|
|
|
codeSegmentDigest = CodeSegmentDigest()
|
|
codeSegmentDigest.setComponentByName('offset', code_sect_header.offset)
|
|
codeSegmentDigest.setComponentByName('codeSectionDigests', set_of_digest)
|
|
|
|
code_segment_digests.setComponentByPosition(code_segment_idx, codeSegmentDigest)
|
|
code_segment_idx += 1
|
|
|
|
arch_digest.setComponentByName('CodeSegmentDigests', code_segment_digests)
|
|
|
|
setOfArchDigests = SetOfArchitectureDigest()
|
|
setOfArchDigests.setComponentByPosition(0, arch_digest)
|
|
|
|
appDigest = ApplicationDigest()
|
|
|
|
appDigest.setComponentByName('version', 1)
|
|
appDigest.setComponentByName('digests', setOfArchDigests)
|
|
|
|
binaryDigest = der_encoder.encode(appDigest)
|
|
|
|
with open(app_args.output, 'wb') as f:
|
|
f.write(binaryDigest)
|
|
|
|
# sign with openssl if specified
|
|
if app_args.openssl_path is not None:
|
|
assert app_args.signer_pfx is not None
|
|
|
|
out_base, out_ext = os.path.splitext(app_args.output)
|
|
signed_path = out_base + ".signed" + out_ext
|
|
|
|
# http://stackoverflow.com/questions/12507277/how-to-fix-unable-to-write-random-state-in-openssl
|
|
temp_files = []
|
|
if sys.platform == "win32" and "RANDFILE" not in os.environ:
|
|
temp_file = create_temp_file()
|
|
temp_files += [temp_file]
|
|
os.environ["RANDFILE"] = temp_file
|
|
|
|
try:
|
|
# create PEM from PFX
|
|
pfx_pem_path = create_temp_file(".pem")
|
|
temp_files += [pfx_pem_path]
|
|
print("Extracting PEM from PFX to:" + pfx_pem_path)
|
|
openssl_cmd(app_args, ["pkcs12", "-in", app_args.signer_pfx, "-out", pfx_pem_path], True, True)
|
|
|
|
# extract CA certs
|
|
pfx_cert_path = create_temp_file(".cert")
|
|
temp_files += [pfx_cert_path]
|
|
print("Extracting cert from PFX to:" + pfx_cert_path)
|
|
openssl_cmd(app_args, ["pkcs12", "-in", app_args.signer_pfx, "-cacerts", "-nokeys", "-out", pfx_cert_path], True, False)
|
|
|
|
# we embed the public keychain for client validation
|
|
openssl_cmd(app_args, ["cms", "-sign", "-nodetach", "-md", "sha256", "-binary", "-in", app_args.output, "-outform", "der", "-out", signed_path, "-signer", pfx_pem_path, "-certfile", pfx_cert_path], True, False)
|
|
finally:
|
|
for t in temp_files:
|
|
if "RANDFILE" in os.environ and t == os.environ["RANDFILE"]:
|
|
del os.environ["RANDFILE"]
|
|
os.unlink(t)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|