from datetime import datetime, timedelta from flask import current_app import requests, time, subprocess, re # How can we update coder admin_token??? What is the maximum expiration days? # coder server --max-token-lifetime. Default 2540400 hours = 290 days # The maximum lifetime duration users can specify when creating an API token. # coder tokens create --lifetime. Default 720 hours = 30 days # Specify a duration for the lifetime of the token. # Create token API key - curl -X POST http://coder-server:8080/api/v2/users/{user}/keys/tokens # { # "lifetime": 0, # "scope": "all", # "token_name": "string" # } admin_token = 'gNmzL1TLeN-gXfs7q10uWPINqtlpt02Pj' template_id = 'a5577a86-e700-41be-997b-6001d78061d4' user_email = 'learn@akamai.com' user_pwd = 'Qodnwk=$s8' token_header = {'Coder-Session-Token': admin_token, 'Accept': 'application/json'} token_param = {'email': user_email, 'password': user_pwd} # unused hours of workspace unused_hours = 2 # workspace ttl ms ttl = 86400000 # 24 hours ######################################################################################### # functions for end-users start # functions below are invoked when end-user click the link/button to get a workspace # list_workspaces(), search_workspaces(), create_workspace_name, update_workspace(), is_workspace_running(), is_agent_connected(), is_app_ready() ########################################################################################## # this function get the all workspaces under the specified coder owner def list_workspaces(owner): current_app.logger.debug('workspace_owner= %s', owner) url = 'http://localhost:3000/api/v2/workspaces?owner=' + owner response = requests.get(url, headers=token_header) s_code = response.status_code body = response.json() current_app.logger.debug('list_workspace status code: '+str(s_code)) workspaces = [] if s_code == 200: for workspace in body['workspaces']: if workspace['owner_name'] == owner: workspaces.append(workspace) else: current_app.logger.info('cannot get workspaces list') workspaces = None return workspaces # this function is main logic which finds an available workspace and return it def search_workspaces(workspaces, hours): pattern = r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{1-6}Z$' selectedWorkspace = None email = None current_app.logger.debug('hours= '+str(hours)) # current_app.logger.debug('workspaces= '+str(workspaces)) for workspace in workspaces: current_app.logger.debug('workspace= '+str(workspace)) is_selected = False workspace_name = workspace['name'] current_app.logger.debug('workspace_name= '+workspace_name) # 'last_used_at' looked useful. But it is updated too late. 5-10 minutes gap. # so, we gave up using it. # two code lines below remains just to show the debugging information. last_used_at = workspace['last_used_at'] current_app.logger.debug('last_used_at= %s', last_used_at) # Each time a workspace is assigned to an end-user, we rename the workspace to the current time. # so, we use workspace name as 'last_assigned_at' variable last_assigned_at = datetime.strptime(workspace['name'], '%Y%m%d%H%M%S') current_app.logger.debug('last_assigned_at= %s', last_assigned_at) currentDate = datetime.now() current_app.logger.debug('now= %s', currentDate) # we calculate how many hours has past since the last time the workspace was assigned. current_app.logger.debug(currentDate - last_assigned_at) difference = (currentDate - last_assigned_at).total_seconds() / 3600 current_app.logger.debug('hoursSinceLastAssigned= '+str(difference)+' hour(s)') # if the workspace has been unassigned status longer than unused_hours, we select the workspace. if difference > hours: is_selected = True # if workspace is selected, we update its name to the current timestamp if is_selected: selectedWorkspace =update_workspace(workspace) current_app.logger.debug('selected workspace= '+str(selectedWorkspace)) break if selectedWorkspace == None: current_app.logger.info('cannot find an available workspace') # we need to call create_workspaces() asynchronously return selectedWorkspace # this function creates a new workspace name based on current time # How can I make sure this function is thread-safe, to avoid duplicate workspace_names??? def create_workspace_name(): # past = timedelta(hours=-unused_hours) # new_name = datetime.now() + past new_name = str(datetime.now().strftime('%Y%m%d%H%M%S')) current_app.logger.debug('new_name= %s'+new_name) return new_name # this function updates a selected workspace with a new name created from create_workspace_name() def update_workspace(workspace): selected_workspace = None workspace_id = workspace['id'] workspace_name = workspace['name'] url = 'http://localhost:3000/api/v2/workspaces/' + workspace_id new_name = create_workspace_name() current_app.logger.debug('new_name= %s', new_name) req_body = {"username": "a", "name": new_name} current_app.logger.debug('update_workspace req_body= %s', req_body) response = requests.patch(url, json=req_body, headers=token_header) s_code = response.status_code current_app.logger.debug('update_workspace status code: %s', str(s_code)) # body = response.json() current_app.logger.debug('update_workspace response body: %s', response.content) if s_code == 204: current_app.logger.debug('workspace %s is updated to %s', workspace_name, new_name) selected_workspace = requests.get('http://localhost:3000/api/v2/users/a/workspace/'+ new_name, headers=token_header).json() current_app.logger.info('selected_workspace= %s', selected_workspace) # following code lines double-check whether the workspace is successfully created or not. # we commented them out. we might want to use them again for debugging later. # time.sleep(0.5) # response = requests.get( # 'http://localhost:3000/api/v2/users/a/workspace/'+new_name, headers=token_header) # s_code = response.status_code # current_app.logger.debug(response.content) #if s_code != 200: # current_app.logger.info('cannot check update status of workspace ' + workspace_name) # workspace = response.json() else: current_app.logger.info('cannot update workspace %s', workspace_name) return selected_workspace # first, check workspace status def is_workspace_running(user_id): workspace_status = 'unknown' status_url = 'http://localhost:3000/api/v2/users/a/workspace/'+user_id status_response = requests.get(status_url, headers=token_header) current_app.logger.debug(user_id + ": is_workspace_running status code: " + str(status_response.status_code)) status_json = status_response.json() if status_json['latest_build']['status'] == 'running': current_app.logger.info(user_id + ': workspace is running') return True else: current_app.logger.info(user_id + ': workspace is NOT running') return False # if workspace is not running, start it def start_workspace(user_id): command = 'coder start a/'+user_id result = subprocess.run([command], shell=True, capture_output=True, text=True) current_app.logger.debug(user_id + ': ' + result.stdout) s_code = result.returncode if s_code == 0: return True else: return False # second, check the agent of the workspace # if it is not ready, wait def is_agent_connected(user_id): agent_status = 'unknown' count = 0 while agent_status != 'connected': status_url = 'http://localhost:3000/api/v2/users/a/workspace/'+user_id status_response = requests.get(status_url, headers=token_header) current_app.logger.debug( user_id + ": is_agent_connected status code: " + str(status_response.status_code)) status_json = status_response.json() # app.logger.debug(user_id + ': '+str(status_json)) if len(status_json['latest_build']['resources']) > 1: for resource in status_json['latest_build']['resources']: if resource['type'] == 'docker_container': for agent in resource['agents']: if agent['name'] == 'main': agent_status = agent['status'] current_app.logger.info( user_id + ": agent status: " + agent_status) if agent_status == 'timeout' or agent_status == 'connected': break count += 1 # will check coder is ready for the new url time.sleep(1) current_app.logger.debug(user_id + ": api call count: "+str(count)) if agent_status == 'connected': return True else: return False # third, check the app running inside the workspace # there are 2 apps. coder-server and OWASP juice-shop def is_app_ready(url): is_ready = False while True: response = requests.get(url) s_code = response.status_code current_app.logger.debug('%s status code= %s', url, s_code) if s_code == 200: is_ready = True break else: time.sleep(1) return is_ready ######################################################################################## # functions for end-users end ######################################################################################## ########################################################## # The following functions are for administrator only!!! # ######################################################### # create a workspace def create_workspace(): url = 'http://localhost:3000/api/v2/organizations/1a453979-dbd0-49f5-8d4c-5188c9027466/members/a/workspaces' workspace_name = create_workspace_name() param = {"name": workspace_name, "template_id": "a5577a86-e700-41be-997b-6001d78061d4", "ttl_ms": ttl} response = requests.post(url, json=param, headers=token_header) s_code = response.status_code current_app.logger.info(workspace_name + ": create_workspace status: " + str(s_code)) body = response.json() current_app.logger.debug(workspace_name + ': create_workspace body: ' + str(body)) if s_code == 201: current_app.logger.info(workspace_name +" is created") else: current_app.logger.info(workspace_name +" cannot create a new workspace") body = None return body # create workspaces by the input numbers def create_workspaces(number_of_workspaces): workspaces = [] for i in range(number_of_workspaces): workspace = create_workspace() workspaces.append(workspace) time.sleep(number_of_workspaces) if len(workspaces) > 0: current_app.logger.info("workspaces are created") else: current_app.logger.info("cannot create any workspace") workspaces = None return workspaces # find workspaces by status. pending, starting, running, stopping, failed, canceling, canceled, deleting, deleted def find_by_status(target_status): workspaces = list_workspaces('a') new_workspaces = [] for workspace in workspaces: status = workspace['latest_build']['status'] if status == target_status: new_workspaces.append(workspace) return new_workspaces # workspace deletion related functions start def delete_workspace(workspace): workspace_name = workspace['name'] owner_name = workspace['owner_name'] command = 'coder delete ' + owner_name + '/' + workspace_name + ' -y' result = subprocess.run([command], shell=True, capture_output=True, text=True) current_app.logger.debug(workspace_name + ': ' + result.stdout) r_code = result.returncode if r_code == 0: return True else: return False def delete_workspaces(workspaces): new_workspaces = [] for workspace in workspaces: workspace_name = workspace['name'] if delete_workspace(workspace): current_app.logger.info('deleted workspace %s', workspace_name) new_workspaces.append(workspace) else: current_app.logger.info('failed to delete workspace %s', workspace_name) if len(new_workspaces) == len(workspaces): current_app.logger.info('deleted all workspaces') return True else: return False # update workspace' name, if it is older than workspace_age (hours) def update_old_workspaces(workspace_age): if isinstance(workspace_age, int) == False: workspace_age = int(workspace_age) names = {'old_names': [], 'new_names': []} workspaces = list_workspaces('a') regex = r'^\d{14}' for workspace in workspaces: need_update = False old_name = workspace['name'] if re.match(regex, old_name): difference = datetime.now() - datetime.strptime(old_name, '%Y%m%d%H%M%S') current_app.logger.debug('difference= %s', difference) if difference.total_seconds()/3600 > workspace_age: need_update = True else: need_update = True if need_update: current_app.logger.debug('workspace= %s', workspace) current_app.logger.debug('old_name= %s', old_name) names['old_names'].append(old_name) workspace = update_workspace(workspace) current_app.logger.debug('new_name= %s', workspace['name']) names['new_names'].append(workspace['name']) # time.sleep(0.5) return names # this function is now used. def create_token(user_id): # The token expiry duration for browser sessions. Default 24 hours # Sessions may last longer if they are actively making requests, # but this functionality can be disabled via --disable-session-expiry-refresh. token_url = 'http://localhost:3000/api/v2/users/a/keys' token_response = requests.post(token_url, headers=token_header) s_code = token_response.status_code app.logger.debug('token response status code:' + str(token_response.status_code)) if s_code == 201: token_json = token_response.json() app.logger.info(user_id + ': a new token is created') app.logger.debug(user_id + ': '+token_json['key']) return token_json['key'] else: return ''