This commit is contained in:
sakim 2024-03-28 05:43:56 +00:00
parent 68913cedd8
commit 97b7a2ba47
5 changed files with 482 additions and 290 deletions

.gitignore vendored
View File

@ -1,2 +1,5 @@
.vscode .vscode
venv venv

build/Dockerfile Normal file
View File

@ -0,0 +1,18 @@
FROM python:3.10-alpine
apk update
apk add --update curl
RUN pip3 install flask randomname requests
# COPY /python /home/python
WORKDIR /home/python
# RUN python3 -m venv /home/python/venv
# RUN source /home/python/venv/bin/activate && pip3 install flask edgegrid-python randomname names
ENTRYPOINT ["python3"]
CMD [""]
# CMD ["/bin/sh"]
# CMD ["/home/python/venv/bin/python" ""]

build/python/app Normal file
View File

@ -0,0 +1,337 @@
from flask import Flask, request, make_response, abort, jsonify, render_template, send_file
from akamai.edgegrid import EdgeGridAuth
from urllib.parse import urljoin
from datetime import datetime, timedelta
import requests
import time
import subprocess
import logging
import re
import json
import akamai_functions, docebo_functions, coder_functions
from threading import Thread
import randomname
from io import StringIO
import os
app = Flask(__name__)
# How can we update coder admin_token??? What is the maximum expiration days?
# coder server --max-token-lifetime. Default 2540400 hours = 290 days
# The maximum lifetime duration users can specify when creating an API token.
# coder tokens create --lifetime. Default 720 hours = 30 days
# Specify a duration for the lifetime of the token.
# Create token API key - curl -X POST http://coder-server:8080/api/v2/users/{user}/keys/tokens
# {
# "lifetime": 0,
# "scope": "all",
# "token_name": "string"
# }
admin_token = 'gNmzL1TLeN-gXfs7q10uWPINqtlpt02Pj'
template_id = 'a5577a86-e700-41be-997b-6001d78061d4'
user_email = ''
user_pwd = 'Qodnwk=$s8'
token_header = {'Coder-Session-Token': admin_token,
'Accept': 'application/json'}
token_param = {'email': user_email, 'password': user_pwd}
# unused hours of workspace
unused_hours = 2
# workspace ttl ms
ttl = 86400000 # 24 hours
# Control Center API functions start #
def cc_download():
firstName = randomname.get_name()
lastName = randomname.get_name()
app.logger.debug('firstName= %s, lastName= %s', firstName, lastName)
app.logger.debug('get_users starts')
users = akamai_functions.get_all_users()
app.logger.debug('get_users ends')
app.logger.debug('select_user starts')
user = akamai_functions.select_user(users, unused_hours, firstName, lastName)
uiIdentityId = user['uiIdentityId']
app.logger.debug('select_user ends. uiIdentityId= %s', uiIdentityId)
app.logger.debug('reset_user_pwd starts')
pwd = akamai_functions.reset_user_pwd(uiIdentityId)
app.logger.debug('reset_user_pwd ends')
authorizedUser = user['uiUserName']
app.logger.debug(f'authorizedUser= {authorizedUser}')
credential = akamai_functions.find_credential(authorizedUser, '/home/akamai/dev/learnakamai/json/api_clients.json')
if credential is not None:
new_credential = akamai_functions.create_credential(credential)
if new_credential is not None:
credential = new_credential
abort(500, description="cannot create new credential")
abort(500, description="cannot find credential")
email = user['email']
if email != None and pwd != None:
authorizedUser = user['uiUserName']
app.logger.debug(f'credential= {credential}')
result = '[Control Center]\nLogin URL =\n' + 'email = '+ email + '\npassword = '+ pwd + '\n\n'
result = result + '[API Credential]\nclient_secret = ' + credential["client_secret"] + '\nhost = '+ credential["host"] + '\naccess_token = ' + credential["access_token"] + '\nclient_token = ' + credential["client_token"]
app.logger.debug(f'result = '+result)
buffer = StringIO()
response = make_response(buffer.getvalue())
response.headers['Content-Disposition'] = 'attachment; filename=credential.txt'
response.mimetype = 'application/octet-stream'
# return send_file(buffer, as_attachment=True, download_name="credential.txt", mimetype="html/text")
# response = make_response('<ul><li><a href="" target="_blank">Akamai Control Center</a></li><li>email:' +
# email+'</li><li>password:'+pwd+'</li></ul><b>-- API credential --</b><br/>client_secret = '+ credential["client_secret"] + '<br/>host = '+ credential["host"] + '<br/>access_token = ' + credential["access_token"] + '</br>client_token = ' + credential["client_token"])
return response
abort(500, description="cannot find an available user")
return user
# this function is the target url of iframe widget
def credential():
return render_template('playground.html')
# this function is called by playground.html javascript
def a():
docebo_user_id = request.args.get('user_id')
docebo_access_token = request.args.get('access_token')
app.logger.debug('get_users starts')
users = akamai_functions.get_all_users()
app.logger.debug('get_users ends')
app.logger.debug('docebo_get_users stars')
if docebo_user_id != None and docebo_access_token != None:
docebo_user = docebo_functions.docebo_get_user(docebo_user_id, docebo_access_token)
if docebo_user == None:
return 'cannot get Learn Akamai userdata of '+ docebo_user_id
return 'docebo userdata is required'
firstName = docebo_user['first_name']
lastName = docebo_user['last_name']
app.logger.debug('docebo_get_users ends. firstName= %s, lastName= %s', firstName, lastName)
app.logger.debug('select_user starts')
user = akamai_functions.select_user(users, unused_hours, firstName, lastName)
uiIdentityId = user['uiIdentityId']
app.logger.debug('select_user ends. uiIdentityId= %s', uiIdentityId)
app.logger.debug('reset_user_pwd starts')
pwd = akamai_functions.reset_user_pwd(uiIdentityId)
app.logger.debug('reset_user_pwd ends')
authorizedUser = user['uiUserName']
app.logger.debug(f'authorizedUser= {authorizedUser}')
credential = akamai_functions.find_credential(authorizedUser, '/home/akamai/dev/learnakamai/json/api_clients.json')
if credential is not None:
new_credential = akamai_functions.create_credential(credential)
if new_credential is not None:
credential = new_credential
abort(500, description="cannot create new credential")
abort(500, description="cannot find credential")
email = user['email']
if email != None and pwd != None:
authorizedUser = user['uiUserName']
app.logger.debug(f'credential= {credential}')
response = make_response('<ul><li><a href="" target="_blank">Akamai Control Center</a></li><li>email:' +
email+'</li><li>password:'+pwd+'</li></ul><b>-- API credential --</b><br/>client_secret = '+ credential["client_secret"] + '<br/>host = '+ credential["host"] + '<br/>access_token = ' + credential["access_token"] + '</br>client_token = ' + credential["client_token"])
return response
abort(500, description="cannot find an available user")
return user
# this function deletes properties in groups that have not been used for unused_hours-1 #
# this function will be updated to run repeatedly #
def purge_groups():
groups = akamai_functions.get_groups(unused_hours-1)
app.logger.debug('groups= '+str(groups))
messages = []
msg = None
for group in groups:
groupId = group['groupId']
groupName = group['groupName']
properties = akamai_functions.get_properties_by_group(groupId)
# if there is any existing properties in group
if properties['properties']['items']:
msg = 'found properties in group '+groupName
if akamai_functions.delete_properties(properties):
msg = 'deleted existing properties in group '+groupName
abort(500, description="cannot delete properties in group "+groupName)
msg = 'cannot find any property in group '+groupName
if bool(messages):
return messages
abort(500, description="cannot purge groups")
# test creating users
def c():
num = int(request.args.get('num'))
if num == None: num = 1
app.logger.debug('we will create %s user(s)', num)
new_users = akamai_functions.create_users(num)
if new_users != None:
return new_users
return 'cannot create users'
# Check list roles to assign users to right roles
def d():
result = akamai_functions.list_roles()
return result
# Control Center API functions end #
# coder API functions start #
# this function is the target URL of iframe widget
@app.route('/flask/lab', methods=['GET'])
def lab():
# return render_template('image_render.html', image=file)
return render_template('workspace.html')
#@app.route('/flask/lab', methods=['GET'])
#def lab():
# user_id = request.args.get('user_id')
# access_token = request.args.get('access_token')
# html_body = '<a href="/flask/init?user_id=' + user_id + \
# '&access_token=' + access_token + '">Click To Launch Lab</a>'
# response = make_response(html_body)
# return response
# this function is called by workspace.html javascript
@app.route('/flask/init', methods=['GET'])
def init_workspace():
workspaces = coder_functions.list_workspaces('a')
workspace = coder_functions.search_workspaces(workspaces, unused_hours)
workspace_name = workspace['name']
workspace_status = workspace['latest_build']['status']
code_url = 'https://code--main--' + workspace_name + \
origin_url = 'https://origin--main--' + workspace_name + ''
app.logger.debug('code_url= '+code_url)
app.logger.debug('origin_url= '+origin_url)
# 1st validation.
# if the selected_workspace is not running, we start it.
if workspace_status != 'running':
if coder_functions.start_workspace(workspace_name):
app.logger.debug('started workspace %s', workspace_name)
abort(500, description="cannot start workspace " + workspace_name)
# 2nd validation.
# if the selected_workspace agent is not connected, we wait for it to become connected status
# if we started the selected_workspace above, this takes around 30-40 seconds
if coder_functions.is_agent_connected(workspace_name):
# if validations are successful, we create a session token for end-user to connect to code server and origin server.
# session_token = create_token(workspace_name)
# if session_token != '':
# 3rd validation.
# two apps in the workspace should be ready
if coder_functions.is_app_ready(code_url) and coder_functions.is_app_ready(origin_url): + ': code and origin are ready')
response = make_response('<ul><li>Developer Tools: <a href="'+code_url+'" target="_blank">' + code_url +
'</a></li><li>Test Origin: <a href="'+origin_url+'" target="_blank">'+origin_url+'</a></li></ul>')
# response.set_cookie("coder_session_token", session_token, domain='')
return response
# abort(500, description="cannot create session_token for " + workspace_name)
abort(500, description="cannot connect to the workspace " + workspace_name)
# delete workspaces by status. we can use this to delete all 'failed' workspaces, for example.
# pending, starting, running, stopping, failed, canceling, canceled, deleting, deleted
@app.route('/flask/delete_workspaces_by_status', methods=['GET'])
def delete_workspaces_by_status():
target_status = request.args.get('status')
workspaces = coder_functions.find_by_status(target_status)
if coder_functions.delete_workspaces(workspaces):
return 'deleted all '+target_status+' workspaces'
return 'failed to delete all '+target_status+' workspaces'
# update old workspaces
@app.route('/flask/update_old_workspaces', methods=['GET'])
def update_all():
workspace_age = request.args.get('age')
names = coder_functions.update_old_workspaces(workspace_age)
return names
@app.route('/flask/update_workspace', methods=['GET'])
def test():
workspaces = coder_functions.list_workspaces('a')
workspace = coder_functions.search_workspaces(workspaces, unused_hours)
app.logger.debug('[Before] workspace_name= %s', workspace['name'])
workspace = coder_functions.update_workspace(workspace)
app.logger.debug('[After] workspace_name= %s', workspace['name'])
workspace_name = workspace['name']
workspace_status = workspace['latest_build']['status']
app.logger.debug('workspace_status= %s', workspace_status)
code_url = 'https://code--main--' + workspace_name + \
origin_url = 'https://origin--main--' + workspace_name + ''
app.logger.debug('code_url= '+code_url)
app.logger.debug('origin_url= '+origin_url)
return workspace_name
@app.route('/flask/upgrade_workspaces', methods=['GET'])
def upgrade_workspaces():
return 'under construction'
def internal_error(e):
return jsonify(error=str(e)), 500

