#include <stdlib.h>#include <stdio.h>#include <glob.h>#include <sys/types.h>#include <sys/stat.h>#include <sys/wait.h>#include <unistd.h>#include <fcntl.h>#include <string.h>#include <signal.h>#include <errno.h>#include "utility.h"#include "problem.h"#include "server.h"#include "comm_basics.h"#include "comm_data.h"#include "comm_encode.h"#include "general.h"#include "proxylib.h"#include "kflops.h"#include "gs_seq_dsi.h"
Go to the source code of this file.
Defines | |
| #define | GS_MKTIME(t) ((t)->tm_hour*10000 + (t)->tm_min*100 + (t)->tm_sec) |
Functions | |
| static int | gs_probe_batch_request (char *) |
| static int | gs_cancel_batch_request (char *) |
| static int | process_problem_solve (gs_server_t *gs_server, int tag, int sock) |
| Receive a service request and call execv to run the service. | |
| static int | process_probe_request (int sock) |
| Receive a specific request_id and check if it has completed. | |
| static int | process_problem_example (int sock) |
| returns example C code to call the specified problem | |
| static int | process_cancel_request (int sock) |
| Receive a specific request_id and cancel the request. | |
| static int | process_await_completion (int sock) |
| Check if a specific request_id is completed and send back the results. | |
| static int | process_kill_server (int sock) |
| Handle a KILL_SERVER tag. | |
| static int | process_store_file (int) |
| static int | process_ping (int) |
| static int | process_data_transfer_request (int sock) |
| int | gs_server_process_message (gs_server_t *gs_server, int sock) |
| Get the message tag from the socket and take appropriate actions. | |
| int | gs_server_check_job_restriction (void *gs_server, gs_restriction_t *rest, char *msg, int *rtag) |
| int | gs_server_check_time_restriction (void *gs_server, gs_restriction_t *rest, char *msg, int *rtag) |
| int | gs_server_check_restrictions (gs_server_t *gs_server, char *msg, int *rtag) |
| int | gs_server_listen_and_process_messages (gs_server_t *gs_server, int srvsock) |
| Start listening on socket, accept the messages and hand off for processing. | |
| int | gs_find_coefficient_files (gs_server_t *server, icl_list_t *cfiles) |
| int | gs_find_service_directories (icl_list_t *services, int *count) |
| char * | gs_get_service_name_from_path (char *data) |
| int | gs_server_prepare_model_update_message (int sock, gs_server_t *gs_server, gs_problem_t *problem, char **msg) |
| int | gs_server_register_problems (gs_server_t *gs_server) |
| Register problems with the agent. | |
| int | gs_server_register (gs_server_t *gs_server) |
| Register the server with the agent. | |
| int | gs_server_add_restriction (gs_server_t *gs_server, char *type, char *value) |
| int | gs_server_create_info_dir (gs_server_t *gs_server) |
| int | gs_server_save_encoding (gs_server_t *server) |
| gs_server_t * | gs_server_init (char *server_config) |
| Initialize the server data structure. | |
| int | gs_server_dump (gs_server_t *s) |
| Dump the server for debugging purposes. | |
| int | gs_parse_time (const char *time, struct tm *ptm) |
| int | gs_parse_time_range (char *range, struct tm *start, struct tm *end) |
| int | gs_is_current_time_in_range (struct tm *start, struct tm *end) |
Variables | |
| static char * | gridsolve_root = NULL |
| static char * | gridsolve_arch = NULL |
| static char * | gridsolve_agent = NULL |
| char | GS_SERVER_JOB_COUNT_FILE [FN_LEN] |
This file contains most of the server specific routines.
Definition in file server_start.c.
| #define GS_MKTIME | ( | t | ) | ((t)->tm_hour*10000 + (t)->tm_min*100 + (t)->tm_sec) |
| static int gs_cancel_batch_request | ( | char * | request_id | ) |
Cancel a batch job using the cancel script. this is assumed to be called from process_cancel_request() so we also assume that the current directory is where the requests are stored.
| request_id | -- request id of the request to cancel |
Definition at line 1217 of file server_start.c.
{
gs_problem_t *problem;
char buf[256], fname[256], *cmd;
int status;
FILE *f;
sprintf(fname, "%s/gs_batch_id", request_id);
if((f = fopen(fname, "r")) == NULL)
return 3;
/* don't treat as error */
if(!fgets(buf, 256, f)) {
ERRPRINTF("failed to read ID from file gs_batch_id\n");
return -1;
}
fclose(f);
buf[strlen(buf)-1] = '\0';
problem = (gs_problem_t *) malloc(sizeof(gs_problem_t));
sprintf(fname, "%s/%s", request_id, GS_BATCH_XML);
if(gs_read_problem_from_file(fname, problem) < 0) {
ERRPRINTF("Error loading service: '%s'.\n", GS_BATCH_XML);
return -1;
}
cmd = dstring_sprintf("%s/service/%s/gs_cancel %s", gridsolve_root,
problem->name, buf);
gs_free_problem(problem);
if(!cmd) {
ERRPRINTF("malloc");
return -1;
}
status = system(cmd);
if(status < 0) {
ERRPRINTF("command failed: '%s'\n", cmd);
return -1;
}
return WEXITSTATUS(status);
}


| int gs_find_coefficient_files | ( | gs_server_t * | server, | |
| icl_list_t * | cfiles | |||
| ) |
Finds coefficient files.
| gs_server | -- this server's information | |
| cfiles | -- a linked list initialized by the caller into which the file names will be stored |
Definition at line 1894 of file server_start.c.
{
int i, globflags;
char *globpattern;
glob_t *pglob;
if(!cfiles || !server)
return -1;
if(!server->infodir)
return 0;
/* glob the info directory to find coefficient files */
globflags = GLOB_NOSORT;
globpattern = dstring_sprintf("%s/*.coe", server->infodir);
if(!globpattern)
return -1;
pglob = (glob_t*)MALLOC(sizeof(glob_t));
if(!pglob) {
free(globpattern);
return -1;
}
/* Glob for files */
if((i=glob(globpattern, globflags, NULL, pglob)) != 0) {
free(globpattern);
globfree(pglob);
free(pglob);
return 0;
}
free(globpattern);
/* For each file containing a problem description */
for (i=0; i < pglob->gl_pathc; i++)
icl_list_append(cfiles, strdup(pglob->gl_pathv[i]));
globfree(pglob);
free(pglob);
return 0;
}


| int gs_find_service_directories | ( | icl_list_t * | services, | |
| int * | count | |||
| ) |
Finds the services that are stored in $GRIDSOLVE_ROOT/service
| gs_server | -- this server's information | |
| services | -- a linked list initialized by the caller into which the service names will be stored |
Definition at line 1952 of file server_start.c.
{
int i, globflags;
char *globpattern;
glob_t *pglob;
*count = 0;
if(!services)
return -1;
DBGPRINTF("Entering\n");
/* glob the services directory to find services */
globflags = GLOB_NOSORT;
globpattern = dstring_sprintf("%s/service/*/*.xml",
gridsolve_root);
if(!globpattern)
return -1;
pglob = (glob_t*)MALLOC(sizeof(glob_t));
if(!pglob) {
free(globpattern);
return -1;
}
/* Glob for files */
if((i=glob(globpattern, globflags, NULL, pglob)) != 0) {
ERRPRINTF("No problems or services found using path: %s\n ", globpattern);
ERRPRINTF("\t glob returned %d\n", i);
free(globpattern);
globfree(pglob);
free(pglob);
return -1;
}
free(globpattern);
/* For each file containing a problem description */
for (i=0; i < pglob->gl_pathc; i++) {
icl_list_append(services, strdup(pglob->gl_pathv[i]));
(*count)++;
}
globfree(pglob);
free(pglob);
return 0;
}


| char* gs_get_service_name_from_path | ( | char * | data | ) |
Definition at line 2006 of file server_start.c.
{
char *tok, *prevtok, *path;
if(!data) return NULL;
tok = strchr(data, '/');
if(tok) {
do {
tok++;
prevtok = tok;
} while((tok = strchr(tok, '/')));
}
else
prevtok = data;
path = strdup(prevtok);
if(!path) return NULL;
tok = strchr(path, '.');
if(!tok) {
free(path);
return NULL;
}
*tok = 0;
return path;
}

