00001
00008
00009
00010
00011 #include <stdlib.h>
00012 #include <stdio.h>
00013 #include <netdb.h>
00014 #include <strings.h>
00015 #include <sys/types.h>
00016 #include <sys/time.h>
00017 #include <sys/un.h>
00018 #include <sys/stat.h>
00019 #include <sys/mman.h>
00020 #include <fcntl.h>
00021 #include <string.h>
00022 #include <signal.h>
00023 #include <time.h>
00024 #include <unistd.h>
00025 #include <glob.h>
00026
00027 #include "utility.h"
00028 #include "proxylib.h"
00029 #include "comm_basics.h"
00030 #include "comm_data.h"
00031 #include "server.h"
00032 #include "agent.h"
00033 #include "problem.h"
00034 #include "comm_encode.h"
00035 #include "gs_storage.h"
00036 #include "mfork.h"
00037 #ifdef GS_SMART_GRIDSOLVE
00038 #include "gs_smart_task_graph.h"
00039 #include "gs_smart_app_pm.h"
00040 #endif
00041
00042 gs_agent_scheduler_t gs_agent_parse_scheduler_name(char *);
00043
00046 char
00047 *agent_cfg = NULL,
00048 GRIDSOLVE_SQLITE_DB[FN_LEN] = "",
00049 GRIDSOLVE_SENSOR_USOCK[FN_LEN] = "",
00050 GRIDSOLVE_STATS_FILE[FN_LEN] = "",
00051 GRIDSOLVE_SCHED_LOCK_FILE[FN_LEN] = "";
00052
00053 gs_agent_stats_t
00054 gs_agent_stats;
00055
00056 gs_agent_scheduler_t
00057 gs_agent_scheduler_selection;
00058
00060 int GS_SENSORFD;
00061 FILE *GS_SENSORFILE;
00062
00063 int
00064 global_taskid = 0,
00065 htm_scheduler_sync = 1,
00066 keep_track_of_task_events = 0;
00067
00068 struct timeval
00069 global_start_time = {0, 0};
00070
00072 sqlite3 *global_db = NULL;
00073
00080 static void
00081 gs_agent_generic_signal_handler(int sig)
00082 {
00083 if(sig == SIGHUP) {
00084 gs_info_t *sa_list;
00085
00086 LOGPRINTF("SIGHUP: reading configuration file '%s'\n", agent_cfg);
00087
00088 if(gs_parse_config_file(agent_cfg, &sa_list) == 0) {
00089 if(sa_list) {
00090 gs_info_t *p;
00091
00092 for(p = sa_list; p != NULL; p = p->next) {
00093 if(!strcasecmp(p->type, "GS_AGENT_SCHEDULER"))
00094 gs_agent_scheduler_selection = gs_agent_parse_scheduler_name(p->value);
00095 else if(!strcasecmp(p->type, "GS_HTM_SYNC"))
00096 htm_scheduler_sync = atoi(p->value);
00097
00098 free(p->type);
00099 free(p->value);
00100 }
00101
00102 free(sa_list);
00103 }
00104 }
00105
00106 return;
00107 }
00108
00109 ERRPRINTF("Agent terminating on signal %d.\n", sig);
00110 unlink(GRIDSOLVE_STATS_FILE);
00111 unlink(GRIDSOLVE_SQLITE_DB);
00112 unlink(GRIDSOLVE_SCHED_LOCK_FILE);
00113 exit(0);
00114 }
00115
00122 static void
00123 gs_temp_sighup_handler(int sig)
00124 {
00125 kill(getppid(), SIGHUP);
00126 return;
00127 }
00128
00136 double
00137 get_time_since_startup()
00138 {
00139 struct timeval tv;
00140 double cur_time, start_time;
00141
00142 gettimeofday(&tv, NULL);
00143
00144 cur_time = (double)tv.tv_sec + (((double)tv.tv_usec) / 1000000.0);
00145 start_time = (double)global_start_time.tv_sec +
00146 (((double)global_start_time.tv_usec) / 1000000.0);
00147
00148 return cur_time - start_time;
00149 }
00150
00163 int
00164 gs_agent_create_info_dir(gs_agent_t *gs_agent)
00165 {
00166 ipaddr_t ipaddress;
00167 char dummy_componentid[CID_LEN];
00168
00169 ipaddress = proxy_get_my_ipaddr();
00170
00171 if(gs_create_info_dir(dummy_componentid, ipaddress,
00172 gs_agent->port, &(gs_agent->infodir)) < 0) {
00173 ERRPRINTF("Creating agent infodir failed.\n");
00174 return -1;
00175 }
00176
00177 return 0;
00178 }
00179
00191 gs_agent_t *
00192 gs_agent_init(char *cfg)
00193 {
00194 gs_agent_t *gs_agent;
00195 int pid;
00196
00197 gettimeofday(&global_start_time, NULL);
00198
00199 gs_agent = (gs_agent_t *) calloc(1, sizeof(gs_agent_t));
00200
00201 if(!gs_agent) return NULL;
00202
00203
00204 gs_agent->hostname = gs_get_machine_name();
00205 gs_agent->servers = NULL;
00206 gs_agent->dsig = pvmgetdsig();
00207 gs_agent->port = getenv_int("GRIDSOLVE_AGENT_PORT", GRIDSOLVE_AGENT_PORT_DEFAULT);
00208
00209 if(gs_agent_create_info_dir(gs_agent) < 0) {
00210 ERRPRINTF("Could not create agent infodir.\n");
00211 return NULL;
00212 }
00213
00214 pid = getpid();
00215
00216 gs_clean_up_old_temp_files(gs_agent->infodir);
00217
00218 snprintf(GRIDSOLVE_SQLITE_DB, FN_LEN, "%s/%s.%d", gs_agent->infodir, GRIDSOLVE_SQLITE_DB_PREFIX, pid);
00219 snprintf(GRIDSOLVE_STATS_FILE, FN_LEN, "%s/%s.%d", gs_agent->infodir, GRIDSOLVE_STATS_FILE_PREFIX, pid);
00220 snprintf(GRIDSOLVE_SENSOR_USOCK, FN_LEN, "%s/%s.%d", gs_agent->infodir, GRIDSOLVE_SENSOR_USOCK_PREFIX, pid);
00221 snprintf(GRIDSOLVE_SCHED_LOCK_FILE, FN_LEN, "%s/%s.%d", gs_agent->infodir, GRIDSOLVE_SCHED_LOCK_FILE_PREFIX, pid);
00222
00223 gs_init_stats_file(gs_agent, GRIDSOLVE_STATS_FILE);
00224
00225 gs_agent->mysql.host = GRIDSOLVE_DEFAULT_MYSQL_HOST;
00226 gs_agent->mysql.user = GRIDSOLVE_DEFAULT_MYSQL_USER;
00227 gs_agent->mysql.passwd = GRIDSOLVE_DEFAULT_MYSQL_PASSWD;
00228 gs_agent->mysql.db_name = GRIDSOLVE_DEFAULT_MYSQL_DB;
00229 gs_agent->mysql.port = GRIDSOLVE_DEFAULT_MYSQL_PORT;
00230 gs_agent->mysql.unix_socket = GRIDSOLVE_DEFAULT_MYSQL_UNIX_SOCKET;
00231
00232 if(cfg) {
00233 gs_info_t *sa_list;
00234
00235 if(gs_parse_config_file(cfg, &sa_list) == 0) {
00236 if(sa_list) {
00237 gs_info_t *p;
00238
00239 for(p = sa_list; p != NULL; p = p->next) {
00240 if(!strcasecmp(p->type, "GS_MYSQL_HOST"))
00241 gs_agent->mysql.host = p->value;
00242 else if(!strcasecmp(p->type, "GS_MYSQL_USER"))
00243 gs_agent->mysql.user = p->value;
00244 else if(!strcasecmp(p->type, "GS_MYSQL_PASSWD"))
00245 gs_agent->mysql.passwd = p->value;
00246 else if(!strcasecmp(p->type, "GS_MYSQL_DB"))
00247 gs_agent->mysql.db_name = p->value;
00248 else if(!strcasecmp(p->type, "GS_MYSQL_PORT"))
00249 gs_agent->mysql.port = atoi(p->value);
00250 else if(!strcasecmp(p->type, "GS_MYSQL_UNIX_SOCKET"))
00251 gs_agent->mysql.unix_socket = p->value;
00252 else if(!strcasecmp(p->type, "GS_AGENT_SCHEDULER"))
00253 gs_agent_scheduler_selection = gs_agent_parse_scheduler_name(p->value);
00254 else if(!strcasecmp(p->type, "GS_HTM_SYNC"))
00255 htm_scheduler_sync = atoi(p->value);
00256
00257
00258 free(p->type);
00259 }
00260
00261 free(sa_list);
00262 }
00263 }
00264 }
00265
00266 LOGPRINTF("Initialized agent %s\n", gs_agent->hostname);
00267
00268 return (gs_agent);
00269 }
00270
00281 int
00282 gs_init_stats_file(gs_agent_t *gs_agent, char *fname)
00283 {
00284 int fd;
00285
00286 if(!gs_agent || !fname) {
00287 ERRPRINTF("Invalid args\n");
00288 return -1;
00289 }
00290
00291 if((fd = open(fname, O_RDWR | O_CREAT | O_TRUNC, 0600)) < 0) {
00292 ERRPRINTF("Could not create stats file '%s'\n", fname);
00293 return -1;
00294 }
00295
00296 gettimeofday(&gs_agent_stats.start_time, NULL);
00297 strncpy(gs_agent_stats.hostname, gs_agent->hostname, GS_MAX_NAMELEN);
00298 gs_agent_stats.hostname[GS_MAX_NAMELEN-1] = '\0';
00299 gs_agent_stats.port = gs_agent->port;
00300
00301 write(fd, &gs_agent_stats, sizeof(gs_agent_stats_t));
00302
00303 close(fd);
00304 return 0;
00305 }
00306
00315 int
00316 gs_agent_process_availability_request(int sock)
00317 {
00318 gs_server_t srv;
00319 char *req;
00320 int s;
00321
00322 if(gs_recv_string(sock, &req) < 0) {
00323 ERRPRINTF("Error reading request string.\n");
00324 return -1;
00325 }
00326
00327 if(gs_decode_availability_request(req, &srv) < 0) {
00328 ERRPRINTF("Couldn't decode availability request.\n");
00329 if(req) free(req);
00330 return -1;
00331 }
00332
00333 if(req) free(req);
00334
00335 s = gs_connect_to_host(srv.componentid, srv.ipaddress, srv.port,
00336 srv.proxyip, srv.proxyport);
00337
00338 if(s == INVALID_SOCKET) {
00339 ERRPRINTF("Server failed connectivity test.\n");
00340 return -1;
00341 }
00342
00343 if(gs_send_tag(s, GS_PROT_OK) < 0) {
00344 ERRPRINTF("Error sending response tag.\n");
00345 return -1;
00346 }
00347
00348 gs_close_socket(s);
00349
00350 return 0;
00351 }
00352
00364 int
00365 gs_agent_process_problem_list(gs_agent_t *gs_agent, int sock)
00366 {
00367 gs_problem_t **problem_list = NULL;
00368 int i, count;
00369
00370 if(!gs_agent) {
00371 ERRPRINTF("Invalid arg: null agent\n");
00372 return -1;
00373 }
00374
00375 count = gs_get_all_problems(gs_agent, &problem_list, &count);
00376
00377 if(count < 0) {
00378 DBGPRINTF("failed to get list of all problems\n");
00379 return -1;
00380 }
00381
00382
00383 if(gs_send_int(sock, count) < 0) {
00384 DBGPRINTF("failed to send number of problems\n");
00385 return -1;
00386 }
00387
00388 for(i = 0; i < count; i++) {
00389 char *prob = NULL;
00390 if((gs_encode_problem(&prob, problem_list[i]) < 0) ||
00391 (gs_send_string(sock, prob) < 0)) {
00392 FREE(prob);
00393 DBGPRINTF("Failed to send problem list\n");
00394 return -1;
00395 }
00396
00397 FREE(prob);
00398 }
00399
00400 for(i = 0; i < count; i++)
00401 gs_free_problem(problem_list[i]);
00402 FREE(problem_list);
00403 return 0;
00404 }
00405
00417 int
00418 gs_agent_process_server_list(gs_agent_t *gs_agent, int sock)
00419 {
00420 gs_server_t **server_list = NULL;
00421 int i, count;
00422
00423 if(!gs_agent) {
00424 ERRPRINTF("Invalid arg: null agent\n");
00425 return -1;
00426 }
00427
00428 count = gs_get_all_servers(gs_agent, &server_list, &count);
00429
00430 if(count < 0) {
00431 DBGPRINTF("failed to get list of all servers\n");
00432 return -1;
00433 }
00434
00435
00436 if(gs_send_int(sock, count) < 0) {
00437 DBGPRINTF("failed to send number of servers\n");
00438 return -1;
00439 }
00440
00441 for(i = 0; i < count; i++) {
00442 char *srv = NULL;
00443 DBGPRINTF("Encoding server: %s.\n", server_list[i]->hostname);
00444 if((gs_encode_server(&srv, server_list[i]) < 0) ||
00445 (gs_send_string(sock, srv) < 0)) {
00446 FREE(srv);
00447 DBGPRINTF("Failed to send server list \n");
00448 return -1;
00449 }
00450
00451 FREE(srv);
00452 }
00453
00454 for(i = 0; i < count; i++)
00455 gs_server_free(server_list[i]);
00456 FREE(server_list);
00457
00458 return 0;
00459 }
00460
00473 int
00474 gs_agent_process_server_ping_update(gs_agent_t *gs_agent, int sock)
00475 {
00476 char *cid = NULL, *msg = NULL;
00477
00478 if(!gs_agent) {
00479 ERRPRINTF("Invalid arg: null agent\n");
00480 return -1;
00481 }
00482
00483 if(gs_recv_string(sock, &cid) < 0) {
00484 ERRPRINTF("gs_recv_string (of cid) failed\n");
00485 return -1;
00486 }
00487
00488 if(gs_recv_string(sock, &msg) < 0) {
00489 ERRPRINTF("gs_recv_string (of msg) failed\n");
00490 return -1;
00491 }
00492
00493 if(gs_update_ping_list(gs_agent, cid, msg) < 0) {
00494 ERRPRINTF("failed to update ping list\n");
00495 return -1;
00496 }
00497
00498 return 0;
00499 }
00500
00513 int
00514 gs_agent_process_server_ping_list(gs_agent_t *gs_agent, int sock)
00515 {
00516 gs_server_t **server_list = NULL;
00517 char *cid = NULL;
00518 int i, count;
00519
00520 if(!gs_agent) {
00521 ERRPRINTF("Invalid arg: null agent\n");
00522 return -1;
00523 }
00524
00525 if(gs_recv_string(sock, &cid) < 0) {
00526 DBGPRINTF("gs_recv_string failed\n");
00527 return -1;
00528 }
00529
00530 count = gs_get_server_ping_list(gs_agent, &server_list, cid, &count);
00531
00532 free(cid);
00533
00534 if(count < 0) {
00535 DBGPRINTF("failed to get list of all servers\n");
00536 return -1;
00537 }
00538
00539
00540 if(gs_send_int(sock, count) < 0) {
00541 DBGPRINTF("failed to send number of servers\n");
00542 return -1;
00543 }
00544
00545 for(i = 0; i < count; i++) {
00546 char *srv = NULL;
00547 DBGPRINTF("Encoding server: %s.\n", server_list[i]->hostname);
00548 if((gs_encode_server(&srv, server_list[i]) < 0) ||
00549 (gs_send_string(sock, srv) < 0)) {
00550 FREE(srv);
00551 DBGPRINTF("Failed to send server list \n");
00552 return -1;
00553 }
00554
00555 FREE(srv);
00556 }
00557
00558 for(i = 0; i < count; i++)
00559 gs_server_free(server_list[i]);
00560 FREE(server_list);
00561
00562 return 0;
00563 }
00564
00576 int
00577 gs_agent_process_problem_desc(gs_agent_t * gs_agent, int sock)
00578 {
00579 char *xml_problem = NULL;
00580 gs_server_t **server_list = NULL;
00581 gs_problem_t *problem = NULL;
00582 int i, rc, count = 0;
00583
00584 if(!gs_agent) {
00585 ERRPRINTF("Invalid arg: null agent\n");
00586 return -1;
00587 }
00588
00589 problem = (gs_problem_t *) CALLOC(1, sizeof(gs_problem_t));
00590 if(!problem) {
00591 DBGPRINTF("Could not malloc new problem.\n");
00592 return -1;
00593 }
00594
00595 if(gs_recv_string(sock, &(problem->name)) < 0) {
00596 DBGPRINTF("gs_recv_string failed\n");
00597 FREE(problem);
00598 return -1;
00599 }
00600 DBGPRINTF("prob name = '%s'\n", problem->name);
00601
00602 count =
00603 gs_get_server_list(gs_agent, problem, NULL, &server_list,
00604 &count);
00605
00606 rc = 0;
00607
00608 if(count <= 0) {
00609 DBGPRINTF("could not find problem %s.\n", problem->name);
00610 if(gs_send_string(sock, "not found") < 0) {
00611 DBGPRINTF("Failed to send problem\n");
00612 rc = -1;
00613 }
00614 }
00615 else {
00616 DBGPRINTF("Encoding problem: %s.\n", problem->name);
00617 if((gs_encode_problem(&xml_problem, problem) < 0) ||
00618 (gs_send_string(sock, xml_problem) < 0)) {
00619 DBGPRINTF("Failed to send problem\n");
00620 rc = -1;
00621 }
00622 }
00623
00624 FREE(xml_problem);
00625
00626 for(i = 0; i < count; i++)
00627 gs_server_free(server_list[i]);
00628 free(server_list);
00629
00630 gs_free_problem(problem);
00631
00632 return rc;
00633 }
00634
00650 int
00651 gs_agent_process_problem_submit(gs_agent_t * gs_agent, int sock)
00652 {
00653 char *msg = NULL, cid_string[2 * CID_LEN + 1];
00654 char *client_criteria = NULL;
00655 char *xml_problem = NULL;
00656 gs_server_t **server_list = NULL;
00657 gs_problem_t *problem = NULL;
00658 int i, count = 0, fd;
00659 int sender_dsig = 0;
00660 int sender_major = -1;
00661 int scalar_args_to_be_transferred = 0;
00662 int problem_desc_to_be_transferred = 0;
00663 time_t clock;
00664 struct tm *now;
00665 char subtime[128];
00666
00667 if(!gs_agent) {
00668 ERRPRINTF("Invalid arg: null agent\n");
00669 return -1;
00670 }
00671
00672 problem = (gs_problem_t *) CALLOC(1, sizeof(gs_problem_t));
00673 if(!problem) {
00674 DBGPRINTF("Could not malloc new problem.\n");
00675 return -1;
00676 }
00677
00678 if(gs_recv_string(sock, &msg) < 0) {
00679 DBGPRINTF("gs_agent_process_problem_submit: gs_recv_string failed\n");
00680 return -1;
00681 }
00682 DBGPRINTF("gs_agent_process_problem_submit: msg = '%s'\n", msg);
00683
00684 if(gs_decode_problem_submit_request(msg, &(problem->name), &sender_dsig, &client_criteria) < 0) {
00685 free(msg);
00686 DBGPRINTF("gs_agent_process_problem_submit: failed to decode request\n");
00687 return -1;
00688 }
00689 DBGPRINTF("problem->name = '%s'\n", problem->name);
00690
00691 free(msg);
00692
00693
00694
00695 count =
00696 gs_get_server_list(gs_agent, problem, client_criteria, &server_list,
00697 &count);
00698
00699 if(client_criteria) {
00700 free(client_criteria);
00701 client_criteria = NULL;
00702 }
00703
00704
00705 if(gs_recv_int(sock, &problem_desc_to_be_transferred) < 0)
00706 goto error_comm;
00707 if(problem_desc_to_be_transferred == 1) {
00708 if(count == 0)
00709 problem->description = strdup(GS_UNKNOWN_PROB);
00710 if(gs_encode_problem(&xml_problem, problem) < 0)
00711 goto error_comm;
00712 if(gs_send_string(sock, xml_problem) < 0)
00713 goto error_comm;
00714 FREE(xml_problem);
00715
00716
00717
00718
00719
00720
00721 if(count == 0) {
00722 gs_free_problem(problem);
00723 return 0;
00724 }
00725 }
00726
00727
00728
00729 if(gs_recv_int(sock, &scalar_args_to_be_transferred) < 0)
00730 goto error_comm;
00731
00732 if(scalar_args_to_be_transferred == 1) {
00733 if(gs_recv_input_scalar_args (sock, problem, sender_dsig,
00734 gs_agent->dsig, &sender_major) < 0)
00735 goto error_comm;
00736
00737 if(gs_receiver_compute_arg_sizes(problem, GS_IN) < 0) {
00738 ERRPRINTF("error computing argument sizes.\n");
00739 return -1;
00740 }
00741 }
00742
00743 if((fd = gs_open_locked_file_timeout(GRIDSOLVE_SCHED_LOCK_FILE, F_WRLCK,
00744 O_RDWR | O_CREAT, GS_SCHED_LOCK_TIMEOUT)) < 0) {
00745 ERRPRINTF("Timed out on scheduler lock. Continuing w/o scheduling.\n");
00746 }
00747 else {
00748
00749
00750
00751 DBGPRINTF("Calling the agent side scheduler\n");
00752 if(gs_agent_scheduler(problem, count, server_list) < 0) {
00753 DBGPRINTF("Could not sort servers\n");
00754 gs_unlock_file(fd);
00755 return -1;
00756 }
00757
00758 gs_unlock_file(fd);
00759 }
00760
00761 if(count <= 0) {
00762 DBGPRINTF("No servers for problem '%s'\n", problem->name);
00763 count = 0;
00764 }
00765
00766
00767 if(gs_send_int(sock, global_taskid) < 0)
00768 goto error_comm;
00769
00770
00771 if(gs_send_int(sock, count) < 0)
00772 goto error_comm;
00773
00774
00775 if(count > 0) {
00776 char dummy_taskid[TASK_ID_LEN];
00777 double start_time;
00778
00779 for(i = 0; i < count; i++) {
00780 char *srv = NULL;
00781 DBGPRINTF("Encoding server: %s.\n", server_list[i]->hostname);
00782 if((gs_encode_server(&srv, server_list[i]) < 0) ||
00783 (gs_send_string(sock, srv) < 0)) {
00784 FREE(srv);
00785 DBGPRINTF("Failed to send server list \n");
00786 return -1;
00787 }
00788
00789 FREE(srv);
00790 }
00791
00792 DBGPRINTF("Encoding problem: %s.\n", problem->name);
00793 if((gs_encode_problem(&xml_problem, problem) < 0) ||
00794 (gs_send_string(sock, xml_problem) < 0)) {
00795 FREE(xml_problem);
00796 DBGPRINTF("Failed to send problem\n");
00797 return -1;
00798 }
00799 FREE(xml_problem);
00800
00801
00802
00803 if(scalar_args_to_be_transferred) {
00804 struct timeval tv;
00805
00806 gettimeofday(&tv, NULL);
00807
00808 proxy_cid_to_str(cid_string, server_list[0]->componentid);
00809 start_time = get_time_since_startup();
00810
00811 srand48((double) tv.tv_usec);
00812
00813 for(i = 0; i < TASK_ID_LEN-1; i++)
00814 dummy_taskid[i] = (char) ((int)(drand48() * (double)('z' - 'a')) + 'a');
00815 dummy_taskid[TASK_ID_LEN-1] = 0;
00816
00817 if(gs_insert_submitted_task_guess(cid_string, dummy_taskid, global_taskid,
00818 start_time, server_list[0]->score, 0.0, start_time, 0, 0) < 0)
00819 ERRPRINTF("Warning: error inserting task\n");
00820 }
00821 }
00822
00823 LOGPRINTF("Agent %s got %s request from client and sent %d servers\n",
00824 gs_agent->hostname, problem->name, count);
00825
00826 clock = time(0);
00827 now = localtime(&clock);
00828 snprintf(subtime, 1024, "[(%02d/%02d/%04d) %02d:%02d:%02d]",
00829 now->tm_mon+1,now->tm_mday,now->tm_year+1900,
00830 now->tm_hour,now->tm_min,now->tm_sec);
00831
00832
00833
00834
00835
00836 if (server_list != NULL && count > 0) {
00837 server_list[0]->workload += 100;
00838 if (gs_update_workload(gs_agent, server_list[0]) < 0)
00839 DBGPRINTF("Could not temporarily update workload for the first server \n");
00840 proxy_cid_to_str(cid_string, server_list[0]->componentid);
00841 }
00842
00843 for(i = 0; i < count; i++)
00844 gs_server_free(server_list[i]);
00845 if(server_list) free(server_list);
00846
00847 gs_free_problem_and_data(problem);
00848
00849 return 0;
00850
00851 error_comm:
00852 for(i = 0; i < count; i++)
00853 gs_server_free(server_list[i]);
00854 if(server_list) free(server_list);
00855 gs_free_problem_and_data(problem);
00856 ERRPRINTF("Error handling problem submission\n");
00857 return -1;
00858 }
00859
00860 int
00861 gs_agent_update_all_perf_models(char *model_update, char *srv_cid)
00862 {
00863 int i, num_updates;
00864 char *mcopy, *tok;
00865
00866 if(!model_update)
00867 return -1;
00868
00869 mcopy = strdup(model_update);
00870
00871 if(!mcopy)
00872 return -1;
00873
00874 tok = strtok(mcopy, "\n");
00875
00876 num_updates = atoi(tok);
00877
00878 for(i=0;i<num_updates;i++) {
00879 char *name, *expr;
00880
00881 tok = strtok(NULL, "\n");
00882
00883 if(gs_decode_model_update(tok, &name, &expr) < 0) {
00884 ERRPRINTF("Error parsing performance model update\n");
00885 continue;
00886 }
00887
00888 if(gs_update_perf_expr(srv_cid, name, expr) < 0) {
00889 ERRPRINTF("Error updating performance model\n");
00890 }
00891
00892 free(name);
00893 free(expr);
00894 }
00895
00896 free(mcopy);
00897
00898 return 0;
00899 }
00900
00912 int
00913 gs_agent_process_workload_report(gs_agent_t * gs_agent, int sock)
00914 {
00915 char *msg, temp_cid[CID_LEN * 2 + 1];
00916 gs_server_t srv;
00917
00918 if(!gs_agent) {
00919 ERRPRINTF("Invalid arg: null agent\n");
00920 return -1;
00921 }
00922
00923 if(gs_recv_string(sock, &msg) < 0) {
00924 DBGPRINTF("gs_agent_process_workload_report: gs_recv_string failed\n");
00925 return -1;
00926 }
00927
00928 if(gs_decode_workload_report(msg, &srv.workload, &srv.nproblems,
00929 temp_cid) < 0)
00930 {
00931 DBGPRINTF("Failed to decode workload report\n");
00932 free(msg);
00933 return -1;
00934 }
00935
00936 free(msg);
00937
00938 if(gs_recv_string(sock, &msg) < 0) {
00939 DBGPRINTF("gs_agent_process_workload_report: gs_recv_string failed\n");
00940 return -1;
00941 }
00942
00943 gs_agent_update_all_perf_models(msg, temp_cid);
00944
00945 free(msg);
00946
00947 proxy_str_to_cid(srv.componentid, temp_cid);
00948 DBGPRINTF("gs_agent_process_workload_report: workload = %d serverID %s\n",
00949 srv.workload, temp_cid);
00950
00951 if(gs_update_workload(gs_agent, &srv) < 0) {
00952 DBGPRINTF("gs_agent_process_workload_report: report from unknown server\n");
00953 if(gs_send_tag(sock, GS_PROT_UNKNOWN_SERVER) < 0)
00954 return -1;
00955 }
00956 else {
00957 if(gs_send_tag(sock, GS_PROT_OK) < 0)
00958 return -1;
00959 }
00960
00961 DBGPRINTF("Agent %s handled a workload report setting server %s to %d\n",
00962 gs_agent->hostname, temp_cid, srv.workload);
00963 return 0;
00964 }
00965
00977 int
00978 gs_agent_process_server_registration(gs_agent_t * gs_agent, int sock)
00979 {
00980 char *serverstr = NULL;
00981 gs_server_t *gs_server;
00982 time_t clock;
00983 struct tm *now;
00984 char subtime[128];
00985
00986 if(!gs_agent) {
00987 ERRPRINTF("Invalid arg: null agent\n");
00988 return -1;
00989 }
00990
00991 gs_server = (gs_server_t *) CALLOC(1, sizeof(gs_server_t));
00992
00993 if(!gs_server) {
00994 ERRPRINTF("Failed to malloc server struct\n");
00995 return -1;
00996 }
00997
00998 DBGPRINTF("Entering \n");
00999
01000
01001 if((gs_recv_string(sock, &serverstr) < 0) ||
01002 (gs_decode_server(serverstr, gs_server) < 0))
01003 goto error;
01004
01005 FREE(serverstr);
01006
01007 gs_server->last_update = time(NULL);
01008
01009
01010 ASSERT_EXPR((gs_add_server(gs_agent, gs_server) >= 0), goto error);
01011
01012
01013 if(gs_send_tag(sock, GS_PROT_OK) < 0)
01014 goto error;
01015
01016 LOGPRINTF("Agent %s registered server %s\n", gs_agent->hostname, gs_server->hostname);
01017 clock = time(0);
01018 now = localtime(&clock);
01019 snprintf(subtime, 1024, "[(%02d/%02d/%04d) %02d:%02d:%02d]",
01020 now->tm_mon+1,now->tm_mday,now->tm_year+1900,
01021 now->tm_hour,now->tm_min,now->tm_sec);
01022
01023 gs_server_free(gs_server);
01024
01025 return 0;
01026
01027 error:
01028 ERRPRINTF("Error registering server\n");
01029 gs_server_free(gs_server);
01030 FREE(serverstr);
01031 gs_send_tag(sock, GS_PROT_ERROR);
01032 return -1;
01033 }
01034
01047 int
01048 gs_agent_task_terminated(char *server_cid, char *request_id, int agent_taskid,
01049 double run_time)
01050 {
01051 gs_htm_task *task;
01052 double end_time;
01053
01054 end_time = get_time_since_startup();
01055
01056 if(gs_insert_completed_task(server_cid, request_id, agent_taskid, end_time,
01057 0.0, 0.0, end_time, 0, 1) < 0)
01058 ERRPRINTF("Warning: could not insert completed task\n");
01059
01060 task = (gs_htm_task *) malloc(sizeof(gs_htm_task));
01061
01062 if(!task) {
01063 ERRPRINTF("malloc...\n");
01064
01065 return 0;
01066 }
01067
01068 if(gs_get_task_by_agent_taskid(agent_taskid, task) < 0) {
01069 free(task);
01070
01071 return 0;
01072 }
01073
01074 LOGPRINTF("task %d: total time = %lf, server run time = %lf\n", agent_taskid,
01075 end_time-task->start, run_time);
01076
01077 free(task);
01078
01079 return 0;
01080 }
01081
01093 int
01094 gs_agent_process_notify_cancel(int sock)
01095 {
01096 char *msg, *server_cid, *request_id;
01097 int agent_taskid;
01098
01099 if(gs_recv_string(sock, &msg) < 0) {
01100 ERRPRINTF("Error communicating with server.\n");
01101 return -1;
01102 }
01103
01104 LOGPRINTF("msg = '%s'\n", msg);
01105 SENSORPRINTF("CANCEL %s\n", msg);
01106
01107
01108 if(!keep_track_of_task_events) {
01109 free(msg);
01110 return 0;
01111 }
01112
01113 server_cid = (char *)malloc(strlen(msg));
01114
01115 if(!server_cid) {
01116 ERRPRINTF("malloc failed.\n");
01117 return -1;
01118 }
01119
01120 request_id = (char *)malloc(strlen(msg));
01121
01122 if(!request_id) {
01123 ERRPRINTF("malloc failed.\n");
01124 free(server_cid);
01125 return -1;
01126 }
01127
01128 if(gs_decode_cancel_notification(msg, &server_cid, &request_id,
01129 &agent_taskid) < 0)
01130 ERRPRINTF("Could not decode cancel notifiction.\n");
01131 else
01132 gs_agent_task_terminated(server_cid, request_id, agent_taskid, 0.0);
01133
01134 free(request_id);
01135 free(server_cid);
01136 free(msg);
01137
01138 return 0;
01139 }
01140
01152 int
01153 gs_agent_process_notify_failure(int sock)
01154 {
01155 char *msg, *server_cid, *request_id;
01156 int agent_taskid;
01157
01158 if(gs_recv_string(sock, &msg) < 0) {
01159 ERRPRINTF("Error communicating with server.\n");
01160 return -1;
01161 }
01162
01163 LOGPRINTF("msg = '%s'\n", msg);
01164 SENSORPRINTF("FAIL %s\n", msg);
01165
01166
01167 if(!keep_track_of_task_events) {
01168 free(msg);
01169 return 0;
01170 }
01171
01172 server_cid = (char *)malloc(strlen(msg));
01173
01174 if(!server_cid) {
01175 ERRPRINTF("malloc failed.\n");
01176 return -1;
01177 }
01178
01179 request_id = (char *)malloc(strlen(msg));
01180
01181 if(!request_id) {
01182 ERRPRINTF("malloc failed.\n");
01183 free(server_cid);
01184 return -1;
01185 }
01186
01187 if(gs_decode_cancel_notification(msg, &server_cid, &request_id,
01188 &agent_taskid) < 0)
01189 ERRPRINTF("Could not decode failure notifiction.\n");
01190 else
01191 gs_agent_task_terminated(server_cid, request_id, agent_taskid, 0.0);
01192
01193 free(request_id);
01194 free(server_cid);
01195 free(msg);
01196
01197 return 0;
01198 }
01199
01211 int
01212 gs_agent_process_notify_complete(int sock)
01213 {
01214 char *msg, *server_cid, *request_id;
01215 double service_et;
01216 int agent_taskid;
01217
01218 if(gs_recv_string(sock, &msg) < 0) {
01219 ERRPRINTF("Error communicating with server.\n");
01220 return -1;
01221 }
01222
01223 LOGPRINTF("msg = '%s'\n", msg);
01224 SENSORPRINTF("COMPLE %s\n", msg);
01225
01226
01227 if(!keep_track_of_task_events) {
01228 free(msg);
01229 return 0;
01230 }
01231
01232 server_cid = (char *)malloc(strlen(msg));
01233
01234 if(!server_cid) {
01235 ERRPRINTF("malloc failed.\n");
01236 return -1;
01237 }
01238
01239 request_id = (char *)malloc(strlen(msg));
01240
01241 if(!request_id) {
01242 ERRPRINTF("malloc failed.\n");
01243 free(server_cid);
01244 return -1;
01245 }
01246
01247 if(gs_decode_problem_complete_notification(msg, &server_cid, &request_id,
01248 &agent_taskid, &service_et) < 0)
01249 ERRPRINTF("Could not decode completion notifiction.\n");
01250 else
01251 gs_agent_task_terminated(server_cid, request_id, agent_taskid, service_et);
01252
01253 free(request_id);
01254 free(server_cid);
01255 free(msg);
01256
01257 return 0;
01258 }
01259
01260 int
01261 gs_dump_task_list(char *server_cid)
01262 {
01263 gs_htm_task **tasks = NULL;
01264 int i, count;
01265
01266 gs_get_tasks_for_server(server_cid, &tasks, &count, 1);
01267
01268 if(count < 0) {
01269 ERRPRINTF("failed to get list of all tasks\n");
01270 return -1;
01271 }
01272
01273 printf("task list for '%s':\n", server_cid);
01274
01275 for(i=0;i<count;i++) {
01276 printf("%s:\n", tasks[i]->id);
01277 printf("start=%10.2lf, duration=%10.2lf, remaining=%10.2lf, end=%10.2lf, active=%d, finished=%d\n",
01278 tasks[i]->start, tasks[i]->duration, tasks[i]->remaining,
01279 tasks[i]->end, tasks[i]->active, tasks[i]->finished);
01280
01281 free(tasks[i]);
01282 }
01283
01284 free(tasks);
01285
01286 return 0;
01287 }
01288
01306 int
01307 gs_agent_reassign_task(char *cid_string, char *taskid, int agent_taskid,
01308 double duration, double agent_est_time)
01309 {
01310 double start_time;
01311 gs_htm_task *task;
01312
01313 task = (gs_htm_task *) malloc(sizeof(gs_htm_task));
01314
01315 if(!task) {
01316 ERRPRINTF("malloc...\n");
01317 return -1;
01318 }
01319
01320 start_time = get_time_since_startup();
01321
01322 if(agent_taskid == -1) {
01323
01324 if(gs_insert_submitted_task(cid_string, taskid, agent_taskid, start_time,
01325 duration, agent_est_time, start_time, 0, 0) < 0)
01326 {
01327 free(task);
01328 return -1;
01329 }
01330 }
01331 else {
01332 if(gs_get_task_by_agent_taskid(agent_taskid, task) < 0) {
01333
01334
01335
01336 if(gs_insert_submitted_task(cid_string, taskid, agent_taskid,
01337 start_time, duration, agent_est_time, start_time, 0, 0) < 0)
01338 {
01339 free(task);
01340 return -1;
01341 }
01342 }
01343 else {
01344
01345
01346
01347
01348 LOGPRINTF("end time from db = %g, agent est end time = %g\n",
01349 task->end, agent_est_time);
01350
01351 if(gs_update_task(cid_string, task->id, taskid, agent_taskid, task->start,
01352 duration, task->remaining, task->end, task->active,
01353 task->finished) < 0)
01354 {
01355 free(task);
01356 return -1;
01357 }
01358 }
01359 }
01360
01361 free(task);
01362
01363 return 0;
01364 }
01365
01377 int
01378 gs_agent_process_notify_submit(int sock)
01379 {
01380 char *msg, *problem_name, *server_cid, *user_name, *host_name,
01381 *client_cid, *request_id;
01382 int agent_taskid;
01383 double est_time, agent_est_time;
01384
01385 if(gs_recv_string(sock, &msg) < 0) {
01386 ERRPRINTF("Error communicating with server.\n");
01387 return -1;
01388 }
01389
01390 if(!msg) {
01391 ERRPRINTF("Bad notification message.\n");
01392 return -1;
01393 }
01394
01395 LOGPRINTF("msg = '%s'\n", msg);
01396 SENSORPRINTF("SUBMIT %s\n", msg);
01397
01398
01399 if(!keep_track_of_task_events) {
01400 free(msg);
01401 return 0;
01402 }
01403
01404 if(gs_decode_problem_solve_notification(msg, &problem_name, &est_time,
01405 &server_cid, &user_name, &host_name, &client_cid, &request_id,
01406 &agent_taskid, &agent_est_time) < 0) {
01407 free(msg);
01408 ERRPRINTF("Error communicating with server.\n");
01409 return -1;
01410 }
01411
01412 LOGPRINTF("in agent, est time = %lf, agent est time = %lf\n", est_time, agent_est_time);
01413
01414 if(gs_agent_reassign_task(server_cid, request_id, agent_taskid, est_time, agent_est_time) < 0)
01415 ERRPRINTF("Warning: could not assign task\n");
01416
01417 free(msg);
01418 free(problem_name);
01419 free(server_cid);
01420 free(user_name);
01421 free(host_name );
01422 free(client_cid);
01423 free(request_id);
01424
01425 return 0;
01426 }
01427
01440 int
01441 gs_agent_process_problem_registration(gs_agent_t * gs_agent, int sock)
01442 {
01443 char *problemstr, *temp_cid, **model_strings, **removed_probs;
01444 gs_problem_t **problem = NULL;
01445 int i, num_services, num_removed;
01446
01447 if(!gs_agent) {
01448 ERRPRINTF("Invalid arg: null agent\n");
01449 return -1;
01450 }
01451
01452 if(gs_recv_string(sock, &temp_cid) < 0) {
01453 ERRPRINTF("Error communicating with server.\n");
01454 return -1;
01455 }
01456
01457 if(gs_recv_int(sock, &num_services) < 0) {
01458 ERRPRINTF("Error communicating with server.\n");
01459 return -1;
01460 }
01461
01462 problem = (gs_problem_t **)calloc(num_services, sizeof(gs_problem_t *));
01463 if(!problem) {
01464 ERRPRINTF("malloc failed.\n");
01465 return -1;
01466 }
01467
01473 model_strings = (char **)calloc(num_services+1, sizeof(char *));
01474 if(!model_strings) {
01475 ERRPRINTF("malloc failed.\n");
01476 return -1;
01477 }
01478
01479 removed_probs = (char **)calloc(num_services+1, sizeof(char *));
01480 if(!removed_probs) {
01481 ERRPRINTF("malloc failed.\n");
01482 return -1;
01483 }
01484
01485 i = 0;
01486 for(;;) {
01487
01488 if(gs_recv_string(sock, &problemstr) < 0) {
01489 ERRPRINTF("Error communicating with the server\n");
01490 goto error;
01491 }
01492
01493
01494 if(!strcmp(problemstr, GS_END_PROB_REG)) {
01495 FREE(problemstr);
01496 break;
01497 }
01498
01499 problem[i] = (gs_problem_t *) CALLOC(1, sizeof(gs_problem_t));
01500
01501 if(!problem[i]) {
01502 ERRPRINTF("Failed to allocate problem struct\n");
01503 goto error;
01504 }
01505
01506 if(gs_decode_problem(problemstr, problem[i]) < 0) {
01507 ERRPRINTF("Error decoding problem struct\n");
01508 FREE(problemstr);
01509 goto error;
01510 }
01511
01512 FREE(problemstr);
01513
01514 LOGPRINTF("Received problem %s\n", problem[i]->name);
01515
01516 i++;
01517 }
01518
01519
01520
01521
01522 num_services = i;
01523
01524 if(num_services > 0)
01525 LOGPRINTF("Received %d new services\n", num_services);
01526
01527 i = 0;
01528 for(;;) {
01529 if(gs_recv_string(sock, &model_strings[i]) < 0) {
01530 ERRPRINTF("Error communicating with the server\n");
01531 goto error;
01532 }
01533
01534
01535 if(!strcmp(model_strings[i], GS_END_PROB_REG))
01536 break;
01537
01538 i++;
01539 }
01540
01541 i = 0;
01542 for(;;) {
01543
01544 if(gs_recv_string(sock, &removed_probs[i]) < 0) {
01545 ERRPRINTF("Error communicating with the server\n");
01546 goto error;
01547 }
01548
01549
01550 if(!strcmp(removed_probs[i], GS_END_PROB_REG)) {
01551 break;
01552 }
01553
01554 i++;
01555 }
01556
01557 num_removed = i;
01558
01559
01560 if(gs_register_problem_changes(gs_agent, problem, num_services,
01561 model_strings, removed_probs, num_removed, temp_cid) < 0) {
01562 ERRPRINTF("Failed to add problem to database\n");
01563 goto error;
01564 }
01565
01566
01567 if(gs_send_tag(sock, GS_PROT_OK) < 0)
01568 goto error;
01569
01570 FREE(temp_cid);
01571 return 0;
01572
01573 error:
01574 ERRPRINTF("Agent could not process problem registration \n");
01575
01576 if(problem) {
01577 for(i=0;i<num_services;i++)
01578 if(problem[i])
01579 gs_free_problem(problem[i]);
01580 free(problem);
01581 }
01582
01583 FREE(temp_cid);
01584 gs_send_tag(sock, GS_PROT_ERROR);
01585 return -1;
01586 }
01587
01598 int
01599 gs_agent_process_kill_agent(gs_agent_t *gs_agent, int sock)
01600 {
01601 char *msg;
01602
01603 if(!gs_agent) {
01604 ERRPRINTF("Invalid arg: null agent\n");
01605 return -1;
01606 }
01607
01608
01609
01610 if(gs_recv_string(sock, &msg) < 0) {
01611 ERRPRINTF("Error receving Password \n");
01612 return -1;
01613 }
01614
01615 if(!strcmp(msg, "GridSolve")) {
01616 free(msg);
01617
01618 if(gs_send_tag(sock, GS_PROT_OK) < 0) {
01619 ERRPRINTF("Error sending confirmation \n");
01620 return -1;
01621 }
01622
01623 ERRPRINTF("Agent terminating...\n");
01624
01625
01626 gs_close_socket(gs_agent->sock);
01627
01628 kill(getppid(),SIGTERM);
01629 }
01630 else {
01631 free(msg);
01632
01633 if(gs_send_tag(sock, GS_PROT_ERROR) < 0) {
01634 ERRPRINTF("Error sending confirmation \n");
01635 return -1;
01636 }
01637 }
01638
01639 return 0;
01640 }
01641
01642
01653 int
01654 gs_agent_process_kill_server(gs_agent_t *gs_agent, int sock)
01655 {
01656 gs_server_t **server_list = NULL;
01657 int i, count, tag;
01658 char *msg;
01659
01660 if(!gs_agent) {
01661 ERRPRINTF("Invalid arg: null agent\n");
01662 return -1;
01663 }
01664
01668 if(gs_recv_string(sock, &msg) < 0) {
01669 ERRPRINTF("Error receving Server\n");
01670 return -1;
01671 }
01672
01673 if((strlen(msg) >= 2) && !strncmp(msg, "-c", 2)) {
01674 LOGPRINTF("GS_PROT_KILL_SERVER request for server cid '%s': ", msg+2);
01675
01676 server_list = (gs_server_t **) calloc(1, sizeof(gs_server_t *));
01677 if(!server_list) {
01678 ERRPRINTF("malloc failed\n");
01679 free(msg);
01680 return -1;
01681 }
01682
01683 server_list[0] = (gs_server_t *) calloc(1, sizeof(gs_server_t));
01684 if(!server_list[0]) {
01685 ERRPRINTF("malloc failed\n");
01686 free(server_list);
01687 free(msg);
01688 return -1;
01689 }
01690
01691 if(gs_get_server_by_cid(gs_agent, msg+2, server_list[0]) < 0) {
01692 ERRPRINTF("No such server: '%s'.\n", msg+2);
01693
01694 free(server_list[0]);
01695 free(server_list);
01696 free(msg);
01697
01698 if(gs_send_tag(sock, GS_PROT_UNKNOWN_SERVER) < 0) {
01699 ERRPRINTF("Unsuccessful (Sending No Server tag)\n");
01700 return -1;
01701 }
01702
01703 return -1;
01704 }
01705
01706 count = 1;
01707 }
01708 else {
01709 LOGPRINTF("GS_PROT_KILL_SERVER request for server '%s': ", msg);
01710
01711 count = gs_get_all_servers_by_hostname(gs_agent, msg, &server_list, &count);
01712
01713 if(count < 1) {
01714 ERRPRINTF("No such server: '%s'.\n", msg);
01715
01716 free(msg);
01717
01718 if(gs_send_tag(sock, GS_PROT_UNKNOWN_SERVER) < 0) {
01719 ERRPRINTF("Unsuccessful (Sending No Server tag)\n");
01720 return -1;
01721 }
01722
01723 return -1;
01724 }
01725
01726 if(count > 1) {
01727 ERRPRINTF("Multiple matches for server: '%s'.\n", msg);
01728
01729 free(msg);
01730
01731 for(i = 0; i < count; i++)
01732 gs_server_free(server_list[i]);
01733 FREE(server_list);
01734
01735 if(gs_send_tag(sock, GS_PROT_MULTIPLE_SERVER) < 0) {
01736 ERRPRINTF("Unsuccessful (Sending Multiple Server tag)\n");
01737 return -1;
01738 }
01739
01740 return -1;
01741 }
01742 }
01743
01744 free(msg);
01745
01746 if(gs_send_tag(sock, GS_PROT_OK) < 0) {
01747 ERRPRINTF("Unsuccessful (Sending No Server)\n");
01748 return -1;
01749 }
01750
01751 if(gs_encode_server(&msg, server_list[0]) < 0) {
01752 ERRPRINTF("Unsuccessful (Encoding Server)\n");
01753 gs_server_free(server_list[0]);
01754 return -1;
01755 }
01756
01757 if(gs_send_string(sock, msg) < 0) {
01758 ERRPRINTF("Unsuccessful (Sending Server)\n");
01759 free(msg);
01760 gs_server_free(server_list[0]);
01761 return -1;
01762 }
01763
01764 free(msg);
01765
01766 if(gs_recv_tag(sock, &tag) < 0) {
01767 ERRPRINTF("Error receving Confirmation\n");
01768 return -1;
01769 }
01770
01771 if(tag != GS_PROT_OK) {
01772 ERRPRINTF("Unsuccessful (rejected response from client).\n");
01773 gs_server_free(server_list[0]);
01774 return -1;
01775 }
01776
01777 gs_delete_server(gs_agent, server_list[0]);
01778 gs_server_free(server_list[0]);
01779
01780 LOGPRINTF("Server successfully killed.\n");
01781
01782 FREE(server_list);
01783 return 0;
01784 }
01785
01786
01787
01788 #ifdef GS_SMART_GRIDSOLVE
01789
01806 int gs_smart_fault_update_pm(gs_agent_t * gs_agent, int sock){
01807
01808
01809
01810 char *serverstr = NULL;
01811 gs_server_t *gs_server;
01812 time_t clock;
01813 struct tm *now;
01814 char subtime[128];
01815
01816 if(!gs_agent) {
01817 ERRPRINTF("Invalid arg: null agent\n");
01818 return -1;
01819 }
01820
01821 gs_server = (gs_server_t *) CALLOC(1, sizeof(gs_server_t));
01822
01823 if(!gs_server) {
01824 ERRPRINTF("Failed to malloc server struct\n");
01825 return -1;
01826 }
01827
01828 DBGPRINTF("Entering \n");
01829
01830
01831 if((gs_recv_string(sock, &serverstr) < 0) ||
01832 (gs_decode_server(serverstr, gs_server) < 0))
01833 return -1;
01834
01835 FREE(serverstr);
01836
01837
01838
01839 LOGPRINTF("SMART: Server %s has failed during execution .\n", gs_server->hostname);
01840
01841 if(gs_delete_server(gs_agent, gs_server) < 0){
01842 ERRPRINTF("Failed to delete server '%s'\n", gs_server->hostname);
01843 return -1;
01844 }
01845 LOGPRINTF("SMART: Server %s has been removed from database .\n", gs_server->hostname);
01846
01847 gs_server_free(gs_server);
01848
01849
01850 if(gs_send_tag(sock, GS_PROT_OK) < 0)
01851 return -1;
01852
01853
01854 }
01855
01856
01857
01858
01859
01860
01861
01862
01863
01864
01865
01866
01867
01868
01869
01870
01871
01872
01873
01874
01875
01876
01877
01878 int gs_smart_add_idl_info_to_app_pm(gs_agent_t * gs_agent , gs_smart_app_pm * app_pm){
01879 gs_server_t **server_list;
01880 int count;
01881 int i;
01882 char * app_pm_problem;
01883
01884 for(i=0; i<app_pm->nb_tasks;i++){
01885 app_pm->tasks[i]->problem = (gs_problem_t *)calloc(1, sizeof(gs_problem_t));
01886 if(!app_pm->tasks[i]->problem){
01887 ERRPRINTF("SMART: Error allocating to problem %d\n", i);
01888 return -1;
01889 }
01890 app_pm->tasks[i]->problem->name=app_pm->tasks[i]->nickname;
01891 if(gs_encode_problem(&app_pm_problem, app_pm->tasks[i]->problem) < 0){
01892 ERRPRINTF("SMART: Error encoding problem\n");
01893 return -1;
01894 }
01895
01896 count = gs_get_server_list(gs_agent, app_pm->tasks[i]->problem, NULL, &server_list, &count);
01897 }
01898 return 0;
01899
01900 }
01901
01902
01903
01904
01905
01906
01907
01908
01909
01910
01911
01912
01913
01914
01915
01916
01917
01918 int gs_smart_add_servers_to_tg(gs_agent_t * gs_agent, gs_smart_tg * task_graph){
01919 int i, count, node_index=0;
01920 gs_server_t ***server_list;
01921 char *client_criteria = NULL;
01922 server_list= (gs_server_t ***)calloc(task_graph->nb_nodes , sizeof(gs_server_t **));
01923 if(!server_list){
01924 ERRPRINTF("SMART: Error allocating to server list\n");
01925 return -1;
01926 }
01927 for(i=0; i<task_graph->nb_nodes;i++){
01928 if(task_graph->task_nodes[i]->node_type==GS_SMART_TG_REM_TASK_NODE){
01929 count = gs_get_server_list(gs_agent, task_graph->task_nodes[i]->tg_rem_task_node->problem, client_criteria, &server_list[node_index], &count);
01930 if( ( !server_list[node_index] ) || ( count<=0 ) ){
01931 ERRPRINTF("SMART: Error receiving server list or no servers in list\n");
01932 return -1;
01933 }
01934 task_graph->task_nodes[i]->tg_rem_task_node->nb_servers=count;
01935 task_graph->task_nodes[i]->tg_rem_task_node->avail_servers=server_list[node_index];
01936 node_index++;
01937 }
01938 }
01939 return 0;
01940 }
01941
01942
01943
01966 int
01967 gs_smart_agent_process_app_pm_model(gs_agent_t * gs_agent, int sock){
01968 gs_smart_tg * tg;
01969 gs_server_t **server_list = NULL;
01970 gs_smart_app_pm * app_pm;
01971 char * app_pm_str=NULL;
01972 int tag, i, count;
01973 int graph_type;
01974 char * comm_type=NULL;
01975
01976
01977
01978
01979
01980
01981
01982
01983
01984
01985
01986
01987
01988
01989
01990
01991
01992 if(gs_recv_string(sock, &(comm_type)) < 0) {
01993 ERRPRINTF("SMART: Error receiving xml info string\n");
01994 return -1;
01995 }
01996 if(strcmp(comm_type,"enable_remote_comm")==0){
01997 graph_type=GS_SMART_CL_SRV_BRDCST_GRAPH;
01998 }
01999 else if(strcmp(comm_type, "no_dep")==0){
02000 graph_type=GS_SMART_NO_DEP_GRAPH;
02001 }
02002 else if(strcmp(comm_type, "server_comm")==0){
02003 graph_type=GS_SMART_SRV_BRDCST_GRAPH;
02004 }
02005 else if(strcmp(comm_type, "disable_remote_comm")==0){
02006 graph_type=GS_SMART_CLNT_BRDCAST_COMM;
02007 }
02008 else{
02009 graph_type=GS_SMART_CL_SRV_BRDCST_GRAPH;
02010 }
02011 app_pm=(gs_smart_app_pm *)calloc(1,sizeof(gs_smart_app_pm));
02012
02013 if(!gs_agent) {
02014 ERRPRINTF("SMART: Invalid arg: null agent\n");
02015 if(app_pm_str) free(app_pm_str);
02016 if(app_pm) gs_smart_app_pm_free(app_pm);
02017 return -1;
02018 }
02019
02020 if(gs_recv_string(sock, &(app_pm_str)) < 0) {
02021 ERRPRINTF("SMART: Error receiving app_pm string\n");
02022 if(app_pm_str) free(app_pm_str);
02023 if(app_pm) gs_smart_app_pm_free(app_pm);
02024 return -1;
02025 }
02026
02027 if(gs_smart_decode_app_pm(app_pm_str,app_pm)<0){
02028 ERRPRINTF("SMART: Error decoding app_pm string\n");
02029 if(app_pm_str) free(app_pm_str);
02030 if(app_pm) gs_smart_app_pm_free(app_pm);
02031 return -1;
02032 }
02033
02034 if(app_pm_str) free(app_pm_str);
02035 tg = (gs_smart_tg *)calloc(1, sizeof(gs_smart_tg ));
02036
02037 if(!tg){
02038 ERRPRINTF("SMART: Error allocating memory for task graph\n");
02039 if(app_pm_str) free(app_pm_str);
02040 if(app_pm) gs_smart_app_pm_free(app_pm);
02041 if(tg) gs_smart_tg_free(tg);
02042 return -1;
02043 }
02044
02045 tag=GS_SMART_GRAPH_OK;
02046
02047 gs_problem_t **problem_list;
02048 problem_list=(gs_problem_t **)calloc(app_pm->nb_tasks , sizeof(gs_problem_t *));
02049
02050 if(!problem_list){
02051 ERRPRINTF("SMART: Error allocating to problem list\n");
02052 return -1;
02053 }
02054
02055 for(i=0; i<app_pm->nb_tasks;i++){
02056 problem_list[i] = (gs_problem_t *)calloc(1, sizeof(gs_problem_t));
02057 if(!problem_list[i]){
02058 ERRPRINTF("SMART: Error allocating to problem %d\n", i);
02059 return -1;
02060 }
02061 }
02062
02063 if(gs_smart_add_idl_info_to_app_pm(gs_agent, app_pm)<0){
02064 ERRPRINTF("SMART: Error getting problem list for application performance model\n");
02065 return -1;
02066 }
02067
02068 if(gs_smart_tg_construct_tg(app_pm, tg, graph_type)<0){
02069 ERRPRINTF("SMART: Constructing task nodes for task graph\n");
02070 tag=GS_SMART_GRAPH_FAIL;
02071 }
02072
02073 if(tag==GS_SMART_GRAPH_OK){
02074 if(gs_smart_add_servers_to_tg(gs_agent, tg)<0){
02075 ERRPRINTF("SMART: Error adding servers to task graph\n");
02076 tag=GS_SMART_GRAPH_FAIL;
02077 }
02078 }
02079 if(gs_send_tag(sock, tag)<0){
02080 ERRPRINTF("SMART: Error sending tag\n");
02081 if(tg) gs_smart_tg_free(tg);
02082 return -1;
02083 }
02084 if(tag==GS_SMART_GRAPH_OK){
02085 DBGPRINTF("SMART: Task graph created successfully\n");
02086 }
02087 if(tag==GS_SMART_GRAPH_FAIL){
02088 ERRPRINTF("SMART: Error creating task graph. This may be due to no servers registered\n");
02089 if(tg) gs_smart_tg_free(tg);
02090 return -1;
02091 }
02092
02093 if(gs_smart_send_tg(sock, tg)<0){
02094 ERRPRINTF("SMART: gs_send_task_graph failed\n");
02095 if(tg) gs_smart_tg_free(tg);
02096 return -1;
02097 }
02098
02099 count = gs_get_all_servers(gs_agent, &server_list, &count);
02100 if(count < 0) {
02101 ERRPRINTF("failed to get list of all servers\n");
02102 return -1;
02103 }
02104
02105 if(gs_smart_send_all_servers(sock, server_list, count)<0){
02106 ERRPRINTF("SMART: Error sending server link info\n");
02107 return -1;
02108 }
02109
02110 for(i = 0; i < count; i++)
02111 gs_server_free(server_list[i]);
02112
02113 FREE(server_list);
02114
02115 if(gs_send_tag(sock, GS_PROT_OK) < 0)
02116 return -1;
02117
02118
02119 if(app_pm){
02120 if(gs_smart_app_pm_free(app_pm)<0){
02121 ERRPRINTF("SMART : Error freeing application performance model\n");
02122 return -1;
02123 }
02124 }
02125
02126 if(tg){
02127 if(gs_smart_tg_free(tg)<0){
02128 ERRPRINTF("SMART: Error Freeing task graph\n");
02129 return -1;
02130 }
02131 }
02132
02133 return 0;
02134 }
02135
02136 #endif
02137
02138
02139
02140
02141
02142
02153 int
02154 gs_agent_process_message(gs_agent_t * gs_agent, int sock)
02155 {
02156 char *version_str;
02157 int tag, retval;
02158
02159 if((gs_recv_tag(sock, &tag)) < 0) {
02160 ERRPRINTF("could not get tag\n");
02161 return -1;
02162 }
02163
02164 if((gs_recv_string(sock, &version_str)) < 0) {
02165 ERRPRINTF("could not read version string\n");
02166 return -1;
02167 }
02168
02169 if(gs_versions_incompatible(version_str, VERSION))
02170 retval = gs_send_tag(sock, GS_PROT_VERSION_MISMATCH);
02171 else
02172 retval = gs_send_tag(sock, GS_PROT_OK);
02173
02174 free(version_str);
02175
02176 if(retval < 0) {
02177 ERRPRINTF("could not send response tag.\n");
02178 return -1;
02179 }
02180
02181 if(gs_storage_init(gs_agent) < 0) {
02182 ERRPRINTF("Could not init database.\n");
02183 return -1;
02184 }
02185
02186 retval = -1;
02187
02188 switch (tag) {
02189 case GS_PROT_PROBLEM_SUBMIT:
02190 retval = gs_agent_process_problem_submit(gs_agent, sock);
02191 break;
02192 case GS_PROT_SERVER_REGISTER:
02193 retval = gs_agent_process_server_registration(gs_agent, sock);
02194 break;
02195 case GS_PROT_PROBLEM_REGISTER:
02196 retval = gs_agent_process_problem_registration(gs_agent, sock);
02197 break;
02198 case GS_PROT_WORKLOAD_REPORT:
02199 retval = gs_agent_process_workload_report(gs_agent, sock);
02200 break;
02201 case GS_PROT_KILL_AGENT:
02202 retval = gs_agent_process_kill_agent(gs_agent, sock);
02203 break;
02204 case GS_PROT_KILL_SERVER:
02205 retval = gs_agent_process_kill_server(gs_agent, sock);
02206 break;
02207 case GS_PROT_SERVER_LIST:
02208 retval = gs_agent_process_server_list(gs_agent, sock);
02209 break;
02210 case GS_PROT_SERVER_PING_LIST:
02211 retval = gs_agent_process_server_ping_list(gs_agent, sock);
02212 break;
02213 case GS_PROT_SERVER_PING_UPDATE:
02214 retval = gs_agent_process_server_ping_update(gs_agent, sock);
02215 break;
02216 case GS_PROT_PROBLEM_LIST:
02217 retval = gs_agent_process_problem_list(gs_agent, sock);
02218 break;
02219 case GS_PROT_PROBLEM_DESC:
02220 retval = gs_agent_process_problem_desc(gs_agent, sock);
02221 break;
02222 case GS_PROT_NOTIFY_SUBMIT:
02223 retval = gs_agent_process_notify_submit(sock);
02224 break;
02225 case GS_PROT_NOTIFY_FAILURE:
02226 retval = gs_agent_process_notify_failure(sock);
02227 break;
02228 case GS_PROT_NOTIFY_COMPLETE:
02229 retval = gs_agent_process_notify_complete(sock);
02230 break;
02231 case GS_PROT_NOTIFY_CANCEL:
02232 retval = gs_agent_process_notify_cancel(sock);
02233 break;
02234 case GS_PROT_AVAILABILITY_REQ:
02235 retval = gs_agent_process_availability_request(sock);
02236 break;
02237 #ifdef GS_SMART_GRIDSOLVE
02238 case GS_PROT_APP_PM_MODEL:
02239 retval = gs_smart_agent_process_app_pm_model(gs_agent, sock);
02240 break;
02241 case GS_SMART_FAULT_UPDATE_PM:
02242 retval = gs_smart_fault_update_pm(gs_agent, sock);
02243 break;
02244 #endif
02245 default:
02246 LOGPRINTF("Unknown tag %d\n", tag);
02247 break;
02248 }
02249
02250 gs_storage_finalize(gs_agent);
02251
02252 return retval;
02253 }
02254
02266 int
02267 gs_agent_listen_and_process_messages(gs_agent_t *gs_agent)
02268 {
02269 int sock = -1;
02270 int accept_error = 0;
02271
02272 if(!gs_agent) {
02273 ERRPRINTF("Invalid arg: null agent\n");
02274 return -1;
02275 }
02276
02277
02278 proxy_init("");
02279
02280 gs_agent->sock = gs_establish_socket(&(gs_agent->port), TRUE);
02281
02282 if(gs_agent->sock == -1) {
02283 ERRPRINTF("Could not bind to port %d \n", gs_agent->port);
02284 return (-1);
02285 }
02286 LOGPRINTF("Agent %s listening at port %d\n",
02287 gs_agent->hostname, gs_agent->port);
02288
02289 gs_listen_on_socket(gs_agent->sock);
02290
02291 for(;;) {
02292 pid_t pid;
02293
02294
02295 if((sock = gs_accept_connection(gs_agent->sock)) == -1) {
02296
02297 if(accept_error != 1)
02298 ERRPRINTF
02299 ("Failed to accept connection on socket, return to listening \n");
02300 accept_error = 1;
02301 continue;
02302 }
02303 else {
02304 accept_error = 0;
02305 }
02306
02307
02308 global_taskid++;
02309
02310 pid = fork();
02311
02312 switch(pid) {
02313 case -1:
02314 ERRPRINTF("Failed to fork\n");
02315 gs_close_socket(sock);
02316 continue;
02317 case 0:
02318 gs_close_socket(gs_agent->sock);
02319 gs_agent_process_message(gs_agent, sock);
02320 _exit(0);
02321 default:
02322 gs_close_socket(sock);
02323 }
02324
02325 fflush(stderr);
02326 fflush(stdout);
02327 }
02328
02329 return 0;
02330 }
02331
02340 gs_agent_scheduler_t
02341 gs_agent_parse_scheduler_name(char *name)
02342 {
02343 if(!name) return GS_INVALID_SCHEDULER;
02344
02345 if(!strcasecmp(name, "default_mct"))
02346 return GS_DEFAULT_MCT;
02347
02348
02349
02350
02351 if(!strncasecmp(name, "htm", 3)) {
02352 keep_track_of_task_events = 1;
02353
02354 if(!strcasecmp(name, "htm_ml"))
02355 return GS_HTM_ML;
02356
02357 if(!strcasecmp(name, "htm_msf"))
02358 return GS_HTM_MSF;
02359
02360 if(!strcasecmp(name, "htm_hmct"))
02361 return GS_HTM_HMCT;
02362
02363 if(!strcasecmp(name, "htm_mp"))
02364 return GS_HTM_MP;
02365 }
02366
02367 return GS_INVALID_SCHEDULER;
02368 }
02369
02386 int
02387 gs_agent_parse_cmd_line(int argc, char **argv, char **logfile, int *daemon,
02388 int *httpd_port, char **config_file)
02389 {
02390 char *httpd_port_env;
02391 int c;
02392
02393 if(!argv || !logfile || !daemon) {
02394 ERRPRINTF("Invalid args\n");
02395 return -1;
02396 }
02397
02398 *logfile = NULL;
02399 *daemon = 1;
02400 *config_file = NULL;
02401
02402
02403
02404 if((httpd_port_env = getenv("GRIDSOLVE_HTTPD_PORT")) &&
02405 (!strcmp(httpd_port_env, "disable")))
02406 *httpd_port = -1;
02407 else
02408 *httpd_port = getenv_int("GRIDSOLVE_HTTPD_PORT",
02409 GRIDSOLVE_HTTPD_PORT_DEFAULT);
02410
02411
02412
02413
02414
02415
02416 #define GS_AGENT_USAGE_STR "Usage: GS_agent [-l logfile] [-c] [-w httpd port] [-s config file]"
02417
02418 while((c = getopt(argc,argv,"cl:w:s:")) != EOF) {
02419 switch(c) {
02420 case 'w':
02421 if(!strcmp(optarg, "disable"))
02422 *httpd_port = -1;
02423 else
02424 *httpd_port = atoi(optarg);
02425 break;
02426 case 's':
02427 *config_file = strdup(optarg);
02428 break;
02429 case 'l':
02430 *logfile = strdup(optarg);
02431 break;
02432 case 'c':
02433 *daemon = 0;
02434 break;
02435 case '?':
02436 return -1;
02437 break;
02438 default:
02439 ERRPRINTF("Bad arg: '%c'.\n",c);
02440 return -1;
02441 }
02442 }
02443
02444
02445 if(!*config_file) {
02446 char *gridsolve_root;
02447
02448 if((gridsolve_root = getenv("GRIDSOLVE_ROOT")) == NULL)
02449 gridsolve_root = GRIDSOLVE_TOP_BUILD_DIR;
02450 if(!gridsolve_root) {
02451 ERRPRINTF("Warning: GRIDSOLVE_ROOT unknown, assuming cwd.\n");
02452 gridsolve_root = strdup(".");
02453 }
02454
02455 *config_file = dstring_sprintf("%s/agent_config", gridsolve_root);
02456 if(!*config_file) {
02457 fprintf(stderr,"Error generating agent config file name.\n");
02458 exit(EXIT_FAILURE);
02459 }
02460 }
02461
02462 return 0;
02463 }
02464
02474 int
02475 gs_spawn_http_server(gs_agent_t *gs_agent, int httpd_port)
02476 {
02477 void **httpd_args;
02478 int *port;
02479 pid_t pid;
02480
02481 httpd_args = (void **) malloc(2 * sizeof(void *));
02482 if(!httpd_args) {
02483 ERRPRINTF("Failed to allocate memory for child args\n");
02484 return -1;
02485 }
02486
02487 port = (int *)malloc(sizeof(int));
02488 if(!port) {
02489 ERRPRINTF("Failed to allocate memory for port arg\n");
02490 return -1;
02491 }
02492
02493 *port = httpd_port;
02494
02495 httpd_args[0] = port;
02496 httpd_args[1] = gs_agent;
02497
02498
02499 pid = mfork(gs_agent_httpd, -1, httpd_args, NULL, NULL, NULL, 10);
02500
02501 if(pid < 0) {
02502 fprintf(stderr,"Failed to fork http server.\n");
02503 return -1;
02504 }
02505
02506 return 0;
02507 }
02508
02520 int
02521 gs_spawn_server_expiration(gs_agent_t *gs_agent, int timeout)
02522 {
02523 extern void gs_server_expiration_pre(void**);
02524 extern void gs_server_expiration_post(void**);
02525 void **se_args;
02526 pid_t pid;
02527 int *tout;
02528
02529 se_args = (void **) malloc(2 * sizeof(void *));
02530 if(!se_args) {
02531 ERRPRINTF("Failed to allocate memory for child args\n");
02532 return -1;
02533 }
02534
02535 tout = (int *) malloc(sizeof(int));
02536 if(!tout) {
02537 ERRPRINTF("Failed to allocate memory for timeout arg\n");
02538 return -1;
02539 }
02540
02541 *tout = timeout;
02542
02543 se_args[0] = gs_agent;
02544 se_args[1] = tout;
02545
02546 if(gs_signal(SIGHUP, gs_temp_sighup_handler) == SIG_ERR)
02547 ERRPRINTF("error setting up temp signal handler\n");
02548
02549 pid = mfork(gs_server_expiration, GS_EXPIRE_FREQUENCY, se_args,
02550 gs_server_expiration_pre, NULL, gs_server_expiration_post, 30);
02551
02552 if(gs_signal(SIGHUP, gs_agent_generic_signal_handler) == SIG_ERR)
02553 ERRPRINTF("error restoring signal handler\n");
02554
02555 if(pid < 0) {
02556 ERRPRINTF("Failed to fork server expiration process.\n");
02557 return -1;
02558 }
02559
02560 return 0;
02561 }
02562
02570 int
02571 gs_spawn_sensor(gs_agent_t *gs_agent)
02572 {
02573 struct sockaddr_un sensoraddr;
02574 void **sens_args;
02575 pid_t pid;
02576 int err;
02577
02578 sens_args = (void **) malloc(2 * sizeof(void *));
02579 if(!sens_args) {
02580 ERRPRINTF("Failed to allocate memory for child args\n");
02581 return -1;
02582 }
02583 sens_args[0] = NULL;
02584 sens_args[1] = gs_agent;
02585
02586
02587 pid = mfork(gs_agent_sensor_run, -1, sens_args,
02588 gs_agent_sensor_pre, gs_agent_sensor_post,
02589 gs_agent_sensor_exit, 30);
02590
02591 if(pid < 0) {
02592 ERRPRINTF("Failed to fork sensor process.\n");
02593 return -1;
02594 }
02595
02596
02597 GS_SENSORFD = socket(PF_UNIX, SOCK_STREAM, 0);
02598 if ( GS_SENSORFD < 0 )
02599 {
02600 ERRPRINTF("Could not open socket to sensor.\n");
02601 return -1;
02602 }
02603
02604 memset(&sensoraddr, 0x0, sizeof(struct sockaddr_un));
02605 sensoraddr.sun_family = PF_UNIX;
02606 strcpy(sensoraddr.sun_path, GRIDSOLVE_SENSOR_USOCK);
02607
02608 err = connect(GS_SENSORFD, (struct sockaddr*) &sensoraddr,
02609 sizeof(struct sockaddr_un));
02610 if ( err < 0 )
02611 {
02612 ERRPRINTF("Warning: Connect on sensor socket failed.");
02613 ERRPRINTF(" logging events to stdout.\n");
02614 GS_SENSORFILE = stdout;
02615 }
02616 else {
02617 GS_SENSORFILE = fdopen(GS_SENSORFD, "w+");
02618 LOGPRINTF("Connected to sensor.\n");
02619 }
02620
02621 return 0;
02622 }
02623
02634 int
02635 main(int argc, char **argv)
02636 {
02637 gs_agent_t *gs_agent = NULL;
02638 int status = -1, daemon;
02639 int httpd_port;
02640 char *logfile, *gridsolve_root;
02641
02642 gs_agent_scheduler_selection = GS_DEFAULT_MCT;
02643
02644 gs_setup_signal_handlers(gs_agent_generic_signal_handler);
02645
02646 if((gridsolve_root = getenv("GRIDSOLVE_ROOT")) == NULL)
02647 gridsolve_root = GRIDSOLVE_TOP_BUILD_DIR;
02648
02649 if(!gridsolve_root) {
02650 ERRPRINTF("Warning: GRIDSOLVE_ROOT unknown, assuming cwd.\n");
02651 gridsolve_root = strdup(".");
02652 }
02653
02654 if(gs_agent_parse_cmd_line(argc, argv, &logfile, &daemon, &httpd_port,
02655 &agent_cfg) < 0) {
02656 fprintf(stderr, "%s\n", GS_AGENT_USAGE_STR);
02657 exit(EXIT_FAILURE);
02658 }
02659
02660 if(!logfile) {
02661 logfile = dstring_sprintf("%s/gs_agent.log", gridsolve_root);
02662 if(!logfile) {
02663 ERRPRINTF("Error generating log file name.\n");
02664 exit(EXIT_FAILURE);
02665 }
02666 }
02667
02668 if(daemon && gs_daemon_init(gridsolve_root, logfile) < 0) {
02669 fprintf(stderr, "Failed to start agent as a daemon.\n");
02670 exit(EXIT_FAILURE);
02671 }
02672
02673 gs_agent = gs_agent_init(agent_cfg);
02674 if(!gs_agent) {
02675 ERRPRINTF("Failed to initialize agent\n");
02676 exit(EXIT_FAILURE);
02677 }
02678
02679 if(gs_agent_scheduler_selection == GS_INVALID_SCHEDULER) {
02680 fprintf(stderr, "Invalid scheduler type specified. Legal values are:\n");
02681 fprintf(stderr, " default_mct -- original gridsolve scheduler\n");
02682 fprintf(stderr, " htm_ml -- HTM minimum length\n");
02683 fprintf(stderr, " htm_msf -- HTM minimum sumflow\n");
02684 fprintf(stderr, " htm_hmct -- HTM historical minimum completion time\n");
02685 fprintf(stderr, " htm_mp -- HTM minimum perturbation\n");
02686 exit(EXIT_FAILURE);
02687 }
02688
02689 gs_log_init(argv[0], -1, -1, "agent");
02690
02691
02692 #ifdef GS_USE_MYSQL
02693 if(gs_mysql_init_db(gs_agent) < 0) {
02694 printf("MySQL db could not be initialized\n");
02695 exit(-1);
02696 }
02697 #else
02698 if(gs_init_db(&global_db) < 0) {
02699 ERRPRINTF("SQLite could not be initialized\n");
02700 exit(EXIT_FAILURE);
02701 }
02702 #endif
02703
02704 if(gs_spawn_sensor(gs_agent) < 0) {
02705 ERRPRINTF("Failed to spawn sensor\n");
02706 exit(EXIT_FAILURE);
02707 }
02708
02709 if(gs_spawn_server_expiration(gs_agent, GS_EXPIRE_TIMEOUT) < 0) {
02710 ERRPRINTF("Failed to spawn server expiration process\n");
02711 exit(EXIT_FAILURE);
02712 }
02713
02714 if(httpd_port >= 0)
02715 if(gs_spawn_http_server(gs_agent, httpd_port) < 0) {
02716 ERRPRINTF("Failed to spawn http server\n");
02717 exit(EXIT_FAILURE);
02718 }
02719
02720 status = gs_agent_listen_and_process_messages(gs_agent);
02721
02722 LOGPRINTF("Agent %s exiting with status %d\n", gs_agent->hostname, status);
02723
02724 exit(status);
02725 }
02726
02736 int
02737 gs_agent_init_conns(gs_agent_conn_t *conns)
02738 {
02739 int i;
02740
02741 if(!conns) {
02742 ERRPRINTF("Invalid arg: null connections ptr\n");
02743 return -1;
02744 }
02745
02746 for(i=0; i < GS_AGENT_MAX_FD; i++)
02747 conns->fd[i] = 0;
02748 conns->maxfd = 0;
02749
02750 return 0;
02751 }
02752
02762 int
02763 gs_agent_add_conn(gs_agent_conn_t *conns, int fd)
02764 {
02765 if(!conns) {
02766 ERRPRINTF("Invalid arg: null connections ptr\n");
02767 return -1;
02768 }
02769
02770 if(fd > GS_AGENT_MAX_FD)
02771 return -1;
02772
02773 conns->fd[fd] = 1;
02774
02775 if(conns->maxfd < fd)
02776 conns->maxfd = fd;
02777
02778 return 0;
02779 }
02780
02790 int
02791 gs_agent_del_conn(gs_agent_conn_t *conns, int fd)
02792 {
02793 int i;
02794
02795 if(!conns) {
02796 ERRPRINTF("Invalid arg: null connections ptr\n");
02797 return -1;
02798 }
02799
02800 conns->fd[fd] = 0;
02801
02802 if(fd == conns->maxfd) {
02803 for(i=fd;i>=0;i--)
02804 if(conns->fd[i]) {
02805 conns->maxfd = i;
02806 break;
02807 }
02808
02809 if(i < 0)
02810 conns->maxfd = 0;
02811 }
02812
02813 return 0;
02814 }
02815
02830 int
02831 gs_agent_setup_fd_sets(gs_agent_conn_t *connections, int listensock,
02832 int extrasock, fd_set *allset, int *maxfd)
02833 {
02834 int i;
02835
02836 if(!connections || !allset || !maxfd) {
02837 ERRPRINTF("Invalid args\n");
02838 return -1;
02839 }
02840
02841 *maxfd = extrasock;
02842 if(listensock > *maxfd)
02843 *maxfd = listensock;
02844
02845 FD_ZERO(allset);
02846 if(extrasock >= 0)
02847 FD_SET(extrasock, allset);
02848 if(listensock >= 0)
02849 FD_SET(listensock, allset);
02850
02851 for(i=0; i <= connections->maxfd; i++) {
02852 if(connections->fd[i]) {
02853 FD_SET(i, allset);
02854 if(i > *maxfd)
02855 *maxfd = i;
02856 }
02857 }
02858
02859 return 0;
02860 }