from akamai.edgegrid import EdgeGridAuth from urllib.parse import urljoin from datetime import datetime, timedelta import requests, json import re import time import names from flask import current_app # Control Center Account is 'Marketplace Test01' contract_id = 'ctr_V-41DUHPB' # admin user API credential client_secret = 'HP299+k2YnnyRKtbzKYtGOrM4ve1tXlrsn6o3ZZ/Mdw=' host = 'akab-hkgndwdao42uuh4y-q4itpussnn3gck4x.luna.akamaiapis.net' access_token = 'akab-oaav5wopp546rowg-gjkaonxqkb6suzo7' client_token = 'akab-4daj4uu4qpqiukly-5kbvowcutmvydqk2' baseurl = 'https://' + host # unused hours of user unused_hours = 5 sess = requests.Session() sess.auth = EdgeGridAuth( client_token=client_token, client_secret=client_secret, access_token=access_token ) def get_all_users(): response = sess.get(urljoin( baseurl, '/identity-management/v3/user-admin/ui-identities?authGrants=true')) s_code = response.status_code body = response.json() users = body return users def verify_user_assigned_ever(user, firstName, lastName): is_user_assigned_ever = False if user['firstName'] == firstName and user['lastName'] == lastName: is_user_assigned_ever = True return is_user_assigned_ever def verify_user_email_domain(user): pattern = r'^[a-zA-Z0-9_.+-]+@akamai-lab\.com$' is_correct_domain = False email = user['email'] if re.fullmatch(pattern, email): is_correct_domain = True return is_correct_domain def verify_user_lastLoginDate(user, hours): is_valid = False if 'lastLoginDate' in user: # if user has ever logged in to Control Center date = user['lastLoginDate'] loginDate = datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ") # current_app.logger.debug('lastLoginDate='+date) currentDate = datetime.now() # current_app.logger.debug('now='+str(currentDate)) difference = (currentDate - loginDate).total_seconds() / 3600 difference = int(difference) # current_app.logger.debug('hoursSinceLastLogin='+str(difference)+' hours') if difference > hours: # if user has not logged in longer than unusedDays is_valid = True else: # user has never logged in to Control Center yet is_valid = True return is_valid def select_user(users, hours, firstName, lastName): selectedUser = None current_app.logger.debug('hours='+str(hours)) # To avoid user pool exhaustion, we reuse existing users if they remain in the pool. for user in users: if verify_user_assigned_ever(user, firstName, lastName): if verify_user_lastLoginDate(user, hours) is False: selectedUser = user break # if there is no existing user in the pool, we search for a random available user if selectedUser == None: for user in users: if verify_user_email_domain(user) and verify_user_lastLoginDate(user, hours): uiIdentityId = user['uiIdentityId'] selectedUser = update_user(uiIdentityId, firstName, lastName) break if selectedUser != None: current_app.logger.debug('selectedUser= %s', selectedUser) else: raise Exception('cannot find an available user') # we might want to create a new group and a user here??? return selectedUser def reset_user_pwd(u_id): response = sess.post(urljoin(baseurl, '/identity-management/v3/user-admin/ui-identities/' + u_id+'/reset-password?sendEmail=false')) pwd = response.json()['newPassword'] return pwd def update_user(uiIdentityId, firstName, lastName): requestBody = { "firstName": "John", "lastName": "Doe", "country": "USA", "phone": "3456788765", "contactType": "Billing", "preferredLanguage": "English", "sessionTimeOut": 64800, "timeZone": "GMT" } current_app.logger.debug('uiIdentityId= %s', uiIdentityId) requestBody['firstName'] = firstName requestBody['lastName'] = lastName current_app.logger.debug('requestBody= %s', requestBody) response = sess.put(urljoin(baseurl, '/identity-management/v3/user-admin/ui-identities/' + uiIdentityId + '/basic-info'), json=requestBody) s_code = response.status_code current_app.logger.debug('update_user status code= %s', s_code) body = response.json() current_app.logger.debug('update_user response body= %s', body) return body def find_credential(authorizedUser, file_path): credential =[] file = open(file_path) clients = json.load(file) current_app.logger.debug(f'clients= {clients}') for client in clients: user = client["authorizedUser"] if user == authorizedUser: credential = client["credentials"] break if len(credential) > 0 : print(f'credentials= {credential}') return credential else: return None def create_credential(credential): # this credential belongs to the selected user client_secret = credential["client_secret"] host = credential["host"] access_token = credential["access_token"] client_token = credential["client_token"] baseurl = "https://" + host new_credential={ "client_secret": "", "host": "", "access_token": "", "client_token": "" } new_credential["host"] = host new_credential["access_token"] = access_token sess = requests.Session() sess.auth = EdgeGridAuth( client_token=client_token, client_secret=client_secret, access_token=access_token ) headers = { "Accept":"application/json"} # create a new credential for the selected user response = sess.post(urljoin(baseurl, "/identity-management/v3/api-clients/self/credentials"), headers=headers) s_code = response.status_code current_app.logger.debug(f"status code= {s_code}") if s_code == 201: body = response.json() current_app.logger.debug(f"response body= {body}") new_credential["client_secret"] = body["clientSecret"] new_credential["client_token"] = body["clientToken"] credentialId = body["credentialId"] current_app.logger.info(f"new credentialId= {credentialId}") # if a new credential is created, start updating its expiration payload = { "expiresOn": "2020-10-11T23:06:59.000Z", "status": "ACTIVE", "description": "Expiration 4 hours" } current = datetime.now() current_app.logger.debug (f"current= {current}") expiration = current + timedelta(hours = 4) current_app.logger.debug(f"expiration= {expiration}") payload["expiresOn"] = expiration.strftime("%Y-%m-%dT%H:%M:%S.%fZ") current_app.logger.debug(f"payload= {payload}") path = "/identity-management/v3/api-clients/self/credentials/{}".format(credentialId) headers = { "Content-Type": "application/json", "Accept": "application/json"} # update the new credential expiration response = sess.put(urljoin(baseurl, path), headers=headers, json=payload) s_code = response.status_code current_app.logger.debug(f"status code= {s_code}") if s_code == 200: body = response.json() current_app.logger.debug(f"update credential response= {body}") return new_credential else: return None else: return None ######################################################### # The following functions are for administrator only!!! # ######################################################### def is_user_in_users(users, uiUserName): is_user_in_users = False users = get_all_users() for user in users: if user['uiUserName'] == uiUserName: is_user_in_users = True current_app.logger.debug('user %s exists in all users', uiUserName) break if is_user_in_users == False: current_app.logger.debug( 'cannot find user %s in all users', uiUserName) return is_user_in_users ######################################################### # functions for deleting old properties # ######################################################### def get_properties_by_group(group_id): if isinstance(group_id, str) == False: group_id = str(group_id) response = sess.get(urljoin( baseurl, '/papi/v1/properties?contractId='+contract_id+'&groupId='+group_id)) properties = response.json() return properties # this function is used by delete_properties(properties) def get_property_details(property_id, contract_id, group_id): response = sess.get(urljoin(baseurl, '/papi/v1/properties/' + property_id+'?contractId='+contract_id+'&groupId='+group_id)) s_code = response.status_code property_details = response.json() if s_code != 200: current_app.logger.debug( 'failed to check status of property '+property_id) current_app.logger.debug(property_details) else: current_app.logger.debug('checked status of property '+property_id) return property_details def deactivate_property(property_id, network, property_version): deactivate_payload = { "acknowledgeAllWarnings": False, "activationType": "DEACTIVATE", "fastPush": True, "ignoreHttpErrors": True, "notifyEmails": ["learn@akamai.com"], "useFastFallback": False, "network": "STAGING", "propertyVersion": 0 } activation_link = None if network == 'STAGING': deactivate_payload['network'] = 'STAGING' elif network == 'PRODUCTION': deactivate_payload['network'] = 'PRODUCTION' else: current_app.abort( 500, description="deactivation target network value is invalid >> network= "+network) current_app.logger.debug('deactivation target network is ' + str(deactivate_payload['network'])) deactivate_payload['propertyVersion'] = property_version current_app.logger.debug('property_version= '+str(deactivate_payload['propertyVersion']) ) response = sess.post(urljoin(baseurl, '/papi/v1/properties/' + property_id+'/activations'), json=deactivate_payload) s_code = response.status_code body = response.json() current_app.logger.debug('deactivation response= '+str(body)) activation_link = str(body['activationLink']) if s_code != 201: current_app.logger.debug('failed to deactivate property '+property_id) current_app.logger.debug(body) else: current_app.logger.debug('started deactivation of property ' + property_id + ' with activationLink '+activation_link) return activation_link def delete_properties(properties): deleted = False staging_version = None production_version = None if len(properties['properties']['items']) > 0: for property in properties['properties']['items']: current_app.logger.debug('property= ' + str(property)) property_id = property['propertyId'] group_id = property['groupId'] current_app.logger.debug('property_id= '+property_id) current_app.logger.debug('contract_id= '+contract_id) current_app.logger.debug('group_id= '+group_id) # Check property activation status property_details = get_property_details( property_id, contract_id, group_id) current_app.logger.debug( 'property_details= '+str(property_details)) # Deactivate property if it is active in staging or network staging_version = property_details['properties']['items'][0]['stagingVersion'] if staging_version != 'None' and isinstance(staging_version, int): current_app.logger.debug('property '+property_id+' version ' + str(staging_version)+' is active in staging network') activation_link = deactivate_property( property_id, 'STAGING', staging_version) if activation_link != None: current_app.logger.info( 'deactivation started in staging network. we will wait until it finishes') while True: time.sleep(1) response = sess.get(urljoin(baseurl, activation_link)) s_code = response.status_code body = response.json() current_app.logger.debug( 'get activation response code= '+str(s_code)) current_app.logger.debug( 'get activation response body= '+str(body)) status = str(body['activations']['items'][0]['status']) activation_id = str( response['activations']['items'][0]['activationId']) current_app.logger.debug('activation_id ' + activation_id+' is '+status) if status == 'DEACTIVATED': current_app.logger.info( 'finished deactivation of property '+property_id + ' in staging network') break else: current_app.logger.debug( 'deactivation is in progress. We will check activation status 1 second later') else: current_app.logger.debug('property '+property_id + ' is not active in staging network') # Deactivate property if it is active in production or network production_version = property_details['properties']['items'][0]['productionVersion'] if production_version != 'None' and isinstance(production_version, int): current_app.logger.debug('property '+property_id+' version ' + str(production_version)+' is active in production network') activation_link = deactivate_property( property_id, 'PRODUCTION', production_version) if activation_link != None: current_app.logger.info( 'deactivation started in production network. we will wait until it finishes') while True: time.sleep(1) response = sess.get(urljoin(baseurl, activation_link)) body = response.json() current_app.logger.debug( 'get activation response code= '+str(s_code)) current_app.logger.debug( 'get activation response body= '+str(body)) status = str(body['activations']['items'][0]['status']) activation_id = str( body['activations']['items'][0]['activationId']) current_app.logger.debug('activation_id ' + activation_id+' is '+status) if status == 'DEACTIVATED': current_app.logger.info( 'finished deactivation of property '+property_id) break else: current_app.logger.debug( 'deactivation is in progress. We will check activation status again 1 second later') else: current_app.logger.debug('property '+property_id + ' is not active in production network') # Delete properties response = sess.delete( urljoin(baseurl, '/papi/v1/properties/'+property_id)) s_code = response.status_code body = response.json() current_app.logger.debug('delete status code= '+str(s_code)) if s_code != 200: current_app.logger.debug( 'failed to delete property '+property_id) current_app.logger.debug(body) deleted = False break else: deleted = True current_app.logger.debug('deleted property '+property_id) else: current_app.logger.debug('cannot find any property') return deleted ###################################################### # functions for creating groups and users # ###################################################### def create_users(num): new_users = [] for i in range(num): group = create_group() groupId = group['groupId'] groupName = group['groupName'] uiUserName = groupName + '@akamai-lab.com' user = create_user(groupId, groupName) if user != None: current_app.logger.debug('user '+uiUserName+' is created') new_users.append(user) else: current_app.logger.debug('cannot create a new user') if len(new_users) > 0: return new_users else: return None def create_group(): parent_groupId = '232397' groupName = str(datetime.now().strftime('%Y%m%d%H%M%S')) current_app.logger.debug('groupName= %s', groupName) requestBody = {"groupName": groupName} response = sess.post(urljoin( baseurl, '/identity-management/v3/user-admin/groups/'+parent_groupId), json=requestBody) s_code = response.status_code current_app.logger.debug( 'create_group status code= '+str(s_code)) body = response.json() current_app.logger.debug('create_group response body= %s', body) groupId = body['groupId'] if s_code == 201: return body else: return 'cannot create a new group ' + str(groupName) def create_user(groupId, groupName): headers = {"accept": "application/json"} requestBody = { "authGrants": [ { # 3 = admin, 928 = editor. refer to 'list_roles.json' for more information. "roleId": 928, "groupId": 12345 } ], "firstName": "John", "lastName": "Doe", "email": "@akamai-lab.com", "phone": "(123) 321-1234", "additionalAuthentication": "NONE", "country": "USA" } # firstName = names.get_first_name() # lastName = names.get_last_name() firstName = 'Not' lastName = 'Assigned' email = groupName + '@akamai-lab.com' current_app.logger.debug('email= '+email) requestBody['firstName'] = firstName requestBody['lastName'] = lastName requestBody['email'] = email requestBody['authGrants'][0]['groupId'] = groupId current_app.logger.debug('create_user request body= %s', requestBody) response = sess.post(urljoin( baseurl, '/identity-management/v3/user-admin/ui-identities?sendEmail=false'), json=requestBody, headers=headers) s_code = response.status_code current_app.logger.debug('create_user status code= %s', s_code) body = response.json() current_app.logger.debug('create_user response body= %s', body) if s_code == 201: return body else: return None ####################################################### def list_roles(): response = sess.get(urljoin( baseurl, '/identity-management/v3/user-admin/roles')) s_code = response.status_code current_app.logger.debug('list_roles status code= %s', s_code) body = response.json() current_app.logger.debug('list_roles response body= %s', body) if s_code == 200: return body else: return 'cannot list roles' def get_groups(unused_hours): pattern = r'^[a-zA-Z0-9_.+-]+@akamai-lab\.com$' current_app.logger.debug('unused_hours='+str(unused_hours)) groups = [] users = get_all_users() filtered_users = [{}] for user in users: # current_app.logger.debug('user= ' + str(user)) email = user['email'] current_app.logger.debug('email= '+email) is_correct_user = False if re.fullmatch(pattern, email): # if email domain is akamai-lab.com if 'lastLoginDate' in user: # if the user has ever logged in to Control Center date = user['lastLoginDate'] loginDate = datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ") current_app.logger.debug('lastLoginDate='+date) currentDate = datetime.now() current_app.logger.debug('now='+str(currentDate)) difference = (currentDate - loginDate).total_seconds() / 3600 current_app.logger.debug( 'hoursSinceLastLogin=' + str(difference)+'hour(s)') if difference > unused_hours: # if the user has not logged in longer than unused hours is_correct_user = True else: # if the user has never logged in is_correct_user = True if is_correct_user: groupId = user['authGrants'][0]['groupId'] groupName = user['authGrants'][0]['groupName'] group = {"groupName": "", "groupId": ""} group['groupName'] = groupName group['groupId'] = groupId groups.append(group) return groups