View File

@ -1,18 +1,6 @@
from flask import Flask, request, make_response, abort, jsonify, render_template, send_file from flask import Flask, make_response, abort, jsonify, render_template
from akamai.edgegrid import EdgeGridAuth from datetime import datetime
from urllib.parse import urljoin import logging, randomname, requests, time
from datetime import datetime, timedelta
import requests
import time
import subprocess
import logging
import re
import json
import akamai_functions, docebo_functions, coder_functions
from threading import Thread
import randomname
from io import StringIO
import os
app = Flask(__name__) app = Flask(__name__)
app.logger.setLevel(logging.DEBUG) app.logger.setLevel(logging.DEBUG)
@ -29,11 +17,14 @@ app.logger.setLevel(logging.DEBUG)
# "token_name": "string" # "token_name": "string"
# } # }
admin_token = 'gNmzL1TLeN-gXfs7q10uWPINqtlpt02Pj' admin_token = 'mS7rgguZeb-VTyP8Yc3RPWQ1uy2thqsOi'
template_id = 'a5577a86-e700-41be-997b-6001d78061d4' template_id = '80c50f63-0d2c-4a2c-a99f-45b36a9c038a'
organization_id = '9284e3c7-e20c-4736-929e-4f1508920811'
user = 'akamai'
user_email = '' user_email = ''
user_pwd = 'Qodnwk=$s8' user_pwd = 'Qodnwk=$s8'
token_header = {'Coder-Session-Token': admin_token, token_header = {'Coder-Session-Token': admin_token,
'Accept': 'application/json'} 'Accept': 'application/json'}
token_param = {'email': user_email, 'password': user_pwd} token_param = {'email': user_email, 'password': user_pwd}
@ -44,294 +35,124 @@ unused_hours = 2
# workspace ttl ms # workspace ttl ms
ttl = 86400000 # 24 hours ttl = 86400000 # 24 hours
################################################### @app.route('/flask/hello')
# Control Center API functions start # def hello():
################################################### return 'hello'
@app.route('/flask/cc_download') @app.route('/flask/create')
def cc_download(): def create():
firstName = randomname.get_name() start =
lastName = randomname.get_name() is_agent_ready = False
app.logger.debug('firstName= %s, lastName= %s', firstName, lastName) is_code_ready = False
body = create_workspace()
if body is None:
abort(500, description="cannot create new workspace")
app.logger.debug('get_users starts') workspace_name = get_workspace_name(body)
users = akamai_functions.get_all_users()
app.logger.debug('get_users ends')
app.logger.debug('select_user starts') # is_agent_ready = is_agent_connected(workspace_name)
user = akamai_functions.select_user(users, unused_hours, firstName, lastName) is_code_ready = is_code_live(workspace_name)
uiIdentityId = user['uiIdentityId'] if is_code_ready == True:
app.logger.debug('select_user ends. uiIdentityId= %s', uiIdentityId) end =
elapsed = end - start
app.logger.debug('reset_user_pwd starts') app.logger.debug('elapsed= '+str(elapsed))
pwd = akamai_functions.reset_user_pwd(uiIdentityId) return workspace_name + " is ready!"
app.logger.debug('reset_user_pwd ends')
authorizedUser = user['uiUserName']
app.logger.debug(f'authorizedUser= {authorizedUser}')
credential = akamai_functions.find_credential(authorizedUser, '/home/akamai/dev/learnakamai/json/api_clients.json')
if credential is not None:
new_credential = akamai_functions.create_credential(credential)
if new_credential is not None:
credential = new_credential
else: else:
abort(500, description="cannot create new credential") abort(500, description="cannot start new workspace")
# this function creates a new workspace name based on current time
def create_workspace_name():
# past = timedelta(hours=-unused_hours)
# new_name = + past
new_name = str('%Y%m%d%H%M%S'))
name = randomname.get_name()
new_name = new_name + name
app.logger.debug('new_name= '+new_name)
return new_name
# create a workspace
def create_workspace():
url = 'http://coder:7080/api/v2/organizations/'+organization_id+'/members/'+user+'/workspaces'
workspace_name = create_workspace_name()
param = {"name": workspace_name,
"template_id": template_id, "ttl_ms": ttl}
response =, json=param, headers=token_header)
s_code = response.status_code +
": create_workspace status: " + str(s_code))
body = response.json()
app.logger.debug(workspace_name + ': create_workspace body: ' + str(body))
if s_code == 201: +" is created")
else: else:
abort(500, description="cannot find credential") +" cannot create a new workspace")
body = None
return body
email = user['email'] def get_workspace_id(body: dict):
if email != None and pwd != None: workspace_id = body['id']
authorizedUser = user['uiUserName'] app.logger.debug("workspace_id is "+workspace_id)
return workspace_id
app.logger.debug(f'credential= {credential}') def get_workspace_name(body: dict):
result = '[Control Center]\nLogin URL =\n' + 'email = '+ email + '\npassword = '+ pwd + '\n\n' workspace_name = body['latest_build']['workspace_name']
result = result + '[API Credential]\nclient_secret = ' + credential["client_secret"] + '\nhost = '+ credential["host"] + '\naccess_token = ' + credential["access_token"] + '\nclient_token = ' + credential["client_token"] app.logger.debug("workspace_name is "+workspace_name)
app.logger.debug(f'result = '+result)
buffer = StringIO()
response = make_response(buffer.getvalue())
response.headers['Content-Disposition'] = 'attachment; filename=credential.txt'
response.mimetype = 'application/octet-stream'
# return send_file(buffer, as_attachment=True, download_name="credential.txt", mimetype="html/text")
# response = make_response('<ul><li><a href="" target="_blank">Akamai Control Center</a></li><li>email:' +
# email+'</li><li>password:'+pwd+'</li></ul><b>-- API credential --</b><br/>client_secret = '+ credential["client_secret"] + '<br/>host = '+ credential["host"] + '<br/>access_token = ' + credential["access_token"] + '</br>client_token = ' + credential["client_token"])
return response
abort(500, description="cannot find an available user")
return user
# this function is the target url of iframe widget
def credential():
return render_template('playground.html')
# this function is called by playground.html javascript
def a():
docebo_user_id = request.args.get('user_id')
docebo_access_token = request.args.get('access_token')
app.logger.debug('get_users starts')
users = akamai_functions.get_all_users()
app.logger.debug('get_users ends')
app.logger.debug('docebo_get_users stars')
if docebo_user_id != None and docebo_access_token != None:
docebo_user = docebo_functions.docebo_get_user(docebo_user_id, docebo_access_token)
if docebo_user == None:
return 'cannot get Learn Akamai userdata of '+ docebo_user_id
return 'docebo userdata is required'
firstName = docebo_user['first_name']
lastName = docebo_user['last_name']
app.logger.debug('docebo_get_users ends. firstName= %s, lastName= %s', firstName, lastName)
app.logger.debug('select_user starts')
user = akamai_functions.select_user(users, unused_hours, firstName, lastName)
uiIdentityId = user['uiIdentityId']
app.logger.debug('select_user ends. uiIdentityId= %s', uiIdentityId)
app.logger.debug('reset_user_pwd starts')
pwd = akamai_functions.reset_user_pwd(uiIdentityId)
app.logger.debug('reset_user_pwd ends')
authorizedUser = user['uiUserName']
app.logger.debug(f'authorizedUser= {authorizedUser}')
credential = akamai_functions.find_credential(authorizedUser, '/home/akamai/dev/learnakamai/json/api_clients.json')
if credential is not None:
new_credential = akamai_functions.create_credential(credential)
if new_credential is not None:
credential = new_credential
abort(500, description="cannot create new credential")
abort(500, description="cannot find credential")
email = user['email']
if email != None and pwd != None:
authorizedUser = user['uiUserName']
app.logger.debug(f'credential= {credential}')
response = make_response('<ul><li><a href="" target="_blank">Akamai Control Center</a></li><li>email:' +
email+'</li><li>password:'+pwd+'</li></ul><b>-- API credential --</b><br/>client_secret = '+ credential["client_secret"] + '<br/>host = '+ credential["host"] + '<br/>access_token = ' + credential["access_token"] + '</br>client_token = ' + credential["client_token"])
return response
abort(500, description="cannot find an available user")
return user
# this function deletes properties in groups that have not been used for unused_hours-1 #
# this function will be updated to run repeatedly #
def purge_groups():
groups = akamai_functions.get_groups(unused_hours-1)
app.logger.debug('groups= '+str(groups))
messages = []
msg = None
for group in groups:
groupId = group['groupId']
groupName = group['groupName']
properties = akamai_functions.get_properties_by_group(groupId)
# if there is any existing properties in group
if properties['properties']['items']:
msg = 'found properties in group '+groupName
if akamai_functions.delete_properties(properties):
msg = 'deleted existing properties in group '+groupName
abort(500, description="cannot delete properties in group "+groupName)
msg = 'cannot find any property in group '+groupName
if bool(messages):
return messages
abort(500, description="cannot purge groups")
# test creating users
def c():
num = int(request.args.get('num'))
if num == None: num = 1
app.logger.debug('we will create %s user(s)', num)
new_users = akamai_functions.create_users(num)
if new_users != None:
return new_users
return 'cannot create users'
# Check list roles to assign users to right roles
def d():
result = akamai_functions.list_roles()
return result
# Control Center API functions end #
# coder API functions start #
# this function is the target URL of iframe widget
@app.route('/flask/lab', methods=['GET'])
def lab():
# return render_template('image_render.html', image=file)
return render_template('workspace.html')
#@app.route('/flask/lab', methods=['GET'])
#def lab():
# user_id = request.args.get('user_id')
# access_token = request.args.get('access_token')
# html_body = '<a href="/flask/init?user_id=' + user_id + \
# '&access_token=' + access_token + '">Click To Launch Lab</a>'
# response = make_response(html_body)
# return response
# this function is called by workspace.html javascript
@app.route('/flask/init', methods=['GET'])
def init_workspace():
workspaces = coder_functions.list_workspaces('a')
workspace = coder_functions.search_workspaces(workspaces, unused_hours)
workspace_name = workspace['name']
workspace_status = workspace['latest_build']['status']
code_url = 'https://code--main--' + workspace_name + \
origin_url = 'https://origin--main--' + workspace_name + ''
app.logger.debug('code_url= '+code_url)
app.logger.debug('origin_url= '+origin_url)
# 1st validation.
# if the selected_workspace is not running, we start it.
if workspace_status != 'running':
if coder_functions.start_workspace(workspace_name):
app.logger.debug('started workspace %s', workspace_name)
abort(500, description="cannot start workspace " + workspace_name)
# 2nd validation.
# if the selected_workspace agent is not connected, we wait for it to become connected status
# if we started the selected_workspace above, this takes around 30-40 seconds
if coder_functions.is_agent_connected(workspace_name):
# if validations are successful, we create a session token for end-user to connect to code server and origin server.
# session_token = create_token(workspace_name)
# if session_token != '':
# 3rd validation.
# two apps in the workspace should be ready
if coder_functions.is_app_ready(code_url) and coder_functions.is_app_ready(origin_url): + ': code and origin are ready')
response = make_response('<ul><li>Developer Tools: <a href="'+code_url+'" target="_blank">' + code_url +
'</a></li><li>Test Origin: <a href="'+origin_url+'" target="_blank">'+origin_url+'</a></li></ul>')
# response.set_cookie("coder_session_token", session_token, domain='')
return response
# abort(500, description="cannot create session_token for " + workspace_name)
abort(500, description="cannot connect to the workspace " + workspace_name)
# delete workspaces by status. we can use this to delete all 'failed' workspaces, for example.
# pending, starting, running, stopping, failed, canceling, canceled, deleting, deleted
@app.route('/flask/delete_workspaces_by_status', methods=['GET'])
def delete_workspaces_by_status():
target_status = request.args.get('status')
workspaces = coder_functions.find_by_status(target_status)
if coder_functions.delete_workspaces(workspaces):
return 'deleted all '+target_status+' workspaces'
return 'failed to delete all '+target_status+' workspaces'
# update old workspaces
@app.route('/flask/update_old_workspaces', methods=['GET'])
def update_all():
workspace_age = request.args.get('age')
names = coder_functions.update_old_workspaces(workspace_age)
return names
@app.route('/flask/update_workspace', methods=['GET'])
def test():
workspaces = coder_functions.list_workspaces('a')
workspace = coder_functions.search_workspaces(workspaces, unused_hours)
app.logger.debug('[Before] workspace_name= %s', workspace['name'])
workspace = coder_functions.update_workspace(workspace)
app.logger.debug('[After] workspace_name= %s', workspace['name'])
workspace_name = workspace['name']
workspace_status = workspace['latest_build']['status']
app.logger.debug('workspace_status= %s', workspace_status)
code_url = 'https://code--main--' + workspace_name + \
origin_url = 'https://origin--main--' + workspace_name + ''
app.logger.debug('code_url= '+code_url)
app.logger.debug('origin_url= '+origin_url)
return workspace_name return workspace_name
def is_agent_connected(workspace_name):
agent_status = 'unknown'
count = 0
while agent_status != 'connected':
status_url = 'http://coder:7080/api/v2/users/'+user+'/workspace/'+workspace_name
status_response = requests.get(status_url, headers=token_header)
workspace_name + ": is_agent_connected status code: " + str(status_response.status_code))
status_json = status_response.json()
app.logger.debug(workspace_name + ': '+str(status_json))
if len(status_json['latest_build']['resources']) > 1:
for resource in status_json['latest_build']['resources']:
if resource['type'] == 'kubernetes_deployment':
for agent in resource['agents']:
if agent['name'] == 'main':
agent_status = agent['status']
workspace_name + ": agent status: " + agent_status)
if agent_status == 'timeout' or agent_status == 'connected':
count += 1
# will check coder is ready for the new url
app.logger.debug(workspace_name + ": api call count: "+str(count))
return True
# third, check the app running inside the workspace
# there are 2 apps. coder-server and OWASP juice-shop
def is_code_live(workspace_name):
code_url = 'http://coder:7080/@'+user+'/'+workspace_name+'.main/apps/code-server/?folder=/home/coder'
is_ready = False
while True:
response = requests.get(code_url)
s_code = response.status_code
app.logger.debug('%s status code= %s', code_url, s_code)
if s_code == 200:
is_ready = True
return is_ready
@app.route('/flask/upgrade_workspaces', methods=['GET'])
def upgrade_workspaces():
return 'under construction'
@app.errorhandler(500) @app.errorhandler(500)
def internal_error(e): def internal_error(e):
return jsonify(error=str(e)), 500 return jsonify(error=str(e)), 500
if __name__ == "__main__":'', port=5000, debug=True)

View File

@ -1,5 +1,17 @@
version: "3.9" version: "3.9"
services: services:
build: ./build
# flask requires SIGINT to stop gracefully
# (default stop signal from Compose is SIGTERM)
stop_signal: SIGINT
- '5000:5000'
- ./build/python:/home/python
coder: coder:
# This MUST be stable for our documentation and # This MUST be stable for our documentation and
# other automations. # other automations.
@ -21,6 +33,7 @@ services:
# - "998" # docker group on host # - "998" # docker group on host
volumes: volumes:
- /var/run/docker.sock:/var/run/docker.sock - /var/run/docker.sock:/var/run/docker.sock
- ./Dev-kubeconfig.yaml:/home/coder/.kube/config
depends_on: depends_on:
database: database:
condition: service_healthy condition: service_healthy