| int gs_is_current_time_in_range | ( | struct tm * | start, | |
| struct tm * | end | |||
| ) |
Checks whether the current local time falls in the range specified by the start and end times.
| start | -- the starting time | |
| end | -- the ending time |
Definition at line 2921 of file server_start.c.
{
int start_time, end_time, cur_time;
struct tm *local_time;
time_t ct;
time(&ct);
local_time = localtime(&ct);
#define GS_MKTIME(t) ((t)->tm_hour*10000 + (t)->tm_min*100 + (t)->tm_sec)
start_time = GS_MKTIME(start);
end_time = GS_MKTIME(end);
cur_time = GS_MKTIME(local_time);
if(start_time <= end_time)
return (cur_time >= start_time) && (cur_time <= end_time);
else
return (cur_time <= end_time) || (cur_time >= start_time);
return 0;
}

| int gs_parse_time | ( | const char * | time, | |
| struct tm * | ptm | |||
| ) |
Given a string representing the time, parse and store in a tm struct. The string can be formatted as "H:M:S", "H:M", or "H". "am" or "pm" may be appended; otherwise it's assumed to be in 24-hour format.
| time | -- string representation of the time | |
| ptm | -- pointer to tm struct where the time should be stored |
Definition at line 2812 of file server_start.c.
{
int n = 0, nchars = 0;
memset(ptm, 0, sizeof(*ptm));
ptm->tm_isdst = -1;
n = sscanf(time, "%2u:%2u:%2u%n",
&ptm->tm_hour, &ptm->tm_min, &ptm->tm_sec, &nchars);
if(n != 3) {
n = sscanf(time, "%2u:%2u%n", &ptm->tm_hour, &ptm->tm_min, &nchars);
if(n != 2) {
n = sscanf(time, "%2u%n", &ptm->tm_hour, &nchars);
if(n != 1)
return -1;
}
}
time += nchars;
if(*time) {
char *ampm;
ampm = (char *)malloc(strlen(time));
if(!ampm) return -1;
if(*time && sscanf(time, "%s", ampm)) {
if(!strcasecmp(ampm, "pm"))
ptm->tm_hour += 12;
}
free(ampm);
}
if(ptm->tm_hour < 0 || ptm->tm_hour > 23 ||
ptm->tm_min < 0 || ptm->tm_min > 59 ||
ptm->tm_sec < 0 || ptm->tm_sec > 61)
return -1;
return 0;
}

| int gs_parse_time_range | ( | char * | range, | |
| struct tm * | start, | |||
| struct tm * | end | |||
| ) |
Parses a time range string (e.g. "9am-5pm") and stores the start and end parts in tm structs.
| range | -- string representing the time range | |
| start | -- pointer to tm struct to store the starting time | |
| end | -- pointer to tm struct to store the ending time |
Definition at line 2869 of file server_start.c.
{
char *copy, *endtok;
if(!range || !start || !end) return -1;
copy = strdup(range);
if(!copy) return -1;
endtok = strchr(copy, '-');
if(!endtok) {
free(copy);
return -1;
}
*endtok = 0;
endtok++;
if(*endtok == 0) {
free(copy);
return -1;
}
if(gs_parse_time(copy, start) < 0) {
fprintf(stderr, "Error parsing start time\n");
free(copy);
return -1;
}
if(gs_parse_time(endtok, end) < 0) {
fprintf(stderr, "Error parsing end time\n");
free(copy);
return -1;
}
free(copy);
return 0;
}


| static int gs_probe_batch_request | ( | char * | request_id | ) | [static] |
Local function declarations
Probe for completion using the probe script. this is assumed to be called from process_probe_request() so we also assume that the current directory is where the requests are stored.
| request_id | -- request id of the request to probe |
Definition at line 1153 of file server_start.c.
{
gs_problem_t *problem;
char buf[256], fname[256], *cmd;
int status;
FILE *f;
sprintf(fname, "%s/gs_batch_id", request_id);
if((f = fopen(fname, "r")) == NULL)
return 3;
/* don't treat as error... */
if(!fgets(buf, 256, f))
return 0;
fclose(f);
buf[strlen(buf)-1] = '\0';
problem = (gs_problem_t *) malloc(sizeof(gs_problem_t));
sprintf(fname, "%s/%s", request_id, GS_BATCH_XML);
if(gs_read_problem_from_file(fname, problem) < 0) {
ERRPRINTF("Error loading service: '%s'.\n", GS_BATCH_XML);
return -1;
}
cmd = dstring_sprintf("%s/service/%s/gs_probe %s", gridsolve_root,
problem->name, buf);
gs_free_problem(problem);
if(!cmd) {
ERRPRINTF("malloc");
return -1;
}
status = system(cmd);
if(status < 0) {
ERRPRINTF("command failed: '%s'\n", cmd);
return -1;
}
return WEXITSTATUS(status);
}


| int gs_server_add_restriction | ( | gs_server_t * | gs_server, | |
| char * | type, | |||
| char * | value | |||
| ) |
Adds the specified restriction to the server's restriction list.
| gs_server | -- pointer to server to add restriction to | |
| type | -- restriction type | |
| value | -- string value of restriction |
Definition at line 2431 of file server_start.c.
{
gs_restriction_t *new;
if(!gs_server || !type || !value)
return -1;
new = (gs_restriction_t *)calloc(1, sizeof(gs_restriction_t));
if(!new) return -1;
if(!strcasecmp(type, "RESTRICT_JOBS")) {
new->restriction_type = GS_JOB_RESTRICT;
new->restriction.job_restriction.max_jobs = atoi(value);
new->check_func = gs_server_check_job_restriction;
}
else if(!strcasecmp(type, "RESTRICT_TIME")) {
struct tm *start, *end;
new->restriction_type = GS_TIME_RESTRICT;
start = &(new->restriction.time_restriction.start);
end = &(new->restriction.time_restriction.end);
if(gs_parse_time_range(value, start, end) < 0) {
ERRPRINTF("Skipping time restriction '%s' ", value);
ERRPRINTF("since the time(s) could not be parsed\n");
free(new);
return -1;
}
new->check_func = gs_server_check_time_restriction;
}
else {
ERRPRINTF("Invalid restriction type '%s'\n", type);
free(new);
return -1;
}
new->next = gs_server->restrictions;
gs_server->restrictions = new;
return 0;
}


| int gs_server_check_job_restriction | ( | void * | gs_server, | |
| gs_restriction_t * | rest, | |||
| char * | msg, | |||
| int * | rtag | |||
| ) |
Checks the problem solve request against the server's restrictions.
| gs_server | -- pointer to server whose restrictions are to be checked | |
| msg | -- the problem solve request message | |
| rtag | -- set upon return. if no restrictions are violated, rtag is set to GS_PROT_OK. otherwise the tag is set to something specific to the violated restriction. |
Definition at line 1544 of file server_start.c.
{
int jcount;
if(gs_get_job_count(GS_SERVER_JOB_COUNT_FILE, &jcount) < 0)
return -1;
if(jcount >= rest->restriction.job_restriction.max_jobs)
*rtag = GS_SVC_ERR_RESTRICT_JOBS;
else
*rtag = GS_PROT_OK;
return 0;
}

| int gs_server_check_restrictions | ( | gs_server_t * | gs_server, | |
| char * | msg, | |||
| int * | rtag | |||
| ) |
Checks the problem solve request against the server's restrictions.
| gs_server | -- pointer to server whose restrictions are to be checked | |
| msg | -- the problem solve request message | |
| rtag | -- set upon return. if no restrictions are violated, rtag is set to GS_PROT_OK. otherwise the tag is set to something specific to the violated restriction. |
Definition at line 1604 of file server_start.c.
{
gs_restriction_t *rp;
for(rp = gs_server->restrictions; rp; rp = rp->next) {
if(rp->check_func(gs_server, rp, msg, rtag) < 0)
return -1;
if(*rtag != GS_PROT_OK)
return 0;
}
*rtag = GS_PROT_OK;
return 0;
}

