#include <sys/types.h>#include <sys/socket.h>#include <sys/time.h>#include <sys/un.h>#include <stdio.h>#include <string.h>#include <stdlib.h>#include <errno.h>#include <unistd.h>#include "comm_encode.h"#include "comm_basics.h"#include "utility.h"#include "mfork.h"#include "icl_list.h"#include "agent.h"#include "gs_storage.h"
Go to the source code of this file.
Functions | |
| int | gs_sensor_handle_serverlist (int) |
| static void | gs_sensor_generic_signal_handler (int sig) |
| int | gs_sensor_update_all (gs_agent_conn_t *clients, char *msg) |
| int | gs_write_int_nl (int sock, int num) |
| int | gs_write_nl (int sock, char *msg) |
| int | gs_sensor_get_line (int myfd, char *s, int max) |
| void | gs_sensor_send_list (int socket) |
| int | gs_sensor_handle_connection (int sensorsock, int *fd) |
| int | gs_sensor_handle_disconnect (int myfd, gs_agent_conn_t *connections) |
| int | gs_sensor_handle_ping (int myfd) |
| int | gs_sensor_process_agent_message (gs_agent_conn_t *connections, int myfd) |
| int | gs_sensor_process_request (int myfd, gs_agent_conn_t *connections) |
| int | gs_agent_sensor_process_messages (int agentsock, int sensorsock) |
| void | gs_agent_sensor_run (void **args) |
| void | gs_agent_sensor_pre (void **args) |
| void | gs_agent_sensor_post (void **args) |
| void | gs_agent_sensor_exit (void **args) |
| void gs_agent_sensor_exit | ( | void ** | args | ) |
Function to be called by mfork upon terminating.
| args | -- mfork args array. args[0] contains a pipe fd where the parent waits for the sensor to notify it that it's ready to accept a connection. |
Definition at line 744 of file gs_agent_sensor.c.
{
unlink(GRIDSOLVE_SENSOR_USOCK);
}


| void gs_agent_sensor_post | ( | void ** | args | ) |
Function to be called by mfork after the fork.
| args | -- mfork args array. args[0] contains a pipe fd where the parent waits for the sensor to notify it that it's ready to accept a connection. |
Definition at line 712 of file gs_agent_sensor.c.
{
int *pfds, i;
char junk;
if(!args || !args[0]) {
ERRPRINTF("error waiting for sensor: NULL args.\n");
return;
}
pfds = (int *)args[0];
close(pfds[1]);
if((i = read(pfds[0], &junk, 1)) < 0) {
ERRPRINTF("Error waiting for sensor to start.\n");
return;
}
DBGPRINTF("read returned %d.\n", i);
close(pfds[0]);
}

| void gs_agent_sensor_pre | ( | void ** | args | ) |
Function to be called by mfork before the fork.
| args | -- mfork args array. args[0] is allocated 2 bytes to store a pipe fd where the parent waits for the sensor to notify it that it's ready to accept a connection. args[1] points to the agent structure. |
Definition at line 681 of file gs_agent_sensor.c.
{
int *pfds;
if(!args)
return;
pfds = (int *)malloc(2 * sizeof(int));
if(!pfds) {
ERRPRINTF("out of memory.\n");
return;
}
if(pipe(pfds) < 0) {
ERRPRINTF("Cannot create pipe.\n");
return;
}
args[0] = pfds;
}

