#include "portability.h"#include "server.h"#include "general.h"#include "comm_data.h"#include "comm_basics.h"#include "comm_encode.h"#include "comm_protocol.h"#include <errno.h>
Go to the source code of this file.
Functions | |
| int | gs_server_free (gs_server_t *server) |
| Free the server data structures. | |
| int | gs_do_ping (gs_server_t *srv, char *msg, int len, double *et) |
| int | gs_free_restrictions (gs_restriction_t *rlist) |
| int | gs_clean_up_old_temp_files (char *infodir) |
| int | gs_parse_pid_from_requestid (char *reqid) |
| int | gs_parse_pid_from_tempname (char *tempname) |
| int | gs_get_job_count (char *file, int *jcount) |
| int | gs_modify_job_count (char *file, int op) |
| int | gs_init_job_count (char *file) |
| int | gs_increment_job_count (char *file) |
| int | gs_decrement_job_count (char *file) |
| int | gs_notify_agent_problem_complete (char *agenthost, in_port_t agentport, char *problem_name, char *server_cid, char *user_name, char *host_name, char *client_cid, char *request_id, int agent_taskid, double run_time) |
| int | gs_notify_agent_problem_solve (char *agenthost, in_port_t agentport, char *problem_name, double est_time, char *server_cid, char *user_name, char *host_name, char *client_cid, char *request_id, int agent_taskid, double agent_est_time) |
| int | gs_create_info_dir (char componentid[CID_LEN], ipaddr_t ipaddress, in_port_t port, char **dirname) |
This file contains various utility functions for manipulating servers.
Definition in file server_util.c.
| int gs_clean_up_old_temp_files | ( | char * | infodir | ) |
Attempts to remove old files (pipes, etc) typically stored in /tmp from previous runs of the agent.
Definition at line 169 of file server_util.c.
{
char sqlite_db_pattern[FN_LEN], stats_file_pattern[FN_LEN],
sensor_usock_pattern[FN_LEN], jobcount_pattern[FN_LEN],
lock_file_pattern[FN_LEN], journal_pattern[FN_LEN], *prefix;
int i, globflags = GLOB_NOCHECK | GLOB_NOSORT;
glob_t *pglob;
pglob = (glob_t *) malloc(sizeof(glob_t));
if(!pglob) return -1;
prefix = infodir ? infodir : GS_INFODIR_PATH;
sprintf(sqlite_db_pattern, "%s/%s.[0-9]*", prefix, GRIDSOLVE_SQLITE_DB_PREFIX);
sprintf(journal_pattern, "%s/%s.[0-9]*-journal", prefix, GRIDSOLVE_SQLITE_DB_PREFIX);
sprintf(stats_file_pattern, "%s/%s.[0-9]*", prefix, GRIDSOLVE_STATS_FILE_PREFIX);
sprintf(jobcount_pattern, "%s/%s.[0-9]*", prefix, GS_SERVER_JOB_COUNT_FILE_PREFIX);
sprintf(sensor_usock_pattern, "%s/%s.[0-9]*", prefix, GRIDSOLVE_SENSOR_USOCK_PREFIX);
sprintf(lock_file_pattern, "%s/%s.[0-9]*", prefix, GRIDSOLVE_SCHED_LOCK_FILE_PREFIX);
if(glob(sqlite_db_pattern, globflags, NULL, pglob) ||
glob(journal_pattern, globflags | GLOB_APPEND, NULL, pglob) ||
glob(stats_file_pattern, globflags | GLOB_APPEND, NULL, pglob) ||
glob(lock_file_pattern, globflags | GLOB_APPEND, NULL, pglob) ||
glob(jobcount_pattern, globflags | GLOB_APPEND, NULL, pglob) ||
glob(sensor_usock_pattern, globflags | GLOB_APPEND, NULL, pglob))
{
globfree(pglob);
free(pglob);
return 0;
}
for (i=0; i<pglob->gl_pathc; i++) {
int pid_to_check, rv;
struct stat stbuf;
/* since GLOB_NOCHECK is specified, must skip patterns in gl_pathv[] */
if(stat(pglob->gl_pathv[i], &stbuf) != 0)
continue;
if((pid_to_check = gs_parse_pid_from_tempname(pglob->gl_pathv[i])) < 0)
continue;
rv = kill(pid_to_check, 0);
/* if kill returns 0 or -1 w/EPERM, then there is probably a process with
* the given PID running now. in that case we don't want to try to
* delete this file since it may be in use.
*/
if((rv == 0) || ((rv < 0) && (errno == EPERM)))
continue;
LOGPRINTF("Removing old file '%s'\n", pglob->gl_pathv[i]);
unlink(pglob->gl_pathv[i]);
}
globfree(pglob);
free(pglob);
return 0;
}