| int gs_server_check_time_restriction | ( | void * | gs_server, | |
| gs_restriction_t * | rest, | |||
| char * | msg, | |||
| int * | rtag | |||
| ) |
Checks the problem solve request against the server's restrictions.
| gs_server | -- pointer to server whose restrictions are to be checked | |
| msg | -- the problem solve request message | |
| rtag | -- set upon return. if the time restriction is not violated, rtag is set to GS_PROT_OK. otherwise the tag is set to GS_SVC_ERR_RESTRICT_TIME. |
Definition at line 1573 of file server_start.c.
{
struct tm *start, *end;
start = &(rest->restriction.time_restriction.start);
end = &(rest->restriction.time_restriction.end);
if(!gs_is_current_time_in_range(start, end)) {
LOGPRINTF("Rejecting job due to time restriction.\n");
*rtag = GS_SVC_ERR_RESTRICT_TIME;
return 0;
}
*rtag = GS_PROT_OK;
return 0;
}

| int gs_server_create_info_dir | ( | gs_server_t * | gs_server | ) |
If the server_info directory doesn't exist, create it.
| gs_server | -- server struct containing the ip address and port to use. upon return, the infodir field will contain the subdirectory name. If the subdirectory already exists, then the componentid field will also be overwritten here with the ID from the previous run. |
Definition at line 2489 of file server_start.c.
{
if(gs_create_info_dir(gs_server->componentid, gs_server->ipaddress,
gs_server->port, &(gs_server->infodir)) < 0) {
ERRPRINTF("Creating server infodir failed.\n");
return -1;
}
return 0;
}


| int gs_server_dump | ( | gs_server_t * | s | ) |
Dump the server for debugging purposes.
| s | - the server data structure |
Definition at line 2787 of file server_start.c.
{
char cid_string[2*CID_LEN+1], server_dottedIP[20], proxy_dottedIP[20];
proxy_cid_to_str(cid_string, s->componentid);
proxy_ip_to_str(s->ipaddress, server_dottedIP);
proxy_ip_to_str(s->proxyip, proxy_dottedIP);
fprintf(stderr, "SERVER: ");
fprintf(stderr, "hostname %s ipaddress %s port %hu proxyip %s proxyport %hu componentid %s arch %s data_format %d kflops %d workload %d ncpu %d status %d availcpu %g availmem %g nproblems %d agenthost %s agentport %d score %g\n", s->hostname, server_dottedIP, s->port, proxy_dottedIP, s->proxyport, cid_string, s->arch, s->data_format, s->kflops, s->workload, s->ncpu, s->status, s->availcpu, s->availmem, s->nproblems, s->agenthost, s->agentport, s->score);
return 0;
}


| gs_server_t* gs_server_init | ( | char * | server_config | ) |
Initialize the server data structure.
Create a new server data structure, and initialize various fields such as the agent, the root of the service, the machine name/ip, any proxy information...
Definition at line 2554 of file server_start.c.
{
char *strp, *gs_agent_copy, *gs_agent_env, *gs_agent_cfg,
*srv_port_env, *srv_port_cfg, *data_dir_env, *data_dir;
gs_server_t *gs_server;
pid_t pid;
gridsolve_agent = gridsolve_root = gridsolve_arch = data_dir = NULL;
gs_agent_env = gs_agent_cfg = srv_port_env = srv_port_cfg = NULL;
gs_server = (gs_server_t*)CALLOC(1, sizeof(gs_server_t));
if(!gs_server) {
ERRPRINTF("Could not allocate memory.\n");
return NULL;
}
gridsolve_root = getenv("GRIDSOLVE_ROOT");
if(!gridsolve_root)
gridsolve_root = GRIDSOLVE_TOP_BUILD_DIR;
gridsolve_arch = getenv("GRIDSOLVE_ARCH");
if(!gridsolve_arch)
gridsolve_arch = GRIDSOLVE_ARCH_FROM_CONFIGURE;
data_dir_env = getenv("GRIDSOLVE_DATA_DIR");
/* if SMART_GRIDSOLVE enabled, make the default gs_server->smart setting 1 */
#ifdef GS_SMART_GRIDSOLVE
gs_server->smart = 1;
struct stat stbuf;
if(stat(smart_obj_file_dir, &stbuf) != 0) {
if(mkdir(smart_obj_file_dir, 0700) < 0) {
ERRPRINTF("Could not create directory '%s' ", smart_obj_file_dir);
return NULL;
}
}
total_bytes_stored=0;
stored_arg_list=NULL;
last_stored_arg=stored_arg_list;
#endif
/* Parse server config file */
if(!server_config) {
gs_server_free(gs_server);
ERRPRINTF("Failed to generate server_config file name\n");
return NULL;
}
if(gs_parse_config_file(server_config, &(gs_server->sa_list)) == 0) {
if(gs_server->sa_list) {
gs_info_t *p;
int rlen;
rlen = strlen("RESTRICT_");
for(p = gs_server->sa_list; p != NULL; p = p->next) {
if(!strcasecmp(p->type, "AGENT"))
gs_agent_cfg = p->value;
else if(!strcasecmp(p->type, "PORT"))
srv_port_cfg = p->value;
else if(!strcasecmp(p->type, "OUTPUT_TTL"))
gs_server->output_ttl = atol(p->value);
else if(!strncasecmp(p->type, "RESTRICT_", rlen))
gs_server_add_restriction(gs_server, p->type, p->value);
else if(!strcasecmp(p->type, "GRIDSOLVE_DATA_DIR"))
data_dir = p->value;
else if(!strcasecmp(p->type, "SMART_GRIDSOLVE"))
gs_server->smart = atoi(p->value);
#ifdef GS_SMART_GRIDSOLVE
else if(!strcasecmp(p->type, "TOTAL_BYTES"))
total_allowed_bytes = atoi(p->value);
#endif
}
}
}
/* override config file setting if environment variable is set */
if(data_dir_env)
data_dir = data_dir_env;
/* if SmartGridSolve wasn't enabled at configuration time, then
* make sure any setting parsed above is clobbered.
*/
#ifndef GS_SMART_GRIDSOLVE
gs_server->smart = 0;
#endif
/* Setup agent name:
* if GRIDSOLVE_AGENT is set in the environment, we use
* that value, otherwise use the value set in the config
* file. If neither can be found, then return an error.
*/
gs_agent_env = getenv("GRIDSOLVE_AGENT");
if(gs_agent_env)
gridsolve_agent = gs_agent_env;
else if(gs_agent_cfg)
gridsolve_agent = gs_agent_cfg;
else {
ERRPRINTF("Could not determine GRIDSOLVE_AGENT.\n");
return NULL;
}
/* Do some final sanity checks, even though we don't
* expect any of these to be NULL at this point.
*/
if(!gridsolve_root) {
ERRPRINTF("Could not determine GRIDSOLVE_ROOT.\n");
return NULL;
}
if(!gridsolve_arch) {
ERRPRINTF("Could not determine GRIDSOLVE_ARCH.\n");
return NULL;
}
if(!gridsolve_agent) {
ERRPRINTF("Could not determine GRIDSOLVE_AGENT.\n");
return NULL;
}
/* set up server port number */
srv_port_env = getenv("GRIDSOLVE_SERVER_PORT");
if(srv_port_env)
gs_server->port = atoi(srv_port_env);
else if(srv_port_cfg)
gs_server->port = atoi(srv_port_cfg);
else
gs_server->port = GRIDSOLVE_SERVER_PORT_DEFAULT;
/* Initialize the proxy library */
proxy_init("");
gs_server->hostname = gs_get_machine_name();
gs_server->ipaddress = proxy_get_my_ipaddr();
gs_server->proxyport = ntohs(proxy_get_proxy_port());
gs_server->data_format = pvmgetdsig();
LOGPRINTF("Running benchmark on %s\n", gs_server->hostname);
gs_server->kflops = kflops();
LOGPRINTF("%s kflops %d\n", gs_server->hostname, gs_server->kflops);
gs_server->ncpu = 1; /* TODO */
gs_server->status = -1; /* TODO */
gs_server->availcpu = -1; /* TODO */
gs_server->availmem = -1; /* TODO */
gs_server->problemlist = NULL;
gs_server->nproblems = 0;
gs_server->agenthost = NULL;
gs_server->arch = gridsolve_arch;
/* Parse the gridsolve_agent to get agent:port or use agent and default port */
gs_agent_copy = strdup(gridsolve_agent);
if(!gs_agent_copy) {
gs_server_free(gs_server);
ERRPRINTF("strdup failed\n");
return NULL;
}
DBGPRINTF("gs_agent %s \n", gs_agent_copy);
if((strp = strchr(gs_agent_copy, ':')) != NULL) {
gs_server->agentport = atoi(strp+1);
*strp = '\0';
} else {
gs_server->agentport = getenv_int("GRIDSOLVE_AGENT_PORT", GRIDSOLVE_AGENT_PORT_DEFAULT);
}
gs_server->agenthost = strdup(gs_agent_copy);
if(!gs_server->agenthost) {
gs_server_free(gs_server);
free(gs_agent_copy);
ERRPRINTF("strdup failed\n");
return NULL;
}
FREE(gs_agent_copy);
if(gs_server_create_info_dir(gs_server) < 0) {
ERRPRINTF("Warning: failed to create and/or get info from server info dir\n");
gs_server->infodir = NULL;
}
gs_clean_up_old_temp_files(gs_server->infodir);
pid = getpid();
snprintf(GS_SERVER_JOB_COUNT_FILE, FN_LEN, "%s/%s.%d",
gs_server->infodir ? gs_server->infodir : GS_INFODIR_PATH,
GS_SERVER_JOB_COUNT_FILE_PREFIX, pid);
#ifdef GS_SMART_GRIDSOLVE
snprintf(GS_SMART_OBJ_FILE_EXT, FN_LEN, "%s/%s.%d",
gs_server->infodir ? gs_server->infodir : GS_INFODIR_PATH,
GS_SMART_OBJ_FILE_PREFIX, pid);
#endif
if(gs_init_job_count(GS_SERVER_JOB_COUNT_FILE) < 0) {
ERRPRINTF("Failed to initialize job count file '%s'\n",
GS_SERVER_JOB_COUNT_FILE);
return NULL;
}
if(gs_server_save_encoding(gs_server) < 0)
ERRPRINTF("Warning: Could not save server encoding.\n");
if(data_dir) {
if(chdir(data_dir) < 0) {
ERRPRINTF("Could not chdir to specified data directory '%s'\n", data_dir);
return NULL;
}
}
return gs_server;
}


