Skip to content
Snippets Groups Projects
prepd.cpp 7.04 KiB
Newer Older
#include <pqxx/pqxx>
#include <string>
#include <iostream>
#include <memory>
#include <algorithm>
#include <fstream>
#include <cstdlib>
#include <unistd.h>
#include <stdio.h>
#include <assert.h>

using namespace pqxx;
using namespace std;

#define TURBODB "turbodb"
#define ZAFL_OPTIONS string(" -g -d -m 0x10000 --auto-zafl-libraries ")

string db_connect_options = string("dbname=" TURBODB " host=localhost");

class WorkUnit_t
{
	public:
		WorkUnit_t() : ver_id(0) { }
		WorkUnit_t(int32_t _ver_id, string  _orig_binary) : ver_id(_ver_id), orig_binary(_orig_binary) { }

		int32_t       getVersionID()  const { return ver_id; }
		const string& getOrigBinary() const { return orig_binary; }

		bool isValid() const { return orig_binary.length()!=0; }

	private:
		int32_t ver_id;
		string   orig_binary;
};


void do_zafl(pqxx::connection &conn, int32_t log_id, const WorkUnit_t& work)
{
	auto txn = make_unique<pqxx::work>(conn);
	cout << "Doing zafl for job " << work.getVersionID() << endl;


	const auto prepare = [&]()
		{
		conn.prepare("start_pipe" , "update zafl_logs set log = $1 where log_id = $2");
		conn.prepare("update_log", "update zafl_logs set log = log || $1 where log_id = $2");
		conn.prepare("mark_err"   , "update boost_versions set err = true where ver_id = $1");
		conn.prepare("upload_zafl", "update boost_versions set zafl_binary = $1, end_time=NOW(), err = false where ver_id = $2; ");
		};

	// start with some logging.
	prepare();
	txn->prepared("start_pipe")("Attempting to start zafl.\n")(log_id).exec();
	txn->commit();
	txn = make_unique<pqxx::work>(conn);

	// create a temp file to write the binary to.
	char zafl_input_tmpname[]="/tmp/predtmpXXXXXX";
	const auto fd   = mkstemp(zafl_input_tmpname);
	const auto& raw = work.getOrigBinary();
	const auto file_write = static_cast<size_t>(write(fd,raw.c_str(), raw.size()));
	close(fd);
	if(file_write != raw.size())
	{
		prepare();
		txn->prepared("update_log")("Couldn't create tmp output file.  Out of disk space?\n")(work.getVersionID()).exec();
		txn->prepared("mark_err")(work.getVersionID()).exec();
		txn->commit();
		txn = make_unique<pqxx::work>(conn);
		return;

	}

	// figure out an output filename 
	const auto output_name = string(tmpnam(nullptr));

	// prepare zafl to run.
	const auto zafl_cmd = string() + "zafl.sh " + ZAFL_OPTIONS  + " " + zafl_input_tmpname + " " + output_name + " 2>&1 ";
        cout << "Trying command: " << zafl_cmd << endl;
        auto pipe = popen(zafl_cmd.c_str(), "r");
        if (!pipe)
        {
                cerr << "Couldn't start command." << endl;
		prepare();
		txn->prepared("update_log")("Couldn't start pipe.  Out of mem/processes?\n")(log_id).exec();
		txn->prepared("mark_err")(work.getVersionID()).exec();
		txn->commit();
		txn = make_unique<pqxx::work>(conn);
		return;
        }

	// mark that we've started zafl.
	prepare();
	txn->prepared("update_log")("Pipe started with cmd: "+zafl_cmd+"\n")(log_id).exec();
	txn->commit();
	txn = make_unique<pqxx::work>(conn);


	// setup an empty buffer and record the time we last logged as now.
	auto log_buffer  = string();
	auto last_update = time(nullptr);


        auto c=EOF;
        do
        {
		// get the next byte from the pipe. this may block.
                auto fread_res=fread(&c,1,1,pipe);
                if(fread_res != 1) break;

		// record the byte in a buffer.
                log_buffer += (char)c ;

		// check if it's time to update the buffer.
		const auto log_buff_big          = log_buffer.size() > 1024;
	        const auto too_long              = difftime(time(nullptr), last_update) > 2;
	        const auto non_zero_buf          = log_buffer.size() > 0;
		const auto too_long_and_non_zero = too_long && non_zero_buf;
		if(log_buff_big || too_long_and_non_zero)
		{
			// yup!
			
			// append the new log bytes to the db entry
			prepare();
			txn->prepared("update_log")(log_buffer)(log_id).exec();
			txn->commit();
			txn = make_unique<pqxx::work>(conn);

			// reset for next bit of buffer.
			last_update=time(nullptr);
			log_buffer=""; 
		}

        } while ( c != EOF );

	// put end of log buffer in place.
	if(log_buffer!="")
	{
		prepare();
		txn->prepared("update_log")(log_buffer)(log_id).exec();
		txn->commit();
		txn = make_unique<pqxx::work>(conn);
	}

	// now deal with return code
        const auto returnCode = pclose(pipe);
	if(returnCode >= 2)
	{	
		// log the failure.
		prepare();
		txn->prepared("update_log")("Zafl failed with exit_code="+to_string(returnCode)+"!\n")(log_id).exec();
		txn->prepared("mark_err"  )(work.getVersionID()).exec();
		txn->commit();
		txn = make_unique<pqxx::work>(conn);
		return;
	}
	else
	{
		// log success.
		prepare();
		txn->prepared("update_log")("Zafl seems to have worked!\n")(log_id).exec();
		txn->commit();
		txn = make_unique<pqxx::work>(conn);
	}

	ifstream fin(output_name, ios::binary);
	ostringstream ostrm;
	ostrm << fin.rdbuf();
	const auto zafl_contents = ostrm.str();
	const auto pqxx_zafl_contents = pqxx::binarystring(zafl_contents.c_str(), zafl_contents.size());
	txn->prepared("upload_zafl")(pqxx_zafl_contents)(work.getVersionID()).exec(); 
	cout << "Zafl'd verison is " << zafl_contents.size() << " bytes " << endl;
	txn->commit();


	remove(zafl_input_tmpname);
	remove(output_name.c_str());

        return;
}

