mirror of
https://github.com/noDRM/DeDRM_tools.git
synced 2026-03-20 04:58:56 +00:00
Ton of PDF DeDRM updates
- Support "Standard" and "Adobe.APS" encryptions - Support decrypting with owner password instead of user password - New function to return encryption filter name - Support for V=5, R=5 and R=6 PDF files - Support for AES256-encrypted PDF files - Disable broken cross-reference streams in output
This commit is contained in:
@@ -3,6 +3,7 @@
|
||||
|
||||
# ineptpdf.py
|
||||
# Copyright © 2009-2020 by i♥cabbages, Apprentice Harper et al.
|
||||
# Copyright © 2021 by noDRM
|
||||
|
||||
# Released under the terms of the GNU General Public Licence, version 3
|
||||
# <http://www.gnu.org/licenses/>
|
||||
@@ -46,13 +47,14 @@
|
||||
# 8.0.5 - Do not process DRM-free documents
|
||||
# 8.0.6 - Replace use of float by Decimal for greater precision, and import tkFileDialog
|
||||
# 9.0.0 - Add Python 3 compatibility for calibre 5
|
||||
# 9.1.0 - Support for decrypting with owner password, support for V=5, R=5 and R=6 PDF files, support for AES256-encrypted PDFs.
|
||||
|
||||
"""
|
||||
Decrypts Adobe ADEPT-encrypted PDF files.
|
||||
"""
|
||||
|
||||
__license__ = 'GPL v3'
|
||||
__version__ = "9.0.0"
|
||||
__version__ = "9.1.0"
|
||||
|
||||
import codecs
|
||||
import sys
|
||||
@@ -131,6 +133,9 @@ def unicode_argv():
|
||||
class ADEPTError(Exception):
|
||||
pass
|
||||
|
||||
class ADEPTInvalidPasswordError(Exception):
|
||||
pass
|
||||
|
||||
class ADEPTNewVersionError(Exception):
|
||||
pass
|
||||
|
||||
@@ -184,6 +189,7 @@ def _load_crypto_libcrypto():
|
||||
|
||||
AES_cbc_encrypt = F(None, 'AES_cbc_encrypt',[c_char_p, c_char_p, c_ulong, AES_KEY_p, c_char_p,c_int])
|
||||
AES_set_decrypt_key = F(c_int, 'AES_set_decrypt_key',[c_char_p, c_int, AES_KEY_p])
|
||||
AES_set_encrypt_key = F(c_int, 'AES_set_encrypt_key',[c_char_p, c_int, AES_KEY_p])
|
||||
|
||||
RC4_set_key = F(None,'RC4_set_key',[RC4_KEY_p, c_int, c_char_p])
|
||||
RC4_crypt = F(None,'RC4',[RC4_KEY_p, c_int, c_char_p, c_char_p])
|
||||
@@ -236,7 +242,7 @@ def _load_crypto_libcrypto():
|
||||
class AES(object):
|
||||
MODE_CBC = 0
|
||||
@classmethod
|
||||
def new(cls, userkey, mode, iv):
|
||||
def new(cls, userkey, mode, iv, decrypt=True):
|
||||
self = AES()
|
||||
self._blocksize = len(userkey)
|
||||
# mode is ignored since CBCMODE is only thing supported/used so far
|
||||
@@ -246,7 +252,11 @@ def _load_crypto_libcrypto():
|
||||
return
|
||||
keyctx = self._keyctx = AES_KEY()
|
||||
self._iv = iv
|
||||
rv = AES_set_decrypt_key(userkey, len(userkey) * 8, keyctx)
|
||||
self._isDecrypt = decrypt
|
||||
if decrypt:
|
||||
rv = AES_set_decrypt_key(userkey, len(userkey) * 8, keyctx)
|
||||
else:
|
||||
rv = AES_set_encrypt_key(userkey, len(userkey) * 8, keyctx)
|
||||
if rv < 0:
|
||||
raise ADEPTError('Failed to initialize AES key')
|
||||
return self
|
||||
@@ -255,12 +265,23 @@ def _load_crypto_libcrypto():
|
||||
self._keyctx = None
|
||||
self._iv = 0
|
||||
self._mode = 0
|
||||
self._isDecrypt = None
|
||||
def decrypt(self, data):
|
||||
if not self._isDecrypt:
|
||||
raise ADEPTError("AES not ready for decryption")
|
||||
out = create_string_buffer(len(data))
|
||||
rv = AES_cbc_encrypt(data, out, len(data), self._keyctx, self._iv, 0)
|
||||
if rv == 0:
|
||||
raise ADEPTError('AES decryption failed')
|
||||
return out.raw
|
||||
def encrypt(self, data):
|
||||
if self._isDecrypt:
|
||||
raise ADEPTError("AES not ready for encryption")
|
||||
out = create_string_buffer(len(data))
|
||||
rv = AES_cbc_encrypt(data, out, len(data), self._keyctx, self._iv, 1)
|
||||
if rv == 0:
|
||||
raise ADEPTError('AES decryption failed')
|
||||
return out.raw
|
||||
|
||||
return (ARC4, RSA, AES)
|
||||
|
||||
@@ -373,14 +394,23 @@ def _load_crypto_pycrypto():
|
||||
class AES(object):
|
||||
MODE_CBC = _AES.MODE_CBC
|
||||
@classmethod
|
||||
def new(cls, userkey, mode, iv):
|
||||
def new(cls, userkey, mode, iv, decrypt=True):
|
||||
self = AES()
|
||||
self._aes = _AES.new(userkey, mode, iv)
|
||||
self._decrypt = decrypt
|
||||
return self
|
||||
def __init__(self):
|
||||
self._aes = None
|
||||
self._decrypt = None
|
||||
def decrypt(self, data):
|
||||
if not self._decrypt:
|
||||
raise ADEPTError("AES not ready for decrypt.")
|
||||
|
||||
return self._aes.decrypt(data)
|
||||
def encrypt(self, data):
|
||||
if self._decrypt:
|
||||
raise ADEPTError("AES not ready for encrypt.")
|
||||
return self._aes.encrypt(data)
|
||||
|
||||
class RSA(object):
|
||||
def __init__(self, der):
|
||||
@@ -422,7 +452,7 @@ ARC4, RSA, AES = _load_crypto()
|
||||
# 1 = only if present in input
|
||||
# 2 = always
|
||||
|
||||
GEN_XREF_STM = 1
|
||||
GEN_XREF_STM = 0
|
||||
|
||||
# This is the value for the current document
|
||||
gen_xref_stm = False # will be set in PDFSerializer
|
||||
@@ -1507,6 +1537,16 @@ class PDFDocument(object):
|
||||
|
||||
raise PDFEncryptionError('Unknown filter: param=%r' % param)
|
||||
|
||||
def initialize_and_return_filter(self):
|
||||
if not self.encryption:
|
||||
self.is_printable = self.is_modifiable = self.is_extractable = True
|
||||
self.ready = True
|
||||
return None
|
||||
|
||||
(docid, param) = self.encryption
|
||||
type = literal_name(param['Filter'])
|
||||
return type
|
||||
|
||||
def initialize_adobe_ps(self, password, docid, param):
|
||||
global KEYFILEPATH
|
||||
self.decrypt_key = self.genkey_adobe_ps(param)
|
||||
@@ -1549,30 +1589,178 @@ class PDFDocument(object):
|
||||
PASSWORD_PADDING = b'(\xbfN^Nu\x8aAd\x00NV\xff\xfa\x01\x08..' \
|
||||
b'\x00\xb6\xd0h>\x80/\x0c\xa9\xfedSiz'
|
||||
# experimental aes pw support
|
||||
def initialize_standard(self, password, docid, param):
|
||||
# copy from a global variable
|
||||
|
||||
def check_user_password(self, password, docid, param):
|
||||
V = int_value(param.get('V', 0))
|
||||
if V < 5:
|
||||
return self.check_user_password_V4(password, docid, param)
|
||||
else:
|
||||
return self.check_user_password_V5(password, param)
|
||||
|
||||
def check_owner_password(self, password, docid, param):
|
||||
V = int_value(param.get('V', 0))
|
||||
if V < 5:
|
||||
return self.check_owner_password_V4(password, docid, param)
|
||||
else:
|
||||
return self.check_owner_password_V5(password, param)
|
||||
|
||||
def check_user_password_V5(self, password, param):
|
||||
U = str_value(param['U'])
|
||||
userdata = U[:32]
|
||||
salt = U[32:32+8]
|
||||
# Truncate password:
|
||||
password = password[:min(127, len(password))]
|
||||
if self.hash_V5(password, salt, b"", param) == userdata:
|
||||
return True
|
||||
return None
|
||||
|
||||
def check_owner_password_V5(self, password, param):
|
||||
U = str_value(param['U'])
|
||||
O = str_value(param['O'])
|
||||
userdata = U[:48]
|
||||
ownerdata = O[:32]
|
||||
salt = O[32:32+8]
|
||||
# Truncate password:
|
||||
password = password[:min(127, len(password))]
|
||||
if self.hash_V5(password, salt, userdata, param) == ownerdata:
|
||||
return True
|
||||
return None
|
||||
|
||||
def recover_encryption_key_with_password(self, password, docid, param):
|
||||
# Truncate password:
|
||||
key_password = password[:min(127, len(password))]
|
||||
|
||||
if self.check_owner_password_V5(key_password, param):
|
||||
O = str_value(param['O'])
|
||||
U = str_value(param['U'])
|
||||
OE = str_value(param['OE'])
|
||||
key_salt = O[40:40+8]
|
||||
user_data = U[:48]
|
||||
encrypted_file_key = OE[:32]
|
||||
elif self.check_user_password_V5(key_password, param):
|
||||
U = str_value(param['U'])
|
||||
UE = str_value(param['UE'])
|
||||
key_salt = U[40:40+8]
|
||||
user_data = b""
|
||||
encrypted_file_key = UE[:32]
|
||||
else:
|
||||
raise Exception("Trying to recover key, but neither user nor owner pass is correct.")
|
||||
|
||||
intermediate_key = self.hash_V5(key_password, key_salt, user_data, param)
|
||||
|
||||
file_key = self.process_with_aes(intermediate_key, False, encrypted_file_key)
|
||||
|
||||
return file_key
|
||||
|
||||
|
||||
def process_with_aes(self, key: bytes, encrypt: bool, data: bytes, repetitions: int = 1, iv: bytes = None):
|
||||
if iv is None:
|
||||
keylen = len(key)
|
||||
iv = bytes([0x00]*keylen)
|
||||
|
||||
if not encrypt:
|
||||
plaintext = AES.new(key,AES.MODE_CBC,iv, True).decrypt(data)
|
||||
return plaintext
|
||||
else:
|
||||
aes = AES.new(key, AES.MODE_CBC, iv, False)
|
||||
new_data = bytes(data * repetitions)
|
||||
crypt = aes.encrypt(new_data)
|
||||
return crypt
|
||||
|
||||
|
||||
def hash_V5(self, password, salt, userdata, param):
|
||||
R = int_value(param['R'])
|
||||
K = SHA256(password + salt + userdata)
|
||||
if R < 6:
|
||||
return K
|
||||
elif R == 6:
|
||||
round_number = 0
|
||||
done = False
|
||||
while (not done):
|
||||
round_number = round_number + 1
|
||||
K1 = password + K + userdata
|
||||
if len(K1) < 32:
|
||||
raise Exception("K1 < 32 ...")
|
||||
#def process_with_aes(self, key: bytes, encrypt: bool, data: bytes, repetitions: int = 1, iv: bytes = None):
|
||||
E = self.process_with_aes(K[:16], True, K1, 64, K[16:32])
|
||||
|
||||
E_mod_3 = 0
|
||||
for i in range(16):
|
||||
E_mod_3 += E[i]
|
||||
E_mod_3 = E_mod_3 % 3
|
||||
|
||||
if E_mod_3 == 0:
|
||||
ctx = hashlib.sha256()
|
||||
ctx.update(E)
|
||||
K = ctx.digest()
|
||||
elif E_mod_3 == 1:
|
||||
ctx = hashlib.sha384()
|
||||
ctx.update(E)
|
||||
K = ctx.digest()
|
||||
else:
|
||||
ctx = hashlib.sha512()
|
||||
ctx.update(E)
|
||||
K = ctx.digest()
|
||||
|
||||
if round_number >= 64:
|
||||
ch = int.from_bytes(E[-1:], "big", signed=False)
|
||||
if ch <= round_number - 32:
|
||||
done = True
|
||||
|
||||
result = K[0:32]
|
||||
return result
|
||||
else:
|
||||
raise NotImplementedError("Revision > 6 not supported.")
|
||||
|
||||
|
||||
def check_owner_password_V4(self, password, docid, param):
|
||||
|
||||
# compute_O_rc4_key:
|
||||
V = int_value(param.get('V', 0))
|
||||
if V >= 5:
|
||||
raise Exception("compute_O_rc4_key not possible with V>= 5")
|
||||
|
||||
R = int_value(param.get('R', 0))
|
||||
|
||||
length = int_value(param.get('Length', 40)) # Key length (bits)
|
||||
password = (password+self.PASSWORD_PADDING)[:32]
|
||||
hash = hashlib.md5(password)
|
||||
if R >= 3:
|
||||
for _ in range(50):
|
||||
hash = hashlib.md5(hash.digest()[:length//8])
|
||||
hash = hash.digest()[:length//8]
|
||||
|
||||
# "hash" is the return value of compute_O_rc4_key
|
||||
|
||||
Odata = str_value(param.get('O'))
|
||||
# now call iterate_rc4 ...
|
||||
x = ARC4.new(hash).decrypt(Odata) # 4
|
||||
if R >= 3:
|
||||
for i in range(1,19+1):
|
||||
k = b''.join(bytes([c ^ i]) for c in hash )
|
||||
x = ARC4.new(k).decrypt(x)
|
||||
|
||||
# TODO: remove the padding string from the end of the data!
|
||||
for ct in range(1, len(x)):
|
||||
new_x = x[:ct]
|
||||
enc_key = self.check_user_password(new_x, docid, param)
|
||||
if enc_key is not None:
|
||||
return enc_key
|
||||
|
||||
return False
|
||||
|
||||
|
||||
|
||||
|
||||
def check_user_password_V4(self, password, docid, param):
|
||||
|
||||
V = int_value(param.get('V', 0))
|
||||
if (V <=0 or V > 4):
|
||||
raise PDFEncryptionError('Unknown algorithm: param=%r' % param)
|
||||
length = int_value(param.get('Length', 40)) # Key length (bits)
|
||||
O = str_value(param['O'])
|
||||
R = int_value(param['R']) # Revision
|
||||
if 5 <= R:
|
||||
raise PDFEncryptionError('Unknown revision: %r' % R)
|
||||
U = str_value(param['U'])
|
||||
P = int_value(param['P'])
|
||||
try:
|
||||
EncMetadata = str_value(param['EncryptMetadata'])
|
||||
except:
|
||||
EncMetadata = b'True'
|
||||
self.is_printable = bool(P & 4)
|
||||
self.is_modifiable = bool(P & 8)
|
||||
self.is_extractable = bool(P & 16)
|
||||
self.is_annotationable = bool(P & 32)
|
||||
self.is_formsenabled = bool(P & 256)
|
||||
self.is_textextractable = bool(P & 512)
|
||||
self.is_assemblable = bool(P & 1024)
|
||||
self.is_formprintable = bool(P & 2048)
|
||||
|
||||
# Algorithm 3.2
|
||||
password = (password+self.PASSWORD_PADDING)[:32] # 1
|
||||
hash = hashlib.md5(password) # 2
|
||||
@@ -1580,9 +1768,13 @@ class PDFDocument(object):
|
||||
hash.update(struct.pack('<l', P)) # 4
|
||||
hash.update(docid[0]) # 5
|
||||
# aes special handling if metadata isn't encrypted
|
||||
if EncMetadata == ('False' or 'false'):
|
||||
try:
|
||||
EncMetadata = str_value(param['EncryptMetadata'])
|
||||
except:
|
||||
EncMetadata = b'True'
|
||||
if (EncMetadata == ('False' or 'false') or V < 4) and R >= 4:
|
||||
hash.update(codecs.decode(b'ffffffff','hex'))
|
||||
if 5 <= R:
|
||||
if R >= 3:
|
||||
# 8
|
||||
for _ in range(50):
|
||||
hash = hashlib.md5(hash.digest()[:length//8])
|
||||
@@ -1603,25 +1795,100 @@ class PDFDocument(object):
|
||||
is_authenticated = (u1 == U)
|
||||
else:
|
||||
is_authenticated = (u1[:16] == U[:16])
|
||||
if not is_authenticated:
|
||||
raise ADEPTError('Password is not correct.')
|
||||
self.decrypt_key = key
|
||||
|
||||
if is_authenticated:
|
||||
return key
|
||||
|
||||
return None
|
||||
|
||||
def initialize_standard(self, password, docid, param):
|
||||
|
||||
self.decrypt_key = None
|
||||
|
||||
|
||||
# copy from a global variable
|
||||
V = int_value(param.get('V', 0))
|
||||
if (V <=0 or V > 5):
|
||||
raise PDFEncryptionError('Unknown algorithm: %r' % V)
|
||||
R = int_value(param['R']) # Revision
|
||||
if R >= 7:
|
||||
raise PDFEncryptionError('Unknown revision: %r' % R)
|
||||
|
||||
# check owner pass:
|
||||
retval = self.check_owner_password(password, docid, param)
|
||||
if retval is True or retval is not None:
|
||||
#print("Owner pass is valid - " + str(retval))
|
||||
if retval is True:
|
||||
self.decrypt_key = self.recover_encryption_key_with_password(password, docid, param)
|
||||
else:
|
||||
self.decrypt_key = retval
|
||||
|
||||
if self.decrypt_key is None or self.decrypt_key is True or self.decrypt_key is False:
|
||||
# That's not the owner password. Check if it's the user password.
|
||||
retval = self.check_user_password(password, docid, param)
|
||||
if retval is True or retval is not None:
|
||||
#print("User pass is valid")
|
||||
if retval is True:
|
||||
self.decrypt_key = self.recover_encryption_key_with_password(password, docid, param)
|
||||
else:
|
||||
self.decrypt_key = retval
|
||||
|
||||
if self.decrypt_key is None or self.decrypt_key is True or self.decrypt_key is False:
|
||||
raise ADEPTInvalidPasswordError("Password invalid.")
|
||||
|
||||
|
||||
P = int_value(param['P'])
|
||||
|
||||
self.is_printable = bool(P & 4)
|
||||
self.is_modifiable = bool(P & 8)
|
||||
self.is_extractable = bool(P & 16)
|
||||
self.is_annotationable = bool(P & 32)
|
||||
self.is_formsenabled = bool(P & 256)
|
||||
self.is_textextractable = bool(P & 512)
|
||||
self.is_assemblable = bool(P & 1024)
|
||||
self.is_formprintable = bool(P & 2048)
|
||||
|
||||
|
||||
# genkey method
|
||||
if V == 1 or V == 2:
|
||||
if V == 1 or V == 2 or V == 4:
|
||||
self.genkey = self.genkey_v2
|
||||
elif V == 3:
|
||||
self.genkey = self.genkey_v3
|
||||
elif V == 4:
|
||||
self.genkey = self.genkey_v2
|
||||
#self.genkey = self.genkey_v3 if V == 3 else self.genkey_v2
|
||||
elif V >= 5:
|
||||
self.genkey = self.genkey_v5
|
||||
|
||||
set_decipher = False
|
||||
|
||||
if V >= 4:
|
||||
# Check if we need new genkey_v4 - only if we're using AES.
|
||||
try:
|
||||
for key in param['CF']:
|
||||
algo = str(param["CF"][key]["CFM"])
|
||||
if algo == "/AESV2":
|
||||
if V == 4:
|
||||
self.genkey = self.genkey_v4
|
||||
set_decipher = True
|
||||
self.decipher = self.decrypt_aes
|
||||
elif algo == "/AESV3":
|
||||
if V == 4:
|
||||
self.genkey = self.genkey_v4
|
||||
set_decipher = True
|
||||
self.decipher = self.decrypt_aes
|
||||
elif algo == "/V2":
|
||||
set_decipher = True
|
||||
self.decipher = self.decrypt_rc4
|
||||
except:
|
||||
pass
|
||||
|
||||
# rc4
|
||||
if V != 4:
|
||||
self.decipher = self.decipher_rc4 # XXX may be AES
|
||||
if V < 4:
|
||||
self.decipher = self.decrypt_rc4 # XXX may be AES
|
||||
# aes
|
||||
elif V == 4 and length == 128:
|
||||
self.decipher = self.decipher_aes
|
||||
elif V == 4 and length == 256:
|
||||
raise PDFNotImplementedError('AES256 encryption is currently unsupported')
|
||||
if not set_decipher:
|
||||
# This should usually already be set by now.
|
||||
# If it's not, assume that V4 and newer are using AES
|
||||
if V >= 4:
|
||||
self.decipher = self.decrypt_aes
|
||||
self.ready = True
|
||||
return
|
||||
|
||||
@@ -1776,17 +2043,11 @@ class PDFDocument(object):
|
||||
key = hash.digest()[:min(len(self.decrypt_key) + 5, 16)]
|
||||
return key
|
||||
|
||||
def decrypt_aes(self, objid, genno, data):
|
||||
key = self.genkey(objid, genno)
|
||||
ivector = data[:16]
|
||||
data = data[16:]
|
||||
plaintext = AES.new(key,AES.MODE_CBC,ivector).decrypt(data)
|
||||
# remove pkcs#5 aes padding
|
||||
cutter = -1 * plaintext[-1]
|
||||
plaintext = plaintext[:cutter]
|
||||
return plaintext
|
||||
def genkey_v5(self, objid, genno):
|
||||
# Looks like they stopped this useless obfuscation.
|
||||
return self.decrypt_key
|
||||
|
||||
def decrypt_aes256(self, objid, genno, data):
|
||||
def decrypt_aes(self, objid, genno, data):
|
||||
key = self.genkey(objid, genno)
|
||||
ivector = data[:16]
|
||||
data = data[16:]
|
||||
@@ -2330,6 +2591,17 @@ def decryptBook(userkey, inpath, outpath, inept=True):
|
||||
return 0
|
||||
|
||||
|
||||
def getPDFencryptionType(inpath):
|
||||
if RSA is None:
|
||||
raise ADEPTError("PyCryptodome or OpenSSL must be installed.")
|
||||
with open(inpath, 'rb') as inf:
|
||||
doc = doc = PDFDocument()
|
||||
parser = PDFParser(doc, inf)
|
||||
filter = doc.initialize_and_return_filter()
|
||||
return filter
|
||||
|
||||
|
||||
|
||||
def cli_main():
|
||||
sys.stdout=SafeUnbuffered(sys.stdout)
|
||||
sys.stderr=SafeUnbuffered(sys.stderr)
|
||||
|
||||
Reference in New Issue
Block a user