00001
00008
00009
00010
00011 #include <string.h>
00012 #include <sys/types.h>
00013 #include <sys/socket.h>
00014 #include <sys/stat.h>
00015 #include <sys/time.h>
00016 #include <netinet/in.h>
00017 #include <netdb.h>
00018 #include <stdio.h>
00019 #include <unistd.h>
00020 #include <glob.h>
00021 #include <dirent.h>
00022 #include <errno.h>
00023 #include <utime.h>
00024
00025 #include "server.h"
00026 #include "utility.h"
00027 #include "comm_basics.h"
00028 #include "comm_data.h"
00029 #include "comm_encode.h"
00030 #include "general.h"
00031 #include "gs_pm_model.h"
00032
00033
00034
00035 #define GSREQUEST_ALL_GLOB_PATTERN "gsrequest_%s_[0-9]*_*"
00036
00037
00038
00039 static int
00040 gs_send_workload_report(int, gs_workload_packet *),
00041 gs_remove_output_data(char *),
00042 gs_remove_dead_blocking_dirs(gs_server_t *),
00043 gs_find_and_remove_retrieved_results(gs_server_t *, char *),
00044 gs_prepare_coefficient_updates(gs_server_t *, char **);
00045
00057 void
00058 gs_workload_report(void **args)
00059 {
00060 gs_server_t *gs_server;
00061 PROXY_COMPONENTADDR myaddr;
00062 int sock;
00063 gs_workload_packet wp;
00064 int retVal;
00065 char *portstr;
00066
00067 if(!args || !args[0])
00068 return;
00069
00070 gs_server = args[0];
00071
00072 wp.coeff_update = NULL;
00073
00074 gs_prepare_coefficient_updates(gs_server, &wp.coeff_update);
00075
00076 if(!wp.coeff_update)
00077 wp.coeff_update = strdup("0");
00078
00079 wp.agent_host = getenv("GRIDSOLVE_AGENT");
00080
00081 if(!wp.agent_host)
00082 wp.agent_host = gs_server->agenthost;
00083
00084 if(!wp.agent_host) {
00085 ERRPRINTF("agent host not in server_config or GRIDSOLVE_AGENT env var\n");
00086 return;
00087 }
00088
00089 portstr = getenv("GRIDSOLVE_AGENT_PORT");
00090
00091 if(portstr)
00092 wp.agent_port = atoi(portstr);
00093 else
00094 wp.agent_port = GRIDSOLVE_AGENT_PORT_DEFAULT;
00095
00096
00097
00098
00099
00100 wp.msgtype = GS_PROT_WORKLOAD_REPORT;
00101
00102 myaddr = proxy_get_local_addr();
00103 memcpy(wp.server_cid, myaddr.ID, CID_LEN);
00104
00105 wp.server_workload = gs_get_workload();
00106
00107
00108
00109 if(wp.server_workload < 0) {
00110 DBGPRINTF("Unable to get workload. Using default of 50\n");
00111 wp.server_workload = 50;
00112 }
00113
00114 wp.nproblems = gs_server->nproblems;
00115
00116 sock = gs_connect_direct(wp.agent_host, wp.agent_port);
00117
00118 if(sock < 0) {
00119 DBGPRINTF("Could not connect to agent.\n");
00120 return;
00121 }
00122
00123 if( (retVal = gs_send_workload_report(sock, &wp)) < 0){
00124 DBGPRINTF("Error sending workload report.\n");
00125 if(retVal == -2) {
00126
00127
00128
00129
00130
00131 if(gs_server->problemlist)
00132 gs_free_problem(gs_server->problemlist);
00133 gs_server->problemlist = NULL;
00134
00135 close(sock);
00136 gs_server_register(gs_server);
00137 }
00138 }
00139
00140 close(sock);
00141
00142 gs_find_and_remove_retrieved_results(gs_server, "retrieved");
00143 gs_find_and_remove_retrieved_results(gs_server, "cancelled");
00144 gs_remove_dead_blocking_dirs(gs_server);
00145
00146 gs_server_register_problems(gs_server);
00147
00148 return;
00149 }
00150
00160 static int
00161 gs_prepare_coefficient_updates(gs_server_t *gs_server, char **msg)
00162 {
00163 icl_list_t *coef_files, *l;
00164 char *cinfo, *update;
00165 int num_updates;
00166
00167 num_updates = 0;
00168
00169 update = dstring_sprintf("");
00170 if(!update) return -1;
00171
00172 coef_files = icl_list_new();
00173
00174 if(coef_files) {
00175 if(gs_find_coefficient_files(gs_server, coef_files) < 0)
00176 ERRPRINTF("Error finding coefficient files.\n");
00177
00178 for(l=icl_list_first(coef_files); l!=NULL; l=icl_list_next(coef_files, l)) {
00179 char *filename = (char *)l->data;
00180 long age;
00181
00182 age = gs_seconds_since_modified(filename);
00183
00184 if(age < GS_UPDATE_FREQUENCY) {
00185 char *model_filename, *probname, *expr;
00186 int i, mfd;
00187
00188 model_filename = strdup(filename);
00189
00190 if(!model_filename) {
00191 ERRPRINTF("strdup failed\n");
00192 return -1;
00193 }
00194
00195 strncpy(model_filename + (strlen(model_filename) - 3),
00196 "mdl", 3);
00197
00198 mfd = open(model_filename, O_RDONLY, 0600);
00199
00200 if(mfd < 0) {
00201 ERRPRINTF("Warning: failed to open model file '%s'\n",
00202 model_filename);
00203 continue;
00204 }
00205
00206
00207
00208
00209
00210
00211
00212 if(gs_lock_fd_nowait(mfd, F_RDLCK) < 0) {
00213 struct timeval time_now[2];
00214
00215 ERRPRINTF("Skipping update of '%s': don't want to wait for lock",
00216 filename);
00217
00218 close(mfd);
00219
00220
00221 gettimeofday(&time_now[0], NULL);
00222 time_now[0].tv_sec += 10;
00223 time_now[1] = time_now[0];
00224
00225 #ifdef __INTERIX
00226
00227 {
00228 struct utimbuf time_now_utimbuf;
00229 time_now_utimbuf.actime = time_now[0].tv_sec;
00230 time_now_utimbuf.modtime = time_now[1].tv_sec;
00231 utime(filename, &time_now_utimbuf);
00232 }
00233 #else
00234 utimes(filename, time_now);
00235 #endif
00236
00237 continue;
00238 }
00239
00240 if(gs_get_contents_of_file(filename, &expr) < 0) {
00241 ERRPRINTF("Warning: failed to read coefficient file '%s'\n", filename);
00242 gs_unlock_fd(mfd);
00243 close(mfd);
00244 continue;
00245 }
00246
00247 if(expr[strlen(expr)-1] == '\n')
00248 expr[strlen(expr)-1] = 0;
00249
00250 gs_unlock_fd(mfd);
00251 close(mfd);
00252
00253 for(i = strlen(filename) - 1; (i >= 0) && (filename[i] != '/'); i--)
00254 ;
00255
00256 probname = filename + i + 1;
00257
00258 for(i=0;i<strlen(probname);i++)
00259 if(probname[i] == '.') {
00260 probname[i] = 0;
00261 break;
00262 }
00263
00264 if(gs_encode_model_update(&cinfo, probname, expr) == 0) {
00265
00266 update = dstring_append(update, cinfo);
00267 num_updates++;
00268 }
00269
00270 free(expr);
00271 }
00272 }
00273
00274 icl_list_destroy(coef_files, free);
00275 }
00276
00277 *msg = dstring_sprintf("%d\n%s", num_updates, update);
00278 free(update);
00279
00280 return 0;
00281 }
00282
00293 static int
00294 gs_send_workload_report(int sock, gs_workload_packet *wp)
00295 {
00296 char *msg, *cu_msg, temp_cid[CID_LEN*2+1];
00297 int tag;
00298
00299 proxy_cid_to_str(temp_cid, wp->server_cid);
00300
00301 if(gs_encode_workload_report(&msg, wp->server_workload, wp->nproblems,
00302 temp_cid) < 0)
00303 return -1;
00304
00305 if((gs_send_tag(sock, wp->msgtype) < 0) ||
00306 (gs_send_string(sock, VERSION) < 0)) {
00307 free(msg);
00308 return -1;
00309 }
00310
00311 if(gs_recv_tag(sock,&tag) < 0)
00312 return -1;
00313
00314 if(tag != GS_PROT_OK) {
00315 if(tag == GS_PROT_VERSION_MISMATCH)
00316 ERRPRINTF("Warning: agent is an incompatible version\n");
00317 return -1;
00318 }
00319
00320 if(gs_send_string(sock, msg) < 0) {
00321 free(msg);
00322 return -1;
00323 }
00324
00325 cu_msg = wp->coeff_update ? wp->coeff_update : "0";
00326
00327 if(gs_send_string(sock, cu_msg) < 0) {
00328 free(msg);
00329 return -1;
00330 }
00331
00332 free(msg);
00333
00334
00335 if(gs_recv_tag(sock, &tag) < 0)
00336 return -1;
00337
00338 if(tag != GS_PROT_OK) {
00339 if(tag == GS_PROT_UNKNOWN_SERVER)
00340 return -2;
00341
00342 ERRPRINTF("Agent rejected workload report for some unspecified reason\n");
00343 return -1;
00344 }
00345
00346 return 0;
00347 }
00348
00359 int
00360 gs_remove_results_if_too_old(gs_server_t *serv, char *reqdir)
00361 {
00362 struct stat stbuf;
00363 struct timeval tv;
00364 long age;
00365
00366 if(!reqdir) return -1;
00367
00368 if(stat(reqdir, &stbuf) < 0)
00369 return -1;
00370
00371 gettimeofday(&tv, NULL);
00372
00373 age = tv.tv_sec - stbuf.st_atime;
00374
00375 if((serv->output_ttl > 0) && (age > serv->output_ttl)) {
00376 LOGPRINTF("Removing expired output directory '%s'\n", reqdir);
00377 if(gs_remove_output_data(reqdir) < 0) {
00378 ERRPRINTF("Unable to remove output for '%s'\n", reqdir);
00379 return -1;
00380 }
00381 }
00382
00383 return 0;
00384 }
00385
00396 static int
00397 gs_count_dir_entries(char *name)
00398 {
00399 DIR *dirp;
00400 struct dirent *dp;
00401 int count;
00402
00403 count = 0;
00404
00405 if((dirp = opendir(name)) == NULL)
00406 return 0;
00407
00408 while((dp = readdir(dirp)) != NULL)
00409 count++;
00410
00411 closedir(dirp);
00412
00413 return count;
00414 }
00415
00425 static int
00426 gs_remove_dead_blocking_dirs(gs_server_t *serv)
00427 {
00428 char *globpattern, temp_cid[CID_LEN*2+1];
00429 int globflags = GLOB_NOSORT;
00430 glob_t pglob;
00431 int i;
00432
00433 proxy_cid_to_str(temp_cid, serv->componentid);
00434
00435 globpattern = dstring_sprintf(GSREQUEST_ALL_GLOB_PATTERN, temp_cid);
00436
00437 if(!globpattern) {
00438 ERRPRINTF("Error creating glob pattern\n");
00439 return -1;
00440 }
00441
00442 if(glob(globpattern, globflags, NULL, &pglob) != 0) {
00443 free(globpattern);
00444 globfree(&pglob);
00445 return 0;
00446 }
00447
00448 for (i=0; i<pglob.gl_pathc; i++) {
00449
00450
00451 if(gs_count_dir_entries(pglob.gl_pathv[i]) < 3) {
00452 int pid_to_check, rv;
00453
00454 if((pid_to_check = gs_parse_pid_from_requestid(pglob.gl_pathv[i])) < 0)
00455 continue;
00456
00457 rv = kill(pid_to_check, 0);
00458
00459
00460
00461
00462
00463 if((rv == 0) || ((rv < 0) && (errno == EPERM)))
00464 continue;
00465
00466 LOGPRINTF("Removing failed blocking request dir '%s'\n",
00467 pglob.gl_pathv[i]);
00468 rmdir(pglob.gl_pathv[i]);
00469
00470 if(gs_decrement_job_count(GS_SERVER_JOB_COUNT_FILE) < 0)
00471 ERRPRINTF("Warning: failed to decrement job count.\n");
00472 }
00473 }
00474
00475 free(globpattern);
00476 globfree(&pglob);
00477 return 0;
00478 }
00479
00492 static int
00493 gs_find_and_remove_retrieved_results(gs_server_t *serv, char *sfile)
00494 {
00495 char *globpattern, temp_cid[CID_LEN*2+1];
00496 int globflags = GLOB_NOSORT;
00497 glob_t *pglob;
00498 int i, rv;
00499
00500 proxy_cid_to_str(temp_cid, serv->componentid);
00501
00502 globpattern = dstring_sprintf(GSREQUEST_ALL_GLOB_PATTERN, temp_cid);
00503
00504 if(!globpattern) {
00505 ERRPRINTF("Error creating glob pattern\n");
00506 return -1;
00507 }
00508
00509 pglob = (glob_t*)malloc(sizeof(glob_t));
00510
00511 if(!pglob || !sfile) {
00512 free(globpattern);
00513 return -1;
00514 }
00515
00516 if(glob(globpattern, globflags, NULL, pglob) != 0) {
00517 free(globpattern);
00518 globfree(pglob);
00519 free(pglob);
00520 return 0;
00521 }
00522
00523 for (i=0; i<pglob->gl_pathc; i++) {
00524 struct stat stbuf;
00525 char *rfile;
00526
00527 rfile = malloc(strlen(pglob->gl_pathv[i]) + strlen(sfile) + 2);
00528 if(!rfile) {
00529 ERRPRINTF("Couldn't malloc space for filename\n");
00530 free(globpattern);
00531 globfree(pglob);
00532 free(pglob);
00533 return -1;
00534 }
00535
00536 sprintf(rfile, "%s/%s", pglob->gl_pathv[i], sfile);
00537
00538 rv = stat(rfile, &stbuf);
00539
00540 free(rfile);
00541
00542 if(rv == 0) {
00543 if(gs_remove_output_data(pglob->gl_pathv[i]) < 0) {
00544 ERRPRINTF("Unable to remove output for '%s'\n", pglob->gl_pathv[i]);
00545 continue;
00546 }
00547 }
00548 else if(serv->output_ttl > 0)
00549 gs_remove_results_if_too_old(serv, pglob->gl_pathv[i]);
00550 }
00551
00552 free(globpattern);
00553 globfree(pglob);
00554 free(pglob);
00555
00556 return 0;
00557 }
00558
00569 static int
00570 gs_remove_output_data(char *request_id)
00571 {
00572 struct dirent *dp;
00573 DIR *dirp;
00574 int rv;
00575
00576
00577
00578
00579
00580 rv = 0;
00581
00582 if(chdir(request_id) < 0)
00583 return -1;
00584
00585 dirp = opendir(".");
00586
00587 while(dirp) {
00588 if((dp = readdir(dirp)) != NULL) {
00589 struct stat buf;
00590
00591 if(!strcmp(dp->d_name, ".") || !strcmp(dp->d_name, ".."))
00592 continue;
00593
00594 if(stat(dp->d_name, &buf) != 0)
00595 continue;
00596
00597 if(buf.st_mode & S_IFDIR)
00598 rv |= gs_remove_output_data(dp->d_name);
00599 else
00600 rv |= unlink(dp->d_name);
00601 }
00602 else
00603 break;
00604 }
00605
00606 chdir("..");
00607 closedir(dirp);
00608 if(rmdir(request_id) < 0)
00609 return -1;
00610
00611 return rv;
00612 }