!22 设备安全等级凭据生成工具提交

Merge pull request !22 from Zen知仁/xzr
This commit is contained in:
openharmony_ci 2022-01-30 09:49:34 +00:00 committed by Gitee
commit f19ce04d4e
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
4 changed files with 635 additions and 4 deletions

View File

@ -13,6 +13,27 @@
import("//build/ohos.gni")
# generate dlsm's cred
action("gen_ohos_dslm_cred_file") {
file_name = "dslm_finger.cfg"
script = rebase_path("dslm_cred_tool.py")
outputs = [ "$target_gen_dir/$file_name" ]
out_dir = rebase_path("$target_gen_dir")
args = [
"create",
"--artifacts-dir=$out_dir/artifacts",
"--field-manufacture=ohos",
"--field-brand=$device_name",
"--field-model=$product_name",
"--field-software-version=3.0.0",
"--field-security-level=SL1",
"--cred-file=$out_dir/$file_name",
"--strict=true",
]
}
# sa lib
ohos_shared_library("dslm_service") {
sources = [ "dslm_ohos_credential.c" ]
@ -23,11 +44,14 @@ ohos_shared_library("dslm_service") {
]
deps = [
":dslm_ohos_cread_obj",
":dslm_ohos_cred_obj",
"//base/security/device_security_level/baselib/utils:utils_static",
"//base/security/device_security_level/services/sa:service_sa_static",
]
# generate the cred file
deps += [ ":dslm_ohos_cred_file" ]
external_deps = [
"hilog_native:libhilog",
"utils_base:utils",
@ -37,7 +61,7 @@ ohos_shared_library("dslm_service") {
subsystem_name = "security"
}
ohos_source_set("dslm_ohos_cread_obj") {
ohos_source_set("dslm_ohos_cred_obj") {
sources = [
"impl/dslm_ohos_request.c",
"impl/dslm_ohos_verify.c",
@ -58,3 +82,13 @@ ohos_source_set("dslm_ohos_cread_obj") {
part_name = "device_security_level"
subsystem_name = "security"
}
ohos_prebuilt_etc("dslm_ohos_cred_file") {
out = get_target_outputs(":gen_ohos_dslm_cred_file")
source = out[0]
deps = [ ":gen_ohos_dslm_cred_file" ]
part_name = "device_security_level"
subsystem_name = "security"
}

View File

@ -0,0 +1,594 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import argparse
import ast
import base64
import errno
import json
import os
import sys
import shutil
import subprocess
import tempfile
from datetime import datetime
CRED_VERSION = '1.0.1'
ATTESTATION_KEY_USERPUBLICKEY = 'userPublicKey'
ATTESTATION_KEY_SIGNATURE = 'signature'
def _message(message, file):
if message and file:
file.write(str(message) + os.linesep)
def _error_message(message, file=sys.stderr):
_message(message, file)
def _info_message(message, file=sys.stdout):
_message(message, file)
def run_command(command, input_data=None):
"""
input:
command: str tuple/list of command and options to compose full cmd
input_data: bytes or None that will be redirect to command
output:
recode: int, non-zero means failure
out: bytes or None, stdout info
err: bytes or None, stderr info
"""
TIMEOUT_SECONDS = 30
try:
proc = subprocess.Popen(
command,
stdin=None if input_data is None else subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
except FileNotFoundError:
return errno.ENOENT, b'', b'executable not found'
try:
if input_data is None:
proc.wait(timeout=TIMEOUT_SECONDS)
out = proc.stdout.read()
err = proc.stderr.read()
else:
out, err = proc.communicate(
input=input_data, timeout=TIMEOUT_SECONDS)
except subprocess.TimeoutExpired:
proc.kill()
return errno.ETIMEDOUT, b'', b'timeout'
return proc.returncode, out, err
class OpenSslWrapperException(Exception):
"""raised when openssl execution failed"""
pass
class OpenSslWrapper:
"""wrapper for openssl:"""
PEM_PERFIX = '-----BEGIN PUBLIC KEY-----'
PEM_SUFFIX = '-----END PUBLIC KEY-----'
def __init__(self, store_dir: str = 'artifacts'):
self.exe = shutil.which('openssl')
if self.exe is None or not os.path.exists(self.exe):
raise OpenSslWrapperException('openssl binary not found')
store_dir = os.path.join(os.getcwd(), store_dir)
if not os.path.exists(store_dir):
os.makedirs(store_dir, 0o700)
self.store_dir = store_dir
def generate_ecc_key_pair(self, key_alias: str, curve: str = 'brainpoolP384r1') -> bool:
sk_file = os.path.join(self.store_dir, '{}.key.pem'.format(key_alias))
pk_file = os.path.join(self.store_dir, '{}.pub.pem'.format(key_alias))
try:
if not os.path.isfile(sk_file):
code, res, err = run_command([self.exe, 'ecparam', '-genkey', '-name', curve, '-out', sk_file])
if code != 0:
raise OpenSslWrapperException('ecparam failed, err: {}'.format(err.decode('utf8')))
if not os.path.isfile(pk_file):
code, res, err = run_command(
[self.exe, 'ec', '-in', sk_file, '-pubout', '-out', pk_file])
if code != 0:
raise OpenSslWrapperException(
'ec failed, err: {}'.format(err.decode('utf8')))
except OpenSslWrapperException:
if os.path.exists(sk_file):
os.remove(sk_file)
if os.path.exists(pk_file):
os.remove(pk_file)
return False
return True
def get_ecc_public_key(self, key_alias: str) -> bytes:
sk_file = os.path.join(self.store_dir, '{}.key.pem'.format(key_alias))
pk_file = os.path.join(self.store_dir, '{}.pub.pem'.format(key_alias))
try:
if not os.path.isfile(pk_file):
code, res, err = run_command([self.exe, 'ec', '-in', sk_file, '-pubout', '-out', pk_file])
if code != 0:
raise OpenSslWrapperException('ec failed, err: {}'.format(err.decode('utf8')))
with open(pk_file) as fp:
pem_str = fp.read().replace(OpenSslWrapper.PEM_PERFIX, '')
pem_str = pem_str.replace(OpenSslWrapper.PEM_SUFFIX, '')
pem_str = pem_str.replace(os.linesep, '')
return base64.b64decode(pem_str)
except OpenSslWrapperException as ex:
if os.path.exists(sk_file):
os.remove(sk_file)
if os.path.exists(pk_file):
os.remove(pk_file)
raise ex
def digest_sign(self, key_alias: str, content: bytes, digest: str = '-sha384') -> bytes:
sk_file = os.path.join(self.store_dir, '{}.key.pem'.format(key_alias))
if not os.path.isfile(sk_file):
raise OpenSslWrapperException('path not exists')
code, res, err = run_command(
[self.exe, 'dgst', digest, '-sign', sk_file, ], content
)
if code != 0:
raise OpenSslWrapperException('dgst failed, err: {}'.format(err.decode('utf8')))
return res
def digest_verify_with_key_alias(self, key_alias: str, content: bytes, sig: bytes, digest: str = '-sha384') -> bool:
pk_file = os.path.join(self.store_dir, '{}.pub.pem'.format(key_alias))
if not os.path.isfile(pk_file):
raise OpenSslWrapperException('path not exists')
try:
with tempfile.NamedTemporaryFile('wb', dir=self.store_dir, delete=False) as signature_file:
signature_file.write(sig)
signature_file.close()
code, res, err = run_command(
[self.exe, 'dgst', digest, '-verify', pk_file, '-signature', signature_file.name, ], content
)
finally:
os.remove(signature_file.name)
return True if code == 0 else False
def digest_verify_with_pub_key(self, pub_key: bytes, content: bytes, sig: bytes, digest: str = '-sha384') -> bool:
try:
with tempfile.NamedTemporaryFile('wb+', dir=self.store_dir, delete=False) as pubkey_file:
# create an temp pubkey pem file
pub_key_base64 = base64.b64encode(pub_key).decode('utf8')
pub_key = '{}{}{}{}{}{}'.format(
OpenSslWrapper.PEM_PERFIX, os.linesep,
pub_key_base64, os.linesep,
OpenSslWrapper.PEM_SUFFIX, os.linesep
)
pubkey_file.write(pub_key.encode('utf8'))
pubkey_file.close()
with tempfile.NamedTemporaryFile('wb', dir=self.store_dir, delete=False) as signature_file:
# create an temp signature file
signature_file.write(sig)
signature_file.close()
code, res, err = run_command(
[self.exe, 'dgst', digest, '-verify', pubkey_file.name, '-signature', signature_file.name, ], content
)
finally:
os.remove(pubkey_file.name)
os.remove(signature_file.name)
return True if code == 0 else False
class CredInitializationException(Exception):
"""raised when CredInitialization execution failed"""
pass
class CredInitialization:
KEY_ALIAS_ROOT = 'root'
KEY_ALIAS_OEM = 'oem'
KEY_ALIAS_DEVICE = 'device'
def __init__(self, store_dir: str):
self.ssl = OpenSslWrapper(store_dir)
pass
def process(self):
ssl = self.ssl
try:
success = ssl.generate_ecc_key_pair(CredInitialization.KEY_ALIAS_ROOT)
if not success:
raise CredInitializationException('generate_ecc_key_pair root error')
success = ssl.generate_ecc_key_pair(CredInitialization.KEY_ALIAS_OEM)
if not success:
raise CredInitializationException('generate_ecc_key_pair oem error')
success = ssl.generate_ecc_key_pair(CredInitialization.KEY_ALIAS_DEVICE)
if not success:
raise CredInitializationException('generate_ecc_key_pair device error')
except CredInitializationException:
_error_message('cred init failed')
return
class CredCreatationException(Exception):
"""raised when CredInitialization execution failed"""
pass
class CredCreatation:
def __init__(self, store_dir: str, file: str, payload: dict):
self.ssl = OpenSslWrapper(store_dir)
self.payload = payload
self.file = file
pass
def _gene_head(self):
head = {"typ": "DSL"}
return base64.b64encode(json.dumps(head, ensure_ascii=True).encode('utf8')).decode('utf8')
def _gene_atteastation(self, root_pk: str, root_sign: str,
oem_pk: str, oem_sign: str,
device_pk: str, device_sign: str):
data = [
{
ATTESTATION_KEY_USERPUBLICKEY: device_pk,
ATTESTATION_KEY_SIGNATURE: device_sign
},
{
ATTESTATION_KEY_USERPUBLICKEY: oem_pk,
ATTESTATION_KEY_SIGNATURE: oem_sign
},
{
ATTESTATION_KEY_USERPUBLICKEY: root_pk,
ATTESTATION_KEY_SIGNATURE: root_sign
},
]
return base64.b64encode(json.dumps(data, ensure_ascii=True).encode('utf8')).decode('utf8')
def _gene_payload(self):
# add sign time
self.payload['signTime'] = datetime.now().strftime('%Y%m%d%H%M%S')
self.payload['version'] = CRED_VERSION
return base64.b64encode(json.dumps(self.payload, ensure_ascii=True).encode('utf8')).decode('utf8')
def process(self):
ssl = self.ssl
try:
# root self signed
root_pub_bytes = ssl.get_ecc_public_key(CredInitialization.KEY_ALIAS_ROOT)
root_pub_self_signed_bytes = ssl.digest_sign(CredInitialization.KEY_ALIAS_ROOT, root_pub_bytes)
root_pub_str = base64.b64encode(root_pub_bytes).decode('utf8')
root_pub_self_sign_str = base64.b64encode(root_pub_self_signed_bytes).decode('utf8')
# oem signed by root
oem_pub_bytes = ssl.get_ecc_public_key(CredInitialization.KEY_ALIAS_OEM)
oem_pub_signed_bytes = ssl.digest_sign(CredInitialization.KEY_ALIAS_ROOT, oem_pub_bytes)
oem_pub_signed_str = base64.b64encode(oem_pub_signed_bytes).decode('utf8')
oem_pub_str = base64.b64encode(oem_pub_bytes).decode('utf8')
# device signed by oem
device_pub_bytes = ssl.get_ecc_public_key(CredInitialization.KEY_ALIAS_DEVICE)
device_pub_signed_bytes = ssl.digest_sign(CredInitialization.KEY_ALIAS_OEM, device_pub_bytes)
device_pub_signed_str = base64.b64encode(device_pub_signed_bytes).decode('utf8')
device_pub_str = base64.b64encode(device_pub_bytes).decode('utf8')
attestation = self._gene_atteastation(root_pub_str, root_pub_self_sign_str,
oem_pub_str, oem_pub_signed_str,
device_pub_str, device_pub_signed_str)
head = self._gene_head()
payload = self._gene_payload()
head_payload = '{}.{}'.format(head, payload)
head_payload_signed_bytes = ssl.digest_sign(
CredInitialization.KEY_ALIAS_DEVICE, head_payload.encode('utf8'))
head_payload_signed_string = base64.b64encode(head_payload_signed_bytes).decode('utf8')
cred = '{}.{}.{}'.format(head_payload, head_payload_signed_string, attestation)
except (CredCreatationException, OpenSslWrapperException):
_error_message('cred create failed, please init first')
return
with open(self.file, 'w') as fp:
fp.write(cred)
class CredVerificationException(Exception):
"""raised when CredVerification execution failed"""
pass
class CredVerification:
def __init__(self, store_dir: str, file: str):
self.ssl = OpenSslWrapper(store_dir)
self.file = file
pass
def process(self):
try:
head, payload, signature, attestation = self._split_file(self.file)
self._check_head(head)
self._check_payload(payload)
self._check_signature(signature)
self._check_attestation(attestation, '{}.{}'.format(head, payload), signature)
except CredVerificationException as ex:
_error_message(ex)
return
_info_message('verify success!')
def _check_head(self, header: str):
header_str = self._base64decode(header).decode('utf8')
header_obj = ast.literal_eval(header_str)
if header_obj['typ'] != 'DSL':
raise CredVerificationException('head error')
_info_message('head:')
_info_message(json.dumps(header_obj, indent=2))
def _check_payload(self, payload: str):
payload_str = self._base64decode(payload).decode('utf8')
_info_message('payload:')
_info_message(json.dumps(json.loads(payload_str), indent=2))
def _check_signature(self, signature: str):
self._base64decode(signature)
def _check_attestation(self, attestation, payload, payload_sign):
ATTES_PARA_LEN = 3
atts_str = self._base64decode(attestation).decode('utf8')
attes_obj = json.loads(atts_str)
if (len(attes_obj) != ATTES_PARA_LEN):
raise CredVerificationException('attes para error')
ssl = self.ssl
device, oem, root = attes_obj
root_pk_bytes = self._base64decode(root[ATTESTATION_KEY_USERPUBLICKEY])
root_self_sign_bytes = self._base64decode(root[ATTESTATION_KEY_SIGNATURE])
verify = ssl.digest_verify_with_pub_key(root_pk_bytes, root_pk_bytes, root_self_sign_bytes, '-sha384')
if not verify:
raise CredVerificationException('root_self_sign verify error')
oem_pk_bytes = self._base64decode(oem[ATTESTATION_KEY_USERPUBLICKEY])
oem_sign_bytes = self._base64decode(oem[ATTESTATION_KEY_SIGNATURE])
verify = ssl.digest_verify_with_pub_key(root_pk_bytes, oem_pk_bytes, oem_sign_bytes, '-sha384')
if not verify:
raise CredVerificationException('oem_sign verify error')
device_pk_bytes = self._base64decode(device[ATTESTATION_KEY_USERPUBLICKEY])
device_sign_bytes = self._base64decode(device[ATTESTATION_KEY_SIGNATURE])
verify = ssl.digest_verify_with_pub_key(oem_pk_bytes, device_pk_bytes, device_sign_bytes, '-sha384')
if not verify:
raise CredVerificationException('device_sign verify error')
payload_sign_bytes = self._base64decode(payload_sign)
verify = ssl.digest_verify_with_pub_key(device_pk_bytes, payload.encode('utf8'),
payload_sign_bytes, '-sha384')
if not verify:
verify = ssl.digest_verify_with_pub_key(device_pk_bytes, payload.encode('utf8'),
payload_sign_bytes, '-sha256')
if not verify:
raise CredVerificationException('payload verify error')
def _split_file(self, cred_file_name: str):
out = self._get_file_content(cred_file_name).split('.')
CRED_PARA_LEN = 4
if (len(out) != CRED_PARA_LEN):
raise CredVerificationException("cred para error")
return out
def _get_file_content(self, file_path: str):
if not os.path.isfile(file_path):
raise CredVerificationException('file {} is not existed'.format(file_path))
with open(file_path, 'r') as fp:
return fp.read().strip()
def _base64decode(self, content: str):
return base64.urlsafe_b64decode(content + '=' * (4 - len(content) % 4))
def init_creds(input_args: argparse.Namespace):
aciton = CredInitialization(input_args.dir)
aciton.process()
def create_creds(input_args):
payload = {k: v for k, v in input_args.__dict__.items() if k not in ['dir', 'cred', 'process', 'strict'] and v}
if input_args.strict:
init_creds(input_args)
aciton = CredCreatation(input_args.dir, input_args.cred, payload)
aciton.process()
if input_args.strict:
shutil.rmtree(input_args.dir)
def verify_creds(input_args):
aciton = CredVerification(input_args.dir, input_args.cred)
aciton.process()
class CredCommand:
def _setup_arguments(self, subparsers, config: dict):
action = config.get('action')
arguments = config.get('arguments')
parser = subparsers.add_parser(action.get('name'), help=action.get('help'))
parser.set_defaults(process=action.get('process'))
for item, arg in arguments.items():
parser.add_argument(*arg.get('name'), dest=item,
metavar=arg.get('metavar'), help=arg.get('help'),
required=arg.get('required'), type=arg.get('type'),
choices=arg.get('choices'), default=arg.get('default'))
def _setup_init_cmd_parses(self, subparsers):
cmd_args_def = {
'action': {
'name': 'init',
'help': 'initialization tool for device security level credential',
'process': init_creds
},
'arguments': {
'dir': {
'name': ['-d', '--artifacts-dir'],
'metavar': 'dir',
'type': str,
'default': 'artifacts',
'help': 'output artifacts dir',
}
}
}
self._setup_arguments(subparsers, cmd_args_def)
def _setup_create_cmd_parses(self, subparsers):
cmd_args_def = {
'action': {
'name': 'create',
'help': 'creatation tool for device security level credential',
'process': create_creds,
},
'arguments': {
'dir': {
'name': ['-d', '--artifacts-dir'],
'metavar': 'dir',
'type': str,
'default': 'artifacts',
'help': 'input artifacts dir',
},
'type': {
'name': ['-t', '--field-type'],
'type': str,
'choices': ['debug', 'release'],
'default': 'debug',
'help': 'debug or release',
},
'manufacture': {
'name': ['-M', '--field-manufacture'],
'type': str,
'help': 'device manufacture info',
'required': True
},
'brand': {
'name': ['-b', '--field-brand'],
'type': str,
'help': 'device brand info',
'required': True
},
'model': {
'name': ['-m', '--field-model'],
'type': str,
'help': 'device model info',
'required': True
},
'udid': {
'name': ['-u', '--field-udid'],
'type': str,
'help': 'device udid info',
'required': False
},
'sn': {
'name': ['-n', '--field-sn'],
'type': str,
'help': 'device sn info',
'required': False
},
'softwareVersion': {
'name': ['-s', '--field-software-version'],
'type': str,
'help': 'device software version info',
'required': True
},
'securityLevel': {
'name': ['-l', '--field-security-level'],
'type': str,
'choices': ['SL1', 'SL2', 'SL3', 'SL4', 'SL5'],
'default': 'SL1',
'help': 'device security security info',
},
'cred': {
'name': ['-f', '--cred-file'],
'metavar': 'file',
'type': str,
'help': 'the device security level credential file to output',
'required': True
},
'strict': {
'name': ['--strict'],
'metavar': 'strict',
'type': bool,
'choices': [True, False],
'default': False,
'help': 'clean up the artifacts after process',
},
}
}
self._setup_arguments(subparsers, cmd_args_def)
def _setup_verify_cmd_parses(self, subparsers):
cmd_args_def = {
'action': {
'name': 'verify',
'help': 'verification tool for device security level credential',
'process': verify_creds
},
'arguments': {
'cred': {
'name': ['-f', '--cred-file'],
'metavar': 'file',
'type': str,
'help': 'the device security level credential file to verify',
'required': True
},
'dir': {
'name': ['-d', '--artifacts-dir'],
'metavar': 'dir',
'type': str,
'default': 'artifacts',
'help': 'input artifacts dir',
}
}}
self._setup_arguments(subparsers, cmd_args_def)
def parse_args(self):
parser = argparse.ArgumentParser(description='A collection of device security level credential tools')
subparsers = parser.add_subparsers(required=True, metavar='action')
self._setup_init_cmd_parses(subparsers)
self._setup_create_cmd_parses(subparsers)
self._setup_verify_cmd_parses(subparsers)
return parser.parse_args()
if __name__ == '__main__':
cmd = CredCommand()
args = cmd.parse_args()
args.process(args)

View File

@ -20,8 +20,11 @@ ohos_sa_profile("device_security_level_profile") {
ohos_prebuilt_etc("dslm_service.rc") {
source = "dslm_service.cfg"
deps = [ ":device_security_level_profile" ]
relative_install_dir = "init"
deps = [ ":device_security_level_profile" ]
subsystem_name = "security"
part_name = "device_security_level"
}

View File

@ -41,7 +41,7 @@ ohos_unittest("dslm_test") {
deps = [
"//base/security/device_security_level/baselib/utils:utils_static",
"//base/security/device_security_level/oem_property/ohos:dslm_ohos_cread_obj",
"//base/security/device_security_level/oem_property/ohos:dslm_ohos_cred_obj",
"//base/security/device_security_level/services/bigdata:service_bigdata_obj",
"//base/security/device_security_level/services/common:service_common_obj",
"//base/security/device_security_level/services/dslm:service_dslm_test_obj",