00001
00016
00017
00018
00019 #ifdef HAVE_CONFIG_H
00020 #include "config.h"
00021 #endif
00022
00023 #ifndef _REENTRANT
00024 #define _REENTRANT
00025 #endif
00026
00027 #include <pthread.h>
00028 #include <stdio.h>
00029 #include <stdlib.h>
00030 #include <sys/time.h>
00031 #include <sys/types.h>
00032 #include <unistd.h>
00033 #include <sys/socket.h>
00034 #include <netdb.h>
00035 #include <netinet/in.h>
00036 #include <netinet/tcp.h>
00037 #include <arpa/inet.h>
00038 #include <sys/errno.h>
00039 #include <memory.h>
00040 #include <signal.h>
00041 #include <fcntl.h>
00042 #include <errno.h>
00043 #include <sys/ioctl.h>
00044
00045 #ifdef HAVE_CONFIG_H
00046 #include "config.h"
00047 #endif
00048
00049 #include "symtab.h"
00050 #include "queue.h"
00051 #include "proxy_utils.h"
00052 #include "proxy_server.h"
00053
00054 SYMTABLE *accept_table;
00055 pthread_mutex_t accept_tab_mutex;
00056 QUEUE *removed_components_queue;
00057
00067 void
00068 proxy_print_component_tag(FILE *f, char *prefix, char *ctag, char *suffix)
00069 {
00070 int id_size, port_size;
00071 in_port_t server_port;
00072 CID server_id;
00073
00074 id_size = sizeof(server_id.id);
00075 port_size = sizeof(server_port);
00076
00077 memcpy(&server_port, ctag + id_size, port_size);
00078
00079 if(prefix)
00080 fprintf(f, "%s ", prefix);
00081 proxy_print_componentID(f, ctag);
00082 fprintf(f, "/%d", ntohs(server_port));
00083 if(suffix)
00084 fprintf(f, " %s", suffix);
00085 fprintf(f, "\n");
00086 }
00087
00103 int
00104 proxy_daemon_init(char *root, char *logfile) {
00105 pid_t pid;
00106 int fd;
00107
00108 if(!root || !logfile)
00109 return -1;
00110
00111 pid = fork();
00112
00113 if(pid < 0)
00114 return -1;
00115
00116 if(pid != 0)
00117 exit(0);
00118
00119 setsid();
00120
00121 pid = fork();
00122
00123 if(pid < 0)
00124 return -1;
00125
00126 if(pid != 0)
00127 exit(0);
00128
00129 chdir(root);
00130 umask(0);
00131
00132
00133 fd = open("/dev/null", O_RDWR|O_CREAT, 0644);
00134 if(fd < 0) {
00135 perror("open()");
00136 return -1;
00137 }
00138 close(0);
00139 if(dup2(fd, 0) < 0) {
00140 fprintf(stderr,"dup2 failed");
00141 perror("dup2()");
00142 return -1;
00143 }
00144 close(fd);
00145
00146 fprintf(stderr, "\n"
00147 "==============================================================\n"
00148 "| -- Giving up terminal control! --\n|\n"
00149 "| All further terminal output will be sent to the log file:\n"
00150 "|\t\"%s\"\n"
00151 "=============================================================="
00152 "\n\n", logfile);
00153
00154
00155 remove(logfile);
00156 fd = open(logfile, O_RDWR|O_CREAT, 0644);
00157 if(fd < 0){
00158 fprintf(stderr,"dup2 failed");
00159 perror("open()");
00160 return -1;
00161 }
00162
00163
00164 close(1);
00165 if(dup2(fd, 1) < 0){
00166 fprintf(stderr,"dup2 failed");
00167 perror("dup2()");
00168 return -1;
00169 }
00170 close(2);
00171 if(dup2(fd, 2) < 0){
00172 fprintf(stderr,"dup2 failed");
00173 perror("dup2()");
00174 return -1;
00175 }
00176 close(fd);
00177
00178 return 0;
00179 }
00180
00187 int
00188 proxy_block_sigpipe()
00189 {
00190 sigset_t sigvec;
00191
00192 if (sigemptyset(&sigvec) < 0 ||
00193 sigaddset(&sigvec, SIGPIPE) < 0 ||
00194 pthread_sigmask(SIG_BLOCK, &sigvec, NULL) != 0)
00195 {
00196 fprintf(stderr, "Unable to set thread signal mask. Aborting\n");
00197 return -1;
00198 }
00199
00200 return 0;
00201 }
00202
00203 int proxy_krb5_enabled = FALSE;
00204
00213 static int
00214 getenv_int(char* name, int defval)
00215 {
00216 char *envstr = NULL;
00217 long int longval = -1;
00218 char *endptr;
00219 extern int errno;
00220
00221 if (name == NULL) return defval;
00222
00223
00224 if ((envstr = getenv(name)) == NULL) return defval;
00225
00226
00227 longval = strtol(envstr, &endptr, 10);
00228 if ((errno == ERANGE) || (longval==0 && endptr==envstr))
00229 return defval;
00230
00231 return (int)longval;
00232 }
00233
00250 int
00251 proxy_server_parse_cmd_line(int argc, char **argv, char **logfile,
00252 int *daemon, int *kerberos)
00253 {
00254 int c;
00255
00256 *logfile = NULL;
00257 *daemon = 1;
00258 *kerberos = 0;
00259
00260
00261
00262
00263
00264
00265 #define PROXY_SERVER_USAGE_STR "Usage: proxy_server [-l logfile] [-c] [-k]"
00266
00267 while((c = getopt(argc,argv,"kcl:")) != EOF) {
00268 switch(c) {
00269 case 'l':
00270 *logfile = strdup(optarg);
00271 break;
00272 case 'c':
00273 *daemon = 0;
00274 break;
00275 case 'k':
00276 *kerberos = 1;
00277 break;
00278 case '?':
00279 return -1;
00280 break;
00281 default:
00282 fprintf(stderr,"Bad arg: '%c'.\n",c);
00283 return -1;
00284 }
00285 }
00286
00287 return 0;
00288 }
00289
00301 int
00302 main(int argc, char *argv[])
00303 {
00304 int listen_sock, daemon;
00305 int proxy_listen_port = -1;
00306 socklen_t clilen;
00307 struct sockaddr_in tcp_serv, cliaddr;
00308 void *proxy_generic_conn_handler(void *);
00309 void *proxy_purge_old_components(void *);
00310 char *logfile;
00311 pthread_t tid;
00312
00313
00314 if(proxy_server_parse_cmd_line(argc, argv, &logfile, &daemon,
00315 &proxy_krb5_enabled) < 0)
00316 {
00317 fprintf(stderr, "%s\n", PROXY_SERVER_USAGE_STR);
00318 exit(EXIT_FAILURE);
00319 }
00320
00321 #ifdef KERBEROS5
00322 if (proxy_check_krb5_envvars() != 0)
00323 exit(EXIT_FAILURE);
00324 #else
00325 if(proxy_krb5_enabled) {
00326 fprintf(stderr, "Warning! Kerberos was not enabled during compilation ");
00327 fprintf(stderr, "(-k ignored).\n\n");
00328 }
00329 #endif
00330
00331
00332 if(!logfile)
00333 logfile = PROXY_LOGFILE;
00334
00335
00336 if(daemon && proxy_daemon_init(".", logfile) < 0) {
00337 fprintf(stderr, "Failed to start proxy server as a daemon.\n");
00338 exit(EXIT_FAILURE);
00339 }
00340
00341 if (proxy_block_sigpipe() < 0) {
00342 fprintf(stderr, "Couldn't block SIGPIPE. Aborting.\n");
00343 exit(EXIT_FAILURE);
00344 }
00345
00346 removed_components_queue = new_queue();
00347 if (!removed_components_queue) {
00348 fprintf(stderr, "Bad news! out of memory allocating queue\n");
00349 exit(EXIT_FAILURE);
00350 }
00351
00352
00353 accept_table = new_symtable(PROXY_ACCEPT_TABLE_SIZE);
00354
00355 if (!accept_table) {
00356 fprintf(stderr, "Bad news! out of memory allocating table\n");
00357 exit(EXIT_FAILURE);
00358 }
00359
00360 if (pthread_mutex_init(&accept_tab_mutex, NULL)) {
00361 fprintf(stderr, "Bad news! cannot create mutex\n");
00362 exit(EXIT_FAILURE);
00363 }
00364
00365 proxy_listen_port = getenv_int("PROXY_LISTEN_PORT", PROXY_LISTEN_PORT_DEFAULT);
00366 if (proxy_init_sock(&listen_sock, &tcp_serv, "tcp", FALSE, proxy_listen_port) < 0) {
00367 fprintf(stderr, "Failed to create socket\n");
00368 perror("proxy_init_sock");
00369 exit(EXIT_FAILURE);
00370 }
00371
00372 if(pthread_create(&tid, NULL, &proxy_purge_old_components, NULL)) {
00373 fprintf(stderr, "Warning: error creating new thread.\n");
00374 perror("pthread_create");
00375 }
00376
00377 if(pthread_detach(tid))
00378 fprintf(stderr,"Warning: could not detach thread after creation.\n");
00379
00380 printf("proxy IP %u\n", proxy_get_my_ipaddr());
00381 printf("proxy listening on port %d\n", ntohs(tcp_serv.sin_port));
00382
00383 if (listen(listen_sock, 50) < 0) {
00384 perror("listen");
00385 exit(EXIT_FAILURE);
00386 }
00387
00388 for (;;) {
00389 int *connfd;
00390
00391 clilen = sizeof(cliaddr);
00392
00393
00394 connfd = (int *) malloc(sizeof(int));
00395
00396 if (!connfd) {
00397 fprintf(stderr, "Cannot malloc. going to sleep for a while.\n");
00398 sleep(3);
00399 continue;
00400 }
00401
00402 *connfd = accept(listen_sock, (struct sockaddr *) &cliaddr, &clilen);
00403
00404 if (*connfd < 0) {
00405 free(connfd);
00406 perror("accept");
00407 continue;
00408 }
00409
00410 printf("new client connected!\n");
00411
00412 if (proxy_auth(*connfd) < 0) {
00413 fprintf(stderr, "Warning: failed to authenticate client.\n");
00414 close(*connfd);
00415 continue;
00416 }
00417
00418 if (pthread_create(&tid, NULL, &proxy_generic_conn_handler, connfd)) {
00419 fprintf(stderr, "Warning: error creating new thread.\n");
00420 close(*connfd);
00421 free(connfd);
00422 continue;
00423 }
00424
00425 if(pthread_detach(tid))
00426 fprintf(stderr,"Warning: could not detach thread after creation.\n");
00427 }
00428 }
00429
00439 int
00440 proxy_auth(int s)
00441 {
00442 proxy_tag_t response;
00443
00444 #ifdef KERBEROS5
00445 if (proxy_krb5_enabled) {
00446 response = PROXY_KRB5_AUTH_REQUIRED;
00447 if (write(s, &response, sizeof(response)) < 0)
00448 return -1;
00449
00450
00451 if (proxy_recv_krb5_credentials(s) < 0) {
00452
00453
00454
00455
00456
00457
00458 response = PROXY_AUTH_FAILED;
00459
00460 if (write(s, &response, sizeof(response)) == -1)
00461 return -1;
00462
00463 return -1;
00464 }
00465
00466 response = PROXY_AUTH_ACCEPTED;
00467 }
00468 else
00469 response = PROXY_AUTH_ACCEPTED;
00470 #else
00471 response = PROXY_AUTH_ACCEPTED;
00472 #endif
00473
00474 if (write(s, &response, sizeof(response)) < 0)
00475 return -1;
00476
00477 return 0;
00478 }
00479
00487 void
00488 dump_component(char *key, void *var)
00489 {
00490 COMPONENT *item = (COMPONENT *)var;
00491
00492 proxy_print_component_tag(stdout, "COMPONENT key =", key, NULL);
00493
00494 printf("\tID = ");
00495 proxy_print_componentID(stdout, item->id.id);
00496 printf("\n");
00497 printf("\tport = %d\n", ntohs(item->port));
00498 printf("\tsockfd = %d\n", item->sockfd);
00499
00500 return;
00501 }
00502
00515 void *
00516 proxy_purge_old_components(void *arg)
00517 {
00518 COMPONENT *component;
00519
00520 for(;;) {
00521 sleep(PROXY_PURGE_FREQ);
00522
00523 if (pthread_mutex_lock(&accept_tab_mutex) == 0) {
00524 printf("\n");
00525 printf("--------- ACCEPT TABLE [%d items] ---------\n", accept_table->num_items);
00526 hash_dump(accept_table, dump_component);
00527 printf("------------------------------------------\n");
00528 printf("\n");
00529
00530 pthread_mutex_unlock(&accept_tab_mutex);
00531 }
00532
00533 if(pthread_mutex_lock(removed_components_queue->mutex) == 0) {
00534
00535
00536
00537 while((component = (COMPONENT *)dequeue(removed_components_queue))) {
00538 destroy_queue(component->msg_queue);
00539 free(component);
00540 }
00541
00542 pthread_mutex_unlock(removed_components_queue->mutex);
00543 } else {
00544 fprintf(stderr, "Warning: couldn't lock removed components queue.\n");
00545 }
00546 }
00547
00548 return NULL;
00549 }
00550
00560 void *
00561 proxy_generic_conn_handler(void *arg)
00562 {
00563 int s, n;
00564 proxy_tag_t tag;
00565
00566 s = *((int *) arg);
00567 free(arg);
00568
00569 n = proxy_tread(s, (void *) &tag, 1, PROXY_TIMEOUT_DEFAULT);
00570
00571 if (n <= 0) {
00572 fprintf(stderr, "Warning: error reading tag. Closing connection.\n");
00573 close(s);
00574 return NULL;
00575 }
00576
00577 printf("tag = %d\n", tag);
00578
00579 switch (tag) {
00580
00581 case PROXY_CONTROL_CONNECTION:
00582 proxy_handle_control_connection(s);
00583 break;
00584
00585 case PROXY_CONNECT:
00586 proxy_handle_connection_request(s);
00587 break;
00588
00589
00590 case 'Z':
00591 fprintf(stderr, "TERMINATING...\n");
00592 exit(EXIT_SUCCESS);
00593 break;
00594
00595 default:
00596 fprintf(stderr, "Warning: unknown tag %d\n", tag);
00597 close(s);
00598 break;
00599 }
00600
00601 return NULL;
00602 }
00603
00611 COMPONENT *
00612 proxy_new_component()
00613 {
00614 COMPONENT *new;
00615
00616 new = (COMPONENT *) malloc(sizeof(COMPONENT));
00617
00618 if (!new)
00619 return NULL;
00620
00621 new->id.id[0] = '\0';
00622 new->port = 0;
00623 new->sockfd = 0;
00624 new->to_be_freed = FALSE;
00625 new->msg_queue = NULL;
00626
00627 return new;
00628 }
00629
00645 COMPONENT *
00646 proxy_get_request_header(int connfd, CID * client_id)
00647 {
00648 int id_size, port_size, to;
00649 COMPONENT *component;
00650 in_port_t dest_port;
00651 HASHNODE *ht;
00652 CID dest_id;
00653 char component_tag[sizeof(CID) + sizeof(dest_port) + 1];
00654
00655 to = PROXY_TIMEOUT_DEFAULT;
00656 id_size = sizeof(client_id->id);
00657 port_size = sizeof(dest_port);
00658
00659 if (proxy_tread(connfd, (void *) client_id->id, id_size, to) <= 0 ||
00660 proxy_tread(connfd, (void *) dest_id.id, id_size, to) <= 0 ||
00661 proxy_tread(connfd, (void *) &dest_port, port_size, to) <= 0) {
00662 fprintf(stderr, "Error reading IDs. Aborting connection.\n");
00663 return NULL;
00664 }
00665
00666
00667 memcpy(component_tag, dest_id.id, id_size);
00668 memcpy(component_tag + id_size, &dest_port, port_size);
00669 *(component_tag + id_size + port_size) = '\0';
00670
00671 proxy_print_component_tag(stdout, "CONNECT REQ dest = ",
00672 component_tag, NULL);
00673
00676 if (pthread_mutex_lock(&accept_tab_mutex) == 0) {
00677 ht = hash_lookup(accept_table, (void *) component_tag);
00678
00679 if (!ht) {
00680 proxy_tag_t response = PROXY_CONNECTION_REFUSED;
00681
00682 fprintf(stderr, "Component entry not found!\n");
00683 pthread_mutex_unlock(&accept_tab_mutex);
00684 write(connfd, &response, sizeof(response));
00685 return NULL;
00686 }
00687
00688 component = (COMPONENT *) (ht->item);
00689
00690 printf("found entry.. socket desc = %d\n", component->sockfd);
00691
00692 pthread_mutex_unlock(&accept_tab_mutex);
00693 } else {
00694 fprintf(stderr, "proxy_get_request_header(): Error locking.\n");
00695 return NULL;
00696 }
00697
00698 return component;
00699 }
00700
00718 int
00719 proxy_send_cid_to_control_conn(int s, COMPONENT * c, CID client_id,
00720 in_port_t port)
00721 {
00722
00723
00724 printf("waiting for msg queue lock\n");
00725
00726 if (pthread_mutex_lock(c->msg_queue->mutex) == 0) {
00727 COMPONENT *client_component;
00728
00729 client_component = proxy_new_component();
00730
00731
00732 if (c->to_be_freed || !client_component) {
00733 proxy_tag_t response = PROXY_CONNECTION_REFUSED;
00734
00735 if (client_component)
00736 free(client_component);
00737 pthread_mutex_unlock(c->msg_queue->mutex);
00738 write(s, &response, sizeof(response));
00739 return -1;
00740 }
00741
00742 memcpy(client_component->id.id, client_id.id, sizeof(client_id.id));
00743 client_component->port = port;
00744
00745 printf("putting component in queue\n");
00746
00747 enqueue(c->msg_queue, (void *) client_component);
00748 pthread_cond_signal(c->msg_queue->notEmpty);
00749 pthread_mutex_unlock(c->msg_queue->mutex);
00750 } else {
00751 fprintf(stderr, "Error locking.\n");
00752 return -1;
00753 }
00754
00755 return 0;
00756 }
00757
00774 int
00775 proxy_wait_for_server_conn(int connfd, int sock2)
00776 {
00777 int maxfd, nready;
00778 fd_set allset, rset;
00779 struct timeval tv;
00780 int newfd = -1;
00781 struct sockaddr_in cliaddr;
00782 socklen_t clilen;
00783
00784
00785
00786 maxfd = sock2;
00787 FD_ZERO(&allset);
00788 FD_SET(sock2, &allset);
00789
00790 rset = allset;
00791 tv.tv_sec = 15;
00792 tv.tv_usec = 0;
00793
00794 printf("waiting for 2nd connection\n");
00795 nready = select(maxfd + 1, &rset, NULL, NULL, &tv);
00796
00797 if (nready <= 0) {
00798 proxy_tag_t response = PROXY_CONNECTION_REFUSED;
00799
00800 printf("select error %d (0 means timed out)\n", nready);
00801 write(connfd, &response, sizeof(response));
00802 return -1;
00803 }
00804
00805 if (FD_ISSET(sock2, &rset)) {
00806 clilen = sizeof(cliaddr);
00807
00808 printf("something happened on the second listening socket!\n");
00809
00810 newfd = accept(sock2, (struct sockaddr *) &cliaddr, &clilen);
00811
00812 printf("after accept, new fd = %d\n", newfd);
00813
00814 if (proxy_auth(newfd) < 0) {
00815 fprintf(stderr, "Warning: failed to authenticate client.\n");
00816 close(newfd);
00817 }
00818 }
00819
00820 return newfd;
00821 }
00822
00834 int
00835 proxy_handle_data_transfer(int client_fd, int serv_fd)
00836 {
00837 int maxfd, nready;
00838 fd_set allset, rset;
00839
00840 maxfd = serv_fd > client_fd ? serv_fd : client_fd;
00841
00842 FD_ZERO(&allset);
00843 FD_SET(serv_fd, &allset);
00844 FD_SET(client_fd, &allset);
00845
00846 for (;;) {
00847 int n;
00848 char buf[PROXY_XFER_CHUNKSIZE];
00849
00850 rset = allset;
00851
00852 nready = select(maxfd + 1, &rset, NULL, NULL, NULL);
00853
00854 if (nready < 0) {
00855
00856 if (errno == EINTR)
00857 continue;
00858
00859 return -1;
00860 }
00861
00862 if (nready == 0)
00863 continue;
00864
00865 if (FD_ISSET(client_fd, &rset)) {
00866 n = proxy_read_timeout(client_fd, buf, PROXY_XFER_CHUNKSIZE,
00867 PROXY_TIMEOUT_DEFAULT);
00868
00869 if (n <= 0) {
00870 printf("seems the requesting host (client) broke connection\n");
00871 break;
00872 }
00873
00874 n = write(serv_fd, buf, n);
00875 } else if (FD_ISSET(serv_fd, &rset)) {
00876 n = proxy_read_timeout(serv_fd, buf, PROXY_XFER_CHUNKSIZE,
00877 PROXY_TIMEOUT_DEFAULT);
00878
00879 if (n <= 0) {
00880 printf("seems the proxied host (server) broke connection\n");
00881 break;
00882 }
00883
00884 n = write(client_fd, buf, n);
00885 }
00886 }
00887
00888 return 0;
00889 }
00890
00906 int
00907 proxy_check_client_match(int data_conn_fd, int client_fd,
00908 in_port_t port, CID client_id)
00909 {
00910 proxy_tag_t response = PROXY_CONNECTION_ACCEPTED;
00911 in_port_t new_port;
00912 CID new_cid;
00913 int n;
00914
00915 printf("sending connection accepted to client\n");
00916 n = write(client_fd, &response, sizeof(response));
00917
00918 if (n <= 0) {
00919 fprintf(stderr, "Error writing response to client\n");
00920 return -1;
00921 }
00922
00923 printf("getting response\n");
00924
00925 n = proxy_tread(data_conn_fd, (char *)&response, sizeof(response),
00926 PROXY_TIMEOUT_DEFAULT);
00927 if (n <= 0) {
00928 fprintf(stderr, "Error getting response from client\n");
00929 return -1;
00930 }
00931
00932 if (response != PROXY_CONNECT_REPLY) {
00933 fprintf(stderr, "Didn't get the expected tag!!\n");
00934 return -1;
00935 }
00936
00937 printf("getting cid\n");
00938 n = proxy_tread(data_conn_fd, (char *)&(new_cid.id), sizeof(new_cid.id),
00939 PROXY_TIMEOUT_DEFAULT);
00940 if (n <= 0) {
00941 fprintf(stderr, "Error getting ID from client\n");
00942 return -1;
00943 }
00944
00945 printf("getting port\n");
00946 n = proxy_tread(data_conn_fd, (char *)&new_port, sizeof(new_port),
00947 PROXY_TIMEOUT_DEFAULT);
00948 if (n <= 0) {
00949 fprintf(stderr, "Error getting port from client\n");
00950 return -1;
00951 }
00952
00953
00954 if (memcmp(client_id.id, new_cid.id, sizeof(new_cid.id))
00955 || (port != new_port)) {
00956 fprintf(stderr, "BAD MATCH!!\n");
00957 response = PROXY_CONNECTION_REFUSED;
00958 write(data_conn_fd, &response, sizeof(response));
00959
00960 return -1;
00961 }
00962
00963 printf("IDs match, now sending connection accepted msg to server.\n");
00964
00965
00966 response = PROXY_CONNECTION_ACCEPTED;
00967 n = write(data_conn_fd, &response, sizeof(response));
00968 if (n <= 0) {
00969 fprintf(stderr, "Error writing response to client\n");
00970 return -1;
00971 }
00972
00973 return 0;
00974 }
00975
00985 int
00986 proxy_handle_connection_request(int connfd)
00987 {
00988 int new_incoming_sock, data_conn_fd;
00989 struct sockaddr_in tcp_serv;
00990 CID client_id;
00991 COMPONENT *component;
00992
00993 if (proxy_block_sigpipe() < 0)
00994 return -1;
00995
00996 component = proxy_get_request_header(connfd, &client_id);
00997
00998 if (!component) {
00999 fprintf(stderr, "Error getting component header. Aborting connection.\n");
01000 close(connfd);
01001 return -1;
01002 }
01003
01004 if (proxy_init_sock(&new_incoming_sock, &tcp_serv, "tcp", FALSE, 0) < 0) {
01005 fprintf(stderr, "Error creating new incoming socket.\n");
01006 close(connfd);
01007 return -1;
01008 }
01009
01010 printf("new port --> %d\n", ntohs(tcp_serv.sin_port));
01011
01012 if (listen(new_incoming_sock, 5) < 0) {
01013 proxy_tag_t response = PROXY_CONNECTION_REFUSED;
01014
01015 write(connfd, &response, sizeof(response));
01016 close(connfd);
01017 return -1;
01018 }
01019
01020 if (proxy_send_cid_to_control_conn(connfd, component, client_id, tcp_serv.sin_port) < 0) {
01021 fprintf(stderr, "Error queueing control id.\n");
01022 close(connfd);
01023 return -1;
01024 }
01025
01026 data_conn_fd = proxy_wait_for_server_conn(connfd, new_incoming_sock);
01027
01028 if (data_conn_fd > 0) {
01029 if (proxy_check_client_match(data_conn_fd, connfd, tcp_serv.sin_port, client_id) < 0) {
01030 printf("bad match\n");
01031 } else
01032 proxy_handle_data_transfer(connfd, data_conn_fd);
01033
01034 close(data_conn_fd);
01035 }
01036
01037 close(connfd);
01038 close(new_incoming_sock);
01039
01040 return 0;
01041 }
01042
01054 COMPONENT *
01055 proxy_init_server_component(int connfd, char *component_tag)
01056 {
01057 COMPONENT *component;
01058 CID server_id;
01059 in_port_t server_port;
01060 int id_size, to;
01061 int port_size;
01062
01063 to = PROXY_TIMEOUT_DEFAULT;
01064 id_size = sizeof(server_id.id);
01065 port_size = sizeof(server_port);
01066
01067 component = proxy_new_component();
01068 if (component)
01069 component->msg_queue = new_queue();
01070
01071 if (!component || !component->msg_queue) {
01072 fprintf(stderr, "Cannot malloc, aborting connection\n");
01073 if (component) {
01074 if (component->msg_queue)
01075 destroy_queue(component->msg_queue);
01076 free(component);
01077 }
01078 return NULL;
01079 }
01080
01081 printf("id size = %d, port size = %d\n", id_size, port_size);
01082
01083 if (proxy_tread(connfd, (void *) server_id.id, id_size, to) < 0 ||
01084 proxy_tread(connfd, (void *) &server_port, port_size, to) < 0) {
01085 if (component->msg_queue)
01086 destroy_queue(component->msg_queue);
01087 free(component);
01088 return NULL;
01089 }
01090
01091 printf("port = %d\n", ntohs(server_port));
01092
01093
01094 memcpy(component_tag, server_id.id, id_size);
01095 memcpy(component_tag + id_size, &server_port, port_size);
01096 *(component_tag + id_size + port_size) = '\0';
01097
01098 proxy_print_component_tag(stdout, "component_tag =", component_tag, NULL);
01099
01100 component->id = server_id;
01101 component->port = server_port;
01102 component->sockfd = connfd;
01103
01104 if (pthread_mutex_lock(&accept_tab_mutex) == 0) {
01105 HASHNODE *ht;
01106
01107 ht = hash_lookup(accept_table, (void *) component_tag);
01108
01109 if(ht) {
01110 proxy_print_component_tag(stderr, "Component ", component_tag,
01111 " already exists in the table!\n");
01112 pthread_mutex_unlock(&accept_tab_mutex);
01113 return NULL;
01114 }
01115
01116 hash_insert(accept_table, (void *) component, (void *) component_tag);
01117 pthread_mutex_unlock(&accept_tab_mutex);
01118 } else {
01119 if (component->msg_queue)
01120 destroy_queue(component->msg_queue);
01121 free(component);
01122 return NULL;
01123 }
01124
01125 return component;
01126 }
01127
01141 COMPONENT *
01142 proxy_wait_for_queue_message(int connfd, COMPONENT * component)
01143 {
01144 COMPONENT *item;
01145 int keepalive_timer;
01146
01147 keepalive_timer = PROXY_KEEPALIVE_FREQ;
01148
01149 if (pthread_mutex_lock(component->msg_queue->mutex) == 0) {
01150 while (is_empty(component->msg_queue)) {
01151 struct timespec timeout;
01152 struct timeval tv;
01153 int rv;
01154
01155 gettimeofday(&tv, NULL);
01156
01157 timeout.tv_sec = tv.tv_sec + PROXY_MSG_QUEUE_FREQ;
01158 timeout.tv_nsec = 0;
01159
01160
01161 rv = pthread_cond_timedwait(component->msg_queue->notEmpty,
01162 component->msg_queue->mutex, &timeout);
01163
01164 keepalive_timer -= PROXY_MSG_QUEUE_FREQ;
01165
01166 if (rv != 0) {
01167 int remove_srv = 0;
01168
01169
01170
01171 if(keepalive_timer <= 0) {
01172 keepalive_timer = PROXY_KEEPALIVE_FREQ;
01173 remove_srv = (proxy_send_keepalive(connfd) < 0);
01174 }
01175 else {
01176 remove_srv = (proxy_is_something_on_socket(connfd) == 0);
01177 }
01178
01179 if(remove_srv) {
01180 char component_tag[sizeof(CID) + sizeof(in_port_t) + 1];
01181 int id_size = sizeof(component->id);
01182 int port_size = sizeof(component->port);
01183
01184 printf("server must have gone away!\n");
01185
01186
01187 memcpy(component_tag, &(component->id), id_size);
01188 memcpy(component_tag + id_size, &(component->port), port_size);
01189 *(component_tag + id_size + port_size) = '\0';
01190
01191 proxy_delete_server_tag(component_tag, component);
01192 close(component->sockfd);
01193
01194 return NULL;
01195 }
01196 }
01197 }
01198 item = (COMPONENT *) dequeue(component->msg_queue);
01199 pthread_mutex_unlock(component->msg_queue->mutex);
01200 } else
01201 return NULL;
01202
01203 return item;
01204 }
01205
01221 int
01222 proxy_send_conn_request_to_server(int connfd, char *component_tag,
01223 COMPONENT * component, COMPONENT * item)
01224 {
01225 proxy_tag_t response = PROXY_CONNECT_REQUEST;
01226
01227 printf("going to write to the server now (port = %d)...\n", ntohs(item->port));
01228 if (write(connfd, &response, sizeof(response)) < 0 ||
01229 write(connfd, item->id.id, sizeof(item->id.id)) < 0 ||
01230 write(connfd, &item->port, sizeof(item->port)) < 0) {
01231 fprintf(stderr, "Can't write to server, aborting.\n");
01232
01233 proxy_delete_server_tag(component_tag, component);
01234
01235 return -1;
01236 }
01237
01238 return 0;
01239 }
01240
01251 int
01252 proxy_delete_server_tag(char *component_tag, COMPONENT * component)
01253 {
01254
01255 proxy_print_component_tag(stdout, "going to attempt to delete component: ",
01256 component_tag, NULL);
01257
01258 if (pthread_mutex_lock(&accept_tab_mutex) == 0) {
01259 HASHNODE *ht;
01260
01261 printf("going to delete component_tag now...\n");
01262 ht = hash_delete(accept_table, (void *) component_tag);
01263
01264 if (!ht)
01265 printf("warning: expected to find component in hash table\n");
01266 else if (component != (COMPONENT *) (ht->item))
01267 printf("warning: wrong component! \n");
01268
01269 if(ht) free(ht);
01270 pthread_mutex_unlock(&accept_tab_mutex);
01271 } else
01272 return -1;
01273
01274 component->to_be_freed = TRUE;
01275
01276
01277
01278 if (pthread_mutex_lock(removed_components_queue->mutex) == 0) {
01279
01280 printf("putting component in removal queue\n");
01281
01282 enqueue(removed_components_queue, (void *) component);
01283 pthread_mutex_unlock(removed_components_queue->mutex);
01284 } else {
01285 fprintf(stderr, "Warning: couldn't lock removed components queue.\n");
01286 }
01287
01288 return 0;
01289 }
01290
01300 int
01301 proxy_handle_control_connection(int connfd)
01302 {
01303 COMPONENT *component;
01304 char component_tag[sizeof(CID) + sizeof(in_port_t) + 1];
01305
01306 if (proxy_block_sigpipe() < 0)
01307 return -1;
01308
01309 printf("i am a thread handling a control connection %d\n", connfd);
01310
01311 component = proxy_init_server_component(connfd, component_tag);
01312 if (!component) {
01313 fprintf(stderr, "Error intializing server component.\n");
01314 close(connfd);
01315 return -1;
01316 }
01317
01318 for (;;) {
01319 COMPONENT *item;
01320
01321 item = proxy_wait_for_queue_message(connfd, component);
01322
01323 if (!item) {
01324 close(connfd);
01325 return -1;
01326 }
01327
01328 if (proxy_send_conn_request_to_server(connfd, component_tag, component, item) < 0) {
01329 free(item);
01330 close(connfd);
01331 return -1;
01332 }
01333
01334
01335
01336 free(item);
01337 }
01338
01339 return 0;
01340 }
01341
01358 int
01359 proxy_init_sock(int *sfd, struct sockaddr_in *serv, char *proto, int nb, int port)
01360 {
01361 int r, flag = 1;
01362 socklen_t namelen;
01363 char istcp = strcmp(proto, "udp");
01364
01365 if ((*sfd = socket(AF_INET, istcp ? SOCK_STREAM : SOCK_DGRAM, 0)) < 0)
01366 return *sfd;
01367
01368 if ((r = setsockopt(*sfd, SOL_SOCKET, SO_REUSEADDR, (char *) &flag, sizeof(flag))) < 0)
01369 return r;
01370
01371 if(istcp)
01372 setsockopt(*sfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
01373
01374 serv->sin_family = AF_INET;
01375 serv->sin_addr.s_addr = htonl(INADDR_ANY);
01376 serv->sin_port = htons(port);
01377
01378 if ((r = bind(*sfd, (struct sockaddr *) serv, sizeof(*serv))) < 0)
01379 return r;
01380
01381 namelen = sizeof(*serv);
01382 if ((r = getsockname(*sfd, (struct sockaddr *) serv, &namelen)) < 0)
01383 return r;
01384
01385 if (nb)
01386 if ((r = fcntl(*sfd, F_SETFL, O_NDELAY)) < 0)
01387 return r;
01388
01389 return 0;
01390 }
01391
01398 #ifdef KERBEROS5
01399 int
01400 proxy_check_krb5_envvars()
01401 {
01402
01403 if (getenv("GRIDSOLVE_KEYTAB") == NULL) {
01404 fprintf(stderr, "Environment variable GRIDSOLVE_KEYTAB not defined\n");
01405 return -1;
01406 }
01407
01408 if (getenv("GRIDSOLVE_USERS") == NULL) {
01409 fprintf(stderr, "Environment variable GRIDSOLVE_USERS not defined\n");
01410 return -1;
01411 }
01412
01413 return 0;
01414 }
01415 #endif