Source code for worldline.acquiring.sdk.authentication.oauth2_authenticator

from datetime import datetime, timedelta
from json import loads
from threading import Lock
from typing import Iterable, Optional, Sequence, Tuple
from urllib.parse import ParseResult

from .authenticator import Authenticator
from .oauth2_exception import OAuth2Exception

from worldline.acquiring.sdk.communicator_configuration import CommunicatorConfiguration
from worldline.acquiring.sdk.communication.default_connection import DefaultConnection
from worldline.acquiring.sdk.communication.request_header import RequestHeader


[docs] class OAuth2Authenticator(Authenticator): """ OAuth2 Authenticator implementation. """
[docs] def __init__(self, communicator_configuration: CommunicatorConfiguration): """ Constructs a new OAuth2Authenticator instance using the provided CommunicatorConfiguration. :param communicator_configuration: The configuration object containing the OAuth2 client id, client secret and token URI, connection timeout, and socket timeout. None of these can be None or empty, and the timeout values must be positive. """ Authenticator.__init__(self) if not communicator_configuration.oauth2_client_id: raise ValueError("oauth2_client_id is required") if not communicator_configuration.oauth2_client_secret: raise ValueError("oauth2_client_secret is required") if not communicator_configuration.oauth2_token_uri: raise ValueError("oauth2_client_token_uri is required") if communicator_configuration.connect_timeout <= 0: raise ValueError("connect_timeout must be positive") if communicator_configuration.socket_timeout <= 0: raise ValueError("socket_timeout must be positive") self.__client_id = communicator_configuration.oauth2_client_id self.__client_secret = communicator_configuration.oauth2_client_secret self.__token_uri = communicator_configuration.oauth2_token_uri self.__connect_timeout = communicator_configuration.connect_timeout self.__socket_timeout = communicator_configuration.socket_timeout self.__proxy_configuration = communicator_configuration.proxy_configuration # Only a limited amount of scopes may be sent in one request. # While at the moment all scopes fit in one request, keep this code so we can easily add more token types if necessary. # The empty path will ensure that all paths will match, as each full path ends with an empty string. self.__access_tokens = [ self.__TokenType("", "processing_payment", "processing_refund", "processing_credittransfer", "processing_accountverification", "processing_operation_reverse", "processing_dcc_rate", "services_ping"), ]
[docs] def get_authorization(self, http_method: Optional[str], resource_uri: Optional[ParseResult], request_headers: Optional[Sequence[RequestHeader]]) -> str: token_type = self.get_token_type(resource_uri.path) with token_type.lock: if not token_type.access_token or token_type.access_token_expiration < datetime.now(): token_type.access_token, token_type.access_token_expiration = self.__get_access_token(token_type.scopes) return "Bearer " + token_type.access_token
def __get_access_token(self, scopes: str) -> Tuple[str, datetime]: with DefaultConnection(connect_timeout=self.__connect_timeout, socket_timeout=self.__socket_timeout, max_connections=1, proxy_configuration=self.__proxy_configuration) as connection: request_headers = [RequestHeader("Content-Type", "application/x-www-form-urlencoded")] body = "grant_type=client_credentials&client_id=%s&client_secret=%s&scope=%s" \ % (self.__client_id, self.__client_secret, scopes) start_time = datetime.now() status, _, chunks = connection.post(self.__token_uri, request_headers, body) response_body = OAuth2Authenticator.__collect_chunks(chunks) access_token_response = loads(response_body) if status != 200: error_description = access_token_response["error_description"] if "error_description" in access_token_response else None raise OAuth2Exception("There was an error while retrieving the OAuth2 access token: %s - %s" % (access_token_response["error"], error_description)) expiration_time = start_time + timedelta(seconds=access_token_response["expires_in"]) return access_token_response["access_token"], expiration_time
[docs] def get_token_type(self, path: str): for token_type_entry in self.__access_tokens: path_with_trailing_slash = token_type_entry.path + "/" if path.endswith(token_type_entry.path) or path_with_trailing_slash in path: return token_type_entry raise OAuth2Exception("Scope could not be found for path " + path)
@staticmethod def __collect_chunks(chunks: Iterable[bytes]) -> str: collected_body = b"" for chunk in chunks: collected_body += chunk return collected_body.decode('utf-8') class __TokenType: def __init__(self, path, *scopes): self.scopes = str.join(" ", scopes) self.path = path self.access_token = None self.access_token_expiration = None # Python does not provide a read-write lock implementation out-of-the-box. # Use a simple Lock instead. That does mean that multiple reads have to wait on each other, # but the read-only part is limited to checking the access token and its expiration timestamp, # which should take only a very short time self.lock = Lock()