| int gs_create_info_dir | ( | char | componentid[CID_LEN], | |
| ipaddr_t | ipaddress, | |||
| in_port_t | port, | |||
| char ** | dirname | |||
| ) |
If the info directory doesn't exist, create it.
| componentid | -- on return, contains the previous component ID, if it was available in the infodir, otherwise it will contain a random component ID | |
| ipaddress | -- ip address of this component | |
| port | -- port number of this component | |
| dirname | -- on successful return, points to the directory name |
Definition at line 641 of file server_util.c.
{
char server_dottedIP[20], cid_string[2*CID_LEN+1], *si_name, *cid_file,
userid[256], *username;
PROXY_COMPONENTADDR myaddr;
struct stat stbuf;
FILE *f;
uid_t uid;
*dirname = 0;
username = gs_get_login_name();
if(!username) {
ERRPRINTF("Warning: couldn't get user id\n");
username = "UnknownUserID";
}
uid = getuid(); /* getuid() always succeeds according to the man page */
sprintf(userid, "%s_uid%d", username, (int)uid);
myaddr = proxy_get_local_addr();
memcpy(componentid, myaddr.ID, CID_LEN);
si_name = dstring_sprintf("%s", GS_INFODIR_PATH);
if(stat(si_name, &stbuf) != 0) {
if(mkdir(si_name, 0700) < 0) {
free(si_name);
return -1;
}
}
free(si_name);
proxy_ip_to_str(ipaddress, server_dottedIP);
si_name = dstring_sprintf("%s/%s%s_%hu_%s", GS_INFODIR_PATH,
GS_INFODIR_PREFIX, server_dottedIP, port, userid);
if(stat(si_name, &stbuf) != 0) {
if(mkdir(si_name, 0700) < 0) {
free(si_name);
return -1;
}
}
cid_file = dstring_sprintf("%s/%s%s_%hu_%s/cid", GS_INFODIR_PATH,
GS_INFODIR_PREFIX, server_dottedIP, port, userid);
if(stat(cid_file, &stbuf) != 0) {
if((f = fopen(cid_file, "w")) == NULL) {
free(si_name);
free(cid_file);
return -1;
}
proxy_cid_to_str(cid_string, myaddr.ID);
fprintf(f, "%s\n", cid_string);
fclose(f);
}
else {
if((f = fopen(cid_file, "r")) == NULL) {
free(si_name);
free(cid_file);
return -1;
}
free(cid_file);
if(fgets(cid_string, 2*CID_LEN+1, f) == NULL) {
fclose(f);
free(si_name);
return -1;
}
cid_string[2*CID_LEN] = 0;
fclose(f);
proxy_set_cid_from_str(cid_string);
myaddr = proxy_get_local_addr();
}
memcpy(componentid, myaddr.ID, CID_LEN);
*dirname = si_name;
return 0;
}


| int gs_decrement_job_count | ( | char * | file | ) |
Decrements the count of running jobs.
| file | - name of file containing the job count |
Definition at line 449 of file server_util.c.
{
return gs_modify_job_count(file, '-');
}