| int gs_agent_sensor_process_messages | ( | int | agentsock, | |
| int | sensorsock | |||
| ) |
Loops forever (or until some catastrophic failure occurs) processing messages from the agent and monitor clients.
| agentsock | -- socket connected to the agent | |
| sensorsock | -- socket listening on sensor port |
Definition at line 457 of file gs_agent_sensor.c.
{
int maxfd, nready, myfd;
fd_set allset, rset;
struct timeval tv;
gs_agent_conn_t conns;
gs_agent_init_conns(&conns);
/*
* Listen for connections and service requests
*/
DBGPRINTF("AGENT SENSOR RUNNING. PID: %d\n", (int)getpid());
while(1) {
gs_agent_setup_fd_sets(&conns, sensorsock, agentsock, &allset, &maxfd);
rset = allset;
tv.tv_sec = 1;
tv.tv_usec = 0;
myfd = -1;
nready = select(maxfd + 1, &rset, NULL, NULL, &tv);
if((nready < 0) && (errno == EINTR))
continue;
if(nready < 0) {
ERRPRINTF("select failed.. aborting.\n");
break;
}
if(nready == 0) {
if(!mfork_check_parent()) {
ERRPRINTF("Parent died, so I am exiting\n");
break;
}
continue;
}
DBGPRINTF("FDs ready: %d.\n", nready);
/*
** Handle new connection.
**/
if(FD_ISSET(sensorsock, &rset)) {
if(gs_sensor_handle_connection(sensorsock, &myfd) < 0) {
ERRPRINTF("Error handling connection on sensor socket.\n");
continue;
}
if(gs_agent_add_conn(&conns, myfd) < 0) {
ERRPRINTF("Could not add connection for fd %d.\n", myfd);
close(myfd);
continue;
}
DBGPRINTF("Accepted Connection on fd %d.\n", myfd);
continue;
}
else if(FD_ISSET(agentsock, &rset)) {
gs_sensor_process_agent_message(&conns, agentsock);
continue;
}
else {
int i;
myfd = -1;
/*
* Find the FD that's ready
*/
for(i=0;i<=conns.maxfd;i++) {
if(conns.fd[i]) {
if(FD_ISSET(i, &rset)) {
myfd = i;
DBGPRINTF("FD %d is ready to be read.\n", myfd);
break;
}
}
}
}
/*
* Just in case we had an accept error above.
*/
if(myfd < 0) {
DBGPRINTF("Accept failed or couldn't find fd: %d.\n", myfd);
continue;
}
DBGPRINTF("MyFD = %d.\n", myfd);
if(gs_sensor_process_request(myfd, &conns) < 0) {
gs_agent_del_conn(&conns, myfd);
close(myfd);
}
}
return 0;
}


| void gs_agent_sensor_run | ( | void ** | args | ) |
This is the entry point for the sensor server, intended to be started using mfork().
| args | -- mfork args array. args[0] contains a pipe fd where the parent waits for the sensor to notify it that it's ready to accept a connection. args[1] contains a pointer to the agent struct. |
Definition at line 566 of file gs_agent_sensor.c.
{
int *notification_pipe, dump, len, sensorsock, agentsock, tmpsock;
struct sockaddr_un mysaddr;
struct sockaddr_in sclient;
char junk = 'x';
in_port_t port;
if(!args || !args[0] || !args[1]) {
ERRPRINTF("NULL args.. aborting.\n");
_exit(-1);
}
DBGPRINTF("In sensor.\n");
notification_pipe = (int *)args[0];
/* handle all signals except SIGINT. if the agent is running in
* console mode, we only want the parent to get SIGINT, so that all
* the children will realize the parent is dead via mfork_check_parent()
* and cleanly terminate. having all processes catch SIGINT causes
* mfork to try to restart them, depending on whether they catch it
* before or after the parent.
*/
gs_setup_signal_handlers(gs_sensor_generic_signal_handler);
signal (SIGINT, SIG_IGN);
/*
* Setup Client Socket
*/
port = getenv_int("GRIDSOLVE_SENSOR_PORT", GRIDSOLVE_SENSOR_PORT_DEFAULT);
sensorsock = gs_establish_socket(&port, 0);
if(sensorsock < 0) {
ERRPRINTF("Could not open sensor socket.\n");
_exit(-1);
}
gs_listen_on_socket(sensorsock);
DBGPRINTF("Sensor Listening Socket Created on %d.\n", port);
/*
* Connect to DB manager.
*/
if(gs_storage_init(args[1]) < 0) {
ERRPRINTF("Sensor could not connect to DB.\n");
_exit(-1);
}
DBGPRINTF("Sensor Connected to DB Manager.\n");
/*
* setup Unix Domain Socket
*/
tmpsock = socket(PF_UNIX, SOCK_STREAM, 0);
if(tmpsock < 0) {
ERRPRINTF("Could not create agent<->sensor socket.\n");
perror("socket");
_exit(-1);
}
memset(&mysaddr, 0x0, sizeof(struct sockaddr_un));
mysaddr.sun_family = PF_UNIX;
strcpy(mysaddr.sun_path, GRIDSOLVE_SENSOR_USOCK);
unlink(mysaddr.sun_path);
if(bind(tmpsock, (struct sockaddr *) &mysaddr,
sizeof(struct sockaddr_un)) < 0 ) {
ERRPRINTF("Could not bind sensor UDS.\n");
perror("bind");
_exit(-1);
}
if(listen(tmpsock, GRIDSOLVE_AGENT_MAX_CONNECTIONS) < 0) {
ERRPRINTF("Could not listen on sensor UDS.\n");
perror("listen");
_exit(-1);
}
/* don't check for errors on the write() here because in the
* event that this is restarted by mfork, the other end of
* this pipe will be closed already.
*/
DBGPRINTF("Writing 1.\n");
dump = write(notification_pipe[1], &junk, 1);
DBGPRINTF("Write returned %d.\n", dump);
close(notification_pipe[1]);
close(notification_pipe[0]);
/* wait for agent to connect. */
len = sizeof(struct sockaddr_in);
agentsock = accept(tmpsock, (struct sockaddr *) &sclient,
(socklen_t *)&len);
if(agentsock < 0) {
ERRPRINTF("Could not accept connection from agent to sensor.\n");
perror("accept");
_exit(-1);
}
DBGPRINTF("Agent<->Sensor Socket Created.\n");
close(tmpsock);
gs_agent_sensor_process_messages(agentsock, sensorsock);
close(agentsock);
close(sensorsock);
unlink(GRIDSOLVE_SENSOR_USOCK);
}


