00001
00007
00008
00009
00010 #include <stdio.h>
00011 #include <time.h>
00012 #include <string.h>
00013 #include <sys/types.h>
00014 #include <sys/stat.h>
00015 #include <sys/wait.h>
00016 #include <fcntl.h>
00017 #include <errno.h>
00018 #include <unistd.h>
00019 #include <sys/param.h>
00020
00021 #include "utility.h"
00022 #include "problem.h"
00023 #include "comm_protocol.h"
00024 #include "comm_basics.h"
00025 #include "comm_data.h"
00026 #include "comm_encode.h"
00027 #include "gs_pm_model.h"
00028
00029
00030
00031
00032
00033
00034
00035 #define REQUEST_ID_LEN 64
00036 #define REQUEST_ID_TEMPLATE "gsrequest_%s_%d_XXXXXXXXXXXX"
00037
00038
00039
00040
00041 typedef struct {
00042 gs_problem_t *problem;
00043 char *gridsolve_root;
00044 char *gridsolve_arch;
00045 int blocking;
00046 int sock;
00047 int tag;
00048 int my_dsig;
00049 char *srv_cid;
00050 char srv_job_count[FN_LEN];
00051 char *cwd;
00052 char request_id[REQUEST_ID_LEN];
00053 char *agent;
00054 in_port_t agentport;
00055 char *cli_username;
00056 char *cli_hostname;
00057 char *cli_cid;
00058 int client_dsig;
00059 char *problem_name;
00060 gs_service_error_enum_t err;
00061 char *bmode;
00062 char *infodir;
00063 int agent_taskid;
00064 double agent_est_time;
00065 } gs_service_info_t;
00066
00067 int
00068 gs_read_server_from_file(char *, gs_server_t *),
00069 gs_service_read_coeff(gs_service_info_t *, gs_server_t *),
00070 gs_problem_service(gs_problem_t *),
00071 gs_service_blocking_request(gs_service_info_t *),
00072 gs_service_nonblocking_request(gs_service_info_t *),
00073 gs_service_batch_request(gs_service_info_t *);
00074
00075 void
00076 gs_dummy_signal_handler(int),
00077 gs_dummy_signal_handler(int),
00078 gs_service_sigterm_handler(int);
00079
00080 double
00081 gs_read_service_et(char *),
00082 gs_pm_problem_service(gs_service_info_t *),
00083 gs_agent_get_server_score(gs_problem_t *, gs_server_t *);
00084
00085
00086
00087
00088
00089
00090 pid_t gs_service_pid = 0;
00091
00136 int
00137 service_template(int argc, char *argv[])
00138 {
00139 gs_service_info_t sinfo;
00140 gs_server_t *server;
00141 char *service_xml;
00142 double est_time;
00143 char *cwd;
00144
00145 sinfo.err = GS_SVC_ERR_UNSPECIFIED;
00146
00147 if(argc != 17) {
00148 fprintf(stderr, "Bad usage. Anyway, don't use this\n");
00149 fprintf(stderr, "from the command line.\n");
00150 exit(-1);
00151 }
00152
00153 sinfo.problem_name = strdup(argv[1]);
00154 sinfo.tag = atoi(argv[2]);
00155 sinfo.client_dsig = atoi(argv[3]);
00156 sinfo.sock = atoi(argv[4]);
00157 sinfo.gridsolve_root = argv[5];
00158 sinfo.gridsolve_arch = argv[6];
00159 sinfo.blocking = atoi(argv[7]);
00160 sinfo.agent = strdup(argv[8]);
00161 sinfo.agentport = atoi(argv[9]);
00162 sinfo.srv_cid = strdup(argv[10]);
00163 sinfo.cli_username = strdup(argv[11]);
00164 sinfo.cli_hostname = strdup(argv[12]);
00165 sinfo.cli_cid = strdup(argv[13]);
00166 sinfo.infodir = strdup(argv[14]);
00167 sinfo.agent_taskid = atoi(argv[15]);
00168 sinfo.agent_est_time = atof(argv[16]);
00169
00170 server = (gs_server_t *) malloc(sizeof(gs_server_t));
00171
00172 if(!server) {
00173 gs_send_tag(sinfo.sock, GS_SVC_ERR_MALLOC);
00174 exit(-1);
00175 }
00176
00177 if(gs_service_read_coeff(&sinfo, server) < 0) {
00178 free(server);
00179 server = NULL;
00180 }
00181
00182 service_xml = dstring_sprintf("%s/service/%s/%s.xml", sinfo.gridsolve_root,
00183 sinfo.problem_name, sinfo.problem_name);
00184
00185 if(!service_xml) {
00186 gs_send_tag(sinfo.sock, GS_SVC_ERR_MALLOC);
00187 exit(-1);
00188 }
00189
00190 sinfo.problem = (gs_problem_t *) malloc(sizeof(gs_problem_t));
00191
00192 if(!sinfo.problem) {
00193 gs_send_tag(sinfo.sock, GS_SVC_ERR_MALLOC);
00194 exit(-1);
00195 }
00196
00197 snprintf(sinfo.srv_job_count, FN_LEN, "%s/%s.%d", sinfo.infodir,
00198 GS_SERVER_JOB_COUNT_FILE_PREFIX, getppid());
00199
00200
00201 if(gs_read_problem_from_file(service_xml, sinfo.problem) < 0) {
00202 ERRPRINTF("Error loading service: '%s'.\n", service_xml);
00203 gs_send_tag(sinfo.sock, GS_SVC_ERR_MISSING_XML);
00204 exit(-1);
00205 }
00206 else {
00207 sinfo.my_dsig = pvmgetdsig();
00208
00209 cwd = CALLOC(MAXPATHLEN, sizeof(char));
00210 if (cwd == NULL) exit(-1);
00211 if (getcwd(cwd, MAXPATHLEN) == NULL) exit(-1);
00212 sinfo.cwd = strdup(cwd);
00213 FREE(cwd);
00214
00215 if (!sinfo.cwd) {
00216 ERRPRINTF("Can't get current working directory.\n");
00217 gs_send_tag(sinfo.sock, GS_SVC_ERR_GETCWD);
00218 exit(-1);
00219 }
00220
00221 sprintf(sinfo.request_id, REQUEST_ID_TEMPLATE, sinfo.srv_cid,
00222 (int) getpid());
00223
00224 if(gs_create_request_id(sinfo.request_id) < 0) {
00225 ERRPRINTF("Error creating request id.\n");
00226 gs_send_tag(sinfo.sock, GS_SVC_ERR_REQID);
00227 exit(-1);
00228 }
00229
00230 if(mkdir(sinfo.request_id, 0700) < 0) {
00231 ERRPRINTF("Could not create directory '%s' ", sinfo.request_id);
00232 ERRPRINTF("to store output (cwd = '%s')\n", sinfo.cwd);
00233 gs_send_tag(sinfo.sock, GS_SVC_ERR_MKDIR);
00234 exit(-1);
00235 }
00236
00237 if(chdir(sinfo.request_id) < 0) {
00238 ERRPRINTF("Could not cd to request directory '%s'.\n", sinfo.request_id);
00239 gs_send_tag(sinfo.sock, GS_SVC_ERR_CHDIR);
00240 exit(-1);
00241 }
00242
00243 if(gs_increment_job_count(sinfo.srv_job_count) < 0)
00244 ERRPRINTF("Warning: failed to increment job count.\n");
00245
00246 if(gs_send_tag(sinfo.sock, GS_PROT_OK) < 0) {
00247 ERRPRINTF("Error sending GS_PROT_OK.\n");
00248 goto service_abnormal_exit;
00249 }
00250
00251 if(gs_send_string(sinfo.sock, sinfo.request_id) < 0) {
00252 ERRPRINTF("Error sending request id.\n");
00253 goto service_abnormal_exit;
00254 }
00255
00256
00257
00258
00259 if(sinfo.tag == GS_PROT_PROBLEM_SOLVE_ASSIGNED) {
00260 char *problemstring = NULL;
00261 char dsig_string[256];
00262
00263 sprintf(dsig_string, "%d", sinfo.my_dsig);
00264
00265 if(gs_send_string(sinfo.sock, dsig_string) < 0) {
00266 ERRPRINTF("Error sending server data signature.\n");
00267 goto service_abnormal_exit;
00268 }
00269
00270 if(gs_encode_problem(&problemstring, sinfo.problem) < 0) {
00271 ERRPRINTF("Error encoding problem description.\n");
00272 goto service_abnormal_exit;
00273 }
00274
00275 if(gs_send_string(sinfo.sock, problemstring) < 0) {
00276 ERRPRINTF("Error sending problem description.\n");
00277 goto service_abnormal_exit;
00278 }
00279 }
00280
00281
00282 #ifdef GS_SMART_GRIDSOLVE
00283 if(gs_recv_int(sinfo.sock, &sinfo.problem->has_smart_arg_comm) < 0) {
00284 ERRPRINTF("Error sending problem description.\n");
00285 goto service_abnormal_exit;
00286 }
00287
00288 if(sinfo.problem->has_smart_arg_comm==1){
00289 if(gs_smart_recv_map_info(sinfo.sock, sinfo.problem)<0){
00290 ERRPRINTF("Error receiving remote comm info.\n");
00291 goto service_abnormal_exit;
00292 }
00293 }
00294
00295 if(sinfo.problem->has_smart_arg_comm==1){
00296 if(gs_smart_recv_input_args(sinfo.sock, server, sinfo.problem, sinfo.client_dsig, sinfo.my_dsig)<0){
00297 ERRPRINTF("SMART: Error receiving smart input args.\n");
00298 goto service_abnormal_exit;
00299 }
00300 }
00301 else{
00302 if(gs_recv_input_args(sinfo.sock, sinfo.problem, sinfo.client_dsig, sinfo.my_dsig) < 0) {
00303 ERRPRINTF("Error receiving input args.\n");
00304 goto service_abnormal_exit;
00305 }
00306
00307 }
00308
00309 #else
00310
00311
00312
00313 if(gs_recv_input_args(sinfo.sock, sinfo.problem, sinfo.client_dsig, sinfo.my_dsig) < 0) {
00314 ERRPRINTF("Error receiving input args.\n");
00315 goto service_abnormal_exit;
00316 }
00317 #endif
00318
00319 if(server)
00320 est_time = gs_agent_get_server_score(sinfo.problem, server);
00321 else
00322 est_time = 2000.0;
00323
00324 if(gs_notify_agent_problem_solve(sinfo.agent, sinfo.agentport,
00325 sinfo.problem_name, est_time, sinfo.srv_cid, sinfo.cli_username,
00326 sinfo.cli_hostname, sinfo.cli_cid, sinfo.request_id,
00327 sinfo.agent_taskid, sinfo.agent_est_time) < 0)
00328 ERRPRINTF("Warning: failed sending problem solve notification.\n");
00329
00330 sinfo.bmode = gs_problem_getinfo(sinfo.problem, "BATCH_SUBMIT", NULL);
00331
00332 if(sinfo.bmode) {
00333 if(gs_service_batch_request(&sinfo) < 0) {
00334 gs_send_tag(sinfo.sock, sinfo.err);
00335 goto service_abnormal_exit;
00336 }
00337 }
00338 else if(sinfo.blocking) {
00339 if(gs_service_blocking_request(&sinfo) < 0)
00340 goto service_abnormal_exit;
00341 }
00342 else {
00343 if(gs_service_nonblocking_request(&sinfo) < 0)
00344 goto service_abnormal_exit;
00345 }
00346 }
00347
00348 gs_close_socket(sinfo.sock);
00349 exit(0);
00350
00351 service_abnormal_exit:
00352
00353
00354
00355
00356 if(!sinfo.blocking || sinfo.bmode) {
00357 if(gs_decrement_job_count(sinfo.srv_job_count) < 0)
00358 ERRPRINTF("Warning: failed to decrement job count.\n");
00359
00360 if(gs_create_error_file(".", sinfo.err) < 0)
00361 ERRPRINTF("Could not create 'error' file.\n");
00362 }
00363
00364
00365
00366
00367 if(gs_create_timestamp_file(".", "cancelled", 0.0) < 0)
00368 ERRPRINTF("Could not create 'cancelled' file.\n");
00369 ERRPRINTF("Service terminating abnormally\n");
00370 exit(-1);
00371 }
00372
00382 void
00383 gs_dummy_signal_handler(int sig)
00384 {
00385 return;
00386 }
00387
00396 void
00397 gs_service_sigterm_handler(int sig)
00398 {
00399 if(gs_service_pid > 0) {
00400 if(kill(gs_service_pid, sig) < 0)
00401 ERRPRINTF("Failed to kill service process [pid = %d]\n", gs_service_pid);
00402 }
00403
00404 return;
00405 }
00406
00416 void
00417 gs_batch_service_sigterm_handler(int sig)
00418 {
00419 if(gs_service_pid > 0) {
00420 if(kill(gs_service_pid, sig) < 0)
00421 ERRPRINTF("Failed to kill batch service process [pid = %d]\n", gs_service_pid);
00422 }
00423
00424 return;
00425 }
00426
00437 int
00438 gs_service_read_coeff(gs_service_info_t *s, gs_server_t *server)
00439 {
00440 char *server_xml, *service_coeff, *service_model;
00441 int mfd;
00442
00443 server_xml = dstring_sprintf(GS_SERVER_XML_TEMPLATE, s->infodir);
00444
00445 if(!server_xml) {
00446 s->err = GS_SVC_ERR_MALLOC;
00447 return -1;
00448 }
00449
00450 if(gs_read_server_from_file(server_xml, server) < 0) {
00451 s->err = GS_SVC_ERR_MISSING_SV_XML;
00452 return -1;
00453 }
00454
00455 server->workload = gs_get_workload();
00456
00457 service_model = dstring_sprintf("%s/%s.mdl", s->infodir, s->problem_name);
00458
00459 if(!service_model) {
00460 s->err = GS_SVC_ERR_MALLOC;
00461 return -1;
00462 }
00463
00464 service_coeff = dstring_sprintf("%s/%s.coe", s->infodir, s->problem_name);
00465
00466 if(!service_coeff) {
00467 s->err = GS_SVC_ERR_MALLOC;
00468 return -1;
00469 }
00470
00471 mfd = open(service_model, O_RDONLY, 0600);
00472
00473 if(mfd < 0) {
00474 server->perf_expr = strdup(GS_NO_MODEL_UPDATE);
00475 return 0;
00476 }
00477
00478
00479
00480
00481
00482
00483
00484 if(gs_lock_fd(mfd, F_RDLCK) < 0) {
00485 close(mfd);
00486 server->perf_expr = strdup(GS_NO_MODEL_UPDATE);
00487 return 0;
00488 }
00489
00490 if(gs_get_contents_of_file(service_coeff, &(server->perf_expr)) < 0) {
00491 ERRPRINTF("Warning: failed to read coefficient file '%s'\n", service_coeff);
00492 gs_unlock_fd(mfd);
00493 close(mfd);
00494 server->perf_expr = strdup(GS_NO_MODEL_UPDATE);
00495 return 0;
00496 }
00497
00498 if(server->perf_expr[strlen(server->perf_expr)-1] == '\n')
00499 server->perf_expr[strlen(server->perf_expr)-1] = 0;
00500
00501 gs_unlock_fd(mfd);
00502 close(mfd);
00503
00504 return 0;
00505 }
00506
00515 int
00516 gs_service_blocking_request(gs_service_info_t *s)
00517 {
00518 double service_et;
00519
00520 service_et = gs_pm_problem_service(s);
00521
00522 if(gs_decrement_job_count(s->srv_job_count) < 0)
00523 ERRPRINTF("Warning: failed to decrement job count.\n");
00524
00525
00526
00527
00528 if((chdir(s->cwd) < 0) || (chdir(s->request_id) < 0)) {
00529 char *origcwd, *newcwd;
00530
00531 ERRPRINTF("Could not cd back to request directory '%s/%s'.\n",
00532 s->cwd, s->request_id);
00533 gs_send_tag(s->sock, GS_SVC_ERR_CHDIR);
00534
00535
00536
00537
00538
00539
00540 origcwd = dstring_sprintf("%s/%s", s->cwd, s->request_id);
00541 newcwd = getcwd(NULL, MAXPATHLEN);
00542
00543 if(!strcmp(newcwd, origcwd))
00544 return -1;
00545
00546 exit(-1);
00547 }
00548
00549 if(gs_send_tag(s->sock, GS_PROT_OK) < 0) {
00550 ERRPRINTF("Error sending tag.\n");
00551 return -1;
00552 }
00553
00554
00555
00556 #ifdef GS_SMART_GRIDSOLVE
00557 int pid;
00558 if(s->problem->has_smart_arg_comm==1){
00559 if(gs_smart_send_output_args_to_client(s->sock ,s->problem, s->my_dsig)<0){
00560 ERRPRINTF("SMART : Error sending smart sending arguments\n");
00561 return -1;
00562 }
00563 pid=fork();
00564 if(pid==-1){
00565 ERRPRINTF("SMART: Out of memory could not fork\n");
00566 return -1;
00567 }
00568
00569 if(pid==0){
00570 gs_server_t * src_server = (gs_server_t *)calloc(1,sizeof(gs_server_t));
00571 if(gs_service_read_coeff(s, src_server) < 0) {
00572 free(src_server);
00573 src_server = NULL;
00574 }
00575
00576 if(gs_smart_send_output_args_remotely(s->sock ,src_server, s->problem, s->my_dsig)<0){
00577 ERRPRINTF("SMART : Error sending smart sending arguments\n");
00578 return -1;
00579 }
00580 _exit(0);
00581 }
00582 }
00583 else{
00584 if(gs_send_output_args(s->sock, s->problem, s->my_dsig) < 0) {
00585 ERRPRINTF("Error sending output args.\n");
00586 return -1;
00587 }
00588 }
00589 #else
00590 if(gs_send_output_args(s->sock, s->problem, s->my_dsig) < 0) {
00591 ERRPRINTF("Error sending output args.\n");
00592 return -1;
00593 }
00594 #endif
00595
00596
00597
00598 if(gs_notify_agent_problem_complete(s->agent, s->agentport, s->problem_name,
00599 s->srv_cid, s->cli_username, s->cli_hostname, s->cli_cid, s->request_id,
00600 s->agent_taskid, service_et) < 0)
00601 ERRPRINTF("Warning: failed sending problem solve notification.\n");
00602
00603 if(gs_create_timestamp_file(".", "retrieved", 0.0))
00604 ERRPRINTF("Warning: failed to create 'retrieved' file.\n");
00605
00606 return 0;
00607 }
00608
00617 int
00618 gs_service_nonblocking_request(gs_service_info_t *s)
00619 {
00620 char *problemstr = NULL;
00621 FILE *xmlfile;
00622 double service_et;
00623 pid_t pid;
00624 int fd;
00625
00626 gs_service_pid = 0;
00627
00628
00629
00630
00631
00632
00633 if((gs_signal(SIGCHLD, gs_dummy_signal_handler) == SIG_ERR) ||
00634 (gs_signal(SIGTERM, gs_service_sigterm_handler) == SIG_ERR)) {
00635 ERRPRINTF("Error: could not set signal handlers\n");
00636 s->err = GS_SVC_ERR_SIGNALS;
00637 return -1;
00638 }
00639
00640
00641
00642 pid = fork();
00643
00644 if(pid == -1) {
00645 ERRPRINTF("Failed to fork\n");
00646 s->err = GS_SVC_ERR_FORK;
00647 return -1;
00648 }
00649
00650 if(pid == 0) {
00651
00652
00653 setbuf(stdout, NULL);
00654 setbuf(stderr, NULL);
00655
00656 service_et = gs_pm_problem_service(s);
00657
00658 if((chdir(s->cwd) < 0) || (chdir(s->request_id) < 0)) {
00659 ERRPRINTF("Could not cd back to request directory '%s/%s'.\n",
00660 s->cwd, s->request_id);
00661 _exit(GS_SVC_ERR_CHDIR);
00662 }
00663
00664 xmlfile = fopen("problem.xml", "w");
00665
00666 if(!xmlfile) {
00667 ERRPRINTF("Could not create xml file.\n");
00668 _exit(GS_SVC_ERR_CREATE_XML);
00669 }
00670
00671 if(gs_encode_problem(&problemstr, s->problem) < 0) {
00672 ERRPRINTF("Could not encode problem.\n");
00673 _exit(GS_SVC_ERR_PROBLEM_ENC);
00674 }
00675
00676 fprintf(xmlfile, "%s\n", problemstr);
00677
00678 fclose(xmlfile);
00679
00680 fd = open("data", O_WRONLY | O_CREAT, 0600);
00681
00682 if(fd < 0) {
00683 ERRPRINTF("Could not create data file.\n");
00684 _exit(GS_SVC_ERR_CREAT_DATA_FILE);
00685 }
00686
00687
00688 #ifdef GS_SMART_GRIDSOLVE
00689 if(s->problem->has_smart_arg_comm==1){
00690 gs_server_t * src_server = (gs_server_t *)calloc(1,sizeof(gs_server_t));
00691 if(gs_service_read_coeff(s, src_server) < 0) {
00692 free(src_server);
00693 src_server = NULL;
00694 }
00695 if(gs_smart_save_output_args_to_file(s->sock, src_server, fd, s->problem, s->my_dsig) < 0) {
00696 ERRPRINTF("Error sending output args.\n");
00697 _exit(GS_SVC_ERR_IO);
00698 }
00699
00700 }
00701 else{
00702 if(gs_save_output_args_to_file(fd, s->problem, s->my_dsig) < 0) {
00703 ERRPRINTF("Error sending output args.\n");
00704 _exit(GS_SVC_ERR_IO);
00705 }
00706 }
00707
00708 #else
00709 if(gs_save_output_args_to_file(fd, s->problem, s->my_dsig) < 0) {
00710 ERRPRINTF("Error sending output args.\n");
00711 _exit(GS_SVC_ERR_IO);
00712 }
00713
00714 #endif
00715
00716 close(fd);
00717
00718 if(gs_create_timestamp_file(".", "done", service_et) < 0) {
00719 ERRPRINTF("Could not create completion file.\n");
00720 _exit(GS_SVC_ERR_COMPLETION_FILE);
00721 }
00722
00723 _exit(0);
00724 }
00725 else {
00726 pid_t child;
00727 int cstat_loc, status;
00728
00729 gs_service_pid = pid;
00730
00731
00732
00733
00734
00735 child = waitpid(pid, &cstat_loc, 0);
00736
00737 if(child < 0) {
00738 ERRPRINTF("Error waiting for service process %d.\n", (int)pid);
00739 s->err = GS_SVC_ERR_WAITPID;
00740 return -1;
00741 }
00742
00743 if(WIFEXITED(cstat_loc) == 0) {
00744 ERRPRINTF("service process %d did not terminate.\n", (int)pid);
00745 s->err = GS_SVC_ERR_ABNORMAL_EXIT;
00746 return -1;
00747 }
00748
00749 status = WEXITSTATUS(cstat_loc);
00750
00751 if(status != 0) {
00752 ERRPRINTF("service process %d terminated abnormally (status %d).\n",
00753 (int)pid, (char)status);
00754 s->err = (char)status > 0 ? (char)status : GS_SVC_ERR_UNSPECIFIED;
00755 return -1;
00756 }
00757
00758 service_et = gs_read_service_et("done");
00759
00760 if(gs_decrement_job_count(s->srv_job_count) < 0)
00761 ERRPRINTF("Warning: failed to decrement job count.\n");
00762
00763 if(gs_notify_agent_problem_complete(s->agent, s->agentport, s->problem_name,
00764 s->srv_cid, s->cli_username, s->cli_hostname, s->cli_cid, s->request_id,
00765 s->agent_taskid, service_et) < 0)
00766 ERRPRINTF("Warning: failed sending problem solve notification.\n");
00767 }
00768
00769 return 0;
00770 }
00771
00781 double
00782 gs_read_service_et(char *file)
00783 {
00784 double service_et;
00785 FILE *dfile;
00786
00787 service_et = 0.0;
00788 dfile = fopen(file, "r");
00789 if(dfile) {
00790 char buf[128];
00791 fgets(buf, 128, dfile);
00792 if(fgets(buf, 128, dfile))
00793 service_et = atof(buf);
00794
00795 fclose(dfile);
00796 }
00797
00798 return service_et;
00799 }
00800
00807 int
00808 gs_exec_batch_service(gs_service_info_t *s)
00809 {
00810 int status;
00811 char *cmd, *orig_exe, *new_exe;
00812
00813 unlink("gs_batch_id");
00814
00815 #ifdef __CYGWIN__
00816 cmd = dstring_sprintf("%s/service/%s/gs_submit %s/service/%s/%s_batch_service > gs_batch_id",
00817 s->gridsolve_root, s->problem_name, s->gridsolve_root, s->problem_name, s->problem_name);
00818 #else
00819 new_exe = dstring_sprintf("%s/%s/%s_batch_service", s->cwd,
00820 s->request_id, s->problem_name);
00821 orig_exe = dstring_sprintf("%s/service/%s/%s_batch_service",
00822 s->gridsolve_root, s->problem_name, s->problem_name);
00823
00824 if(symlink(orig_exe, new_exe) < 0) {
00825 ERRPRINTF("failed to create symlink (%s -> %s)\n",
00826 new_exe, orig_exe);
00827 return -1;
00828 }
00829 cmd = dstring_sprintf("%s/service/%s/gs_submit %s > gs_batch_id",
00830 s->gridsolve_root, s->problem_name, new_exe);
00831 #endif
00832
00833 if(!cmd) {
00834 ERRPRINTF("failed to create command string\n");
00835 return -1;
00836 }
00837
00838 DBGPRINTF("cmd: %s\n", cmd);
00839 status = system(cmd);
00840
00841 if((status < 0) || (WEXITSTATUS(status) != 0)) {
00842 ERRPRINTF("command failed: '%s'\n", cmd);
00843 return -1;
00844 }
00845
00846 return 0;
00847 }
00848
00853 int
00854 gs_wait_for_batch_job_completion(gs_service_info_t *s)
00855 {
00856 char buf[256], *cmd;
00857 int status;
00858 FILE *f;
00859
00860
00861
00862
00863 if((chdir(s->cwd) < 0) || (chdir(s->request_id) < 0)) {
00864 ERRPRINTF("can't cd back to %s/%s\n", s->cwd, s->request_id);
00865 return -1;
00866 }
00867
00868 if((f = fopen("gs_batch_id", "r")) == NULL) {
00869 ERRPRINTF("failed to open file gs_batch_id\n");
00870 return -1;
00871 }
00872
00873 if(!fgets(buf, 256, f)) {
00874 ERRPRINTF("failed to read ID from file gs_batch_id\n");
00875 return -1;
00876 }
00877
00878 fclose(f);
00879
00880 buf[strlen(buf)-1] = '\0';
00881
00882 cmd = dstring_sprintf("%s/service/%s/gs_probe %s", s->gridsolve_root, s->problem_name, buf);
00883
00884 if(!cmd) {
00885 ERRPRINTF("malloc");
00886 return -1;
00887 }
00888
00889 for(;;) {
00890 status = system(cmd);
00891
00892 if(status < 0) {
00893 ERRPRINTF("command failed: '%s'\n", cmd);
00894 return -1;
00895 }
00896
00897 if(WEXITSTATUS(status) != 0)
00898 break;
00899
00900 sleep(5);
00901 }
00902
00903 return 0;
00904 }
00905
00914 int
00915 gs_service_batch_request(gs_service_info_t *s)
00916 {
00917 char *problemstr = NULL;
00918 double service_et;
00919 FILE *xmlfile;
00920 pid_t pid;
00921
00922 gs_service_pid = 0;
00923
00924 if((gs_signal(SIGCHLD, gs_dummy_signal_handler) == SIG_ERR) ||
00925 (gs_signal(SIGTERM, gs_batch_service_sigterm_handler) == SIG_ERR)) {
00926 ERRPRINTF("Error: could not ignore SIGCHLD\n");
00927 s->err = GS_SVC_ERR_SIGNALS;
00928 return -1;
00929 }
00930
00931
00932 xmlfile = fopen(GS_BATCH_XML, "w");
00933
00934 if(!xmlfile) {
00935 ERRPRINTF("Could not create xml file.\n");
00936 s->err = GS_SVC_ERR_CREATE_XML;
00937 return -1;
00938 }
00939
00940 if(gs_encode_problem(&problemstr, s->problem) < 0) {
00941 ERRPRINTF("Could not encode problem.\n");
00942 s->err = GS_SVC_ERR_PROBLEM_ENC;
00943 return -1;
00944 }
00945
00946 fprintf(xmlfile, "%s\n", problemstr);
00947
00948 fclose(xmlfile);
00949
00950
00951
00952 if(gs_save_input_args_to_file("input", s->problem, s->my_dsig, GS_CALL_FROM_C,
00953 s->problem->major) < 0) {
00954 ERRPRINTF("Error saving input args.\n");
00955 s->err = GS_SVC_ERR_CREAT_DATA_FILE;
00956 return -1;
00957 }
00958
00959
00960 pid = fork();
00961
00962 if(pid == -1) {
00963 ERRPRINTF("Failed to fork\n");
00964 s->err = GS_SVC_ERR_FORK;
00965 return -1;
00966 }
00967
00968 if(pid == 0) {
00969 if(gs_exec_batch_service(s) < 0)
00970 _exit(s->err);
00971
00972 _exit(0);
00973 }
00974 else {
00975 int cstat_loc, status;
00976 pid_t child;
00977
00978 gs_service_pid = pid;
00979
00980
00981
00982 child = waitpid(pid, &cstat_loc, 0);
00983
00984 if(child < 0) {
00985 ERRPRINTF("Error waiting for batch service process %d.\n", (int)pid);
00986 s->err = GS_SVC_ERR_WAITPID;
00987 return -1;
00988 }
00989
00990 if(WIFEXITED(cstat_loc) == 0) {
00991 ERRPRINTF("batch service process %d did not terminate.\n", (int)pid);
00992 s->err = GS_SVC_ERR_ABNORMAL_EXIT;
00993 return -1;
00994 }
00995
00996 status = WEXITSTATUS(cstat_loc);
00997
00998 if(status != 0) {
00999 ERRPRINTF("batch service process %d terminated abnormally (status %d).\n",
01000 (int)pid, (char)status);
01001 s->err = (char)status > 0 ? (char)status : GS_SVC_ERR_UNSPECIFIED;
01002 return -1;
01003 }
01004
01005 if(gs_wait_for_batch_job_completion(s) < 0) {
01006 ERRPRINTF("Failed to wait for job completion.\n");
01007 s->err = GS_SVC_ERR_WAITPID;
01008 return -1;
01009 }
01010
01011 if(s->blocking) {
01012 char filename[5];
01013 int fd;
01014
01015 sprintf(filename, "data");
01016 if((fd = open(filename, O_RDONLY)) == -1) {
01017 ERRPRINTF("failed to open output data\n");
01018 s->err = GS_SVC_ERR_OPEN_DATA_FILE;
01019 return -1;
01020 }
01021
01022 if(gs_restore_output_args_from_file(fd, s->problem, s->my_dsig) < 0) {
01023 ERRPRINTF("failed to restore output data from disk\n");
01024 close(fd);
01025 s->err = GS_SVC_ERR_RESTORE_ARGS;
01026 return -1;
01027 }
01028
01029 close(fd);
01030
01031 if(gs_send_tag(s->sock, GS_PROT_OK) < 0) {
01032 ERRPRINTF("Error sending tag.\n");
01033 s->err = GS_SVC_ERR_IO;
01034 return -1;
01035 }
01036
01037 if(gs_send_output_args(s->sock, s->problem, s->my_dsig) < 0) {
01038 ERRPRINTF("Error sending output args.\n");
01039 s->err = GS_SVC_ERR_IO;
01040 return -1;
01041 }
01042
01043 if(gs_create_timestamp_file(".", "retrieved", 0.0))
01044 ERRPRINTF("Warning: failed to create 'retrieved' file.\n");
01045 }
01046
01047 service_et = gs_read_service_et("done");
01048
01049 if(gs_decrement_job_count(s->srv_job_count) < 0)
01050 ERRPRINTF("Warning: failed to decrement job count.\n");
01051
01052 if(gs_notify_agent_problem_complete(s->agent, s->agentport, s->problem_name,
01053 s->srv_cid, s->cli_username, s->cli_hostname, s->cli_cid, s->request_id,
01054 s->agent_taskid, service_et) < 0)
01055 ERRPRINTF("Warning: failed sending problem solve notification.\n");
01056 }
01057
01058 return 0;
01059 }
01060
01061
01062 int
01063 gs_get_category_names(gs_pm_model_t *model, gs_problem_t *prob, char ***arr)
01064 {
01065 gs_argument_t *argptr;
01066 char **cat_names;
01067 int i;
01068
01069 cat_names = (char **)malloc(model->nb_categories * sizeof(char *));
01070
01071 if(!cat_names)
01072 return -1;
01073
01074 i = 0;
01075 for(argptr=prob->arglist; argptr != NULL; argptr=argptr->next) {
01076 if(argptr->arg_enum) {
01077 cat_names[i] = argptr->name;
01078 i++;
01079 }
01080 }
01081
01082 *arr = cat_names;
01083
01084 return 0;
01085 }
01086
01087 int
01088 gs_get_param_exprs(gs_pm_model_t *model, char *comp_model, char ***arr)
01089 {
01090 char *cm_copy, *cp, **pexp;
01091 int i;
01092
01093 cm_copy = strdup(comp_model);
01094 pexp = (char **)malloc(model->nb_params * sizeof(char *));
01095
01096 if(!cm_copy || !pexp) {
01097 if(cm_copy) free(cm_copy);
01098 if(pexp) free(pexp);
01099 return -1;
01100 }
01101
01102 cp = cm_copy;
01103 i = 0;
01104
01105 while(cp) {
01106 pexp[i] = cp;
01107 i++;
01108 cp = strchr(cp, ';');
01109
01110 if(cp) {
01111 *cp = 0;
01112 cp++;
01113 }
01114 }
01115
01116 *arr = pexp;
01117
01118 return 0;
01119 }
01120
01121 int
01122 gs_gen_expr(int i, int numrows, char **cat_names, char **param_expr,
01123 double **cat_mat, double **coef_mat, gs_pm_model_t *model, FILE *cf)
01124 {
01125 int j;
01126
01127 if(i == numrows) {
01128 fprintf(cf, "-1");
01129 return 0;
01130 }
01131
01132 fprintf(cf, "(");
01133 for(j=0;j<model->nb_categories;j++) {
01134 fprintf(cf, "(%s == %g)", cat_names[j], cat_mat[i][j]);
01135
01136 if(j<model->nb_categories-1)
01137 fprintf(cf, " && ");
01138 }
01139 fprintf(cf, ")");
01140
01141 fprintf(cf, "?");
01142
01143 fprintf(cf, "(");
01144 for(j=0;j<model->nb_params;j++) {
01145 fprintf(cf, " (%g * (%s)) ", coef_mat[i][j], param_expr[j]);
01146
01147 if(j<model->nb_params-1)
01148 fprintf(cf, " + ");
01149 }
01150 fprintf(cf, ")");
01151 fprintf(cf, ":");
01152 fprintf(cf, "(");
01153 gs_gen_expr(i+1, numrows, cat_names, param_expr, cat_mat, coef_mat, model, cf);
01154 fprintf(cf, ")");
01155
01156 return 0;
01157 }
01158
01159 int
01160 gs_generate_pm_expr(gs_pm_model_t *model, char *comp_model, gs_problem_t *prob, FILE *cf)
01161 {
01162 char **cat_names, **param_expr;
01163 double **cat_mat, **coef_mat;
01164 int numrows;
01165
01166 numrows = gs_pm_all_models(model, &cat_mat, &coef_mat);
01167
01168 if(gs_get_category_names(model, prob, &cat_names) < 0) {
01169 ERRPRINTF("Error getting category names\n");
01170 return -1;
01171 }
01172
01173 if(gs_get_param_exprs(model, comp_model, ¶m_expr) < 0) {
01174 ERRPRINTF("Error getting category names\n");
01175 if(cat_names)
01176 free(cat_names);
01177 return -1;
01178 }
01179
01180 if(numrows > 0) {
01181 gs_gen_expr(0, numrows, cat_names, param_expr, cat_mat, coef_mat, model, cf);
01182 fprintf(cf, "\n");
01183 }
01184
01185 free(cat_names);
01186 free(param_expr);
01187
01188 return 0;
01189 }
01190
01201 int
01202 gs_update_perf_model(gs_service_info_t *s, char *model_fname, char *coef_fname,
01203 double elapsed_time)
01204 {
01205 int i, new_model, num_expr, fd;
01206 char *comp_model, *cm_copy, *tok;
01207 gs_arg_enum_t *arg_enum = NULL;
01208 gs_argument_t *argptr;
01209 gs_pm_model_t *model;
01210 struct stat stbuf;
01211 icl_hash_t *symtab;
01212 FILE *coef_file;
01213 double j;
01214
01215 model = NULL;
01216
01217 comp_model = gs_problem_getinfo(s->problem, "COMPLEXITY_MODEL", NULL);
01218
01219 if(!comp_model)
01220 return 0;
01221
01222 new_model = stat(model_fname, &stbuf) < 0;
01223
01224 if((fd = gs_open_locked_file(model_fname, F_WRLCK, O_RDWR | O_CREAT)) < 0) {
01225 ERRPRINTF("Warning: failed to open perf model file '%s'.\n", model_fname);
01226 return -1;
01227 }
01228
01229 if(new_model) {
01230 int num_categories = 0;
01231
01232
01233
01234 num_expr = 1;
01235 for(i=0;i<strlen(comp_model);i++)
01236 if(comp_model[i] == ';')
01237 num_expr++;
01238
01239 for(argptr=s->problem->arglist; argptr != NULL; argptr=argptr->next)
01240 if(argptr->arg_enum)
01241 num_categories++;
01242
01243 model = gs_pm_init_model(num_categories, num_expr, GS_PM_MAX_RUNS);
01244 }
01245 else {
01246
01247
01248 model = gs_pm_load(fd);
01249 }
01250
01251 if(!model) {
01252 ERRPRINTF("Failed to intialize model\n");
01253 gs_unlock_fd(fd);
01254 close(fd);
01255 return -1;
01256 }
01257
01258 if(gs_construct_scalar_hashtable(&symtab, s->problem, GS_IN) < 0) {
01259 ERRPRINTF("Failed to construct hash table for scalars\n");
01260 gs_unlock_fd(fd);
01261 close(fd);
01262 return -1;
01263 }
01264
01265
01266 cm_copy = strdup(comp_model);
01267
01268 if(!cm_copy) {
01269 ERRPRINTF("strdup failed\n");
01270 icl_hash_destroy(symtab, NULL, NULL);
01271 gs_unlock_fd(fd);
01272 close(fd);
01273 return -1;
01274 }
01275
01276 for(i=0, tok=NULL; (tok = strtok(tok ? NULL : cm_copy, ";")); i++) {
01277 if(gs_expr_d(tok, &(model->params[i]), symtab) < 0)
01278 ERRPRINTF("Warning: failed to evaluate model expression '%s'\n", tok);
01279 }
01280
01281 i = 0;
01282 for(argptr=s->problem->arglist; argptr != NULL; argptr=argptr->next) {
01283
01284 j = 0.0;
01285
01286 if(argptr->arg_enum) {
01287 int found_enum_match = 0;
01288
01289 for(arg_enum=argptr->arg_enum; arg_enum != NULL; arg_enum=arg_enum->next) {
01290 if((strcmp(arg_enum->val, "other") == 0) ||
01291 ((argptr->datatype == GS_CHAR) && !strncmp(argptr->data, arg_enum->val, 1)) ||
01292 ((argptr->datatype != GS_CHAR) && (argptr->expr_val == atof(arg_enum->val))))
01293 {
01294 found_enum_match = 1;
01295 model->categories[i] = j;
01296 break;
01297 }
01298
01299 j += 1.0;
01300 }
01301
01302 if(!found_enum_match) {
01303 ERRPRINTF("No match in model for arg %s\n", argptr->name);
01304 icl_hash_destroy(symtab, NULL, NULL);
01305 gs_unlock_fd(fd);
01306 close(fd);
01307 return -1;
01308 }
01309
01310 i++;
01311 }
01312 }
01313
01314 gs_pm_store_timing(elapsed_time, model);
01315
01316 lseek(fd, 0, SEEK_SET);
01317
01318
01319 if(gs_pm_save(model, fd) < 0) {
01320 ERRPRINTF("Failed to save model to disk\n");
01321 icl_hash_destroy(symtab, NULL, NULL);
01322 gs_unlock_fd(fd);
01323 close(fd);
01324 return -1;
01325 }
01326
01327 coef_file = fopen(coef_fname, "w");
01328
01329 if(coef_file) {
01330 gs_generate_pm_expr(model, comp_model, s->problem, coef_file);
01331 fclose(coef_file);
01332 }
01333
01334 gs_pm_free_model(model);
01335 icl_hash_destroy(symtab, NULL, NULL);
01336 gs_unlock_fd(fd);
01337 close(fd);
01338
01339 return 0;
01340 }
01341
01349 double
01350 gs_pm_problem_service(gs_service_info_t *s)
01351 {
01352 double start_time, elapsed_time;
01353
01354 start_time = usertime();
01355 gs_problem_service(s->problem);
01356 elapsed_time = usertime() - start_time;
01357
01358 #ifdef GS_PM_DISABLE
01359 if(strcmp(s->infodir, "-") != 0) {
01360
01361
01362 if(elapsed_time > 0.0) {
01363 char *model_fname, *coef_fname;
01364
01365 model_fname = dstring_sprintf("%s/%s.mdl", s->infodir, s->problem->name);
01366 if(!model_fname)
01367 return -1.0;
01368
01369 coef_fname = dstring_sprintf("%s/%s.coe", s->infodir, s->problem->name);
01370 if(!coef_fname) {
01371 free(model_fname);
01372 return -1.0;
01373 }
01374
01375 gs_update_perf_model(s, model_fname, coef_fname, elapsed_time);
01376
01377 free(model_fname);
01378 free(coef_fname);
01379 }
01380 }
01381 #endif
01382
01383 return elapsed_time;
01384 }