| int gs_do_ping | ( | gs_server_t * | srv, | |
| char * | msg, | |||
| int | len, | |||
| double * | et | |||
| ) |
Performs a ping to the specified server and measures the time.
| srv | -- server to ping | |
| msg | -- data to send | |
| len | -- length of data | |
| et | -- (set upon return to) the elapsed time to send/recv the data |
Definition at line 65 of file server_util.c.
{
double start_time, elapsed_time;
int rv, tag, sock;
elapsed_time = 0.0;
rv = -1;
sock = gs_connect_to_host(srv->componentid, srv->ipaddress, srv->port,
srv->proxyip, srv->proxyport);
if(sock == INVALID_SOCKET) {
ERRPRINTF("invalid socket\n");
return -1;
}
if(gs_send_tag(sock, GS_PROT_PING) < 0) {
ERRPRINTF("failed to send tag\n");
goto ping_err;
}
if(gs_send_string(sock, VERSION) < 0) {
ERRPRINTF("failed to send version string %s\n", VERSION);
goto ping_err;
}
if(gs_recv_tag(sock, &tag) < 0) {
ERRPRINTF("failed to recv response tag\n");
goto ping_err;
}
if(tag != GS_PROT_OK) {
ERRPRINTF("response tag != GS_PROT_OK\n");
goto ping_err;
}
if(gs_send_int(sock, len) < 0) {
ERRPRINTF("failed to send length of data\n");
goto ping_err;
}
start_time = walltime();
if(gs_twrite(sock, msg, len) <= 0) {
ERRPRINTF("error sending ping data\n");
goto ping_err;
}
if(gs_tread(sock, msg, len) <= 0) {
ERRPRINTF("error recving ping data\n");
goto ping_err;
}
elapsed_time = walltime() - start_time;
/* set return val to success and fall through */
rv = 0;
ping_err:
*et = elapsed_time;
gs_close_socket(sock);
return rv;
}


| int gs_free_restrictions | ( | gs_restriction_t * | rlist | ) |
Frees server restrictions list.
| rlist | -- the list of restrictions to be freed |
Definition at line 142 of file server_util.c.
{
gs_restriction_t *rp, *tmp;
rp = rlist;
while(rp) {
tmp = rp->next;
free(rp);
rp = tmp;
}
return 0;
}

| int gs_get_job_count | ( | char * | file, | |
| int * | jcount | |||
| ) |
Gets the number of jobs currently running.
| file | - name of file containing the job count | |
| jcount | - set upon return to the number of running jobs |
Definition at line 325 of file server_util.c.
{
int fd, count, n;
if((fd = gs_open_locked_file(file, F_RDLCK, O_RDONLY)) < 0)
return -1;
if((n = read(fd, &count, sizeof(count))) < 0) {
gs_unlock_file(fd);
close(fd);
return -1;
}
if(n == 0)
count = 0;
gs_unlock_file(fd);
close(fd);
*jcount = count;
return 0;
}


| int gs_increment_job_count | ( | char * | file | ) |
Increments the count of running jobs.
| file | - name of file containing the job count |
Definition at line 435 of file server_util.c.
{
return gs_modify_job_count(file, '+');
}


| int gs_init_job_count | ( | char * | file | ) |
Initializes the job counter.
| file | - name of file containing the job count |
Definition at line 415 of file server_util.c.
{
struct stat stbuf;
if(stat(file, &stbuf) == 0)
if(unlink(file) < 0)
return -1;
return gs_modify_job_count(file, 'i');
}


| int gs_modify_job_count | ( | char * | file, | |
| int | op | |||
| ) |
Modifies the count of the number of jobs currently running.
| file | - name of file containing the job count | |
| op | - operation to perform. '+' - increment by 1 '-' - decrement by 1 'i' - initialize to 0 |
Definition at line 361 of file server_util.c.
{
int fd, count, n;
if((fd = gs_open_locked_file(file, F_WRLCK, O_RDWR | O_CREAT)) < 0) {
ERRPRINTF("Failed to open/lock file '%s' for job count\n", file);
return -1;
}
if((n = read(fd, &count, sizeof(count))) < 0) {
ERRPRINTF("read failure getting job count\n");
gs_unlock_file(fd);
close(fd);
return -1;
}
if(n == 0)
count = 0;
switch(op) {
case '+':
count++;
break;
case '-':
count--;
break;
case 'i':
count = 0;
break;
default:
ERRPRINTF("Warning: bad op type\n");
count = 0;
}
if(count < 0)
count = 0;
lseek(fd, 0, SEEK_SET);
write(fd, &count, sizeof(count));
gs_unlock_file(fd);
close(fd);
return 0;
}


