acsm-calibre-plugin/calibre-plugin/libadobe.py
Florian Bach eebb523b6b Bunch of ADE-related changes
- Support for emulating multiple ADE versions
- Support for importing existing Win/Mac ADE activation
- Various small fixes and cleanup
2021-11-20 06:55:39 +01:00

521 lines
14 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Helper library with code needed for Adobe stuff.
'''
from uuid import getnode
import os, hashlib, base64
import urllib.request, ssl
from datetime import datetime, timedelta
from lxml import etree
import rsa
try:
from Crypto import Random
from Crypto.Cipher import AES
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA
except ImportError:
# Debian (and Ubuntu) ship pycryptodome, but not in its compatible mode with pycrypto
# If `Crypto` can't be found, try under pycryptodome's own namespace
from Cryptodome import Random
from Cryptodome.Cipher import AES
from Cryptodome.PublicKey import RSA
from Cryptodome.Hash import SHA
from oscrypto import keys
from oscrypto.asymmetric import dump_certificate, dump_private_key, dump_public_key
VAR_ACS_SERVER_HTTP = "http://adeactivate.adobe.com/adept"
VAR_ACS_SERVER_HTTPS = "https://adeactivate.adobe.com/adept"
FILE_DEVICEKEY = "devicesalt"
FILE_DEVICEXML = "device.xml"
FILE_ACTIVATIONXML = "activation.xml"
# Lists of different ADE "versions" we know about
VAR_VER_SUPP_CONFIG_NAMES = [ "ADE 1.7.2", "ADE 2.0.1", "ADE 3.0.1", "ADE 4.0.3", "ADE 4.5.10", "ADE 4.5.11" ]
VAR_VER_SUPP_VERSIONS = [ "ADE WIN 9,0,1131,27", "2.0.1.78765", "3.0.1.91394", "4.0.3.123281",
"com.adobe.adobedigitaleditions.exe v4.5.10.186048",
"com.adobe.adobedigitaleditions.exe v4.5.11.187303" ]
VAR_VER_HOBBES_VERSIONS = [ "9.0.1131.27", "9.3.58046", "10.0.85385", "12.0.123217", "12.5.4.186049", "12.5.4.187298" ]
VAR_VER_OS_IDENTIFIERS = [ "Windows Vista", "Windows Vista", "Windows 8", "Windows 8", "Windows 8", "Windows 8" ]
# This is a list of ALL versions we know (and can potentially use if present in a config file).
# Must have the same length / size as the four lists above.
VAR_VER_BUILD_IDS = [ 1131, 78765, 91394, 123281, 186048, 187303 ]
# This is a list of versions that can be used for new authorizations:
VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE = [ 78765, 91394, 123281, 187303 ]
# This is a list of versions to be displayed in the version changer.
VAR_VER_ALLOWED_BUILD_IDS_SWITCH_TO = [ 1131, 78765, 91394, 123281, 187303 ]
# Versions >= this one are using HTTPS
VAR_VER_NEED_HTTPS_BUILD_ID_LIMIT = 123281
# Default build ID to use - ADE 2.0.1
VAR_VER_DEFAULT_BUILD_ID = 78765
def are_ade_version_lists_valid():
# These five lists MUST all have the same amount of elements.
# Otherwise that will cause all kinds of issues.
fail = False
if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_SUPP_VERSIONS):
fail = True
if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_HOBBES_VERSIONS):
fail = True
if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_OS_IDENTIFIERS):
fail = True
if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_BUILD_IDS):
fail = True
if fail:
print("Internal error in DeACSM: Mismatched version list lenghts.")
print("This should never happen, please open a bug report.")
return False
return True
devkey_bytes = None
def get_devkey_path():
global FILE_DEVICEKEY
return FILE_DEVICEKEY
def get_device_path():
global FILE_DEVICEXML
return FILE_DEVICEXML
def get_activation_xml_path():
global FILE_ACTIVATIONXML
return FILE_ACTIVATIONXML
def update_account_path(folder_path: str):
global FILE_DEVICEKEY, FILE_DEVICEXML, FILE_ACTIVATIONXML
FILE_DEVICEKEY = os.path.join(folder_path, "devicesalt")
FILE_DEVICEXML = os.path.join(folder_path, "device.xml")
FILE_ACTIVATIONXML = os.path.join(folder_path, "activation.xml")
def createDeviceKeyFile():
# Original implementation: Device::createDeviceKeyFile()
DEVICE_KEY_SIZE = 16
global devkey_bytes
devkey_bytes = Random.get_random_bytes(DEVICE_KEY_SIZE)
f = open(FILE_DEVICEKEY, "wb")
f.write(devkey_bytes)
f.close()
def get_mac_address():
mac1 = getnode()
mac2 = getnode()
if (mac1 != mac2) or ((mac1 >> 40) % 2):
return bytes([1, 2, 3, 4, 5, 0])
return mac1.to_bytes(6, byteorder='big')
def makeSerial(random: bool):
# Original implementation: std::string Device::makeSerial(bool random)
sha_out = None
if not random:
try:
# Linux
uid = os.getuid()
import pwd
username = pwd.getpwuid(uid).pw_name.encode("utf-8").decode("latin-1")
except:
# Windows
uid = 1000
username = os.getlogin().encode("utf-8").decode("latin-1")
mac_address = get_mac_address()
dataToHash = "%d:%s:%02x:%02x:%02x:%02x:%02x:%02x\x00" % (uid, username,
mac_address[0], mac_address[1], mac_address[2],
mac_address[3], mac_address[4], mac_address[5])
sha_out = hashlib.sha1(dataToHash.encode('latin-1')).hexdigest().lower()
else:
sha_out = Random.get_random_bytes(20).hex().lower()
return sha_out
def makeFingerprint(serial: str):
# Original implementation: std::string Device::makeFingerprint(const std::string& serial)
# base64(sha1(serial + privateKey))
# Fingerprint must be 20 bytes or less.
f = open(FILE_DEVICEKEY, "rb")
devkey_bytes = f.read()
f.close()
str_to_hash = serial + devkey_bytes.decode('latin-1')
hashed_str = hashlib.sha1(str_to_hash.encode('latin-1')).digest()
b64str = base64.b64encode(hashed_str)
return b64str
############################################## HTTP stuff:
def sendHTTPRequest_DL2FILE(URL: str, outputfile: str):
headers = {
"Accept": "*/*",
"User-Agent": "book2png",
}
req = urllib.request.Request(url=URL, headers=headers)
handler = urllib.request.urlopen(req)
chunksize = 16 * 1024
ret_code = handler.getcode()
loc = None
try:
loc = req.headers.get("Location")
except:
pass
if loc is not None:
return sendHTTPRequest_DL2FILE(loc)
if ret_code != 200:
return ret_code
with open(outputfile, "wb") as f:
while True:
chunk = handler.read(chunksize)
if not chunk:
break
f.write(chunk)
return 200
def sendHTTPRequest_getSimple(URL: str):
headers = {
"Accept": "*/*",
"User-Agent": "book2png",
}
# Ignore SSL:
# It appears as if lots of book distributors have either invalid or expired certs ...
# No idea how Adobe handles that (pinning?), but we can just ignore SSL errors and continue anyways.
# Not the best solution, but it works.
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
req = urllib.request.Request(url=URL, headers=headers)
handler = urllib.request.urlopen(req, context=ctx)
content = handler.read()
loc = None
try:
loc = req.headers.get("Location")
except:
pass
if loc is not None:
return sendHTTPRequest_getSimple(loc)
return content
def sendPOSTHTTPRequest(URL: str, document: bytes, type: str, returnRC = False):
headers = {
"Accept": "*/*",
"User-Agent": "book2png",
"Content-Type": type
}
# Ignore SSL:
# It appears as if lots of book distributors have either invalid or expired certs ...
# No idea how Adobe handles that (pinning?), but we can just ignore SSL errors and continue anyways.
# Not the best solution, but it works.
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
req = urllib.request.Request(url=URL, headers=headers, data=document)
handler = urllib.request.urlopen(req, context=ctx)
ret_code = handler.getcode()
if (ret_code == 204 and returnRC):
return 204, ""
if (ret_code != 200):
print("Post request returned something other than 200 - returned %d" % (ret_code))
content = handler.read()
loc = None
try:
loc = req.headers.get("Location")
except:
pass
if loc is not None:
return sendPOSTHTTPRequest(loc, document, type, returnRC)
if returnRC:
return ret_code, content
return content
def sendHTTPRequest(URL: str):
return sendHTTPRequest_getSimple(URL)
def sendRequestDocu(document: str, URL: str):
return sendPOSTHTTPRequest(URL, document.encode("utf-8"), "application/vnd.adobe.adept+xml", False)
def sendRequestDocuRC(document: str, URL: str):
return sendPOSTHTTPRequest(URL, document.encode("utf-8"), "application/vnd.adobe.adept+xml", True)
######### Encryption and signing ###################
def encrypt_with_device_key(data):
global devkey_bytes
if devkey_bytes is None:
f = open(FILE_DEVICEKEY, "rb")
devkey_bytes = f.read()
f.close()
remain = 16
if (len(data) % 16):
remain = 16 - (len(data) % 16)
data += bytes([remain])*remain
iv = Random.get_random_bytes(16)
cip = AES.new(devkey_bytes, AES.MODE_CBC, iv)
encrypted = cip.encrypt(data)
res = iv + encrypted
return res
def decrypt_with_device_key(data):
global devkey_bytes
if devkey_bytes is None:
f = open(FILE_DEVICEKEY, "rb")
devkey_bytes = f.read()
f.close()
cip = AES.new(devkey_bytes, AES.MODE_CBC, data[:16])
decrypted = cip.decrypt(data[16:])
# Remove padding
decrypted = decrypted[:-decrypted[-1]]
return decrypted
def addNonce():
dt = datetime.utcnow()
usec = dt.microsecond
sec = (dt - datetime(1970,1,1)).total_seconds()
Ntime = int(int(sec * 1000) + usec/1000)
# Unixtime to gregorian timestamp
Ntime += 62167219200000
final = bytearray(Ntime.to_bytes(8, 'little'))
tmp = 0
final.extend(tmp.to_bytes(4, 'little'))
ret = ""
ret += "<adept:nonce>%s</adept:nonce>" % (base64.b64encode(final).decode("utf-8"))
m10m = dt + timedelta(minutes=10)
m10m_str = m10m.strftime("%Y-%m-%dT%H:%M:%SZ")
ret += "<adept:expiration>%s</adept:expiration>" % (m10m_str)
return ret
def get_cert_from_pkcs12(_pkcs12, _key):
_, cert, _ = keys.parse_pkcs12(_pkcs12, _key)
cert = dump_certificate(cert, encoding="der")
return cert
def sign_node(node):
sha_hash = hash_node(node)
sha_hash = sha_hash.digest()
# print("Hash is " + sha_hash.hex())
global devkey_bytes
global pkcs12
if devkey_bytes is None:
f = open(FILE_DEVICEKEY, "rb")
devkey_bytes = f.read()
f.close()
try:
activationxml = etree.parse(FILE_ACTIVATIONXML)
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
pkcs12 = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("pkcs12"))).text
except:
return None
my_pkcs12 = base64.b64decode(pkcs12)
my_priv_key, _, _ = keys.parse_pkcs12(my_pkcs12, base64.b64encode(devkey_bytes))
my_priv_key = dump_private_key(my_priv_key, None, "der")
key = rsa.PrivateKey.load_pkcs1(RSA.importKey(my_priv_key).exportKey())
keylen = rsa.pkcs1.common.byte_size(key.n)
padded = rsa.pkcs1._pad_for_signing(sha_hash, keylen)
payload = rsa.pkcs1.transform.bytes2int(padded)
encrypted = key.blinded_encrypt(payload)
block = rsa.pkcs1.transform.int2bytes(encrypted, keylen)
signature = base64.b64encode(block).decode()
# print("sig is %s\n" % block.hex())
return signature
def hash_node(node):
hash_ctx = SHA.new()
hash_node_ctx(node, hash_ctx)
return hash_ctx
ASN_NONE = 0
ASN_NS_TAG = 1 # aka "BEGIN_ELEMENT"
ASN_CHILD = 2 # aka "END_ATTRIBUTES"
ASN_END_TAG = 3 # aka "END_ELEMENT"
ASN_TEXT = 4 # aka "TEXT_NODE"
ASN_ATTRIBUTE = 5 # aka "ATTRIBUTE"
debug = False
def hash_node_ctx(node, hash_ctx):
qtag = etree.QName(node.tag)
if (qtag.localname == "hmac"):
return
hash_do_append_tag(hash_ctx, ASN_NS_TAG)
hash_do_append_string(hash_ctx, qtag.namespace)
hash_do_append_string(hash_ctx, qtag.localname)
attrKeys = node.keys()
# Attributes need to be sorted
attrKeys.sort()
# TODO Implement UTF-8 bytewise sorting:
# "Attributes are sorted first by their namespaces and
# then by their names; sorting is done bytewise on UTF-8
# representations."
for attribute in attrKeys:
# Hash all the attributes
hash_do_append_tag(hash_ctx, ASN_ATTRIBUTE)
hash_do_append_string(hash_ctx, "") # TODO: "Element namespace"? Whatever that means...
hash_do_append_string(hash_ctx, attribute)
hash_do_append_string(hash_ctx, node.get(attribute))
hash_do_append_tag(hash_ctx, ASN_CHILD)
if (node.text is not None):
# If there's raw text, hash that.
# This code block used to just be the following:
# hash_do_append_tag(hash_ctx, ASN_TEXT)
# hash_do_append_string(hash_ctx, node.text.strip())
# though that only works with text nodes < 0x7fff.
# While I doubt we'll ever encounter text nodes larger than 32k in
# this application, I want to implement the spec correctly.
# So there's a loop going over the text, hashing 32k chunks.
text = node.text.strip()
textlen = len(text)
if textlen > 0:
done = 0
remaining = 0
while True:
remaining = textlen - done
if remaining > 0x7fff:
remaining = 0x7fff
hash_do_append_tag(hash_ctx, ASN_TEXT)
hash_do_append_string(hash_ctx, text[done:done+remaining])
done += remaining
if done >= textlen:
break
for child in node:
# If there's child nodes, hash these as well.
hash_node_ctx(child, hash_ctx)
hash_do_append_tag(hash_ctx, ASN_END_TAG)
def hash_do_append_string(hash_ctx, string: str):
str_bytes = bytes(string, encoding="utf-8")
length = len(str_bytes)
len_upper = int(length / 256)
len_lower = int(length & 0xFF)
hash_do_append_raw_bytes(hash_ctx, [len_upper, len_lower])
hash_do_append_raw_bytes(hash_ctx, str_bytes)
def hash_do_append_tag(hash_ctx, tag: int):
if (tag > 5):
return
hash_do_append_raw_bytes(hash_ctx, [tag])
def hash_do_append_raw_bytes(hash_ctx, data: bytes):
hash_ctx.update(bytearray(data))