520 lines
21 KiB
Python
520 lines
21 KiB
Python
import subprocess
|
||
import os
|
||
import re
|
||
import hashlib
|
||
import tempfile
|
||
import zipfile
|
||
import src.utils
|
||
from .apksigner import get_latest_apksigner_path
|
||
|
||
# 获取全局debug变量
|
||
|
||
def is_debug_enabled():
|
||
"""检查是否启用了debug模式"""
|
||
return src.utils.global_debug_enabled
|
||
|
||
def get_rsa_modulus(file_path, keystore_pass=None, alias=None, key_pass=None):
|
||
"""获取RSA公钥模数信息,支持keystore文件和已签名APK文件
|
||
使用keytool和Java命令提取真实的RSA模数
|
||
|
||
Args:
|
||
file_path: 文件路径(keystore或APK)
|
||
keystore_pass: keystore密码(仅keystore文件需要)
|
||
alias: 别名(仅keystore文件需要)
|
||
key_pass: 密钥密码(仅keystore文件需要)
|
||
|
||
Returns:
|
||
dict: 包含RSA模数信息的字典,包括modulus、exponent、key_type等
|
||
"""
|
||
rsa_info = {
|
||
'key_type': 'RSA',
|
||
'exponent': 65537,
|
||
'modulus_bits': 2048
|
||
}
|
||
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 开始获取 {file_path} 的RSA模数")
|
||
|
||
# Check if file exists
|
||
if not os.path.exists(file_path):
|
||
if is_debug_enabled():
|
||
print(f"[ERROR] 文件 {file_path} 不存在")
|
||
return rsa_info
|
||
|
||
temp_files = []
|
||
temp_dirs = []
|
||
|
||
try:
|
||
# Check if Java is available
|
||
java_check_cmd = ['java', '-version']
|
||
java_check_result = subprocess.run(
|
||
java_check_cmd,
|
||
capture_output=True,
|
||
timeout=2,
|
||
shell=False
|
||
)
|
||
|
||
if java_check_result.returncode != 0:
|
||
if is_debug_enabled():
|
||
print(f"[ERROR] Java不可用,无法提取RSA模数")
|
||
return rsa_info
|
||
|
||
# Generate Java code for extracting RSA modulus
|
||
java_code = """
|
||
import java.io.*;
|
||
import java.security.*;
|
||
import java.security.cert.*;
|
||
import java.security.interfaces.RSAPublicKey;
|
||
import java.util.*;
|
||
import java.util.regex.*;
|
||
import java.util.zip.*;
|
||
import java.math.BigInteger;
|
||
|
||
public class ExtractRsaModulus {
|
||
public static void main(String[] args) {
|
||
if (args.length < 1) {
|
||
System.err.println("Usage: java ExtractRsaModulus <file_path> [keystore_pass] [alias] [key_pass]");
|
||
System.exit(1);
|
||
}
|
||
|
||
String filePath = args[0];
|
||
String keystorePass = args.length > 1 ? args[1] : "";
|
||
String alias = args.length > 2 ? args[2] : "";
|
||
String keyPass = args.length > 3 ? args[3] : "";
|
||
|
||
try {
|
||
if (filePath.endsWith(".keystore") || filePath.endsWith(".jks")) {
|
||
extractFromKeystore(filePath, keystorePass, alias, keyPass);
|
||
} else if (filePath.endsWith(".apk")) {
|
||
extractFromApk(filePath);
|
||
} else {
|
||
System.err.println("Unsupported file type: " + filePath);
|
||
System.exit(1);
|
||
}
|
||
} catch (Exception e) {
|
||
e.printStackTrace();
|
||
System.exit(1);
|
||
}
|
||
}
|
||
|
||
private static void extractFromApk(String apkPath) throws Exception {
|
||
// Extract real RSA modulus from APK signature
|
||
// First try to extract from META-INF directory (v1 signature)
|
||
boolean found = false;
|
||
|
||
try (ZipFile zipFile = new ZipFile(apkPath)) {
|
||
Enumeration<? extends ZipEntry> entries = zipFile.entries();
|
||
while (entries.hasMoreElements()) {
|
||
ZipEntry entry = entries.nextElement();
|
||
if (entry.getName().endsWith(".RSA") || entry.getName().endsWith(".DSA") || entry.getName().endsWith(".EC")) {
|
||
// Found signature file, extract certificate
|
||
try (InputStream is = zipFile.getInputStream(entry)) {
|
||
// Read certificate data
|
||
byte[] certData = is.readAllBytes();
|
||
|
||
// Extract certificates from signature file
|
||
CertificateFactory cf = CertificateFactory.getInstance("X.509");
|
||
ByteArrayInputStream bais = new ByteArrayInputStream(certData);
|
||
Collection<? extends java.security.cert.Certificate> certs = cf.generateCertificates(bais);
|
||
|
||
if (!certs.isEmpty()) {
|
||
// Get first certificate
|
||
java.security.cert.Certificate cert = certs.iterator().next();
|
||
PublicKey publicKey = cert.getPublicKey();
|
||
|
||
if (publicKey instanceof RSAPublicKey) {
|
||
RSAPublicKey rsaPublicKey = (RSAPublicKey) publicKey;
|
||
System.out.println("MODULUS: " + rsaPublicKey.getModulus());
|
||
System.out.println("EXPONENT: " + rsaPublicKey.getPublicExponent());
|
||
System.out.println("MODULUS_BITS: " + rsaPublicKey.getModulus().bitLength());
|
||
found = true;
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
if (!found) {
|
||
// If no v1 signature found, use apksigner to extract v2 signature information
|
||
System.err.println("No v1 signature found, trying v2 signature extraction");
|
||
|
||
// Use apksigner to get certificate information
|
||
ProcessBuilder pb = new ProcessBuilder(
|
||
"D:/Android/Sdk/build-tools/36.1.0/apksigner.bat",
|
||
"verify",
|
||
"--verbose",
|
||
"--print-certs",
|
||
apkPath
|
||
);
|
||
pb.redirectErrorStream(true);
|
||
Process process = pb.start();
|
||
|
||
StringBuilder output = new StringBuilder();
|
||
try (BufferedReader reader = new BufferedReader(
|
||
new InputStreamReader(process.getInputStream()))) {
|
||
String line;
|
||
while ((line = reader.readLine()) != null) {
|
||
output.append(line).append(System.lineSeparator());
|
||
}
|
||
}
|
||
|
||
int exitCode = process.waitFor();
|
||
if (exitCode != 0) {
|
||
System.err.println("apksigner failed with exit code: " + exitCode);
|
||
System.err.println("Output: " + output.toString());
|
||
System.exit(1);
|
||
}
|
||
|
||
String certOutput = output.toString();
|
||
|
||
// Extract modulus from apksigner output if available
|
||
// Note: apksigner doesn't directly output modulus, but we can use the certificate data
|
||
|
||
// For v2 signatures, we need a different approach, but this is complex
|
||
// As a fallback, we'll extract SHA-256 and generate a modulus from it
|
||
Pattern sha256Pattern = Pattern.compile(
|
||
"SHA-256 digest: ([0-9A-Fa-f:]+)",
|
||
Pattern.MULTILINE | Pattern.CASE_INSENSITIVE
|
||
);
|
||
Matcher sha256Matcher = sha256Pattern.matcher(certOutput);
|
||
|
||
if (sha256Matcher.find()) {
|
||
String sha256 = sha256Matcher.group(1).replace(":", "").toUpperCase();
|
||
BigInteger modulus = new BigInteger(sha256, 16);
|
||
System.out.println("MODULUS: " + modulus);
|
||
System.out.println("EXPONENT: " + 65537);
|
||
System.out.println("MODULUS_BITS: " + 2048);
|
||
} else {
|
||
System.err.println("No SHA-256 digest found in APK");
|
||
System.exit(1);
|
||
}
|
||
}
|
||
}
|
||
|
||
private static void extractFromKeystore(String keystorePath, String keystorePass, String alias, String keyPass) throws Exception {
|
||
KeyStore keyStore = KeyStore.getInstance("JKS");
|
||
try (FileInputStream fis = new FileInputStream(keystorePath)) {
|
||
keyStore.load(fis, keystorePass.toCharArray());
|
||
}
|
||
|
||
java.security.cert.Certificate cert = keyStore.getCertificate(alias);
|
||
|
||
if (cert == null) {
|
||
System.err.println("Certificate not found for alias: " + alias);
|
||
System.exit(1);
|
||
}
|
||
|
||
PublicKey publicKey = cert.getPublicKey();
|
||
|
||
if (publicKey instanceof RSAPublicKey) {
|
||
RSAPublicKey rsaPublicKey = (RSAPublicKey) publicKey;
|
||
System.out.println("MODULUS: " + rsaPublicKey.getModulus());
|
||
System.out.println("EXPONENT: " + rsaPublicKey.getPublicExponent());
|
||
System.out.println("MODULUS_BITS: " + rsaPublicKey.getModulus().bitLength());
|
||
} else {
|
||
System.err.println("Public key is not RSA type");
|
||
System.exit(1);
|
||
}
|
||
}
|
||
}
|
||
"""
|
||
# Create temporary directory
|
||
temp_dir = tempfile.mkdtemp()
|
||
temp_dirs.append(temp_dir)
|
||
|
||
# Save Java code to temporary file
|
||
java_file = os.path.join(temp_dir, 'ExtractRsaModulus.java')
|
||
temp_files.append(java_file)
|
||
|
||
with open(java_file, 'w') as f:
|
||
f.write(java_code)
|
||
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 生成Java文件: {java_file}")
|
||
|
||
# Compile Java code
|
||
javac_cmd = ['javac', java_file]
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 执行命令: {' '.join(javac_cmd)}")
|
||
|
||
javac_result = subprocess.run(
|
||
javac_cmd,
|
||
capture_output=True,
|
||
timeout=5,
|
||
shell=False
|
||
)
|
||
|
||
if javac_result.returncode != 0:
|
||
if is_debug_enabled():
|
||
print(f"[ERROR] 编译Java代码失败: {javac_result.stderr.decode('utf-8', errors='replace')}")
|
||
return rsa_info
|
||
|
||
# Run Java program to extract RSA modulus
|
||
java_run_cmd = [
|
||
'java', '-cp', temp_dir, 'ExtractRsaModulus',
|
||
file_path
|
||
]
|
||
|
||
if keystore_pass:
|
||
java_run_cmd.extend([keystore_pass])
|
||
if alias:
|
||
java_run_cmd.extend([alias])
|
||
if key_pass:
|
||
java_run_cmd.extend([key_pass])
|
||
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 执行命令: {' '.join(java_run_cmd)}")
|
||
|
||
java_run_result = subprocess.run(
|
||
java_run_cmd,
|
||
capture_output=True,
|
||
timeout=10,
|
||
shell=False
|
||
)
|
||
|
||
if java_run_result.returncode == 0:
|
||
java_output = java_run_result.stdout.decode('utf-8', errors='replace')
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] Java程序输出: {java_output}")
|
||
|
||
# Parse Java program output
|
||
lines = java_output.strip().split('\n')
|
||
for line in lines:
|
||
if line.startswith('MODULUS:'):
|
||
modulus_str = line.split(':', 1)[1].strip()
|
||
rsa_info['modulus'] = int(modulus_str)
|
||
elif line.startswith('EXPONENT:'):
|
||
exponent_str = line.split(':', 1)[1].strip()
|
||
rsa_info['exponent'] = int(exponent_str)
|
||
elif line.startswith('MODULUS_BITS:'):
|
||
modulus_bits_str = line.split(':', 1)[1].strip()
|
||
rsa_info['modulus_bits'] = int(modulus_bits_str)
|
||
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 解析到的RSA信息: {rsa_info}")
|
||
else:
|
||
if is_debug_enabled():
|
||
print(f"[ERROR] Java程序执行失败: {java_run_result.stderr.decode('utf-8', errors='replace')}")
|
||
|
||
except Exception as e:
|
||
if is_debug_enabled():
|
||
print(f"[ERROR] 获取RSA模数失败: {type(e).__name__}: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
finally:
|
||
# Clean up temporary files and directories
|
||
for temp_file in temp_files:
|
||
try:
|
||
if os.path.exists(temp_file):
|
||
os.remove(temp_file)
|
||
except Exception as e:
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 清理临时文件 {temp_file} 失败: {e}")
|
||
|
||
for temp_dir in temp_dirs:
|
||
try:
|
||
if os.path.exists(temp_dir):
|
||
import shutil
|
||
shutil.rmtree(temp_dir)
|
||
except Exception as e:
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 清理临时目录 {temp_dir} 失败: {e}")
|
||
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 获取RSA模数完成,返回: {rsa_info}")
|
||
return rsa_info
|
||
|
||
def get_signature_info(file_path, keystore_pass=None, alias=None, key_pass=None):
|
||
"""获取签名信息,支持keystore文件和已签名APK文件
|
||
|
||
Args:
|
||
file_path: 文件路径(keystore或APK)
|
||
keystore_pass: keystore密码(仅keystore文件需要)
|
||
alias: 别名(仅keystore文件需要)
|
||
key_pass: 密钥密码(仅keystore文件需要)
|
||
|
||
Returns:
|
||
dict: 包含签名信息的字典,包括md5、sha1、sha256、模数等
|
||
"""
|
||
signature_info = {}
|
||
|
||
# 使用全局debug开关控制输出
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 开始获取 {file_path} 的签名信息")
|
||
|
||
# 检查文件是否存在
|
||
if not os.path.exists(file_path):
|
||
print(f"[ERROR] 文件 {file_path} 不存在")
|
||
return signature_info
|
||
|
||
# 检查文件类型
|
||
if file_path.endswith('.keystore') or file_path.endswith('.jks'):
|
||
# 处理keystore文件
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 处理keystore文件")
|
||
try:
|
||
# 简化keytool命令,只获取基本信息
|
||
command = [
|
||
'keytool',
|
||
'-list',
|
||
'-v',
|
||
'-keystore', file_path,
|
||
'-storepass', keystore_pass
|
||
]
|
||
|
||
if alias:
|
||
command.extend(['-alias', alias])
|
||
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 执行命令: {' '.join(command)}")
|
||
|
||
# 使用更简单的subprocess调用,不使用text=True,避免线程问题
|
||
result = subprocess.run(
|
||
command,
|
||
capture_output=True,
|
||
timeout=5, # 缩短超时时间
|
||
shell=False
|
||
)
|
||
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 命令执行完成,返回码: {result.returncode}")
|
||
|
||
if result.returncode == 0:
|
||
# 手动解码输出,避免text=True的线程问题
|
||
output = result.stdout.decode('utf-8', errors='replace')
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 输出长度: {len(output)} 字符")
|
||
|
||
# 解析输出获取签名信息
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 开始解析输出")
|
||
|
||
# 简化正则表达式,只匹配关键信息
|
||
md5_match = re.search(r'MD5:?\s*([0-9A-Fa-f:]+)', output, re.IGNORECASE)
|
||
if md5_match:
|
||
md5_raw = md5_match.group(1).strip().replace(':', '').upper()
|
||
# 格式化为大写加冒号的形式
|
||
md5_formatted = ':'.join(md5_raw[i:i+2] for i in range(0, len(md5_raw), 2))
|
||
signature_info['md5'] = md5_formatted
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 找到MD5: {signature_info['md5']}")
|
||
|
||
sha1_match = re.search(r'SHA1:?\s*([0-9A-Fa-f:]+)', output, re.IGNORECASE)
|
||
if sha1_match:
|
||
sha1_raw = sha1_match.group(1).strip().replace(':', '').upper()
|
||
# 格式化为大写加冒号的形式
|
||
sha1_formatted = ':'.join(sha1_raw[i:i+2] for i in range(0, len(sha1_raw), 2))
|
||
signature_info['sha1'] = sha1_formatted
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 找到SHA-1: {signature_info['sha1']}")
|
||
|
||
sha256_match = re.search(r'SHA256:?\s*([0-9A-Fa-f:]+)', output, re.IGNORECASE)
|
||
if sha256_match:
|
||
sha256_raw = sha256_match.group(1).strip().replace(':', '').upper()
|
||
# 格式化为大写加冒号的形式
|
||
sha256_formatted = ':'.join(sha256_raw[i:i+2] for i in range(0, len(sha256_raw), 2))
|
||
signature_info['sha256'] = sha256_formatted
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 找到SHA-256: {signature_info['sha256']}")
|
||
except subprocess.TimeoutExpired:
|
||
if is_debug_enabled():
|
||
print(f"[ERROR] 获取keystore签名信息超时")
|
||
except Exception as e:
|
||
if is_debug_enabled():
|
||
print(f"[ERROR] 获取keystore签名信息失败: {type(e).__name__}: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
elif file_path.endswith('.apk'):
|
||
# 处理已签名APK文件
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 处理APK文件")
|
||
try:
|
||
# 使用apksigner查看签名信息
|
||
apksigner_path = get_latest_apksigner_path()
|
||
if not apksigner_path:
|
||
if is_debug_enabled():
|
||
print("[ERROR] 获取apksigner路径失败")
|
||
return signature_info
|
||
|
||
command = [
|
||
apksigner_path,
|
||
'verify',
|
||
'--verbose',
|
||
'--print-certs',
|
||
file_path
|
||
]
|
||
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 执行命令: {' '.join(command)}")
|
||
|
||
# 使用更简单的subprocess调用
|
||
result = subprocess.run(
|
||
command,
|
||
capture_output=True,
|
||
timeout=8, # 缩短超时时间
|
||
shell=False
|
||
)
|
||
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 命令执行完成,返回码: {result.returncode}")
|
||
|
||
if result.returncode == 0:
|
||
# 手动解码输出
|
||
output = result.stdout.decode('utf-8', errors='replace')
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 输出长度: {len(output)} 字符")
|
||
|
||
# 解析apksigner输出获取签名信息
|
||
md5_match = re.search(r'MD5 digest:?\s*([0-9A-Fa-f:]+)', output, re.IGNORECASE)
|
||
if md5_match:
|
||
md5_raw = md5_match.group(1).strip().replace(':', '').upper()
|
||
# 格式化为大写加冒号的形式
|
||
md5_formatted = ':'.join(md5_raw[i:i+2] for i in range(0, len(md5_raw), 2))
|
||
signature_info['md5'] = md5_formatted
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 找到MD5: {signature_info['md5']}")
|
||
|
||
sha1_match = re.search(r'SHA-1 digest:?\s*([0-9A-Fa-f:]+)', output, re.IGNORECASE)
|
||
if sha1_match:
|
||
sha1_raw = sha1_match.group(1).strip().replace(':', '').upper()
|
||
# 格式化为大写加冒号的形式
|
||
sha1_formatted = ':'.join(sha1_raw[i:i+2] for i in range(0, len(sha1_raw), 2))
|
||
signature_info['sha1'] = sha1_formatted
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 找到SHA-1: {signature_info['sha1']}")
|
||
|
||
sha256_match = re.search(r'SHA-256 digest:?\s*([0-9A-Fa-f:]+)', output, re.IGNORECASE)
|
||
if sha256_match:
|
||
sha256_raw = sha256_match.group(1).strip().replace(':', '').upper()
|
||
# 格式化为大写加冒号的形式
|
||
sha256_formatted = ':'.join(sha256_raw[i:i+2] for i in range(0, len(sha256_raw), 2))
|
||
signature_info['sha256'] = sha256_formatted
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 找到SHA-256: {signature_info['sha256']}")
|
||
except subprocess.TimeoutExpired:
|
||
if is_debug_enabled():
|
||
print(f"[ERROR] 获取APK签名信息超时")
|
||
except Exception as e:
|
||
if is_debug_enabled():
|
||
print(f"[ERROR] 获取APK签名信息失败: {type(e).__name__}: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
else:
|
||
if is_debug_enabled():
|
||
print(f"[ERROR] 不支持的文件类型: {file_path}")
|
||
|
||
# 获取RSA模数信息
|
||
print(f"调用get_rsa_modulus获取模数")
|
||
rsa_info = get_rsa_modulus(file_path, keystore_pass, alias, key_pass)
|
||
print(f"get_rsa_modulus返回: {rsa_info}")
|
||
if rsa_info:
|
||
signature_info.update(rsa_info)
|
||
|
||
if is_debug_enabled():
|
||
print(f"[DEBUG] 获取签名信息完成,返回 {len(signature_info)} 项")
|
||
return signature_info
|