| static void gs_sensor_generic_signal_handler | ( | int | sig | ) | [static] |
Signal handler for various signals. Exit with normal status.
| sig | -- the signal that was caught |
Definition at line 30 of file gs_agent_sensor.c.
{
/* pass along SIGHUP to parent */
if(sig == SIGHUP) {
kill(getppid(), SIGHUP);
return;
}
ERRPRINTF("Sensor terminating on signal %d.\n", sig);
unlink(GRIDSOLVE_SENSOR_USOCK);
_exit(0);
}


| int gs_sensor_get_line | ( | int | myfd, | |
| char * | s, | |||
| int | max | |||
| ) |
Reads up to max-1 characters from the specified socket, stores into the buffer and null terminates it.
| myfd | -- socket on which to read | |
| s | -- the buffer in which to store the string. should have at least max bytes allocated. | |
| max | -- size of the buffer |
Definition at line 125 of file gs_agent_sensor.c.
{
int len;
if(!s) {
ERRPRINTF("Invalid args: null buffer\n");
return -1;
}
len = proxy_read_timeout(myfd, s, max-1, PROXY_TIMEOUT_DEFAULT);
if(len <= 0)
return -1;
else if (len <= max)
s[len] = '\0';
return 0;
}


| int gs_sensor_handle_connection | ( | int | sensorsock, | |
| int * | fd | |||
| ) |
Handles acceptance of a new monitor connection on the sensor socket.
| sensorsock | -- the listening socket | |
| fd | -- upon return, contains the accepted socket descriptor |
Definition at line 204 of file gs_agent_sensor.c.
{
struct sockaddr_in sclient;
int myfd, len;
char *tok;
char s[GS_SENSOR_MAX_LINE];
if(!fd) {
ERRPRINTF("Invalid arg: null fd\n");
return -1;
}
DBGPRINTF("Received new connection from Agent.\n");
len = sizeof(struct sockaddr_in);
myfd = accept(sensorsock, (struct sockaddr *) &sclient,
(socklen_t *)&len);
if(myfd < 0) {
if(errno == EINTR) {
return -1;
}
else {
perror("accept");
return -1;
}
}
/* send sensor version */
gs_write_nl(myfd, "1.3");
/*
** Handle message.
*/
if(gs_sensor_get_line(myfd, s, GS_SENSOR_MAX_LINE) < 0) {
close(myfd);
return -1;
}
DBGPRINTF("READ: %s\n", s);
tok = strtok(s, " \n\r");
if(!tok) {
DBGPRINTF("Badly formed request.\n");
close(myfd);
return -1;
}
if(!strcmp(tok, "NS_SERVERLIST"))
gs_sensor_handle_serverlist(myfd);
else {
DBGPRINTF("Unknown request.\n");
close(myfd);
return -1;
}
*fd = myfd;
return 0;
}