| int gs_server_listen_and_process_messages | ( | gs_server_t * | gs_server, | |
| int | srvsock | |||
| ) |
Start listening on socket, accept the messages and hand off for processing.
| gs_server | - the server data structure | |
| srvsock | - the socket on which to listen |
Definition at line 1780 of file server_start.c.
{
int sock;
DBGPRINTF("Entering\n");
/* Recv and process connections */
for (;;) {
pid_t pid;
DBGPRINTF("Waiting for a connection\n");
/* if accept fails, go back to while loop */
if((sock = gs_accept_connection(srvsock)) == -1) {
ERRPRINTF("Failed to accept connection on socket, return to listening\n");
continue;
}
#ifdef GS_SMART_GRIDSOLVE
int tag;
char *version_str;
int retval;
int recv_transaction=0;
if(gs_recv_tag(sock, &tag) < 0) {
ERRPRINTF("error reading tag\n");
return -1;
}
DBGPRINTF("Waiting for message on socket %d\n", sock);
if((tag!= GS_SMART_STORE_ARG_TO_FILE) &&
(tag!= GS_SMART_STORE_ARG_TO_MEMORY) &&
(tag!= GS_SMART_RECV_ARG_FROM_MEMORY) &&
(tag!= GS_SMART_DELETE_ARG_FROM_FILE) &&
(tag!=GS_SMART_DELETE_ARG_FROM_MEMORY)){
if((gs_recv_string(sock, &version_str)) < 0) {
ERRPRINTF("could not read version string\n");
return -1;
}
if(gs_versions_incompatible(version_str, VERSION))
retval = gs_send_tag(sock, GS_PROT_VERSION_MISMATCH);
else
retval = gs_send_tag(sock, GS_PROT_OK);
free(version_str);
}
else{
retval=0;
}
if(retval==-1){
ERRPRINTF("SMART : Version error problem\n");
return -1;
}
if(tag==GS_SMART_DELETE_ARG_FROM_MEMORY){
retval = process_smart_delete_arg_from_memory(sock);
recv_transaction=1;
}
if(tag==GS_SMART_STORE_ARG_TO_MEMORY){
retval = process_smart_store_arg_to_memory(sock);
recv_transaction=1;
}
if(tag==GS_SMART_DELETE_ARG_FROM_FILE){
retval = process_smart_delete_arg_from_file(sock);
recv_transaction=1;
}
#endif
pid = fork();
switch(pid) {
case -1: /* error */
ERRPRINTF("Failed to fork\n");
continue;
case 0: /* child */
gs_close_socket(srvsock);
#ifdef GS_SMART_GRIDSOLVE
if(recv_transaction==0){
gs_smart_server_process_message(gs_server, sock, tag);
}
#else
gs_server_process_message(gs_server, sock);
#endif
_exit(0);
default: /* parent */
gs_close_socket(sock);
}
fflush(stderr);
fflush(stdout);
}
/* should not reach here */
return 0;
}


| int gs_server_prepare_model_update_message | ( | int | sock, | |
| gs_server_t * | gs_server, | |||
| gs_problem_t * | problem, | |||
| char ** | msg | |||
| ) |
Definition at line 2040 of file server_start.c.
{
char *pfile, *expr;
if(!gs_server || !problem)
return -1;
if(!gs_server->infodir)
return -1;
if(!strcmp(gs_server->infodir, "-"))
return -1;
pfile = dstring_sprintf("%s/%s.coe", gs_server->infodir, problem->name);
if(!pfile)
return -1;
if(gs_get_contents_of_file(pfile, &expr) < 0)
return -1;
*msg = expr;
return 0;
}


| int gs_server_process_message | ( | gs_server_t * | gs_server, | |
| int | sock | |||
| ) |
Get the message tag from the socket and take appropriate actions.
| sock | - the incoming message socket |
| sock | - the incoming message socket |
Definition at line 181 of file server_start.c.
{
char *version_str;
int tag, retval;
DBGPRINTF("Waiting for message on socket %d\n", sock);
if(gs_recv_tag(sock, &tag) < 0) {
ERRPRINTF("error reading tag\n");
return -1;
}
if((gs_recv_string(sock, &version_str)) < 0) {
ERRPRINTF("could not read version string\n");
return -1;
}
if(gs_versions_incompatible(version_str, VERSION))
retval = gs_send_tag(sock, GS_PROT_VERSION_MISMATCH);
else
retval = gs_send_tag(sock, GS_PROT_OK);
free(version_str);
if(retval < 0) {
ERRPRINTF("could not send response tag.\n");
return -1;
}
retval = -1;
switch(tag) {
case GS_PROT_PROBLEM_SOLVE:
case GS_PROT_PROBLEM_SOLVE_ASSIGNED:
retval = process_problem_solve(gs_server, tag, sock);
break;
case GS_PROT_PROBE_REQUEST:
retval = process_probe_request(sock);
break;
case GS_PROT_KILL_JOB:
retval = process_cancel_request(sock);
break;
case GS_PROT_AWAIT_COMPLETION:
retval = process_await_completion(sock);
break;
case GS_PROT_KILL_SERVER:
retval = process_kill_server(sock);
break;
case GS_PROT_PROBLEM_EXAMPLE:
retval = process_problem_example(sock);
break;
case GS_PROT_PING:
retval = process_ping(sock);
break;
case GS_PROT_STORE_FILE:
retval = process_store_file(sock);
break;
case GS_PROT_DATA_TRANSFER:
retval = process_data_transfer_request(sock);
break;
default:
LOGPRINTF("Unknown tag %d\n", tag);
break;
}
/* to quiet a compiler warning: */
return retval;
}


