"tools/zax/zax.cpp" did not exist on "04d7ea157cf286f0700517eb14d114fa85e606e9"
Newer
Older
#include <pqxx/pqxx>
#include <string>
#include <iostream>
#include <memory>
#include <algorithm>
#include <fstream>
#include <cstdlib>
#include <unistd.h>
#include <stdio.h>
#include <assert.h>
using namespace pqxx;
using namespace std;
#define TURBODB "turbodb"

Jason Hiser
committed
#define ZAFL_OPTIONS string(" -g -d -m 0x10000 --auto-zafl-libraries ")
string db_connect_options = string("dbname=" TURBODB " host=localhost");
class WorkUnit_t
{
public:
WorkUnit_t() : ver_id(0) { }
WorkUnit_t(int32_t _ver_id, string _orig_binary) : ver_id(_ver_id), orig_binary(_orig_binary) { }
int32_t getVersionID() const { return ver_id; }
const string& getOrigBinary() const { return orig_binary; }
bool isValid() const { return orig_binary.length()!=0; }
private:
int32_t ver_id;
string orig_binary;
};
void do_zafl(pqxx::connection &conn, int32_t log_id, const WorkUnit_t& work)
{
auto txn = make_unique<pqxx::work>(conn);
cout << "Doing zafl for job " << work.getVersionID() << endl;
const auto prepare = [&]()
{
conn.prepare("start_pipe" , "update zafl_logs set log = $1 where log_id = $2");
conn.prepare("update_log", "update zafl_logs set log = log || $1 where log_id = $2");
conn.prepare("mark_err" , "update boost_versions set err = true where ver_id = $1");
conn.prepare("upload_zafl", "update boost_versions set zafl_binary = $1, end_time=NOW(), err = false where ver_id = $2; ");
};
// start with some logging.
prepare();
txn->prepared("start_pipe")("Attempting to start zafl.\n")(log_id).exec();
txn->commit();
txn = make_unique<pqxx::work>(conn);
// create a temp file to write the binary to.
char zafl_input_tmpname[]="/tmp/predtmpXXXXXX";
const auto fd = mkstemp(zafl_input_tmpname);
const auto& raw = work.getOrigBinary();
const auto file_write = static_cast<size_t>(write(fd,raw.c_str(), raw.size()));
close(fd);
if(file_write != raw.size())
{
prepare();
txn->prepared("update_log")("Couldn't create tmp output file. Out of disk space?\n")(work.getVersionID()).exec();
txn->prepared("mark_err")(work.getVersionID()).exec();
txn->commit();
txn = make_unique<pqxx::work>(conn);
return;
}
// figure out an output filename
const auto output_name = string(tmpnam(nullptr));
// prepare zafl to run.

Jason Hiser
committed
const auto zafl_cmd = string() + "zafl.sh " + ZAFL_OPTIONS + " " + zafl_input_tmpname + " " + output_name + " 2>&1 ";
cout << "Trying command: " << zafl_cmd << endl;
auto pipe = popen(zafl_cmd.c_str(), "r");
if (!pipe)
{
cerr << "Couldn't start command." << endl;
prepare();
txn->prepared("update_log")("Couldn't start pipe. Out of mem/processes?\n")(log_id).exec();
txn->prepared("mark_err")(work.getVersionID()).exec();
txn->commit();
txn = make_unique<pqxx::work>(conn);
return;
}
// mark that we've started zafl.
prepare();

Jason Hiser
committed
txn->prepared("update_log")("Pipe started with cmd: "+zafl_cmd+"\n")(log_id).exec();
txn->commit();
txn = make_unique<pqxx::work>(conn);
// setup an empty buffer and record the time we last logged as now.
auto log_buffer = string();
auto last_update = time(nullptr);
auto c=EOF;
do
{
// get the next byte from the pipe. this may block.
auto fread_res=fread(&c,1,1,pipe);
if(fread_res != 1) break;
// record the byte in a buffer.
log_buffer += (char)c ;
cout << (char)c << flush;
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// check if it's time to update the buffer.
const auto log_buff_big = log_buffer.size() > 1024;
const auto too_long = difftime(time(nullptr), last_update) > 2;
const auto non_zero_buf = log_buffer.size() > 0;
const auto too_long_and_non_zero = too_long && non_zero_buf;
if(log_buff_big || too_long_and_non_zero)
{
// yup!
// append the new log bytes to the db entry
prepare();
txn->prepared("update_log")(log_buffer)(log_id).exec();
txn->commit();
txn = make_unique<pqxx::work>(conn);
// reset for next bit of buffer.
last_update=time(nullptr);
log_buffer="";
}
} while ( c != EOF );
// put end of log buffer in place.
if(log_buffer!="")
{
prepare();
txn->prepared("update_log")(log_buffer)(log_id).exec();
txn->commit();
txn = make_unique<pqxx::work>(conn);
}
// now deal with return code
const auto returnCode = pclose(pipe);
if(returnCode >= 2)
{
// log the failure.
prepare();
txn->prepared("update_log")("Zafl failed with exit_code="+to_string(returnCode)+"!\n")(log_id).exec();
txn->prepared("mark_err" )(work.getVersionID()).exec();
txn->commit();
txn = make_unique<pqxx::work>(conn);
return;
}
else
{
// log success.
prepare();
txn->prepared("update_log")("Zafl seems to have worked!\n")(log_id).exec();
txn->commit();
txn = make_unique<pqxx::work>(conn);
}
ifstream fin(output_name, ios::binary);
ostringstream ostrm;
ostrm << fin.rdbuf();
const auto zafl_contents = ostrm.str();
const auto pqxx_zafl_contents = pqxx::binarystring(zafl_contents.c_str(), zafl_contents.size());
txn->prepared("upload_zafl")(pqxx_zafl_contents)(work.getVersionID()).exec();
cout << "Zafl'd verison is " << zafl_contents.size() << " bytes " << endl;
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
txn->commit();
remove(zafl_input_tmpname);
remove(output_name.c_str());
return;
}
void do_work(const WorkUnit_t& work)
{
cout << "Doing work for job " << work.getVersionID() << endl;
pqxx::connection conn{db_connect_options.c_str()};
pqxx::work txn{conn};
conn.prepare ("insert_log", "insert into zafl_logs (ver_id) values ($1) returning log_id");
const auto log_ids = txn.prepared("insert_log")(work.getVersionID()).exec();
assert(log_ids.size() == 1);
const auto log_id = log_ids[0]["log_id"].as<int>();
txn.commit();
do_zafl(conn,log_id,work);
}
WorkUnit_t get_work()
{
cout << "Looking for work" << endl;
const auto db_connect_options = string("dbname=" TURBODB " host=localhost");
pqxx::connection conn{db_connect_options.c_str()};
pqxx::work txn{conn};
conn.prepare ("get_newest", "SELECT ver_id,orig_binary FROM ONLY boost_versions where start_time is NULL order by submit_time limit 1 ");
const auto check_res = txn.prepared("get_newest").exec();
// no work to do.
if(check_res.size() == 0)
return {};
const auto ver_id = check_res[0]["ver_id" ].as<int >();
const auto orig_binary_contents = pqxx::binarystring(check_res[0]["orig_binary"]).str();
cout << "Updating starting time " << endl;
conn.prepare ("update_start", "update boost_versions set start_time=NOW() where ver_id = $1 ");
txn .prepared("update_start")(ver_id).exec();
txn.commit();
return {ver_id,orig_binary_contents};
}
void main_loop()
{
while(1)
{
try
{
const auto work = get_work();
if(work.isValid())
do_work(work);
else
sleep(5);
}
catch(const exception& e)
{
cerr << "Caution, exception occured when trying to do work." << endl;
cerr << e.what() << endl;
cerr << "Sleeping for 5 seconds to wait for db to stabilize." << endl;
// conintue -- try next work item
}
}
}
int main(const int argc, const char* argv[])
{
main_loop();
}