| int gs_notify_agent_problem_complete | ( | char * | agenthost, | |
| in_port_t | agentport, | |||
| char * | problem_name, | |||
| char * | server_cid, | |||
| char * | user_name, | |||
| char * | host_name, | |||
| char * | client_cid, | |||
| char * | request_id, | |||
| int | agent_taskid, | |||
| double | run_time | |||
| ) |
Send notification to the agent that we have successfully handled the request. The agent uses this for monitoring purposes.
| agenthost | -- host name of agent to be notified | |
| agentport | -- agent port number | |
| problem_name | -- name of problem | |
| server_cid | -- server's component ID | |
| user_name | -- client user name | |
| host_name | -- client host name | |
| client_cid | -- client component ID | |
| request_id | -- request ID | |
| agent_taskid | -- agent-generated task id | |
| run_time | -- the server execution time |
Definition at line 474 of file server_util.c.
{
int sock_agent, tag;
pid_t newpid;
char *msg;
newpid = fork();
if(newpid < 0) {
ERRPRINTF("fork() failed.\n");
return -1;
}
if(newpid == 0) {
if(gs_encode_problem_complete_notification(&msg, server_cid, request_id,
agent_taskid, run_time) < 0)
{
ERRPRINTF("Unable to encode message.\n");
_exit(-1);
}
sock_agent = gs_connect_direct(agenthost, agentport);
if(sock_agent < 0) {
ERRPRINTF("Unable to connect to agent.\n");
_exit(-1);
}
if((gs_send_tag(sock_agent, GS_PROT_NOTIFY_COMPLETE) < 0) ||
(gs_send_string(sock_agent, VERSION) < 0)) {
ERRPRINTF("failed to send tag\n");
gs_close_socket(sock_agent);
_exit(-1);
}
if(gs_recv_tag(sock_agent, &tag) < 0) {
ERRPRINTF("Error communicating with agent.\n");
gs_close_socket(sock_agent);
_exit(-1);
}
if(tag != GS_PROT_OK) {
if(tag == GS_PROT_VERSION_MISMATCH)
ERRPRINTF("Error: Agent is an incompatible version\n");
else
ERRPRINTF("Error: Agent refused with code %d\n", tag);
gs_close_socket(sock_agent);
_exit(-1);
}
if(gs_send_string(sock_agent, msg) < 0) {
ERRPRINTF("Error communicating with agent.\n");
gs_close_socket(sock_agent);
_exit(-1);
}
gs_close_socket(sock_agent);
_exit(0);
}
return 0;
}