| int gs_server_register | ( | gs_server_t * | gs_server | ) |
Register the server with the agent.
| gs_server | - the server data structure |
Definition at line 2352 of file server_start.c.
{
int sock_agent, reply_tag;
char *serverstr;
sock_agent = gs_connect_direct(gs_server->agenthost, gs_server->agentport);
if(sock_agent < 0) {
ERRPRINTF("Could not connect to the agent\n");
return -1;
}
/* upload server info to agent */
if(gs_encode_server(&serverstr, gs_server) < 0) {
ERRPRINTF("Failed to encode the server struct\n");
return -1;
}
DBGPRINTF("serverstr: %s\n", serverstr);
if((gs_send_tag(sock_agent, GS_PROT_SERVER_REGISTER) < 0) ||
(gs_send_string(sock_agent, VERSION) < 0))
{
FREE(serverstr);
ERRPRINTF("Error communicating with the agent.\n");
return -1;
}
/* if reply not OK, check for more info */
if(gs_recv_tag(sock_agent, &reply_tag) < 0) {
FREE(serverstr);
ERRPRINTF("Error communicating with the agent.\n");
return -1;
}
if(reply_tag != GS_PROT_OK) {
if(reply_tag == GS_PROT_VERSION_MISMATCH)
ERRPRINTF("Agent refused this incompatible version of GridSolve\n");
else
ERRPRINTF("Error code %d: Could not register server\n", reply_tag);
FREE(serverstr);
return -1;
}
if(gs_send_string(sock_agent, serverstr) < 0) {
ERRPRINTF("Error communicating with the agent.\n");
FREE(serverstr);
return -1;
}
FREE(serverstr);
/* if reply not OK, check for more info */
if(gs_recv_tag(sock_agent, &reply_tag) < 0) {
ERRPRINTF("Error communicating with the agent.\n");
return -1;
}
if(reply_tag != GS_PROT_OK) {
ERRPRINTF("Error code %d: Could not register server\n", reply_tag);
return -1;
}
gs_close_socket(sock_agent);
DBGPRINTF("returning\n");
return 0;
}


| int gs_server_register_problems | ( | gs_server_t * | gs_server | ) |
Register problems with the agent.
Add problems to the servers problemlist by scanning the appropriate directories using file globbing expressions. Limitation: Currently only expects one problem description per problem file.
| gs_server | - the server data structure |
Definition at line 2080 of file server_start.c.
{
int i, count, sock_agent, reply_tag, problem_already_exists, total_services;
char *problemstring, temp_cid[CID_LEN*2+1], **model_msg;
gs_problem_t *problem, *pp, *prev;
icl_list_t *services, *l;
DBGPRINTF("Entering\n");
services = icl_list_new();
total_services = 0;
if(!services)
return -1;
if(gs_find_service_directories(services, &total_services) < 0)
ERRPRINTF("Warning: no services found.\n");
model_msg = NULL;
if(total_services > 0) {
model_msg = (char **)malloc(total_services * sizeof(char *));
if(!model_msg) {
ERRPRINTF("malloc failed.\n");
goto error_registering_problems;
}
}
proxy_cid_to_str(temp_cid, gs_server->componentid);
sock_agent = gs_connect_direct(gs_server->agenthost, gs_server->agentport);
if(sock_agent < 0) {
ERRPRINTF("Unable to connect to agent.\n");
goto error_registering_problems;
}
if((gs_send_tag(sock_agent, GS_PROT_PROBLEM_REGISTER) < 0) ||
(gs_send_string(sock_agent, VERSION) < 0))
{
ERRPRINTF("Error communicating with agent.\n");
gs_close_socket(sock_agent);
goto error_registering_problems;
}
if(gs_recv_tag(sock_agent, &reply_tag) < 0) {
ERRPRINTF("Error communicating with agent.\n");
gs_close_socket(sock_agent);
goto error_registering_problems;
}
if(reply_tag != GS_PROT_OK) {
if(reply_tag == GS_PROT_VERSION_MISMATCH)
ERRPRINTF("Agent refused this incompatible version of GridSolve\n");
else
ERRPRINTF("Error code %d: Could not register problem\n", reply_tag);
gs_close_socket(sock_agent);
goto error_registering_problems;
}
if((gs_send_string(sock_agent, temp_cid) < 0) ||
(gs_send_int(sock_agent, total_services) < 0)) {
ERRPRINTF("Error communicating with agent.\n");
gs_close_socket(sock_agent);
goto error_registering_problems;
}
i = 0;
for (l=icl_list_first(services); l!=NULL; l=icl_list_next(services, l)) {
char *path = (char *)l->data;
/* For each problem description file, find size and slurp it into
* an allocated buffer
*/
problem = (gs_problem_t *)CALLOC(1, sizeof(gs_problem_t));
if(!problem) {
ERRPRINTF("Couldn't allocate memory for problem\n");
goto error_registering_problems;
}
if(gs_read_problem_from_file(path, problem) < 0) {
ERRPRINTF("failed to get problem from '%s'\n", path);
continue;
}
/* add problem to the end of the problemlist if it does not exist */
problem_already_exists=0;
for(prev=pp=gs_server->problemlist; pp; pp=pp->next) {
if(strcmp(pp->name,problem->name) == 0) {
problem_already_exists = 1;
break;
}
prev = pp;
}
if(problem_already_exists) {
char *pstr1, *pstr2;
int diff;
pstr1 = pstr2 = NULL;
if((gs_encode_problem(&pstr1, problem) < 0) ||
(gs_encode_problem(&pstr2, pp) < 0)) {
if(pstr1) free(pstr1);
if(pstr2) free(pstr2);
continue;
}
diff = strcmp(pstr1, pstr2);
free(pstr1);
free(pstr2);
if(!diff) {
gs_free_problem(problem);
continue;
}
LOGPRINTF("Problem %s has been changed.\n", problem->name);
/* remove the problem from the server's problem list and
* fall through to the problem registration below.
*/
if(pp == gs_server->problemlist)
gs_server->problemlist = pp->next;
else
prev->next = pp->next;
gs_server->nproblems--;
pp->next = NULL;
gs_free_problem(pp);
}
LOGPRINTF("Registering new service %s\n", path);
if(gs_server->problemlist == NULL)
gs_server->problemlist = problem;
else {
problem->next = gs_server->problemlist;
gs_server->problemlist = problem;
}
gs_server->nproblems++;
if(gs_encode_problem(&problemstring, problem) < 0) {
ERRPRINTF("Failed to encode problem.\n");
goto error_registering_problems;
}
/* Register problem at agent */
if(gs_send_string(sock_agent, problemstring) < 0) {
gs_close_socket(sock_agent);
FREE(problemstring);
ERRPRINTF("Error communicating with agent.\n");
goto error_registering_problems;
}
if(gs_server_prepare_model_update_message(sock_agent, gs_server, problem, &model_msg[i]) < 0)
model_msg[i] = strdup(GS_NO_MODEL_UPDATE);
i++;
}
if(gs_send_string(sock_agent, GS_END_PROB_REG) < 0) {
ERRPRINTF("Error communicating with agent.\n");
gs_close_socket(sock_agent);
goto error_registering_problems;
}
count = i;
for(i=0;i<count;i++) {
if(model_msg && model_msg[i]) {
if(gs_send_string(sock_agent, model_msg[i]) < 0) {
ERRPRINTF("Error communicating with agent.\n");
gs_close_socket(sock_agent);
goto error_registering_problems;
}
}
}
if(gs_send_string(sock_agent, GS_END_PROB_REG) < 0) {
ERRPRINTF("Error communicating with agent.\n");
gs_close_socket(sock_agent);
goto error_registering_problems;
}
/* look for removed services */
for(prev=pp=gs_server->problemlist; pp; pp=pp->next) {
int service_found;
char *sname;
service_found = 0;
for(l=icl_list_first(services); l!=NULL; l=icl_list_next(services, l)) {
sname = gs_get_service_name_from_path((char*)l->data);
if(!sname) continue;
if(!strcmp(sname, pp->name)) {
service_found = 1;
free(sname);
break;
}
free(sname);
}
if(!service_found) {
if(gs_send_string(sock_agent, pp->name) < 0) {
ERRPRINTF("Error communicating with agent.\n");
gs_close_socket(sock_agent);
goto error_registering_problems;
}
LOGPRINTF("Service '%s' was removed.. unregistering.\n", pp->name);
if(pp == gs_server->problemlist)
gs_server->problemlist = pp->next;
else
prev->next = pp->next;
gs_server->nproblems--;
pp->next = NULL;
gs_free_problem(pp);
pp = prev;
}
prev = pp;
}
if(gs_send_string(sock_agent, GS_END_PROB_REG) < 0) {
ERRPRINTF("Error communicating with agent.\n");
gs_close_socket(sock_agent);
goto error_registering_problems;
}
if(gs_recv_tag(sock_agent, &reply_tag) < 0) {
ERRPRINTF("Failed to recv tag.\n");
goto error_registering_problems;
}
if (reply_tag != GS_PROT_OK) {
ERRPRINTF("Could not register problems\n");
goto error_registering_problems;
}
if(services) icl_list_destroy(services, free);
/* disconnect from agent */
gs_close_socket(sock_agent);
return 0;
error_registering_problems:
if(services) icl_list_destroy(services, free);
return -1;
}


