Source code for config_yourself.provider.kms
# Copyright 2018 Blink Health LLC
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://www.apache.org/licenses/LICENSE-2.0
import re
import boto3
import botocore
from config_yourself.exceptions import InvalidConfig, ConfigException
from .base import CryptoService
[docs]class Service(CryptoService):
"""The original KMS CryptoService
This provider does not need ``secrets``, and will use the AWS default credential chain to get credentials.
"""
def __init__(self, config, _secrets):
# secrets are ignored by the kms provider
self._client = None
self._region = None
self._key = None
self._setup_client(config)
def _setup_client(self, crypto):
# Setup the region for kms by looking at `crypto.region` and fallback on parsing
# the key ARN and extracting the region from it
try:
region = crypto.get("region", None)
except AttributeError as e:
# When the crypto property exists, but is not a dict, we raise this error.
raise InvalidConfig(original=e)
try:
if not region and "key" in crypto and ":" in crypto["key"]:
# try and fetch the region name from the key's arn if specified
# arn:aws:kms:{region}:{account}:{key_name}
arn = crypto["key"].split(":")
if len(arn) == 6:
region = crypto["key"].split(":")[3]
else:
raise IndexError
except TypeError:
# crypto.key is not a string, which is likely a pebkac error!
msg = (
"crypto.key is not a string, "
"please remove the crypto property if no encryption is needed"
)
raise InvalidConfig(message=msg)
except IndexError:
# crypto.key is likely in the non fq ARN alias/form
msg = (
"Could not parse region name from crypto.key. Please ensure you have "
"specified a fully-qualifed KSM key ARN to replace `{}` before continuing".format(
crypto["key"]
)
)
raise InvalidConfig(message=msg)
if not region or region == "":
# region not set or empty
msg = (
"Unable to resolve AWS region to query when decrypting. "
"Please use specify a full KMS key ARN in `crypto.key` or set a specific region "
"by adding a `crypto.region`"
)
raise InvalidConfig(message=msg)
self._region = region
self._key = crypto["key"]
self._client = boto3.client("kms", region_name=self._region)
[docs] def decrypt(self, ciphertext):
try:
res = self._client.decrypt(CiphertextBlob=ciphertext)
return res["Plaintext"]
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "AccessDeniedException":
# when we get this error, the most likely case is region mismatch or bonafide
# permission error, let's make this error helpful by showing which credentials
# we tried to use during decryption
try:
creds = boto3.client(
"sts", region_name=self._region
).get_caller_identity()["Arn"]
except Exception:
creds = None
raise KMSDecryptError(
str(self._region), self._key, creds=creds, original=e
)
else:
raise
[docs]class KMSDecryptError(ConfigException):
def __init__(self, region, key, creds=None, original=None):
msg = ""
if original:
msg = "{}. ".format(original)
msg += "Failed to decrypt in region <{}> with key <{}>".format(region, key)
if creds:
msg += " using credentials <{}>".format(creds)
else:
msg += " with no known credentials"
self.message = msg
super(KMSDecryptError, self).__init__(msg)