00001
00008
00009
00010
00011
00012 #include "portability.h"
00013 #include "server.h"
00014 #include "general.h"
00015 #include "comm_data.h"
00016 #include "comm_basics.h"
00017 #include "comm_encode.h"
00018 #include "comm_protocol.h"
00019
00020 #include <errno.h>
00021
00035 int
00036 gs_server_free(gs_server_t *server)
00037 {
00038 if(!server) return -1;
00039
00040 if(server->hostname) free(server->hostname);
00041 if(server->arch) free(server->arch);
00042 gs_free_problem(server->problemlist);
00043 if(server->agenthost) free(server->agenthost);
00044 if(server->sa_list) gs_free_infolist(server->sa_list);
00045 if(server->restrictions) gs_free_restrictions(server->restrictions);
00046 if(server->perf_expr) free(server->perf_expr);
00047 free(server);
00048
00049 return 0;
00050 }
00051
00052
00064 int
00065 gs_do_ping(gs_server_t *srv, char *msg, int len, double *et)
00066 {
00067 double start_time, elapsed_time;
00068 int rv, tag, sock;
00069
00070 elapsed_time = 0.0;
00071 rv = -1;
00072
00073 sock = gs_connect_to_host(srv->componentid, srv->ipaddress, srv->port,
00074 srv->proxyip, srv->proxyport);
00075
00076 if(sock == INVALID_SOCKET) {
00077 ERRPRINTF("invalid socket\n");
00078 return -1;
00079 }
00080
00081 if(gs_send_tag(sock, GS_PROT_PING) < 0) {
00082 ERRPRINTF("failed to send tag\n");
00083 goto ping_err;
00084 }
00085
00086 if(gs_send_string(sock, VERSION) < 0) {
00087 ERRPRINTF("failed to send version string %s\n", VERSION);
00088 goto ping_err;
00089 }
00090
00091 if(gs_recv_tag(sock, &tag) < 0) {
00092 ERRPRINTF("failed to recv response tag\n");
00093 goto ping_err;
00094 }
00095
00096 if(tag != GS_PROT_OK) {
00097 ERRPRINTF("response tag != GS_PROT_OK\n");
00098 goto ping_err;
00099 }
00100
00101 if(gs_send_int(sock, len) < 0) {
00102 ERRPRINTF("failed to send length of data\n");
00103 goto ping_err;
00104 }
00105
00106 start_time = walltime();
00107
00108 if(gs_twrite(sock, msg, len) <= 0) {
00109 ERRPRINTF("error sending ping data\n");
00110 goto ping_err;
00111 }
00112
00113 if(gs_tread(sock, msg, len) <= 0) {
00114 ERRPRINTF("error recving ping data\n");
00115 goto ping_err;
00116 }
00117
00118 elapsed_time = walltime() - start_time;
00119
00120
00121 rv = 0;
00122
00123 ping_err:
00124
00125 *et = elapsed_time;
00126 gs_close_socket(sock);
00127
00128 return rv;
00129 }
00130
00131
00132
00141 int
00142 gs_free_restrictions(gs_restriction_t *rlist)
00143 {
00144 gs_restriction_t *rp, *tmp;
00145
00146 rp = rlist;
00147
00148 while(rp) {
00149 tmp = rp->next;
00150 free(rp);
00151 rp = tmp;
00152 }
00153
00154 return 0;
00155 }
00156
00157
00158
00159
00160 #ifndef WIN32
00161
00168 int
00169 gs_clean_up_old_temp_files(char *infodir)
00170 {
00171 char sqlite_db_pattern[FN_LEN], stats_file_pattern[FN_LEN],
00172 sensor_usock_pattern[FN_LEN], jobcount_pattern[FN_LEN],
00173 lock_file_pattern[FN_LEN], journal_pattern[FN_LEN], *prefix;
00174 int i, globflags = GLOB_NOCHECK | GLOB_NOSORT;
00175 glob_t *pglob;
00176
00177 pglob = (glob_t *) malloc(sizeof(glob_t));
00178
00179 if(!pglob) return -1;
00180
00181 prefix = infodir ? infodir : GS_INFODIR_PATH;
00182
00183 sprintf(sqlite_db_pattern, "%s/%s.[0-9]*", prefix, GRIDSOLVE_SQLITE_DB_PREFIX);
00184 sprintf(journal_pattern, "%s/%s.[0-9]*-journal", prefix, GRIDSOLVE_SQLITE_DB_PREFIX);
00185 sprintf(stats_file_pattern, "%s/%s.[0-9]*", prefix, GRIDSOLVE_STATS_FILE_PREFIX);
00186 sprintf(jobcount_pattern, "%s/%s.[0-9]*", prefix, GS_SERVER_JOB_COUNT_FILE_PREFIX);
00187 sprintf(sensor_usock_pattern, "%s/%s.[0-9]*", prefix, GRIDSOLVE_SENSOR_USOCK_PREFIX);
00188 sprintf(lock_file_pattern, "%s/%s.[0-9]*", prefix, GRIDSOLVE_SCHED_LOCK_FILE_PREFIX);
00189
00190 if(glob(sqlite_db_pattern, globflags, NULL, pglob) ||
00191 glob(journal_pattern, globflags | GLOB_APPEND, NULL, pglob) ||
00192 glob(stats_file_pattern, globflags | GLOB_APPEND, NULL, pglob) ||
00193 glob(lock_file_pattern, globflags | GLOB_APPEND, NULL, pglob) ||
00194 glob(jobcount_pattern, globflags | GLOB_APPEND, NULL, pglob) ||
00195 glob(sensor_usock_pattern, globflags | GLOB_APPEND, NULL, pglob))
00196 {
00197 globfree(pglob);
00198 free(pglob);
00199 return 0;
00200 }
00201
00202 for (i=0; i<pglob->gl_pathc; i++) {
00203 int pid_to_check, rv;
00204 struct stat stbuf;
00205
00206
00207 if(stat(pglob->gl_pathv[i], &stbuf) != 0)
00208 continue;
00209
00210 if((pid_to_check = gs_parse_pid_from_tempname(pglob->gl_pathv[i])) < 0)
00211 continue;
00212
00213 rv = kill(pid_to_check, 0);
00214
00215
00216
00217
00218
00219 if((rv == 0) || ((rv < 0) && (errno == EPERM)))
00220 continue;
00221
00222 LOGPRINTF("Removing old file '%s'\n", pglob->gl_pathv[i]);
00223 unlink(pglob->gl_pathv[i]);
00224 }
00225
00226 globfree(pglob);
00227 free(pglob);
00228
00229 return 0;
00230 }
00231
00244 int
00245 gs_parse_pid_from_requestid(char *reqid)
00246 {
00247 char *fcopy, *pidstr, *eptr;
00248 int pid;
00249
00250 fcopy = strdup(reqid);
00251 if(!fcopy) return -1;
00252
00253 pidstr = strchr(fcopy, '_');
00254 pidstr++;
00255 if(!pidstr) {
00256 free(fcopy);
00257 return -1;
00258 }
00259
00260 pidstr = strchr(pidstr, '_');
00261 pidstr++;
00262 if(!pidstr) {
00263 free(fcopy);
00264 return -1;
00265 }
00266
00267 eptr = strchr(pidstr, '_');
00268 if(!eptr) {
00269 free(fcopy);
00270 return -1;
00271 }
00272 *eptr = 0;
00273
00274 pid = atoi(pidstr);
00275
00276 free(fcopy);
00277
00278 return pid;
00279 }
00280
00291 int
00292 gs_parse_pid_from_tempname(char *tempname)
00293 {
00294 char *fcopy, *pidstr;
00295 int pid;
00296
00297 fcopy = strdup(tempname);
00298 if(!fcopy) return -1;
00299
00300 pidstr = strtok(fcopy, ".");
00301 pidstr = strtok(NULL, ".");
00302
00303 if(!pidstr) {
00304 free(fcopy);
00305 return -1;
00306 }
00307
00308 pid = atoi(pidstr);
00309
00310 free(fcopy);
00311
00312 return pid;
00313 }
00314
00324 int
00325 gs_get_job_count(char *file, int *jcount)
00326 {
00327 int fd, count, n;
00328
00329 if((fd = gs_open_locked_file(file, F_RDLCK, O_RDONLY)) < 0)
00330 return -1;
00331
00332 if((n = read(fd, &count, sizeof(count))) < 0) {
00333 gs_unlock_file(fd);
00334 close(fd);
00335 return -1;
00336 }
00337
00338 if(n == 0)
00339 count = 0;
00340
00341 gs_unlock_file(fd);
00342 close(fd);
00343
00344 *jcount = count;
00345 return 0;
00346 }
00347
00360 int
00361 gs_modify_job_count(char *file, int op)
00362 {
00363 int fd, count, n;
00364
00365 if((fd = gs_open_locked_file(file, F_WRLCK, O_RDWR | O_CREAT)) < 0) {
00366 ERRPRINTF("Failed to open/lock file '%s' for job count\n", file);
00367 return -1;
00368 }
00369
00370 if((n = read(fd, &count, sizeof(count))) < 0) {
00371 ERRPRINTF("read failure getting job count\n");
00372 gs_unlock_file(fd);
00373 close(fd);
00374 return -1;
00375 }
00376
00377 if(n == 0)
00378 count = 0;
00379
00380 switch(op) {
00381 case '+':
00382 count++;
00383 break;
00384 case '-':
00385 count--;
00386 break;
00387 case 'i':
00388 count = 0;
00389 break;
00390 default:
00391 ERRPRINTF("Warning: bad op type\n");
00392 count = 0;
00393 }
00394
00395 if(count < 0)
00396 count = 0;
00397
00398 lseek(fd, 0, SEEK_SET);
00399 write(fd, &count, sizeof(count));
00400 gs_unlock_file(fd);
00401 close(fd);
00402
00403 return 0;
00404 }
00405
00414 int
00415 gs_init_job_count(char *file)
00416 {
00417 struct stat stbuf;
00418
00419 if(stat(file, &stbuf) == 0)
00420 if(unlink(file) < 0)
00421 return -1;
00422
00423 return gs_modify_job_count(file, 'i');
00424 }
00425
00434 int
00435 gs_increment_job_count(char *file)
00436 {
00437 return gs_modify_job_count(file, '+');
00438 }
00439
00448 int
00449 gs_decrement_job_count(char *file)
00450 {
00451 return gs_modify_job_count(file, '-');
00452 }
00453
00473 int
00474 gs_notify_agent_problem_complete(char *agenthost, in_port_t agentport,
00475 char *problem_name, char *server_cid, char *user_name,
00476 char *host_name, char *client_cid, char *request_id, int agent_taskid,
00477 double run_time)
00478 {
00479 int sock_agent, tag;
00480 pid_t newpid;
00481 char *msg;
00482
00483 newpid = fork();
00484
00485 if(newpid < 0) {
00486 ERRPRINTF("fork() failed.\n");
00487 return -1;
00488 }
00489
00490 if(newpid == 0) {
00491 if(gs_encode_problem_complete_notification(&msg, server_cid, request_id,
00492 agent_taskid, run_time) < 0)
00493 {
00494 ERRPRINTF("Unable to encode message.\n");
00495 _exit(-1);
00496 }
00497
00498 sock_agent = gs_connect_direct(agenthost, agentport);
00499
00500 if(sock_agent < 0) {
00501 ERRPRINTF("Unable to connect to agent.\n");
00502 _exit(-1);
00503 }
00504
00505 if((gs_send_tag(sock_agent, GS_PROT_NOTIFY_COMPLETE) < 0) ||
00506 (gs_send_string(sock_agent, VERSION) < 0)) {
00507 ERRPRINTF("failed to send tag\n");
00508 gs_close_socket(sock_agent);
00509 _exit(-1);
00510 }
00511
00512 if(gs_recv_tag(sock_agent, &tag) < 0) {
00513 ERRPRINTF("Error communicating with agent.\n");
00514 gs_close_socket(sock_agent);
00515 _exit(-1);
00516 }
00517
00518 if(tag != GS_PROT_OK) {
00519 if(tag == GS_PROT_VERSION_MISMATCH)
00520 ERRPRINTF("Error: Agent is an incompatible version\n");
00521 else
00522 ERRPRINTF("Error: Agent refused with code %d\n", tag);
00523 gs_close_socket(sock_agent);
00524 _exit(-1);
00525 }
00526
00527 if(gs_send_string(sock_agent, msg) < 0) {
00528 ERRPRINTF("Error communicating with agent.\n");
00529 gs_close_socket(sock_agent);
00530 _exit(-1);
00531 }
00532
00533 gs_close_socket(sock_agent);
00534 _exit(0);
00535 }
00536
00537 return 0;
00538 }
00539
00559 int
00560 gs_notify_agent_problem_solve(char *agenthost, in_port_t agentport,
00561 char *problem_name, double est_time, char *server_cid, char *user_name,
00562 char *host_name, char *client_cid, char *request_id, int agent_taskid,
00563 double agent_est_time)
00564 {
00565 int sock_agent, tag;
00566 pid_t newpid;
00567 char *msg;
00568
00569 newpid = fork();
00570
00571 if(newpid < 0) {
00572 ERRPRINTF("fork() failed.\n");
00573 return -1;
00574 }
00575
00576 if(newpid == 0) {
00577 if(gs_encode_problem_solve_notification(&msg, problem_name, est_time,
00578 server_cid, user_name, host_name, client_cid, request_id,
00579 agent_taskid, agent_est_time) < 0)
00580 {
00581 ERRPRINTF("Unable to encode message.\n");
00582 _exit(-1);
00583 }
00584
00585 sock_agent = gs_connect_direct(agenthost, agentport);
00586
00587 if(sock_agent < 0) {
00588 ERRPRINTF("Unable to connect to agent.\n");
00589 _exit(-1);
00590 }
00591
00592 if((gs_send_tag(sock_agent, GS_PROT_NOTIFY_SUBMIT) < 0) ||
00593 (gs_send_string(sock_agent, VERSION) < 0)) {
00594 ERRPRINTF("failed to send tag\n");
00595 gs_close_socket(sock_agent);
00596 _exit(-1);
00597 }
00598
00599 if(gs_recv_tag(sock_agent, &tag) < 0) {
00600 ERRPRINTF("Error communicating with agent.\n");
00601 gs_close_socket(sock_agent);
00602 _exit(-1);
00603 }
00604
00605 if(tag != GS_PROT_OK) {
00606 if(tag == GS_PROT_VERSION_MISMATCH)
00607 ERRPRINTF("Error: Agent is an incompatible version\n");
00608 else
00609 ERRPRINTF("Error: Agent refused with code %d\n", tag);
00610 gs_close_socket(sock_agent);
00611 _exit(-1);
00612 }
00613
00614 if(gs_send_string(sock_agent, msg) < 0) {
00615 ERRPRINTF("Error communicating with agent.\n");
00616 gs_close_socket(sock_agent);
00617 _exit(-1);
00618 }
00619
00620 gs_close_socket(sock_agent);
00621 _exit(0);
00622 }
00623
00624 return 0;
00625 }
00626
00640 int
00641 gs_create_info_dir(char componentid[CID_LEN], ipaddr_t ipaddress, in_port_t port, char **dirname)
00642 {
00643 char server_dottedIP[20], cid_string[2*CID_LEN+1], *si_name, *cid_file,
00644 userid[256], *username;
00645 PROXY_COMPONENTADDR myaddr;
00646 struct stat stbuf;
00647 FILE *f;
00648 uid_t uid;
00649
00650 *dirname = 0;
00651
00652 username = gs_get_login_name();
00653 if(!username) {
00654 ERRPRINTF("Warning: couldn't get user id\n");
00655 username = "UnknownUserID";
00656 }
00657
00658 uid = getuid();
00659
00660 sprintf(userid, "%s_uid%d", username, (int)uid);
00661
00662 myaddr = proxy_get_local_addr();
00663 memcpy(componentid, myaddr.ID, CID_LEN);
00664
00665 si_name = dstring_sprintf("%s", GS_INFODIR_PATH);
00666
00667 if(stat(si_name, &stbuf) != 0) {
00668
00669 if(mkdir(si_name, 0700) < 0) {
00670 free(si_name);
00671 return -1;
00672 }
00673 }
00674
00675 free(si_name);
00676
00677 proxy_ip_to_str(ipaddress, server_dottedIP);
00678
00679 si_name = dstring_sprintf("%s/%s%s_%hu_%s", GS_INFODIR_PATH,
00680 GS_INFODIR_PREFIX, server_dottedIP, port, userid);
00681
00682 if(stat(si_name, &stbuf) != 0) {
00683 if(mkdir(si_name, 0700) < 0) {
00684 free(si_name);
00685 return -1;
00686 }
00687 }
00688
00689 cid_file = dstring_sprintf("%s/%s%s_%hu_%s/cid", GS_INFODIR_PATH,
00690 GS_INFODIR_PREFIX, server_dottedIP, port, userid);
00691
00692 if(stat(cid_file, &stbuf) != 0) {
00693 if((f = fopen(cid_file, "w")) == NULL) {
00694 free(si_name);
00695 free(cid_file);
00696 return -1;
00697 }
00698
00699 proxy_cid_to_str(cid_string, myaddr.ID);
00700
00701 fprintf(f, "%s\n", cid_string);
00702 fclose(f);
00703 }
00704 else {
00705 if((f = fopen(cid_file, "r")) == NULL) {
00706 free(si_name);
00707 free(cid_file);
00708 return -1;
00709 }
00710
00711 free(cid_file);
00712
00713 if(fgets(cid_string, 2*CID_LEN+1, f) == NULL) {
00714 fclose(f);
00715 free(si_name);
00716 return -1;
00717 }
00718
00719 cid_string[2*CID_LEN] = 0;
00720
00721 fclose(f);
00722
00723 proxy_set_cid_from_str(cid_string);
00724 myaddr = proxy_get_local_addr();
00725 }
00726
00727 memcpy(componentid, myaddr.ID, CID_LEN);
00728
00729 *dirname = si_name;
00730
00731 return 0;
00732 }
00733
00734
00735
00736 #endif