| int gs_server_save_encoding | ( | gs_server_t * | server | ) |
Saves the server struct to a file (as XML) to be loaded later by services.
| server | -- the server to be saved |
Definition at line 2510 of file server_start.c.
{
char *strp = NULL, *filename;
FILE *sfile;
filename = dstring_sprintf(GS_SERVER_XML_TEMPLATE, server->infodir);
if(!filename)
return -1;
sfile = fopen(filename, "w");
free(filename);
if(!sfile) {
ERRPRINTF("Could not create server xml file.\n");
return -1;
}
if(gs_encode_server(&strp, server) < 0) {
ERRPRINTF("Failed to encode the server struct\n");
return -1;
}
fprintf(sfile, "%s\n", strp);
fclose(sfile);
if(strp) free(strp);
return 0;
}


| static int process_await_completion | ( | int | sock | ) |
Check if a specific request_id is completed and send back the results.
| sock | - the connection socket |
Definition at line 906 of file server_start.c.
{
char *request_id;
struct stat stbuf;
gs_problem_t *problem;
if(gs_recv_string(sock, &request_id) < 0) {
ERRPRINTF("gs_recv_string failed\n");
return -1;
}
if(stat(request_id, &stbuf) == 0) {
int fd, my_dsig = pvmgetdsig();
char *filename;
filename = (char *)malloc(strlen(request_id) + 13);
if(!filename) {
ERRPRINTF("malloc failed\n");
return -1;
}
sprintf(filename, "%s/done", request_id);
/* if the job is not finished */
if(stat(filename, &stbuf) != 0) {
int total_wait_time, wait_time, probe_status;
char *err_filename;
err_filename = (char *)malloc(strlen(request_id) + 13);
if(!err_filename) {
ERRPRINTF("malloc failed\n");
return -1;
}
sprintf(err_filename, "%s/cancelled", request_id);
wait_time = 1;
total_wait_time = 0;
do {
sleep(wait_time);
total_wait_time += wait_time;
wait_time = total_wait_time/10 + 1;
if(wait_time > GS_MAX_COMPLETION_POLL_FREQ)
wait_time = GS_MAX_COMPLETION_POLL_FREQ;
/* look for "cancelled" file */
if(stat(err_filename, &stbuf) == 0) {
int err = GS_SVC_ERR_UNSPECIFIED;
FILE *errfile;
/* the service must have terminated abnormally.
* look for an error file and read the code.
*/
sprintf(err_filename, "%s/error", request_id);
if((errfile = fopen(err_filename, "r")) != NULL) {
fscanf(errfile, "%d", &err);
fclose(errfile);
}
gs_send_tag(sock, err);
free(request_id);
free(filename);
free(err_filename);
return -1;
}
if(stat(request_id, &stbuf) != 0) {
LOGPRINTF("Request directory was removed while waiting -- ");
LOGPRINTF("the service probably terminated abnormally.\n");
gs_send_tag(sock, GS_SVC_ERR_REQ_DIR_GONE);
free(request_id);
free(filename);
free(err_filename);
return -1;
}
probe_status = gs_probe_batch_request(request_id);
if(probe_status == 2) {
ERRPRINTF("Batch queue indicated that the job terminated abnormally.\n");
gs_send_tag(sock, GS_SVC_ERR_UNSPECIFIED);
free(request_id);
free(filename);
free(err_filename);
return -1;
}
else if(probe_status == 1)
break;
} while(stat(filename, &stbuf) != 0);
free(err_filename);
}
/* ok, at this point the job has completed and the
* results are stored on disk. we need to restore
* them to the problem struct and send them back
* to the client. TODO: later it may be a good idea
* to avoid writing to disk if wait is called before
* the job completes.
*/
if(chdir(request_id) < 0) {
ERRPRINTF("Could not chdir to request dir '%s'\n", request_id);
free(filename);
return -1;
}
sprintf(filename, "data");
if((fd = open(filename, O_RDONLY)) == -1) {
free(request_id);
free(filename);
ERRPRINTF("failed to open output data\n");
gs_send_tag(sock, GS_SVC_ERR_OPEN_DATA_FILE);
return -1;
}
sprintf(filename, "problem.xml");
problem = (gs_problem_t *)CALLOC(1, sizeof(gs_problem_t));
if(!problem) {
free(request_id);
free(filename);
ERRPRINTF("failed to malloc space for problem\n");
gs_send_tag(sock, GS_SVC_ERR_MALLOC);
return -1;
}
if(gs_read_problem_from_file(filename, problem) < 0) {
gs_free_problem(problem);
free(request_id);
free(filename);
ERRPRINTF("failed to get problem from '%s'\n", filename);
gs_send_tag(sock, GS_SVC_ERR_READ_PROBLEM);
return -1;
}
#ifdef GS_SMART_GRIDSOLVE
gs_argument_t *argptr;
for(argptr= problem->arglist;argptr!=NULL;argptr=argptr->next) {
if(argptr->objecttype!=GS_SCALAR){
if( (argptr->inout==GS_INOUT) || (argptr->inout==GS_OUT) ) {
if(gs_recv_int(sock, &argptr->output_arg_sent_remotely) < 0) {
ERRPRINTF("gs_recv output remote param\n");
return -1;
}
}
}
}
if(gs_smart_restore_output_args_from_file(fd, problem, my_dsig) < 0) {
gs_free_problem(problem);
free(request_id);
free(filename);
ERRPRINTF("failed to restore output data from disk\n");
gs_send_tag(sock, GS_SVC_ERR_RESTORE_ARGS);
return -1;
}
#else
if(gs_restore_output_args_from_file(fd, problem, my_dsig) < 0) {
gs_free_problem(problem);
free(request_id);
free(filename);
ERRPRINTF("failed to restore output data from disk\n");
gs_send_tag(sock, GS_SVC_ERR_RESTORE_ARGS);
return -1;
}
#endif
close(fd);
if(gs_send_tag(sock, GS_PROT_OK) < 0) {
free(request_id);
free(filename);
ERRPRINTF("gs_send_tag failed\n");
return -1;
}
#ifdef GS_SMART_GRIDSOLVE
if(gs_smart_send_output_args_to_client(sock, problem, my_dsig) < 0) {
gs_free_problem(problem);
free(request_id);
free(filename);
ERRPRINTF("failed to send output data to client\n");
return -1;
}
#else
if(gs_send_output_args(sock, problem, my_dsig) < 0) {
gs_free_problem(problem);
free(request_id);
free(filename);
ERRPRINTF("failed to send output data to client\n");
return -1;
}
#endif
gs_free_problem_and_data(problem);
if(gs_create_timestamp_file(".", "retrieved", 0.0) < 0) {
free(request_id);
free(filename);
ERRPRINTF("Could not create 'retrieved' file.\n");
return -1;
}
free(filename);
}
else {
/* no such request exists */
if(gs_send_tag(sock, GS_SVC_ERR_UNKNOWN_REQ) < 0) {
free(request_id);
ERRPRINTF("gs_send_tag failed\n");
return -1;
}
}
free(request_id);
gs_close_socket(sock);
return 0;
}