void do_work(const WorkUnit_t& work)
{
	cout << "Doing work for job " << work.getVersionID() << endl;
	pqxx::connection conn{db_connect_options.c_str()};
	pqxx::work txn{conn};
	conn.prepare ("insert_log", "insert into zafl_logs (ver_id) values ($1) returning log_id");
	const auto log_ids = txn.prepared("insert_log")(work.getVersionID()).exec();

	assert(log_ids.size() == 1);
	const auto log_id = log_ids[0]["log_id"].as<int>();

	txn.commit();
	do_zafl(conn,log_id,work);

}

WorkUnit_t get_work()
{
	cout << "Looking for work" << endl;
	const auto db_connect_options = string("dbname=" TURBODB " host=localhost");
	pqxx::connection conn{db_connect_options.c_str()};
	pqxx::work txn{conn};

	conn.prepare ("get_newest", "SELECT ver_id,orig_binary FROM ONLY boost_versions where start_time is NULL order by submit_time limit 1 ");
	const auto check_res = txn.prepared("get_newest").exec();

	// no work to do.
	if(check_res.size() == 0)
		return {};

	const auto ver_id               = check_res[0]["ver_id"     ].as<int   >();
	const auto orig_binary_contents = pqxx::binarystring(check_res[0]["orig_binary"]).str();


	cout << "Updating starting time " << endl;
	conn.prepare ("update_start", "update boost_versions set start_time=NOW() where ver_id = $1 ");
	txn .prepared("update_start")(ver_id).exec();
	txn.commit();

	return {ver_id,orig_binary_contents};
}


void main_loop()
{
	while(1)
	{
		try
		{
			const auto work = get_work();
			if(work.isValid())
				do_work(work);
			else
				sleep(5);
		}
		catch(const exception& e)
		{
			cerr << "Caution, exception occured when trying to do work." << endl;
			cerr << e.what() << endl;
			cerr << "Sleeping for 5 seconds to wait for db to stabilize." << endl;
			// conintue -- try next work item
		}
	}
}

int main(const int argc, const char* argv[])
{
	main_loop();
}