| int gs_sensor_handle_disconnect | ( | int | myfd, | |
| gs_agent_conn_t * | connections | |||
| ) |
Handles a DISCONNECT message from a client.
| myfd | -- descriptor connected to the client | |
| connections | -- list of all open connections |
Definition at line 272 of file gs_agent_sensor.c.
{
char s[GS_SENSOR_MAX_LINE];
if(!connections) {
ERRPRINTF("Invalid arg: null connections ptr\n");
return -1;
}
DBGPRINTF("DISCONNECT\n");
gs_write_nl(myfd, "DISCONNECTING");
if(gs_sensor_get_line(myfd, s, GS_SENSOR_MAX_LINE) < 0) {
close(myfd);
return -1;
}
/* remove connection */
gs_agent_del_conn(connections, myfd);
close(myfd);
DBGPRINTF("Sensor removed client %d.\n", myfd);
return 0;
}


| int gs_sensor_handle_ping | ( | int | myfd | ) |
Handles a PING message from a client. sends back a "PONG" response.
| myfd | -- descriptor connected to the client |
Definition at line 349 of file gs_agent_sensor.c.
{
DBGPRINTF("PING\n");
return gs_write_nl(myfd, "PONG") < 0 ? -1 : 0;
}


| int gs_sensor_handle_serverlist | ( | int | myfd | ) |
Handles a SERVERLIST message from a client. sends list of all GridSolve servers.
| myfd | -- descriptor connected to the client |
Definition at line 308 of file gs_agent_sensor.c.
{
char s[GS_SENSOR_MAX_LINE];
DBGPRINTF("NS_SERVERLIST\n");
/* send md5 hash */
gs_write_nl(myfd, "00000000000000000000000000000000");
/* get response (ignored) */
if(gs_sensor_get_line(myfd, s, GS_SENSOR_MAX_LINE) < 0) {
close(myfd);
return -1;
}
gs_write_nl(myfd, "ACCEPT");
gs_write_int_nl(myfd, myfd);
/* send sensor version */
gs_write_nl(myfd, "-2");
gs_write_nl(myfd, "1.3");
gs_write_nl(myfd, "3.0");
/* send server list */
gs_sensor_send_list(myfd);
DBGPRINTF("BACK FROM gs_sensor_send_list.\n");
return 0;
}


| int gs_sensor_process_agent_message | ( | gs_agent_conn_t * | connections, | |
| int | myfd | |||
| ) |
Gets a message sent by the agent and forwards it to all the monitor clients currently connected.
| connections | -- list of all open connections | |
| myfd | -- descriptor connected to the agent |
Definition at line 367 of file gs_agent_sensor.c.
{
char s[GS_SENSOR_MAX_LINE];
if(!connections) {
ERRPRINTF("Invalid arg: null connections ptr\n");
return -1;
}
DBGPRINTF("Processing request on fd %d\n", myfd);
/*
* Handle message.
*/
if(gs_sensor_get_line(myfd, s, GS_SENSOR_MAX_LINE) < 0) {
close(myfd);
return -1;
}
DBGPRINTF("READ: %s\n", s);
return gs_sensor_update_all(connections, s);
}