| static int process_cancel_request | ( | int | sock | ) |
Receive a specific request_id and cancel the request.
| sock | - the connection socket |
Definition at line 1403 of file server_start.c.
{
int tag, cancel_status;
char *request_id;
struct stat stbuf;
if(gs_recv_string(sock, &request_id) < 0) {
ERRPRINTF("gs_recv_string failed\n");
return -1;
}
cancel_status = gs_cancel_batch_request(request_id);
switch(cancel_status) {
case -1:
ERRPRINTF("Internal error in gs_cancel_batch_request().\n");
free(request_id);
return -1;
case 0:
/* good, we cancelled the job */
if(gs_send_tag(sock, GS_PROT_OK) < 0) {
free(request_id);
ERRPRINTF("gs_send_tag failed\n");
return -1;
}
if(gs_create_timestamp_file(request_id, "cancelled", 0.0) < 0)
ERRPRINTF("Could not create 'cancelled' file.\n");
free(request_id);
gs_close_socket(sock);
return 0;
case 1:
ERRPRINTF("Failed to cancel the job via the batch system.\n");
if(gs_send_tag(sock, GS_SVC_ERR_CANT_KILL) < 0) {
free(request_id);
ERRPRINTF("gs_send_tag failed\n");
return -1;
}
free(request_id);
gs_close_socket(sock);
return 0;
case 3:
/* this isn't a batch job, so fall through to the normal code below */
break;
default:
ERRPRINTF("Unexpected return value from gs_cancel_batch_request()\n");
break;
}
if(stat(request_id, &stbuf) == 0) {
char *filename;
int stat_rv;
filename = (char *)malloc(strlen(request_id) + 6);
if(!filename) {
ERRPRINTF("malloc failed\n");
return -1;
}
sprintf(filename, "%s/done", request_id);
stat_rv = stat(filename, &stbuf);
free(filename);
if(stat_rv == 0) {
/* the job has already finished.. we didn't have to kill it, but
* we'll go ahead and return "KILLED" anyway.
*/
if(gs_send_tag(sock, GS_PROT_OK) < 0) {
free(request_id);
ERRPRINTF("gs_send_tag failed\n");
return -1;
}
}
else {
/* the job is not finished. kill it. */
int pid;
if((pid = gs_parse_pid_from_requestid(request_id)) < 0) {
free(request_id);
ERRPRINTF("Failed to parse pid from request ID\n");
gs_send_tag(sock, GS_SVC_ERR_INVALID_PID);
return -1;
}
if(kill(pid, SIGTERM) < 0)
tag = GS_SVC_ERR_CANT_KILL;
else
tag = GS_PROT_OK;
if(gs_send_tag(sock, tag) < 0) {
free(request_id);
ERRPRINTF("gs_send_tag failed\n");
return -1;
}
/* we went ahead and sent the response to the client to
* give it the opportunity for a quick return, but we
* just want to wait here to give the process a chance
* to clean itself up and then we'll send the SIGKILL to
* make sure it's dead.
*/
sleep(10);
kill(pid, SIGKILL);
}
if(gs_create_timestamp_file(request_id, "cancelled", 0.0) < 0)
ERRPRINTF("Could not create 'cancelled' file.\n");
}
else {
/* no such request exists */
if(gs_send_tag(sock, GS_SVC_ERR_UNKNOWN_REQ) < 0) {
ERRPRINTF("gs_send_tag failed\n");
return -1;
}
}
free(request_id);
gs_close_socket(sock);
return 0;
}


| static int process_data_transfer_request | ( | int | sock | ) | [static] |
process the request of direct data transfer beween two servers for GridSolve request sequencing
Definition at line 693 of file server_start.c.
{
char *request;
LFS_DSI_OBJECT *obj;
char *path, *file_name, *full_name;
/* receive data transfer request */
if (gs_recv_string(sock, &request) < 0) {
ERRPRINTF("error receiving data transfer request\n");
return -1;
}
/* decode the data transfer request */
if ((obj = gs_seq_decode_lfs_dsi_object(request)) == NULL) {
ERRPRINTF("bad data transfer request\n");
return -1;
}
/* get path and name of the file storing the requested data */
path = obj->path;
file_name = obj->file_name;
/* make sure the file path and name are ok */
if (!path || !file_name) {
ERRPRINTF("bad data file path or name\n");
return -1;
}
/* construct the full path name */
full_name = (char *) malloc(sizeof(char) * (strlen(path) + strlen(file_name) + 1));
if (!full_name) {
perror("malloc");
return -1;
}
strcpy(full_name, path);
full_name[strlen(path)] = '\0';
strcat(full_name, file_name);
full_name[strlen(path) + strlen(file_name)] = '\0';
/* send data from the specified file to the requesting server */
if (gs_send_arg_from_file(sock, full_name) < 0) {
ERRPRINTF("error sending data from file\n");
return -1;
}
return 0;
}


| static int process_kill_server | ( | int | sock | ) |
Handle a KILL_SERVER tag.
| sock | -- The socket with the message |
Definition at line 863 of file server_start.c.
{
char *msg;
DBGPRINTF("receiving password\n");
if(gs_recv_string(sock,&msg) < 0) {
ERRPRINTF("Error receiving password\n");
return -1;
}
if(!strcmp(msg, "GridSolve")) {
free(msg);
if(gs_send_tag(sock, GS_PROT_OK) < 0) {
ERRPRINTF("Error sending confirmaton\n");
return -1;
}
}
else {
free(msg);
if(gs_send_tag(sock, GS_PROT_ERROR) < 0) {
ERRPRINTF("Error sending confirmaton\n");
return -1;
}
return 0;
}
kill(getppid(), SIGTERM);
return 0;
}


| static int process_ping | ( | int | sock | ) |
Responds to a client ping.
| sock | -- The socket with the message |
Definition at line 779 of file server_start.c.
{
char *msg = NULL;
int len = 0;
if(gs_recv_int(sock, &len) < 0) {
ERRPRINTF("Error recving ping len\n");
return -1;
}
if((len < 1) || (len > GS_MAX_PING))
return -1;
msg = (char *)malloc(len);
if(!msg)
return -1;
if(gs_tread(sock, msg, len) < 0) {
ERRPRINTF("Error recving ping msg\n");
return -1;
}
if(gs_twrite(sock, msg, len) < 0) {
ERRPRINTF("Communication error.\n");
if(msg) free(msg);
return -1;
}
if(msg) free(msg);
return 0;
}


