Remove OpenSSL support; only support PyCryptodome

This allows us to clean up the code a lot.

On Windows, it isn't installed by default and
most of the time not be found at all.

On M1 Macs, the kernel will kill the process instead.

Closes #33.
This commit is contained in:
a980e066a01
2022-02-22 23:16:03 +00:00
committed by noDRM
parent f4634b5eab
commit a1dd63ae5f
16 changed files with 202 additions and 2211 deletions

View File

@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
# ineptepub.py
# Copyright © 2009-2021 by i♥cabbages, Apprentice Harper et al.
# Copyright © 2009-2022 by i♥cabbages, Apprentice Harper et al.
# Released under the terms of the GNU General Public Licence, version 3
# <http://www.gnu.org/licenses/>
@@ -31,13 +31,14 @@
# 6.6 - Import tkFileDialog, don't assume something else will import it.
# 7.0 - Add Python 3 compatibility for calibre 5.0
# 7.1 - Add ignoble support, dropping the dedicated ignobleepub.py script
# 7.2 - Only support PyCryptodome; clean up the code
"""
Decrypt Adobe Digital Editions encrypted ePub books.
"""
__license__ = 'GPL v3'
__version__ = "7.1"
__version__ = "7.2"
import sys
import os
@@ -49,6 +50,15 @@ from zipfile import ZipInfo, ZipFile, ZIP_STORED, ZIP_DEFLATED
from contextlib import closing
from lxml import etree
try:
from Cryptodome.Cipher import AES, PKCS1_v1_5
from Cryptodome.PublicKey import RSA
from Cryptodome.Util.Padding import unpad
except ImportError:
from Crypto.Cipher import AES, PKCS1_v1_5
from Crypto.PublicKey import RSA
from Crypto.Util.Padding import unpad
# Wrap a stream so that output gets flushed immediately
# and also make sure that any unicode strings get
# encoded using "replace" before writing them.
@@ -120,234 +130,6 @@ class ADEPTError(Exception):
class ADEPTNewVersionError(Exception):
pass
def _load_crypto_libcrypto():
from ctypes import CDLL, POINTER, c_void_p, c_char_p, c_int, c_long, \
Structure, c_ulong, create_string_buffer, cast
from ctypes.util import find_library
if iswindows:
libcrypto = find_library('libeay32')
else:
libcrypto = find_library('crypto')
if libcrypto is None:
raise ADEPTError('libcrypto not found')
libcrypto = CDLL(libcrypto)
RSA_NO_PADDING = 3
AES_MAXNR = 14
c_char_pp = POINTER(c_char_p)
c_int_p = POINTER(c_int)
class RSA(Structure):
pass
RSA_p = POINTER(RSA)
class AES_KEY(Structure):
_fields_ = [('rd_key', c_long * (4 * (AES_MAXNR + 1))),
('rounds', c_int)]
AES_KEY_p = POINTER(AES_KEY)
def F(restype, name, argtypes):
func = getattr(libcrypto, name)
func.restype = restype
func.argtypes = argtypes
return func
d2i_RSAPrivateKey = F(RSA_p, 'd2i_RSAPrivateKey',
[RSA_p, c_char_pp, c_long])
RSA_size = F(c_int, 'RSA_size', [RSA_p])
RSA_private_decrypt = F(c_int, 'RSA_private_decrypt',
[c_int, c_char_p, c_char_p, RSA_p, c_int])
RSA_free = F(None, 'RSA_free', [RSA_p])
AES_set_decrypt_key = F(c_int, 'AES_set_decrypt_key',
[c_char_p, c_int, AES_KEY_p])
AES_cbc_encrypt = F(None, 'AES_cbc_encrypt',
[c_char_p, c_char_p, c_ulong, AES_KEY_p, c_char_p,
c_int])
class RSA(object):
def __init__(self, der):
buf = create_string_buffer(der)
pp = c_char_pp(cast(buf, c_char_p))
rsa = self._rsa = d2i_RSAPrivateKey(None, pp, len(der))
if rsa is None:
raise ADEPTError('Error parsing ADEPT user key DER')
def decrypt(self, from_):
rsa = self._rsa
to = create_string_buffer(RSA_size(rsa))
dlen = RSA_private_decrypt(len(from_), from_, to, rsa,
RSA_NO_PADDING)
if dlen < 0:
raise ADEPTError('RSA decryption failed')
return to[:dlen]
def __del__(self):
if self._rsa is not None:
RSA_free(self._rsa)
self._rsa = None
class AES(object):
def __init__(self, userkey):
self._blocksize = len(userkey)
if (self._blocksize != 16) and (self._blocksize != 24) and (self._blocksize != 32) :
raise ADEPTError('AES improper key used')
return
key = self._key = AES_KEY()
rv = AES_set_decrypt_key(userkey, len(userkey) * 8, key)
if rv < 0:
raise ADEPTError('Failed to initialize AES key')
def decrypt(self, data):
out = create_string_buffer(len(data))
iv = (b"\x00" * self._blocksize)
rv = AES_cbc_encrypt(data, out, len(data), self._key, iv, 0)
if rv == 0:
raise ADEPTError('AES decryption failed')
return out.raw
return (AES, RSA)
def _load_crypto_pycrypto():
try:
from Cryptodome.Cipher import AES as _AES
from Cryptodome.PublicKey import RSA as _RSA
from Cryptodome.Cipher import PKCS1_v1_5 as _PKCS1_v1_5
except:
from Crypto.Cipher import AES as _AES
from Crypto.PublicKey import RSA as _RSA
from Crypto.Cipher import PKCS1_v1_5 as _PKCS1_v1_5
# ASN.1 parsing code from tlslite
class ASN1Error(Exception):
pass
class ASN1Parser(object):
class Parser(object):
def __init__(self, bytes):
self.bytes = bytes
self.index = 0
def get(self, length):
if self.index + length > len(self.bytes):
raise ASN1Error("Error decoding ASN.1")
x = 0
for count in range(length):
x <<= 8
x |= self.bytes[self.index]
self.index += 1
return x
def getFixBytes(self, lengthBytes):
bytes = self.bytes[self.index : self.index+lengthBytes]
self.index += lengthBytes
return bytes
def getVarBytes(self, lengthLength):
lengthBytes = self.get(lengthLength)
return self.getFixBytes(lengthBytes)
def getFixList(self, length, lengthList):
l = [0] * lengthList
for x in range(lengthList):
l[x] = self.get(length)
return l
def getVarList(self, length, lengthLength):
lengthList = self.get(lengthLength)
if lengthList % length != 0:
raise ASN1Error("Error decoding ASN.1")
lengthList = int(lengthList/length)
l = [0] * lengthList
for x in range(lengthList):
l[x] = self.get(length)
return l
def startLengthCheck(self, lengthLength):
self.lengthCheck = self.get(lengthLength)
self.indexCheck = self.index
def setLengthCheck(self, length):
self.lengthCheck = length
self.indexCheck = self.index
def stopLengthCheck(self):
if (self.index - self.indexCheck) != self.lengthCheck:
raise ASN1Error("Error decoding ASN.1")
def atLengthCheck(self):
if (self.index - self.indexCheck) < self.lengthCheck:
return False
elif (self.index - self.indexCheck) == self.lengthCheck:
return True
else:
raise ASN1Error("Error decoding ASN.1")
def __init__(self, bytes):
p = self.Parser(bytes)
p.get(1)
self.length = self._getASN1Length(p)
self.value = p.getFixBytes(self.length)
def getChild(self, which):
p = self.Parser(self.value)
for x in range(which+1):
markIndex = p.index
p.get(1)
length = self._getASN1Length(p)
p.getFixBytes(length)
return ASN1Parser(p.bytes[markIndex:p.index])
def _getASN1Length(self, p):
firstLength = p.get(1)
if firstLength<=127:
return firstLength
else:
lengthLength = firstLength & 0x7F
return p.get(lengthLength)
class AES(object):
def __init__(self, key):
self._aes = _AES.new(key, _AES.MODE_CBC, b'\x00'*16)
def decrypt(self, data):
return self._aes.decrypt(data)
class RSA(object):
def __init__(self, der):
key = ASN1Parser([x for x in der])
key = [key.getChild(x).value for x in range(1, 4)]
key = [self.bytesToNumber(v) for v in key]
self._rsa = _RSA.construct(key)
def bytesToNumber(self, bytes):
total = 0
for byte in bytes:
total = (total << 8) + byte
return total
def decrypt(self, data):
return _PKCS1_v1_5.new(self._rsa).decrypt(data, 172)
return (AES, RSA)
def _load_crypto():
AES = RSA = None
cryptolist = (_load_crypto_libcrypto, _load_crypto_pycrypto)
if sys.platform.startswith('win'):
cryptolist = (_load_crypto_pycrypto, _load_crypto_libcrypto)
for loader in cryptolist:
try:
AES, RSA = loader()
break
except (ImportError, ADEPTError):
pass
return (AES, RSA)
AES, RSA = _load_crypto()
META_NAMES = ('mimetype', 'META-INF/rights.xml')
NSMAP = {'adept': 'http://ns.adobe.com/adept',
'enc': 'http://www.w3.org/2001/04/xmlenc#'}
@@ -355,7 +137,7 @@ NSMAP = {'adept': 'http://ns.adobe.com/adept',
class Decryptor(object):
def __init__(self, bookkey, encryption):
enc = lambda tag: '{%s}%s' % (NSMAP['enc'], tag)
self._aes = AES(bookkey)
self._aes = AES.new(bookkey, AES.MODE_CBC, b'\x00'*16)
encryption = etree.fromstring(encryption)
self._encrypted = encrypted = set()
self._otherData = otherData = set()
@@ -373,11 +155,11 @@ class Decryptor(object):
path = path.encode('utf-8')
encrypted.add(path)
json_elements_to_remove.add(elem.getparent().getparent())
else:
else:
path = path.encode('utf-8')
otherData.add(path)
self._has_remaining_xml = True
for elem in json_elements_to_remove:
elem.getparent().remove(elem)
@@ -398,8 +180,8 @@ class Decryptor(object):
except:
# possibly not compressed by zip - just return bytes
return bytes
return decompressed_bytes
return decompressed_bytes
def decrypt(self, path, data):
if path.encode('utf-8') in self._encrypted:
data = self._aes.decrypt(data)[16:]
@@ -446,49 +228,26 @@ def isPassHashBook(inpath):
return True
except:
pass
return False
# Checks the license file and returns the UUID the book is licensed for.
# Checks the license file and returns the UUID the book is licensed for.
# This is used so that the Calibre plugin can pick the correct decryption key
# first try without having to loop through all possible keys.
def adeptGetUserUUID(inpath):
def adeptGetUserUUID(inpath):
with closing(ZipFile(open(inpath, 'rb'))) as inf:
try:
rights = etree.fromstring(inf.read('META-INF/rights.xml'))
adept = lambda tag: '{%s}%s' % (NSMAP['adept'], tag)
expr = './/%s' % (adept('user'),)
user_uuid = ''.join(rights.findtext(expr))
if user_uuid[:9] != "urn:uuid:":
if user_uuid[:9] != "urn:uuid:":
return None
return user_uuid[9:]
except:
return None
def verify_book_key(bookkey):
if bookkey[-17] != '\x00' and bookkey[-17] != 0:
# Byte not null, invalid result
return False
if ((bookkey[0] != '\x02' and bookkey[0] != 2) and
((bookkey[0] != '\x00' and bookkey[0] != 0) or
(bookkey[1] != '\x02' and bookkey[1] != 2))):
# Key not starting with "00 02" or "02" -> error
return False
keylen = len(bookkey) - 17
for i in range(1, keylen):
if bookkey[i] == 0 or bookkey[i] == '\x00':
# Padding data contains a space - that's not allowed.
# Probably bad decryption.
return False
return True
def decryptBook(userkey, inpath, outpath):
if AES is None:
raise ADEPTError("PyCrypto or OpenSSL must be installed.")
with closing(ZipFile(open(inpath, 'rb'))) as inf:
namelist = inf.namelist()
if 'META-INF/rights.xml' not in namelist or \
@@ -508,7 +267,7 @@ def decryptBook(userkey, inpath, outpath):
print("Try getting your distributor to give you a new ACSM file, then open that in an old version of ADE (2.0).")
print("If your book distributor is not enforcing the new DRM yet, this will give you a copy with the old DRM.")
raise ADEPTNewVersionError("Book uses new ADEPT encryption")
if len(bookkey) == 172:
print("{0:s} is a secure Adobe Adept ePub.".format(os.path.basename(inpath)))
elif len(bookkey) == 64:
@@ -519,28 +278,21 @@ def decryptBook(userkey, inpath, outpath):
if len(bookkey) != 64:
# Normal Adobe ADEPT
rsa = RSA(userkey)
bookkey = rsa.decrypt(base64.b64decode(bookkey.encode('ascii')))
rsakey = RSA.import_key(userkey) # parses the ASN1 structure
bookkey = base64.b64decode(bookkey)
try:
bookkey = PKCS1_v1_5.new(rsakey).decrypt(bookkey, None) # automatically unpads
except ValueError:
bookkey = None
# Verify key:
if len(bookkey) > 16:
# Padded as per RSAES-PKCS1-v1_5
if verify_book_key(bookkey):
bookkey = bookkey[-16:]
else:
print("Could not decrypt {0:s}. Wrong key".format(os.path.basename(inpath)))
return 2
else:
if bookkey is None:
print("Could not decrypt {0:s}. Wrong key".format(os.path.basename(inpath)))
return 2
else:
# Adobe PassHash / B&N
key = base64.b64decode(userkey)[:16]
aes = AES(key)
bookkey = aes.decrypt(base64.b64decode(bookkey))
if type(bookkey[-1]) != int:
pad = ord(bookkey[-1])
else:
pad = bookkey[-1]
bookkey = bookkey[:-pad]
bookkey = base64.b64decode(bookkey)
bookkey = unpad(AES.new(key, AES.MODE_CBC, b'\x00'*16).decrypt(bookkey), 16) # PKCS#7
if len(bookkey) > 16:
bookkey = bookkey[-16:]