| int gs_notify_agent_problem_solve | ( | char * | agenthost, | |
| in_port_t | agentport, | |||
| char * | problem_name, | |||
| double | est_time, | |||
| char * | server_cid, | |||
| char * | user_name, | |||
| char * | host_name, | |||
| char * | client_cid, | |||
| char * | request_id, | |||
| int | agent_taskid, | |||
| double | agent_est_time | |||
| ) |
Send notification to the agent that we are handling a problem submission. The agent uses this for monitoring purposes.
| agenthost | -- host name of agent to be notified | |
| agentport | -- agent port number | |
| problem_name | -- name of problem | |
| server_cid | -- server's component ID | |
| user_name | -- client user name | |
| host_name | -- client host name | |
| client_cid | -- client component ID | |
| request_id | -- request ID | |
| agent_taskid | -- agent-generated task id | |
| agent_est_time | -- agent's estimate of run time |
Definition at line 560 of file server_util.c.
{
int sock_agent, tag;
pid_t newpid;
char *msg;
newpid = fork();
if(newpid < 0) {
ERRPRINTF("fork() failed.\n");
return -1;
}
if(newpid == 0) {
if(gs_encode_problem_solve_notification(&msg, problem_name, est_time,
server_cid, user_name, host_name, client_cid, request_id,
agent_taskid, agent_est_time) < 0)
{
ERRPRINTF("Unable to encode message.\n");
_exit(-1);
}
sock_agent = gs_connect_direct(agenthost, agentport);
if(sock_agent < 0) {
ERRPRINTF("Unable to connect to agent.\n");
_exit(-1);
}
if((gs_send_tag(sock_agent, GS_PROT_NOTIFY_SUBMIT) < 0) ||
(gs_send_string(sock_agent, VERSION) < 0)) {
ERRPRINTF("failed to send tag\n");
gs_close_socket(sock_agent);
_exit(-1);
}
if(gs_recv_tag(sock_agent, &tag) < 0) {
ERRPRINTF("Error communicating with agent.\n");
gs_close_socket(sock_agent);
_exit(-1);
}
if(tag != GS_PROT_OK) {
if(tag == GS_PROT_VERSION_MISMATCH)
ERRPRINTF("Error: Agent is an incompatible version\n");
else
ERRPRINTF("Error: Agent refused with code %d\n", tag);
gs_close_socket(sock_agent);
_exit(-1);
}
if(gs_send_string(sock_agent, msg) < 0) {
ERRPRINTF("Error communicating with agent.\n");
gs_close_socket(sock_agent);
_exit(-1);
}
gs_close_socket(sock_agent);
_exit(0);
}
return 0;
}


| int gs_parse_pid_from_requestid | ( | char * | reqid | ) |
Parses out the pid from the given request id. the format of the request id is defined in src/problem/service_template.c currently it is: gsrequest_CID_PID_XXXXXXXXXXXX, where CID is the server component ID, PID is the process id and each 'X' is replaced by a random letter or digit.
| reqid | - the request id to parse |
Definition at line 245 of file server_util.c.
{
char *fcopy, *pidstr, *eptr;
int pid;
fcopy = strdup(reqid);
if(!fcopy) return -1;
pidstr = strchr(fcopy, '_');
pidstr++;
if(!pidstr) {
free(fcopy);
return -1;
}
pidstr = strchr(pidstr, '_');
pidstr++;
if(!pidstr) {
free(fcopy);
return -1;
}
eptr = strchr(pidstr, '_');
if(!eptr) {
free(fcopy);
return -1;
}
*eptr = 0;
pid = atoi(pidstr);
free(fcopy);
return pid;
}

| int gs_parse_pid_from_tempname | ( | char * | tempname | ) |
Parses out the pid from the given temporary file name. the format of gridsolve temp files is PREFIX.PID, where PREFIX is some name like "/tmp/gssensor" and PID is the process id.
| tempname | - the request id to parse |
Definition at line 292 of file server_util.c.
{
char *fcopy, *pidstr;
int pid;
fcopy = strdup(tempname);
if(!fcopy) return -1;
pidstr = strtok(fcopy, ".");
pidstr = strtok(NULL, ".");
if(!pidstr) {
free(fcopy);
return -1;
}
pid = atoi(pidstr);
free(fcopy);
return pid;
}

| int gs_server_free | ( | gs_server_t * | server | ) |
Free the server data structures.
| s | - the server data structure |
This routine was put in comm basics because the server data structure is freed in several locations, and we need to be able to call it from the client. We do not want to have to link the server library into the client executable.
Definition at line 36 of file server_util.c.
{
if(!server) return -1;
if(server->hostname) free(server->hostname);
if(server->arch) free(server->arch);
gs_free_problem(server->problemlist);
if(server->agenthost) free(server->agenthost);
if(server->sa_list) gs_free_infolist(server->sa_list);
if(server->restrictions) gs_free_restrictions(server->restrictions);
if(server->perf_expr) free(server->perf_expr);
free(server);
return 0;
}


1.6.3-20100507