import binascii from datetime import datetime from pyasn1.type import univ, namedtype from pyasn1.codec.der.encoder import encode from pyasn1.codec.der.decoder import decode from gmssl import sm2 from pyasn1.codec.der import decoder, encoder from pyasn1_modules import rfc2459 from gmssl.sm2 import CryptSM2 from pyasn1.type.useful import GeneralizedTime from pyasn1.type.univ import Sequence from pyasn1.type import useful
class ECPrimeFieldConfig(univ.Sequence): componentType = namedtype.NamedTypes( namedtype.NamedType('fieldType', univ.ObjectIdentifier('1.2.840.10045.1.1')), namedtype.NamedType('prime', univ.Integer()), )
class ECCurveParameters(univ.Sequence): componentType = namedtype.NamedTypes( namedtype.NamedType('coefficientA', univ.OctetString()), namedtype.NamedType('coefficientB', univ.OctetString()), )
class ECDomainParameters(univ.Sequence): componentType = namedtype.NamedTypes( namedtype.NamedType('version', univ.Integer(1)), namedtype.NamedType('fieldParameters', ECPrimeFieldConfig()), namedtype.NamedType('curveParameters', ECCurveParameters()), namedtype.NamedType('basePoint', univ.OctetString()), namedtype.NamedType('order', univ.Integer()), namedtype.NamedType('cofactor', univ.Integer(1)), )
class SM2SignatureValue(univ.Sequence): componentType = namedtype.NamedTypes( namedtype.NamedType('r', univ.Integer()), namedtype.NamedType('s', univ.Integer()), )
class SM2SignedData(univ.Sequence): componentType = namedtype.NamedTypes( namedtype.NamedType('version', univ.Integer()), namedtype.NamedType('digestAlgorithms', univ.ObjectIdentifier()), namedtype.NamedType('sm2Signature', SM2SignatureValue()), namedtype.NamedType('ecDomainParameters', ECDomainParameters()), namedtype.NamedType('certificate', univ.OctetString()), namedtype.NamedType('timestamp', GeneralizedTime()), )
def asn1_package(version, oid, signature, curve_params, cert_hex, time_stamp): sm2_signed_data = SM2SignedData() sm2_signed_data['version'] = version sm2_signed_data['digestAlgorithms'] = oid sm2_signed_data["sm2Signature"] = SM2SignatureValue() sm2_signed_data["sm2Signature"]['r'] = int(signature[:64], 16) sm2_signed_data["sm2Signature"]['s'] = int(signature[64:], 16) sm2_signed_data["ecDomainParameters"] = ECDomainParameters() sm2_signed_data["ecDomainParameters"]["fieldParameters"] = ECPrimeFieldConfig() sm2_signed_data["ecDomainParameters"]["fieldParameters"]["prime"] = int(curve_params['p'], 16) sm2_signed_data["ecDomainParameters"]["curveParameters"] = ECCurveParameters() sm2_signed_data["ecDomainParameters"]["curveParameters"]["coefficientA"] = univ.OctetString( bytes.fromhex(curve_params['a'])) sm2_signed_data["ecDomainParameters"]["curveParameters"]["coefficientB"] = univ.OctetString( bytes.fromhex(curve_params['b'])) sm2_signed_data["ecDomainParameters"]['basePoint'] = univ.OctetString(bytes.fromhex('04' + curve_params['g'])) sm2_signed_data["ecDomainParameters"]['order'] = int(curve_params['n'], 16) sm2_signed_data["certificate"] = univ.OctetString(bytes.fromhex(cert_hex)) dt = datetime.strptime(time_stamp, "%Y-%m-%d %H:%M:%S") asn1_time_str = dt.strftime("%Y%m%d%H%M%SZ") sm2_signed_data["timestamp"] = GeneralizedTime(asn1_time_str) return encode(sm2_signed_data).hex()
class Sm2CertVerifier: def __init__(self, cert_hex: str): ca_pubkey = "8E1860588D9900C16BD19A0FE0A5ACC600224DBD794FFD34179E03698D52421F46E6D8C6E8AADE512C7B543395AC39C76384726C7F8BA537ABCA0C129ECD9882" self.sm2_crypt = sm2.CryptSM2(public_key=ca_pubkey, private_key=None) self.cert_tbs, self.signature_bytes, self.cert = self.parse_cert(bytes.fromhex(cert_hex))
@staticmethod def parse_cert(cert_der_bytes: bytes): cert, _ = decoder.decode(cert_der_bytes, asn1Spec=rfc2459.Certificate()) tbs = cert.getComponentByName('tbsCertificate') signature_bytes = cert.getComponentByName('signatureValue').asOctets() return tbs, signature_bytes, cert
def decode_rs_from_der(self, signature: bytes) -> bytes: seq, _ = decode(signature, asn1Spec=Sequence()) r = int(seq[0]) s = int(seq[1]) r_bytes = r.to_bytes(32, byteorder='big') s_bytes = s.to_bytes(32, byteorder='big') return r_bytes + s_bytes
def verify_signature(self, signature: bytes, tbs: str): inter_cert_tbs_der = encoder.encode(tbs) inter_signature = self.decode_rs_from_der(signature) return self.sm2_crypt.verify_with_sm3(inter_signature.hex(), inter_cert_tbs_der)
def verify_certificate_expiration_date(self, tbs): validity = tbs.getComponentByName('validity') not_before = validity.getComponentByName('notBefore').getComponent() not_after = validity.getComponentByName('notAfter').getComponent()
if isinstance(not_before, useful.UTCTime): not_before_time = datetime.strptime(str(not_before), "%y%m%d%H%M%SZ") elif isinstance(not_before, useful.GeneralizedTime): not_before_time = datetime.strptime(str(not_before), "%Y%m%d%H%M%SZ") else: raise ValueError("Unsupported notBefore time format")
if isinstance(not_after, useful.UTCTime): not_after_time = datetime.strptime(str(not_after), "%y%m%d%H%M%SZ") elif isinstance(not_after, useful.GeneralizedTime): not_after_time = datetime.strptime(str(not_after), "%Y%m%d%H%M%SZ") else: raise ValueError("Unsupported notAfter time format")
now = datetime.now() return not_before_time <= now <= not_after_time
def verify(self): if not self.verify_certificate_expiration_date(self.cert_tbs): print("证书已过期或尚未生效") return False if not self.verify_signature(self.signature_bytes, self.cert_tbs): print("证书验证未通过") return False return True
class SM2Config: def __init__(self, asn1_str): self.sm2_signed_data,asn1_acess = self.hex_to_asn1(asn1_str, SM2SignedData()) if len(asn1_acess) != 0: raise ValueError("asn1长度有问题") cert_hex = self.get_hex_value(self.sm2_signed_data['certificate']) sm2_cert_verifier = Sm2CertVerifier(cert_hex) valid = sm2_cert_verifier.verify() if not valid: raise TypeError("证书验证不通过") g = self.get_hex_value(self.sm2_signed_data['ecDomainParameters']['basePoint']) g = g[2:] if g.startswith("04") else g self.ecc_table = { 'n': self.get_hex_value(self.sm2_signed_data['ecDomainParameters']['order']), 'p': self.get_hex_value(self.sm2_signed_data['ecDomainParameters']['fieldParameters']['prime']), 'g': g, 'a': self.get_hex_value(self.sm2_signed_data['ecDomainParameters']['curveParameters']['coefficientA']), 'b': self.get_hex_value(self.sm2_signed_data['ecDomainParameters']['curveParameters']['coefficientB']), } public_key = self.extract_public_key(sm2_cert_verifier.cert_tbs) self.sm2_crypt = CryptSM2( private_key="", public_key=public_key, ecc_table=self.ecc_table ) self.sign = (int(self.sm2_signed_data['sm2Signature']['r']).to_bytes(32, 'big').hex().upper() + int(self.sm2_signed_data['sm2Signature']['s']).to_bytes(32, 'big').hex().upper())
@staticmethod def hex_to_asn1(hex_str, asn1_spec): """ 将16进制字符串转换回ASN.1对象 :param hex_str: 16进制字符串 :param asn1_spec: ASN.1结构定义 :return: ASN.1对象 """ der_bytes = binascii.unhexlify(hex_str)
asn1_object, excess = decode(der_bytes, asn1Spec=asn1_spec)
return asn1_object,excess
@staticmethod def get_hex_value(value): """通用转换函数:将 ASN.1 值转换为 16 进制字符串(大写,无前缀)""" if isinstance(value, univ.Integer): return format(int(value), 'X') elif isinstance(value, univ.OctetString): return value.asOctets().hex().upper() else: raise TypeError(f"Unsupported type: {type(value)}")
@staticmethod def extract_public_key(tbs): spki = tbs.getComponentByName('subjectPublicKeyInfo') public_key_bitstring = spki.getComponentByName('subjectPublicKey') pubkey_bytes = bytearray(public_key_bitstring.asOctets()) return pubkey_bytes.hex()
def verify_misc(self): if (int(self.sm2_signed_data['version']) != 1 or str(self.sm2_signed_data['digestAlgorithms']) != '1.2.156.10197.1.401.1' or str(self.sm2_signed_data['timestamp']) != "20250520101000Z"): return False return True
def verify(self, data): valid = self.verify_misc() if not valid: return valid valid = self.sm2_crypt.verify_with_sm3(self.sign, data) return valid
def generateSM2SignedDataExample(): version = 1 oid = '1.2.156.10197.1.401.1' signature = '6f8eaff551d0f3fa6de74b75b33e1e58f9fdb4dc58e61c82e11e717ffcf168c4db3d5a90ff3625d12b8b658f8dbab34340c278b412b3aff25489e7feb1c75598' r = signature[:64] s = signature[64:] curve_params = { "n": 'FFFFFFFEFFFFFFFFFFFFFFFFFFFFFFFF7203DF6B21C6052B53BBF40939D54123', "p": 'FFFFFFFEFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF00000000FFFFFFFFFFFFFFFF', "g": '32c4ae2c1f1981195f9904466a39c9948fe30bbff2660be1715a4589334c74c7bc3736a2f4f6779c59bdcee36b692153d0a9877cc62a474002df32e52139f0a0', "a": 'FFFFFFFEFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF00000000FFFFFFFFFFFFFFFC', "b": '28E9FA9E9D9F5E344D5A9E4BCF6509A7F39789F515AB8F92DDBCBD414D940E93', } cert_hextime_stamp = '2025-05-20 10:10:00'
asn1_package_hex = asn1_package(version, oid, signature, curve_params, cert_hex, time_stamp) return(asn1_package_hex)
if __name__ == '__main__': data = b"Hello, CryptoCup!" asn1_package_hex = generateSM2SignedDataExample() sm2_config = SM2Config(asn1_package_hex) result = sm2_config.verify(data) print(result
""" 待签名消息(字符串): TYTNGLNVKYDLVCGTUYUARFLEFKLAKAZO """
|