| static int process_probe_request | ( | int | sock | ) |
Receive a specific request_id and check if it has completed.
| sock | - the connection socket |
Definition at line 1277 of file server_start.c.
{
char *request_id;
struct stat stbuf;
int probe_status, stat_rv;
if(gs_recv_string(sock, &request_id) < 0) {
ERRPRINTF("gs_recv_string failed\n");
return -1;
}
if(stat(request_id, &stbuf) == 0) {
char *filename;
filename = (char *)malloc(strlen(request_id) + 13);
if(!filename) {
ERRPRINTF("malloc failed\n");
return -1;
}
sprintf(filename, "%s/done", request_id);
stat_rv = stat(filename, &stbuf);
free(filename);
if(stat_rv == 0) {
/* the job is finished */
free(request_id);
if(gs_send_tag(sock, GS_PROT_OK) < 0) {
ERRPRINTF("gs_send_tag failed\n");
return -1;
}
}
else {
char *err_filename;
int tag;
err_filename = (char *)malloc(strlen(request_id) + 13);
if(!err_filename) {
ERRPRINTF("malloc failed\n");
return -1;
}
sprintf(err_filename, "%s/cancelled", request_id);
/* the request exists, but the job is not finished.
* first check whether the service terminated
* abnormally.
*/
if(stat(err_filename, &stbuf) == 0) {
int err = GS_SVC_ERR_UNSPECIFIED;
FILE *errfile;
/* the service didn't finish normally.
* look for an error file and read the code.
*/
sprintf(err_filename, "%s/error", request_id);
if((errfile = fopen(err_filename, "r")) != NULL) {
fscanf(errfile, "%d", &err);
fclose(errfile);
}
gs_send_tag(sock, err);
free(request_id);
free(err_filename);
return -1;
}
free(err_filename);
probe_status = gs_probe_batch_request(request_id);
free(request_id);
switch(probe_status) {
case 0:
tag = GS_SVC_ERR_NOT_FINISHED;
break;
case 1:
tag = GS_PROT_OK;
break;
case 3:
tag = GS_SVC_ERR_NOT_FINISHED;
break;
default:
tag = GS_SVC_ERR_UNSPECIFIED;
break;
}
if(gs_send_tag(sock, tag) < 0) {
ERRPRINTF("gs_send_tag failed\n");
return -1;
}
}
}
else {
/* no such request exists */
free(request_id);
if(gs_send_tag(sock, GS_SVC_ERR_UNKNOWN_REQ) < 0) {
ERRPRINTF("gs_send_tag failed\n");
return -1;
}
}
gs_close_socket(sock);
return 0;
}


| static int process_problem_example | ( | int | sock | ) |
returns example C code to call the specified problem
| sock | -- The socket with the message |
Definition at line 822 of file server_start.c.
{
char *msg, *example;
if(gs_recv_string(sock, &msg) < 0) {
ERRPRINTF("Error recving problem name\n");
return -1;
}
example = dstring_sprintf("%s/service/%s/%s_grpc_example.c",
gridsolve_root, msg, msg);
if(!example) {
ERRPRINTF("Failed to create example filename.\n");
free(msg);
return -1;
}
if(gs_send_arg_from_file(sock, example) < 0) {
ERRPRINTF("Failed to send example code.\n");
free(example);
free(msg);
return -1;
}
free(example);
free(msg);
return 0;
}


| static int process_problem_solve | ( | gs_server_t * | gs_server, | |
| int | tag, | |||
| int | sock | |||
| ) |
Receive a service request and call execv to run the service.
| tag | - GS_PROT_BLOCKING or GS_PROT_NONBLOCKING | |
| sock | - the connection socket |
Definition at line 1630 of file server_start.c.
{
char *problem_name, *service_exe, *msg, **args,
*user_name, *host_name, *client_cid, srv_cid[CID_LEN*2+1];
int rtag, blocking_tag, client_dsig, blocking=1, saw_bad_tag = 0,
agent_taskid;
double agent_est_time;
if(!gs_server)
return -1;
proxy_cid_to_str(srv_cid, gs_server->componentid);
if(gs_recv_tag(sock, &blocking_tag) < 0) {
ERRPRINTF("gs_recv_tag failed\n");
return -1;
}
switch(blocking_tag) {
case GS_PROT_BLOCKING:
blocking = 1;
break;
case GS_PROT_NONBLOCKING:
blocking = 0;
break;
default:
fprintf(stderr,"Invalid blocking tag specified: %d\n", blocking_tag);
saw_bad_tag = 1;
}
if(gs_recv_string(sock, &msg) < 0) {
ERRPRINTF("gs_recv_string failed\n");
return -1;
}
if(saw_bad_tag) {
gs_send_tag(sock, GS_SVC_ERR_BAD_NB_TAG);
free(msg);
return -1;
}
rtag = GS_PROT_OK;
if(gs_server_check_restrictions(gs_server, msg, &rtag) < 0)
ERRPRINTF("Warning: failed to check server restrictions\n");
if(rtag != GS_PROT_OK) {
LOGPRINTF("Job rejected due to server restriction(s)\n");
gs_send_tag(sock, rtag);
return -1;
}
DBGPRINTF("msg = '%s'\n", msg);
problem_name = (char *)malloc(strlen(msg));
user_name = (char *)malloc(strlen(msg));
host_name = (char *)malloc(strlen(msg));
client_cid = (char *)malloc(strlen(msg));
if(!problem_name || !user_name || !host_name || !client_cid) {
ERRPRINTF("Failed to allocate memory.\n");
return -1;
}
if(gs_decode_problem_solve_request(msg, &problem_name, &user_name,
&host_name, &client_cid, &client_dsig, &agent_taskid,
&agent_est_time) < 0)
{
free(problem_name);
free(user_name);
free(host_name);
free(client_cid);
ERRPRINTF("failed to decode request\n");
return -1;
}
DBGPRINTF("problem_name = '%s'\n", problem_name);
service_exe = dstring_sprintf("%s/service/%s/%s_service",gridsolve_root,
problem_name, problem_name);
if(!service_exe) {
free(problem_name);
free(user_name);
free(host_name);
free(client_cid);
ERRPRINTF("Failure generating name of service executable.\n");
return -1;
}
args = (char **)calloc(18, sizeof(char *));
if(!args) {
free(service_exe);
free(problem_name);
free(user_name);
free(host_name);
free(client_cid);
ERRPRINTF("Failure allocating space for args.\n");
return -1;
}
LOGPRINTF("Problem Solve: '%s' %s@%s (%s) %s\n", problem_name, user_name,
host_name, blocking ? "blocking" : "non-blocking",
tag == GS_PROT_PROBLEM_SOLVE_ASSIGNED ? "[assigned server request]" : "");
args[0] = dstring_sprintf("%s_service", problem_name);
args[1] = dstring_sprintf("%s", problem_name);
args[2] = dstring_sprintf("%d", tag);
args[3] = dstring_sprintf("%d", client_dsig);
args[4] = dstring_sprintf("%d", sock);
args[5] = gridsolve_root;
args[6] = gridsolve_arch;
args[7] = dstring_sprintf("%d", blocking);
args[8] = dstring_sprintf("%s", gs_server->agenthost);
args[9] = dstring_sprintf("%d", gs_server->agentport);
args[10] = dstring_sprintf("%s", srv_cid);
args[11] = dstring_sprintf("%s", user_name);
args[12] = dstring_sprintf("%s", host_name);
args[13] = dstring_sprintf("%s", client_cid);
args[14] = dstring_sprintf("%s", gs_server->infodir ? gs_server->infodir : "-");
args[15] = dstring_sprintf("%d", agent_taskid);
args[16] = dstring_sprintf("%lf", agent_est_time);
args[17] = NULL;
if(execv(service_exe, args) < 0) {
if(gs_send_tag(sock, GS_SVC_ERR_EXEC) < 0)
ERRPRINTF("Error sending GS_SVC_ERR_EXEC\n");
free(service_exe);
free(problem_name);
free(user_name);
free(host_name);
free(client_cid);
return -1;
}
/* should not reach here */
return 0;
}


| static int process_store_file | ( | int | sock | ) |
Responds to a file storage request.
| sock | -- The socket with the message |
Definition at line 753 of file server_start.c.
{
char *msg = NULL;
if(gs_recv_string(sock, &msg) < 0) {
ERRPRINTF("Error recving file storage info.\n");
return -1;
}
if(gs_recv_arg_into_file(sock, msg) < 0) {
ERRPRINTF("saving to file '%s'.\n", msg);
return -1;
}
return 0;
}


char * gridsolve_agent = NULL [static] |
Definition at line 45 of file server_start.c.
char * gridsolve_arch = NULL [static] |
Definition at line 44 of file server_start.c.
char* gridsolve_root = NULL [static] |
Global variables, set once in main routine
Definition at line 43 of file server_start.c.
| char GS_SERVER_JOB_COUNT_FILE[FN_LEN] |
Definition at line 47 of file server_start.c.
1.6.3-20100507