| int gs_sensor_process_request | ( | int | myfd, | |
| gs_agent_conn_t * | connections | |||
| ) |
Handles a request made by a monitor client (server list, disconnect, etc).
| myfd | -- descriptor connected to the client | |
| connections | -- list of all open connections |
Definition at line 402 of file gs_agent_sensor.c.
{
/* to keep purify quiet... ugh. */
char tmp_svlist[] = {"NS_SERVERSIST"}, tmp_ping[] = {"PING"},
tmp_dis[] = {"DISCONNECT"}, tmp_0900[] = {"0900"};
char s[GS_SENSOR_MAX_LINE], *tok;
if(!connections) {
ERRPRINTF("Invalid arg: null connections ptr\n");
return -1;
}
DBGPRINTF("Processing request on fd %d\n", myfd);
/*
* Handle message.
*/
if(gs_sensor_get_line(myfd, s, GS_SENSOR_MAX_LINE) < 0) {
close(myfd);
return -1;
}
DBGPRINTF("READ: %s\n", s);
tok = strtok(s, " \n\r");
if(!tok)
return -1;
DBGPRINTF("request tok = '%s'\n", tok);
if(!strcmp(tok, tmp_svlist))
return gs_sensor_handle_serverlist(myfd);
else if(!strcmp(tok, tmp_ping))
return gs_sensor_handle_ping(myfd);
else if(!strcmp(tok, tmp_dis))
return gs_sensor_handle_disconnect(myfd, connections);
else if(!strcmp(tok, tmp_0900))
return 0;
else
ERRPRINTF("Unknown message type, tok = %s\n", tok);
return 0;
}


| void gs_sensor_send_list | ( | int | socket | ) |
Sends server list over the socket.
| socket | -- the socket on which to send the list |
Definition at line 151 of file gs_agent_sensor.c.
{
int cnt, i;
gs_server_t **servers;
char temp_cid[CID_LEN * 2 + 1];
if(gs_get_all_servers(NULL, &servers, &cnt) < 0)
{
ERRPRINTF("Sensor can't get server list.\n");
/*
** send empty list to client
*/
cnt = -1;
}
DBGPRINTF("CNT: %d. SOCKET: %d\n", cnt, socket);
/*
** send server list to client
*/
i = gs_write_int_nl(socket, cnt);
for(i = 0; i < cnt; i++)
{
struct in_addr tmpaddr;
gs_write_nl(socket, servers[i]->hostname);
proxy_cid_to_str(temp_cid, servers[i]->componentid);
gs_write_nl(socket, temp_cid);
tmpaddr.s_addr = servers[i]->ipaddress;
gs_write_nl(socket, inet_ntoa(tmpaddr));
gs_write_int_nl(socket, servers[i]->port);
gs_write_int_nl(socket, servers[i]->workload);
gs_write_nl(socket, "standard");
gs_write_nl(socket, "0");
gs_write_nl(socket, "0");
}
for(i = 0; i < cnt; i++)
gs_server_free(servers[i]);
FREE(servers);
}


| int gs_sensor_update_all | ( | gs_agent_conn_t * | clients, | |
| char * | msg | |||
| ) |
Sends the specified string to all listening clients.
| clients | -- list of connected clients | |
| msg | -- string to send to the clients |
Definition at line 53 of file gs_agent_sensor.c.
{
int i;
if(!clients || !msg) {
ERRPRINTF("Invalid args\n");
return -1;
}
for(i=0;i<=clients->maxfd;i++)
if(clients->fd[i])
gs_writen(i, msg, strlen(msg));
return 0;
}


| int gs_write_int_nl | ( | int | sock, | |
| int | num | |||
| ) |
Write an integer (represented as a string followed by newline) on the specified socket.
| sock | -- socket on which to write the message | |
| num | -- the integer to be sent |
Definition at line 80 of file gs_agent_sensor.c.
{
char s[256];
sprintf(s, "%d\n", num);
return gs_writen(sock, s, strlen(s)) < 0 ? -1 : 0;
}


| int gs_write_nl | ( | int | sock, | |
| char * | msg | |||
| ) |
Appends a newline to the specified string and writes it on the socket.
| sock | -- socket on which to write the message | |
| msg | -- the string to be sent |
Definition at line 98 of file gs_agent_sensor.c.
{
if(!msg) {
ERRPRINTF("Invalid arg: null msg\n");
return -1;
}
if(gs_writen(sock, msg, strlen(msg)) < 0 ||
gs_writen(sock, "\n", 1) < 0)
return -1;
return 0;
}


1.6.3-20100507