// (C) Copyright 2015 by Autodesk, Inc.
//=============================================================================
//
// namespace COMISO::DOcloud IMPLEMENTATION
//
//=============================================================================
#include "DOCloudJob.hh"
#if COMISO_DOCLOUD_AVAILABLE
#include "DOCloudConfig.hh"
#include "CoMISo/Utils/CoMISoError.hh"
#include
#include
#include
#include
#include
DEB_module("DOcloud")
namespace COMISO {
namespace DOcloud {
//////////////////////////////////////////////////////////////////////////
// Config
static const char* json_app_type__ = "Content-Type: application/json";
static const char* gen_app_type__ = "Content-Type: application/octet-stream";
Config::Config()
: root_url_("https://api-oaas.docloud.ibmcloud.com/job_manager/rest/v1/jobs"),
infs_time_(300), fsbl_time_(15)
{
const char* env_cache_dir = getenv("ReFormCacheDir");
if (env_cache_dir != nullptr && env_cache_dir[0] != 0)
{
cache_loc_ = env_cache_dir;
if (cache_loc_.back() != '\\')
cache_loc_ += '\\'; // Eventually add '\' to the directory string.
}
}
Config& Config::object()
{
// TODO: implement MT-lock
static Config config;
return config;
}
void Config::set_root_url(const char* const _root_url)
{
COMISO_THROW_if(_root_url == nullptr, DOCLOUD_CONFIG_SET_VALUE_INVALID);
root_url_ = _root_url;
}
void Config::set_api_key(const char* _api_key)
{
COMISO_THROW_if(_api_key == nullptr, DOCLOUD_CONFIG_SET_VALUE_INVALID);
api_key_ = std::string("X-IBM-Client-Id: ") + _api_key;
}
void Config::set_infeasible_timeout(const int _infs_time)
{
COMISO_THROW_if(_infs_time < 1, DOCLOUD_CONFIG_SET_VALUE_INVALID);
infs_time_ = _infs_time;
}
void Config::set_feasible_timeout(const int _fsbl_time)
{
COMISO_THROW_if(_fsbl_time < 0, DOCLOUD_CONFIG_SET_VALUE_INVALID);
fsbl_time_ = _fsbl_time;
}
void Config::set_cache_location(const char* const _cache_loc)
{
if (_cache_loc == nullptr)
cache_loc_.clear();
else
cache_loc_ = _cache_loc;
}
//////////////////////////////////////////////////////////////////////////
// Config
const Config& Config::query() { return object(); }
Config& Config::modify() { return object(); }
class HeaderTokens
{
public:
HeaderTokens(const std::string& _hdr)
{
// TODO: Performance can be improved by indexing, strtok_r(), etc ...
// ... but probably not worth the effort
std::istringstream strm(_hdr);
typedef std::istream_iterator Iter;
std::copy(Iter(strm), Iter(), std::back_inserter(tkns_));
}
const std::string& operator[](const size_t _idx) const
{
return tkns_[_idx];
}
size_t number() const { return tkns_.size(); }
// Find a token equal to the label and return its value (next token)
bool find_value(const std::string& _lbl, std::string& _val) const
{
auto it = std::find(tkns_.begin(), tkns_.end(), _lbl);
if (it == tkns_.end() || ++it == tkns_.end())
return false;
_val = *it;
return true;
}
typedef std::vector::const_iterator const_iterator;
const_iterator begin() const { return tkns_.begin(); }
const_iterator end() const { return tkns_.end(); }
private:
std::vector tkns_;
};
class JsonTokens
{
public:
JsonTokens() {}
JsonTokens(const std::string& _bdy) { set(_bdy); }
void set(const std::string& _bdy)
{
ptree_.clear();
if (_bdy.empty())
return;
std::istringstream strm(_bdy);
boost::property_tree::json_parser::read_json(strm, ptree_);
}
//size_t number() const { return tkns_.size(); }
// Find a token equal to the label and return its value
bool find_value(const std::string& _lbl, std::string& _val) const
{
auto it = ptree_.find(_lbl);
if (it == ptree_.not_found())
return false;
_val = it->second.get_value();
return true;
}
typedef boost::property_tree::ptree PTree;
const PTree& ptree() const { return ptree_; }
private:
PTree ptree_;
};
Base::IOutputStream& operator<<(Base::IOutputStream& _ds, const JsonTokens::PTree& _ptree)
{
std::stringstream os;
boost::property_tree::json_parser::write_json(os, _ptree);
_ds << os.str();
return _ds;
}
Base::IOutputStream& operator<<(Base::IOutputStream& _ds, const JsonTokens& _json_tkns)
{
return _ds << _json_tkns.ptree();
}
void throw_http_error(const int _err_code, const std::string& _bdy)
{
DEB_enter_func;
std::string err_msg;
JsonTokens bdy_tkns(_bdy);
bdy_tkns.find_value("message", err_msg);
DEB_warning(1, "HTTP Status Code: " << _err_code << "; Message: " << err_msg);
switch (_err_code)
{
case 400 : COMISO_THROW(DOCLOUD_JOB_DATA_INVALID);
case 403 : COMISO_THROW(DOCLOUD_SUBSCRIPTION_LIMIT);
case 404 : COMISO_THROW(DOCLOUD_JOB_NOT_FOUND);
default : COMISO_THROW(DOCLOUD_JOB_UNRECOGNIZED_FAILURE);
}
}
class HttpStatus
{
public:
HttpStatus(const cURLpp::Request& _rqst)
: rqst_(_rqst), hdr_tkns_(_rqst.header()), code_(0)
{
const std::string http_lbl = "HTTP/1.1";
const int code_cntn = 100; // continue code, ignore
for (auto it = hdr_tkns_.begin(), it_end = hdr_tkns_.end(); it != it_end; ++it)
{
if (*it != http_lbl) // search for the http label token
continue;
COMISO_THROW_if(++it == it_end, DOCLOUD_JOB_HTTP_CODE_NOT_FOUND);
code_ = atoi(it->data());
if (code_ != code_cntn)
return;
}
COMISO_THROW(DOCLOUD_JOB_HTTP_CODE_NOT_FOUND); // final http code not found
}
void check(int _code_ok) const
{
if (code_ != _code_ok) // another code found, throw an error
throw_http_error(code_, rqst_.body());
}
const int& code() const { return code_; }
const HeaderTokens& header_tokens() const { return hdr_tkns_; }
private:
const cURLpp::Request& rqst_;
HeaderTokens hdr_tkns_;
int code_;
};
Job::~Job()
{
DEB_enter_func;
delete stts_;
if (url_.empty()) // not setup
return;
cURLpp::Delete del;
del.set_url(url_.data());
del.add_http_header(Config::query().api_key());
del.perform();
// no point in checking the return value, we can't do much if the
// delete request has failed
}
void Job::make()
{
DEB_enter_func;
const auto post_loc = "{\"attachments\" : [{\"name\" :\"" +
std::string(filename_) + "\"}]}";
cURLpp::Post post(post_loc);
post.set_url(Config::query().root_url());
post.add_http_header(Config::query().api_key());
post.add_http_header(json_app_type__);
post.perform();
HttpStatus http_stat(post);
http_stat.check(201);
// TODO: DOcloud header is successful but no location value
COMISO_THROW_if(!http_stat.header_tokens().find_value("Location:", url_),
DOCLOUD_JOB_LOCATION_NOT_FOUND);
if (stts_ == nullptr)
stts_ = new JsonTokens;
}
void Job::upload(cURLpp::Upload& _upld)
{
auto url = url_ + "/attachments/" + filename_ + "/blob";
_upld.set_url(url.data());
_upld.add_http_header(Config::query().api_key());
_upld.add_http_header(gen_app_type__);
_upld.perform();
HttpStatus http_stat(_upld);
http_stat.check(204);
}
void Job::upload()
{
if (file_buf_.empty())
{// file is not buffered into memory
cURLpp::UploadFile upld(filename_);
upload(upld);
}
else
{
cURLpp::UploadData upld(file_buf_);
upload(upld);
}
}
void Job::start()
{
cURLpp::Post post("");
const auto url = url_ + "/execute";
post.set_url(url.data());
post.add_http_header(Config::query().api_key());
post.add_http_header(json_app_type__);
post.perform();
HttpStatus http_stat(post);
http_stat.check(204);
log_seq_idx_ = sol_nmbr_ = sol_sec_nmbr_ = stld_sec_nmbr_ = 0;
stop_wtch_.start();
}
void Job::sync_status()
{
DEB_enter_func;
cURLpp::Get get;
get.set_url(url_.data());
get.add_http_header(Config::query().api_key());
get.perform();
HttpStatus http_stat(get);
http_stat.check(200);
stts_->set(get.body());
/*
// The code below attempted to analyse the status data to find out the
// progress of the solver. This is an undocumented use and does not seem to
// work so far. Achieved here for potential use in the future.
DEB_line(2, stts_);
const auto& details = stts_->ptree().get_child_optional("details");
if (!details)
return;
DEB_line(2, details.get());
const auto& prg_gap =
details.get().get_child("PROGRESS_GAP").get_value();
std::string mip_gap;
const auto mip_gap_it = details.get().find("cplex.mipabsgap");
if (mip_gap_it != details.get().not_found())
mip_gap = mip_gap_it->second.get_value();
DEB_line(2, "Status, MIP gap: " << mip_gap << "; Progress gap: " << prg_gap);
*/
}
void Job::sync_log()
{
DEB_enter_func;
cURLpp::Get get;
const auto url = url_ + "/log/items?start=" + std::to_string(log_seq_idx_) +
"&continuous=true";
get.set_url(url.data());
get.add_http_header(Config::query().api_key());
get.perform();
HttpStatus http_stat(get);
http_stat.check(200);
JsonTokens log(get.body());
bool got_time_data = false;
// iterate the log items, deb_out messages and analyze for solutions #
for (const auto& log_item : log.ptree())
{
DEB_line_if(log_seq_idx_ == 0, 2, "**** DOcloud log ****");
const auto& records = log_item.second.get_child("records");
for (const auto& record : records)
{// the message ends with \n
const std::string msg = record.second.get_child("message").
get_value();
DEB_out(2, record.second.get_child("level").get_value() <<
": " << msg);
const int time_str_len = 15;
const char time_str[time_str_len + 1] = "Elapsed time = ";
const auto time_str_idx = msg.find(time_str);
if (time_str_idx == std::string::npos)
continue;
const int sec_nmbr = atoi(msg.data() + time_str_idx + time_str_len);
//DEB_line(1, "# seconds elapsed : " << sec_nmbr);
const int sol_str_len = 12;
const char sol_str[sol_str_len + 1] = "solutions = ";
const auto sol_str_idx = msg.find(sol_str);
if (sol_str_idx == std::string::npos)
continue;
got_time_data = true;
const int sol_nmbr = atoi(msg.data() + sol_str_idx + sol_str_len);
//DEB_line(1, "# solutions found so far: " << sol_nmbr);
if (sol_nmbr > sol_nmbr_) // new solution(s) found
{// update the number of solutions and the time of the last solution found
sol_nmbr_ = sol_nmbr;
sol_sec_nmbr_ = sec_nmbr;
}
stld_sec_nmbr_ = sec_nmbr - sol_sec_nmbr_;
stop_wtch_.restart();
}
log_seq_idx_ = log_item.second.get_child("seqid").get_value() + 1;
}
if (!got_time_data)
{
DEB_warning(2, "DOCloud did not provide time and solutions number-"
"using internal time counter.");
stld_sec_nmbr_ += stop_wtch_.restart() / 1000;
}
}
bool Job::active() const
{
std::string exct_stts;
stts_->find_value("executionStatus", exct_stts);
// assume the job is not active if the status is not recognized
return exct_stts == "CREATED" || exct_stts == "NOT_STARTED" ||
exct_stts == "RUNNING" || exct_stts == "INTERRUPTING";
/*
Backup of old code converting execution status strings to enum value
enum StatusType { ST_CREATED, ST_NOT_STARTED, ST_RUNNING, ST_INTERRUPTING,
ST_INTERRUPTED, ST_FAILED, ST_PROCESSED, ST_UNKNOWN };
const int n_stts = (int)ST_UNKNOWN;
const char stts_tbl[n_stts][16] = { "CREATED", "NOT_STARTED", "RUNNING",
"INTERRUPTING", "INTERRUPTED", "FAILED", "PROCESSED" };
for (int i = 0; i < n_stts; ++i)
{
if (stts == stts_tbl[i])
return (StatusType)i;
}
return ST_UNKNOWN;
while (stts == ST_CREATED || stts == ST_NOT_STARTED || stts == ST_RUNNING ||
stts == ST_INTERRUPTING);
*/
}
bool Job::stalled() const
{
// exit quick if we have a solution, or wait if we don't have one
const auto& config = Config::query();
return (sol_nmbr_ > 0 && stld_sec_nmbr_ >= config.feasible_timeout())
|| (sol_nmbr_ == 0 && stld_sec_nmbr_ >= config.infeasible_timeout());
}
void Job::abort()
{
std::string exct_stts;
stts_->find_value("executionStatus", exct_stts);
if (exct_stts != "RUNNING" && exct_stts != "NOT_STARTED")
return; // already aborted or aborting
cURLpp::Delete del;
const auto url = url_ + "/execute";
del.set_url(url.data());
del.add_http_header(Config::query().api_key());
del.perform();
HttpStatus http_stat(del);
http_stat.check(204);
}
void Job::wait()
{
do
{
std::this_thread::sleep_for(std::chrono::seconds(1));
sync_status();
sync_log();
if (stalled())
abort(); // The waiting loop must continue until the timer request has been
// processed by the server.
} while (active());
}
double Job::solution(std::vector& _x) const
{
DEB_enter_func;
// check the solution status (assume it's synchronized already)
// What are the possible values for this??
std::string slv_stts;
stts_->find_value("solveStatus", slv_stts);
DEB_line(2, "solveStatus=" << slv_stts);
cURLpp::Get get;
const auto url = url_ + "/attachments/solution.json/blob";
get.set_url(url.data());
get.add_http_header(Config::query().api_key());
get.perform();
HttpStatus http_stat(get);
if (http_stat.code() == 404 &&
get.body().find("CIVOC5102E") != std::string::npos)
{
// The mixed integer optimization has not found any solution.
_x.clear();
return 0;
}
http_stat.check(200);
JsonTokens bdy_tkns(get.body());
DEB_line(7, bdy_tkns);
const auto& vrbls = bdy_tkns.ptree().get_child("CPLEXSolution.variables");
const auto n_vrbls = vrbls.size();
COMISO_THROW_if(n_vrbls != _x.size(), DOCLOUD_CPLEX_SOLUTION_MISMATCH);
size_t i = 0;
for (const auto& v : vrbls)
{
// TODO: this way of conversion is rather hacky
const std::string name =
v.second.get_child("name").get_value(); // this is x#IDX
const int idx = atoi(name.data() + 1);
COMISO_THROW_if(idx < 0 || idx > n_vrbls, DOCLOUD_CPLEX_SOLUTION_MISMATCH);
_x[idx] = v.second.get_child("value").get_value();
DEB_out(7, "#" << idx << "=" <<
v.second.get_child("value").get_value() << "; ");
}
const auto obj_val =
bdy_tkns.ptree().get_child("CPLEXSolution.header.objectiveValue").get_value();
DEB_line(3, "X=" << _x);
return obj_val;
}
} // namespace DOcloud
} // namespace COMISO
#endif // COMISO_DOCLOUD_AVAILABLE