00001
00008
00009
00010
00011 #include <sys/types.h>
00012 #include <sys/stat.h>
00013 #include <fcntl.h>
00014 #include <string.h>
00015 #include <stdarg.h>
00016 #include <errno.h>
00017
00018 #ifdef HAVE_CONFIG_H
00019 #include "config.h"
00020 #endif
00021
00022 #include "portability.h"
00023 #include "utility.h"
00024 #include "comm_basics.h"
00025 #include "comm_encode.h"
00026 #include "comm_data.h"
00027 #include "grpc.h"
00028 #include "gs_seq_dsi.h"
00029 #include "gs_seq_data_handle.h"
00030
00031
00032
00033 static char *client_callsig = NULL;
00034 static char *server_callsig = NULL;
00035
00036
00042 static int gs_send_data_transfer_request(
00043 char *data_handle, gs_argument_t *argptr, int my_dsig) {
00044 LFS_DSI_OBJECT *obj;
00045 char *server_name, *path, *file_name, *full_name;
00046 int server_sock, tag, port;
00047 int len, sender_major, sender_dsig;
00048 struct hostent *hp;
00049 char cid[CID_LEN];
00050 ipaddr_t ipaddr;
00051
00052
00053 if (!data_handle || !argptr) {
00054 fprintf(stderr, "bad data handle or argument pointer \
00055 in gs_send_data_transfer_request\n");
00056 return -1;
00057 }
00058
00059
00060 if ((obj = gs_seq_decode_lfs_dsi_object(data_handle)) == NULL) {
00061 fprintf(stderr, "error decoding data handle\n");
00062 return -1;
00063 }
00064
00065 if (!DATA_RECV_FILE_PATH) {
00066 gs_seq_set_lfs_dsi_data_storage_path();
00067 }
00068
00069
00070 path = DATA_RECV_FILE_PATH;
00071 file_name = obj->file_name;
00072
00073 full_name = (char *) malloc(sizeof(char)
00074 * (strlen(path) + strlen(file_name) + 1));
00075 if (!full_name) {
00076 perror("malloc");
00077 return -1;
00078 }
00079
00080 strcpy(full_name, path);
00081 full_name[strlen(path)] = '\0';
00082
00083 strcat(full_name, file_name);
00084 full_name[strlen(path) + strlen(file_name)] = '\0';
00085
00086
00087 server_name = obj->source;
00088
00089 memset(cid, 0xFF, CID_LEN);
00090
00091
00092 port = GRIDSOLVE_SERVER_PORT_DEFAULT;
00093
00094
00095 if ((hp = gethostbyname(server_name)) == NULL) {
00096 errno = errno_socket();
00097 fprintf(stderr,
00098 "could not gethostbyname for %s (errno %d) \n", server_name, errno);
00099 return INVALID_SOCKET;
00100 }
00101
00102 memcpy((void *) &ipaddr, hp->h_addr_list[0], sizeof(ipaddr));
00103
00104
00105 memset(cid, 0xFF, CID_LEN);
00106
00107
00108 server_sock = gs_connect_to_host(cid, ipaddr, port, 0, 0);
00109
00110 if(server_sock < 0) {
00111 ERRPRINTF("unsuccessful (connecting server)\n");
00112 return -1;
00113 }
00114
00115
00116 if((gs_send_tag(server_sock, GS_PROT_DATA_TRANSFER) < 0) ||
00117 (gs_send_string(server_sock, VERSION) < 0)) {
00118 ERRPRINTF("unsuccessful (sending tag to Server)\n");
00119 return -1;
00120 }
00121
00122
00123 if(gs_recv_tag(server_sock, &tag) < 0) {
00124 ERRPRINTF("error communicating with server.\n");
00125 return -1;
00126 }
00127
00128 if(tag != GS_PROT_OK) {
00129 if(tag == GS_PROT_VERSION_MISMATCH)
00130 ERRPRINTF("error: server is an incompatible version\n");
00131 else
00132 ERRPRINTF("error: server refused\n");
00133 return -1;
00134 }
00135
00136
00137 if (gs_send_string(server_sock, data_handle) < 0) {
00138 ERRPRINTF("unsuccessful (sending data handle)\n");
00139 return -1;
00140 }
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159 if (gs_recv_int(server_sock, &len) < 0) {
00160 fprintf(stderr, "error receiving file length\n");
00161 return -1;
00162 }
00163
00164 if (gs_recv_tag(server_sock, &sender_major) < 0) {
00165 fprintf(stderr, "error receiving sender major\n");
00166 return -1;
00167 }
00168
00169 if (gs_recv_int(server_sock, &sender_dsig) < 0) {
00170 fprintf(stderr, "error receiving sender data signature\n");
00171 return -1;
00172 }
00173
00174 if (gs_recv_arg(server_sock, argptr, sender_major,
00175 sender_dsig, my_dsig, GS_SERVER_SIDE) < 0) {
00176 fprintf(stderr, "error receiving argument\n");
00177 return -1;
00178 }
00179
00180 return 0;
00181 }
00182
00183
00198 int
00199 gs_send_tag(SOCKET sock, int tag)
00200 {
00201 unsigned char bytetag;
00202
00203 DBGPRINTF("sock %d tag %d\n", sock, tag);
00204
00205 if (tag<0 || tag>127) {
00206 ERRPRINTF("Tag value %d cannot be held in one byte!!!\n", tag);
00207 return -1;
00208 }
00209 bytetag = tag;
00210 if (gs_twrite(sock, &bytetag, sizeof(bytetag)) <= 0) {
00211 ERRPRINTF("Could not write tag %d to the network\n", tag);
00212 return -1;
00213 }
00214 return 0;
00215 }
00216
00228 int
00229 gs_recv_tag(SOCKET sock, int *tag)
00230 {
00231 char bytetag = -1;
00232
00233 if (gs_tread(sock, &bytetag, sizeof(bytetag)) == -1) {
00234 ERRPRINTF("Error in reading tag \n");
00235 return(-1);
00236 }
00237 *tag = bytetag;
00238 return 0;
00239 }
00240
00251 int
00252 gs_send_int(SOCKET sock, int tosend)
00253 {
00254 uint32_t uint1 = (uint32_t)tosend;
00255 uint32_t uint2 = htonl(uint1);
00256
00257 if (tosend < 0) {
00258 ERRPRINTF("The integer must be unsigned %d\n", tosend);
00259 return -1;
00260 }
00261
00262 if (gs_twrite(sock, &uint2, sizeof(uint32_t)) <= 0) {
00263 ERRPRINTF("Could not send int %d\n", tosend);
00264 return -1;
00265 }
00266
00267 return 0;
00268 }
00269
00280 int
00281 gs_recv_int(SOCKET sock, int *torecv)
00282 {
00283 uint32_t uint2;
00284
00285 if (gs_tread(sock, (char *)&uint2, sizeof(uint32_t)) == -1) {
00286 ERRPRINTF("Error receiving int \n");
00287 return(-1);
00288 }
00289
00290 *torecv = (int)ntohl(uint2);
00291
00292 return 0;
00293 }
00294
00305 int
00306 gs_send_string(SOCKET sock, char *tosend)
00307 {
00308 int len = -1;
00309
00310 ASSERT_EXPR((tosend != NULL), return(-1));
00311 len = strlen(tosend);
00312
00313 #ifdef GS_DEBUG
00314 {
00315 char *tmps = strdup(tosend);
00316 if (strlen(tmps) > 50) tmps[50]='\0';
00317 DBGPRINTF("sending string of len %d to sock %d: '%s...'\n", len, sock, tmps);
00318 FREE(tmps);
00319 }
00320 #else
00321 DBGPRINTF("sending string of len %d to sock %d \n", len, sock);
00322 #endif
00323
00324 if(gs_send_int(sock, len) < 0) {
00325 ERRPRINTF("Error sending string length\n");
00326 return -1;
00327 }
00328
00329 if(gs_twrite(sock, tosend, len*sizeof(char)) < 0) {
00330 ERRPRINTF("Error sending string\n");
00331 return -1;
00332 }
00333
00334 return 0;
00335 }
00336
00348 int
00349 gs_recv_string(SOCKET sock, char **torecv)
00350 {
00351 int len = -1;
00352
00353 if(gs_recv_int(sock, &len) < 0) {
00354 ERRPRINTF("Error receiving string length\n");
00355 return -1;
00356 }
00357
00358 if((*torecv = (char *)CALLOC(len+1, sizeof(char))) == NULL) {
00359 ERRPRINTF("Error allocating memory for string\n");
00360 return -1;
00361 }
00362
00363 if(gs_tread(sock, *torecv, len*sizeof(char)) < 0) {
00364 free(*torecv);
00365 ERRPRINTF("Error reading string\n");
00366 return -1;
00367 }
00368
00369 #ifdef GS_DEBUG
00370 {
00371 char *tmps = strdup(*torecv);
00372 if (strlen(tmps) > 50) tmps[51]='\0';
00373 DBGPRINTF("recvd string of len %d: '%s...'\n", len, tmps);
00374 FREE(tmps);
00375 }
00376 #else
00377 DBGPRINTF("recvd string of len %d\n", len); fflush(NULL);
00378 #endif
00379
00380 return 0;
00381 }
00382
00393 int
00394 gs_get_element_size(enum datatype data_type, int dsig)
00395 {
00396 switch(data_type) {
00397 case GS_INT:
00398 return DSIG_INT_SIZE(dsig);
00399 case GS_FLOAT:
00400 return DSIG_FLOAT_SIZE(dsig);
00401 case GS_DOUBLE:
00402 return DSIG_DOUBLE_SIZE(dsig);
00403 case GS_SCOMPLEX:
00404 return 2 * DSIG_FLOAT_SIZE(dsig);
00405 case GS_DCOMPLEX:
00406 return 2 * DSIG_DOUBLE_SIZE(dsig);
00407 case GS_CHAR:
00408 return sizeof(char);
00409 default:
00410 ERRPRINTF("gs_get_element_size: bad data type\n");
00411 return 1;
00412 }
00413 }
00414
00424 int
00425 gs_recv_arg_into_file(SOCKET sock, char *fname)
00426 {
00427 char fxbuf[FILE_XFER_BUFSZ];
00428 int n, nleft, len, fd;
00429
00430 fd = open(fname, O_WRONLY | O_CREAT | O_TRUNC, 0666);
00431 if(fd < 0) {
00432 ERRPRINTF("File '%s' could not be opened\n", fname);
00433 return -1;
00434 }
00435
00436
00437
00438
00439 if(gs_recv_int(sock, &len) < 0) {
00440 ERRPRINTF("error in receiving file len\n");
00441 close(fd);
00442 return -1;
00443 }
00444
00445 nleft = len;
00446 while(nleft > 0) {
00447 n = gs_tread(sock, fxbuf, MIN(FILE_XFER_BUFSZ, nleft));
00448
00449 if(n < 0) {
00450 if(errno_socket() == EINTR)
00451 continue;
00452 else {
00453 ERRPRINTF("error in reading argument data from sock\n");
00454 close(fd);
00455 return -1;
00456 }
00457 } else if (n == 0)
00458 break;
00459
00460 if(write(fd, fxbuf, n) != n) {
00461 ERRPRINTF("error in writing argument data to file\n");
00462 close(fd);
00463 return -1;
00464 }
00465
00466 nleft -= n;
00467 }
00468
00469 close(fd);
00470
00471 if(nleft == 0)
00472 return 0;
00473 else
00474 return -1;
00475 }
00476
00489 int
00490 gs_recv_file_scalar(SOCKET sock, gs_argument_t *arg, gs_side_t side)
00491 {
00492 if(side == GS_SERVER_SIDE) {
00493 if(gs_recv_arg_into_file(sock, arg->name) < 0) {
00494 ERRPRINTF("error receiving arg data into file\n");
00495 return -1;
00496 }
00497
00498 arg->data = strdup(arg->name);
00499
00500 if(!arg->data) {
00501 ERRPRINTF("error allocating string for arg.\n");
00502 return -1;
00503 }
00504 }
00505 else if(side == GS_CLIENT_SIDE) {
00506 if(gs_recv_arg_into_file(sock, arg->data) < 0) {
00507 ERRPRINTF("error receiving arg data into file\n");
00508 return -1;
00509 }
00510 }
00511 else {
00512 ERRPRINTF("Bad 'side' argument.\n");
00513 return -1;
00514 }
00515
00516 return 0;
00517 }
00518
00531 int
00532 gs_recv_file_vector(SOCKET sock, gs_argument_t *arg, gs_side_t side)
00533 {
00534 char **data;
00535 int i;
00536
00537 if(!arg) return -1;
00538
00539 if(side == GS_SERVER_SIDE) {
00540 arg->data = malloc(arg->rows * sizeof(char *));
00541 if(!arg->data) {
00542 ERRPRINTF("error allocating memory for filenames.\n");
00543 return -1;
00544 }
00545
00546 data = (char **)arg->data;
00547
00548 for(i = 0; i < arg->rows; i++) {
00549 data[i] = dstring_sprintf("%s_%d", arg->name, i);
00550
00551 if(!data[i]) {
00552 ERRPRINTF("error allocating string for arg.\n");
00553 return -1;
00554 }
00555
00556 if(gs_recv_arg_into_file(sock, data[i]) < 0) {
00557 ERRPRINTF("error receiving arg data into file\n");
00558 return -1;
00559 }
00560 }
00561 }
00562 else if(side == GS_CLIENT_SIDE) {
00563 data = (char **)arg->data;
00564
00565 for(i = 0; i < arg->rows; i++) {
00566 if(gs_recv_arg_into_file(sock, data[i]) < 0) {
00567 ERRPRINTF("error receiving arg data into file\n");
00568 return -1;
00569 }
00570 }
00571 }
00572 else {
00573 ERRPRINTF("Bad 'side' argument.\n");
00574 return -1;
00575 }
00576
00577 return 0;
00578 }
00579
00597 int
00598 gs_recv_arg(SOCKET sock, gs_argument_t *arg, int sender_major,
00599 int sender_dsig, int my_dsig, gs_side_t side)
00600 {
00601 int sender_elsize, data_size;
00602 char *data;
00603
00604 if(!arg || !arg->prob)
00605 return -1;
00606
00607 if(arg->objecttype == GS_FILE)
00608 return gs_recv_file_scalar(sock, arg, side);
00609 else if(arg->objecttype == GS_PACKEDFILE)
00610 return gs_recv_file_vector(sock, arg, side);
00611
00612
00613 sender_elsize = gs_get_element_size(arg->datatype, sender_dsig);
00614 data_size = arg->rows * arg->cols * sender_elsize;
00615
00616 data = arg->data;
00617
00618 DBGPRINTF("receiving arg %s, type=%s, rows=%d, cols=%d, bytes=%d\n",
00619 arg->name, gs_c_datatype[arg->datatype], arg->rows, arg->cols, data_size);
00620
00621 if(arg->dsi) {
00622 if(side == GS_SERVER_SIDE) {
00623 if(gs_recv_dsi_input_arg(sock, arg, data_size) < 0) {
00624 ERRPRINTF("Error receiving DSI input arg\n");
00625 return -1;
00626 }
00627
00628 data = arg->data;
00629 }
00630 else {
00631 if(gs_recv_dsi_output_arg(sock, arg, data_size) < 0) {
00632 ERRPRINTF("Error receiving DSI output arg\n");
00633 return -1;
00634 }
00635
00636
00637
00638
00639
00640 return 0;
00641 }
00642 }
00643 else {
00644
00645
00646
00647
00648
00649 if(arg->inout == GS_VAROUT) {
00650 char **dptr;
00651 int srv_data_size;
00652
00653 if(!arg->data) {
00654 arg->data = (char **) malloc(sizeof(char *));
00655 if(!arg->data) {
00656 ERRPRINTF("malloc\n");
00657 return -1;
00658 }
00659 }
00660
00661 dptr = arg->data;
00662
00663 if(gs_recv_int(sock, &srv_data_size) < 0) {
00664 ERRPRINTF("Error reading arg size\n");
00665 return -1;
00666 }
00667
00668 data_size = srv_data_size;
00669
00670 *dptr = (char *)malloc(data_size);
00671 data = *dptr;
00672
00673 if(!data) {
00674 ERRPRINTF("error allocating space for arg\n");
00675 return -1;
00676 }
00677 }
00678
00679 if(!arg->data) {
00680 arg->data = (char *)malloc(data_size);
00681
00682 if(!arg->data) {
00683 ERRPRINTF("error allocating space for argument ");
00684 ERRPRINTF("%s (rows %s (%d) cols %s (%d))(%d bytes)\n",
00685 arg->name, arg->rowexp, arg->rows, arg->colexp, arg->cols, data_size);
00686 return -1;
00687 }
00688
00689 data = arg->data;
00690 }
00691
00692 if(gs_tread(sock, data, data_size) == -1) {
00693 ERRPRINTF("error in receiving argument ");
00694 ERRPRINTF("%s (rows %s (%d) cols %s (%d))(%d bytes)\n",
00695 arg->name, arg->rowexp, arg->rows, arg->colexp, arg->cols, data_size);
00696 return -1;
00697 }
00698 }
00699
00700 if((arg->objecttype == GS_MATRIX) && (sender_major != arg->prob->major))
00701 gs_transpose_matrix(arg, data, side);
00702 if(sender_dsig != my_dsig)
00703 if(gs_convert_arg(arg, data, sender_dsig, my_dsig) < 0)
00704 return -1;
00705
00706 return 0;
00707 }
00708
00718 int
00719 gs_send_arg_from_file(SOCKET sock, char *filename)
00720 {
00721 struct stat st;
00722 char fxbuf[FILE_XFER_BUFSZ];
00723 int n, fd;
00724
00725 if(stat(filename, &st) == -1) {
00726 ERRPRINTF("Could not stat file '%s'\n", filename);
00727 return -1;
00728 }
00729
00730 fd = open(filename, O_RDONLY, 0666);
00731 if(fd < 0) {
00732 ERRPRINTF("File '%s' could not be opened\n", filename);
00733 return -1;
00734 }
00735
00736
00737
00738
00739 if(gs_send_int(sock, (int)st.st_size) < 0) {
00740 ERRPRINTF("error in sending file len\n");
00741 close(fd);
00742 return -1;
00743 }
00744
00745 while((n=read(fd, fxbuf, FILE_XFER_BUFSZ)) > 0) {
00746 if(gs_twrite(sock, fxbuf, n) != n) {
00747 ERRPRINTF("error in sending argument data\n");
00748 close(fd);
00749 return -1;
00750 }
00751 }
00752
00753 close(fd);
00754
00755 if(n < 0) {
00756 ERRPRINTF("Error reading from file\n");
00757 return -1;
00758 }
00759
00760 return 0;
00761 }
00762
00773 int
00774 gs_send_dsi_input_arg(SOCKET sock, gs_argument_t *arg, int my_dsig)
00775 {
00776 int data_size;
00777 char *data_encoding;
00778
00779 if(!arg || !arg->data)
00780 return -1;
00781
00782 data_size = gs_get_element_size(arg->datatype, my_dsig) *
00783 arg->rows * arg->cols;
00784
00785 DBGPRINTF("sending DSI arg %s, type=%s, rows=%d, cols=%d, bytes=%d\n",
00786 arg->name, gs_c_datatype[arg->datatype], arg->rows, arg->cols, data_size);
00787
00788 if(gs_encode_dsi_object(&data_encoding, arg->data) < 0) {
00789 ERRPRINTF("Failed to encode DSI object\n");
00790 return -1;
00791 }
00792
00793 if(gs_send_string(sock, data_encoding) < 0) {
00794 ERRPRINTF("failed to send encoded DSI object\n");
00795 free(data_encoding);
00796 return -1;
00797 }
00798
00799 free(data_encoding);
00800
00801 return 0;
00802 }
00803
00814 int
00815 gs_send_dsi_output_arg(SOCKET sock, gs_argument_t *arg, int my_dsig)
00816 {
00817 int data_size;
00818 char *data_encoding;
00819
00820 if(!arg || !arg->data)
00821 return -1;
00822
00823 data_size = gs_get_element_size(arg->datatype, my_dsig) *
00824 arg->rows * arg->cols;
00825
00826 DBGPRINTF("sending DSI arg %s, type=%s, rows=%d, cols=%d, bytes=%d\n",
00827 arg->name, gs_c_datatype[arg->datatype], arg->rows, arg->cols, data_size);
00828
00838 if(gs_encode_dsi_object(&data_encoding, arg->dsi_obj) < 0) {
00839 ERRPRINTF("Failed to encode DSI object\n");
00840 return -1;
00841 }
00842
00843 if(gs_send_string(sock, data_encoding) < 0) {
00844 ERRPRINTF("failed to send encoded DSI object\n");
00845 free(data_encoding);
00846 return -1;
00847 }
00848
00849 free(data_encoding);
00850
00851 return 0;
00852 }
00853
00863 int
00864 gs_recv_dsi_input_arg(SOCKET sock, gs_argument_t *arg, int data_size)
00865 {
00866 DSI_OBJECT *obj;
00867 int bytes_read;
00868 char *encoding;
00869 void *data = NULL;
00870
00871 data = (void *)malloc(data_size);
00872
00873 if(!data) {
00874 ERRPRINTF("failed to allocate data\n");
00875 return -1;
00876 }
00877
00878 obj = (DSI_OBJECT *)calloc(1, sizeof(DSI_OBJECT));
00879
00880 if(!obj) {
00881 ERRPRINTF("malloc failed\n");
00882 return -1;
00883 }
00884
00885 if(gs_recv_string(sock, &encoding) < 0) {
00886 ERRPRINTF("error in receiving DSI object encoding\n");
00887 return -1;
00888 }
00889
00890 if(gs_decode_dsi_object(encoding, obj) < 0) {
00891 ERRPRINTF("error decoding DSI object\n");
00892 free(encoding);
00893 free(data);
00894 free(obj);
00895 return -1;
00896 }
00897
00898 arg->dsi_obj = obj;
00899
00900 if(grpc_dsi_read_vector(obj, data, arg->rows * arg->cols,
00901 arg->datatype, &bytes_read) != GRPC_NO_ERROR)
00902 {
00903 ERRPRINTF("error reading DSI object\n");
00904 free(encoding);
00905 free(data);
00906 free(obj);
00907 return -1;
00908 }
00909
00910 if(!data) {
00911 ERRPRINTF("Empty data after reading DSI object\n");
00912 free(encoding);
00913 free(data);
00914 free(obj);
00915 return -1;
00916 }
00917
00918 arg->data = (char *)data;
00919
00920 free(encoding);
00921
00922 return 0;
00923 }
00924
00934 int
00935 gs_recv_dsi_output_arg(SOCKET sock, gs_argument_t *arg, int data_size)
00936 {
00937 char *encoding;
00938
00939 if(gs_recv_string(sock, &encoding) < 0) {
00940 ERRPRINTF("error in receiving DSI object encoding\n");
00941 return -1;
00942 }
00943
00944
00945
00946
00947
00948
00949
00950 free(encoding);
00951
00952 return 0;
00953 }
00954
00965 int
00966 gs_send_arg(SOCKET sock, gs_argument_t *arg, int my_dsig)
00967 {
00968 int data_size;
00969 char *data;
00970
00971 if(!arg || !arg->data)
00972 return -1;
00973
00974 data_size = gs_get_element_size(arg->datatype, my_dsig) *
00975 arg->rows * arg->cols;
00976
00977 data = arg->data;
00978
00979 DBGPRINTF("sending arg %s, type=%s, rows=%d, cols=%d, bytes=%d\n",
00980 arg->name, gs_c_datatype[arg->datatype], arg->rows, arg->cols, data_size);
00981
00982 if(arg->objecttype == GS_FILE) {
00983 if(gs_send_arg_from_file(sock, data) < 0) {
00984 ERRPRINTF("error arg data from file '%s'\n", data);
00985 return -1;
00986 }
00987
00988 return 0;
00989 }
00990 else if(arg->objecttype == GS_PACKEDFILE) {
00991 char **filenames;
00992 int i;
00993
00994 filenames = (char **)arg->data;
00995
00996 for(i=0; i < arg->rows; i++)
00997 if(gs_send_arg_from_file(sock, filenames[i]) < 0) {
00998 ERRPRINTF("error arg data from file '%s'\n", filenames[i]);
00999 return -1;
01000 }
01001
01002 return 0;
01003 }
01004
01005 if(arg->inout == GS_VAROUT) {
01006 if(gs_send_int(sock, data_size) < 0) {
01007 ERRPRINTF("error in sending arg len\n");
01008 return -1;
01009 }
01010
01011 if(arg->objecttype != GS_SCALAR)
01012 data = *((char **)(arg->data));
01013 }
01014
01015 if(gs_twrite(sock, data, data_size) != data_size) {
01016 ERRPRINTF("error in sending argument data\n");
01017 return -1;
01018 }
01019
01020 return 0;
01021 }
01022
01035 int
01036 gs_convert_arg(gs_argument_t *arg, char *data, int sender_dsig, int my_dsig)
01037 {
01038 int i;
01039
01040 if(!arg || !arg->data)
01041 return -1;
01042
01043 switch(arg->datatype) {
01044 case GS_INT:
01045 if(DSIG_INT_SIZE(sender_dsig) == DSIG_INT_SIZE(my_dsig)) {
01046 for(i=0; i< arg->rows * arg->cols; i++)
01047 gs_int_swap_func[DSIG_INT_ORDER(sender_dsig)][DSIG_INT_ORDER(my_dsig)](
01048 ((int *)data)+i);
01049 }
01050 else {
01051 ERRPRINTF("Error: different int sizes not supported yet\n");
01052 return -1;
01053 }
01054 break;
01055 case GS_FLOAT:
01056 if(DSIG_FLOAT_SIZE(sender_dsig) == DSIG_FLOAT_SIZE(my_dsig)) {
01057 for(i=0; i< arg->rows * arg->cols; i++)
01058 gs_float_swap_func[DSIG_FLOAT_ORDER(sender_dsig)][DSIG_FLOAT_ORDER(my_dsig)](((float *)data)+i);
01059 }
01060 else {
01061 ERRPRINTF("Error: different float sizes not supported yet\n");
01062 return -1;
01063 }
01064 break;
01065 case GS_DOUBLE:
01066 if(DSIG_DOUBLE_SIZE(sender_dsig) == DSIG_DOUBLE_SIZE(my_dsig)) {
01067 for(i=0; i< arg->rows * arg->cols; i++)
01068 gs_double_swap_func[DSIG_DOUBLE_ORDER(sender_dsig)][DSIG_DOUBLE_ORDER(my_dsig)](((double *)data)+i);
01069 }
01070 else {
01071 ERRPRINTF("Error: different double sizes not supported yet\n");
01072 return -1;
01073 }
01074 break;
01075
01076 case GS_SCOMPLEX:
01077 if(DSIG_FLOAT_SIZE(sender_dsig) == DSIG_FLOAT_SIZE(my_dsig)) {
01078 gs_scomplex *iptr;
01079
01080 for(i=0; i< arg->rows * arg->cols; i++) {
01081 iptr = ((gs_scomplex *)data) + i;
01082
01083 gs_float_swap_func[DSIG_FLOAT_ORDER(sender_dsig)][DSIG_FLOAT_ORDER(my_dsig)](&(iptr->r));
01084 gs_float_swap_func[DSIG_FLOAT_ORDER(sender_dsig)][DSIG_FLOAT_ORDER(my_dsig)](&(iptr->i));
01085 }
01086
01087 }
01088 else {
01089 ERRPRINTF("Error: different scomplex sizes not supported yet\n");
01090 return -1;
01091 }
01092 break;
01093 case GS_DCOMPLEX:
01094 if(DSIG_DOUBLE_SIZE(sender_dsig) == DSIG_DOUBLE_SIZE(my_dsig)) {
01095 gs_dcomplex *iptr;
01096
01097 for(i=0; i< arg->rows * arg->cols; i++) {
01098 iptr = ((gs_dcomplex *)data) + i;
01099
01100 gs_double_swap_func[DSIG_DOUBLE_ORDER(sender_dsig)][DSIG_DOUBLE_ORDER(my_dsig)](&(iptr->r));
01101 gs_double_swap_func[DSIG_DOUBLE_ORDER(sender_dsig)][DSIG_DOUBLE_ORDER(my_dsig)](&(iptr->i));
01102 }
01103
01104 }
01105 else {
01106 ERRPRINTF("Error: different dcomplex sizes not supported yet\n");
01107 return -1;
01108 }
01109 break;
01110 case GS_CHAR:
01111
01112 break;
01113 case GS_BAD_DTYPE:
01114 ERRPRINTF("Error: argument has invalid data type.\n");
01115 return -1;
01116 break;
01117 }
01118
01119 return 0;
01120 }
01121
01133 int
01134 gs_set_dsi_flags(gs_problem_t *prob, char *callsig)
01135 {
01136 gs_argument_t *argptr;
01137 int i;
01138
01139 if(!prob) return -1;
01140
01141 if(prob->callsig)
01142 free(prob->callsig);
01143
01144 prob->callsig = callsig;
01145
01146 if(!strcmp(callsig, GS_NO_CALL_SIG)) {
01147 for(argptr=prob->arglist; argptr != NULL; argptr=argptr->next) {
01148 argptr->dsi = 0;
01149 argptr->dsi_obj = NULL;
01150 }
01151 return 0;
01152 }
01153
01154 i = 0;
01155 for(argptr=prob->arglist; argptr != NULL; argptr=argptr->next, i++) {
01156 if(prob->callsig) {
01157 if(prob->callsig[i] == 'D')
01158 argptr->dsi = 1;
01159 }
01160 else
01161 argptr->dsi = 0;
01162
01163 argptr->dsi_obj = NULL;
01164 }
01165
01166 return 0;
01167 }
01168
01169
01174 int gs_set_pass_back_flags(gs_problem_t *prob, char *callsig) {
01175 gs_argument_t *argptr;
01176 int i;
01177
01178 if (!prob) return -1;
01179
01180
01181
01182
01183
01184
01185
01186
01187
01188
01189
01190
01191
01192
01193
01194
01195
01196
01197
01198
01199
01200
01201 i = 0;
01202 for (argptr = prob->arglist; argptr != NULL; argptr = argptr->next, i++) {
01203 if (prob->callsig[i] == 'R') {
01204 argptr->pass_back = 0;
01205 argptr->data_handle = 0;
01206 }
01207 else if (prob->callsig[i] == 'S') {
01208 argptr->pass_back = 0;
01209 argptr->data_handle = 1;
01210 }
01211 else if (prob->callsig[i] == 'T') {
01212 argptr->pass_back = 1;
01213 argptr->data_handle = 1;
01214 }
01215 else {
01216 argptr->pass_back = 1;
01217 argptr->data_handle = 0;
01218 }
01219 }
01220
01221 return 0;
01222 }
01223
01224
01234 int
01235 gs_set_call_signature(gs_problem_t *prob_desc)
01236 {
01237 gs_argument_t *argptr;
01238 char *callsig;
01239 int i, count;
01240
01241 if(!prob_desc) return -1;
01242
01243
01244 count = 0;
01245 for(argptr=prob_desc->arglist; argptr != NULL; argptr=argptr->next)
01246 count++;
01247
01248 callsig = (char *)malloc(count + 1);
01249 if(!callsig)
01250 return -1;
01251
01252 i = 0;
01253 for(argptr=prob_desc->arglist; argptr != NULL; argptr=argptr->next, i++) {
01254 argptr->dsi = 0;
01255
01256 if(argptr->data) {
01257 if(gs_is_dsi_object(argptr->data)) {
01258 callsig[i] = 'D';
01259 argptr->dsi = 1;
01260
01261 if((argptr->objecttype == GS_FILE) ||
01262 (argptr->objecttype == GS_PACKEDFILE) ||
01263 (argptr->inout == GS_INOUT) ||
01264 (argptr->inout == GS_OUT) ||
01265 (argptr->inout == GS_VAROUT) ||
01266 (argptr->inout == GS_WORKSPACE))
01267 {
01268 ERRPRINTF("DSI only supported for INPUT variables ");
01269 ERRPRINTF("(offending variable = %s)\n", argptr->name);
01270 free(callsig);
01271 return -1;
01272 }
01273 }
01274
01275
01276
01277
01278
01279 if (!argptr->pass_back && !argptr->data_handle) {
01280 callsig[i] = 'R';
01281 }
01282 else if (!argptr->pass_back && argptr->data_handle) {
01283 callsig[i] = 'S';
01284 }
01285 else if (argptr->pass_back && argptr->data_handle) {
01286 callsig[i] = 'T';
01287 }
01288 else
01289 callsig[i] = 'P';
01290 }
01291 else
01292 callsig[i] = 'N';
01293
01294 argptr->dsi_obj = NULL;
01295 }
01296
01297 callsig[i] = 0;
01298
01299 if(prob_desc->callsig)
01300 free(prob_desc->callsig);
01301
01302 prob_desc->callsig = callsig;
01303
01304 client_callsig = callsig;
01305
01306 return 0;
01307 }
01308
01330 int
01331 gs_send_input_scalar_args(gs_va_list *args, void **argstack, SOCKET sock,
01332 gs_problem_t *problem, int my_dsig, int lang, int major)
01333 {
01334 gs_argument_t *argptr;
01335
01336 if(!problem)
01337 return -1;
01338
01339
01340
01341
01342
01343 if(args || argstack)
01344 if(gs_sender_compute_arg_sizes(args, argstack, problem, lang, major) < 0)
01345 return -1;
01346
01347 if(gs_send_tag(sock, major) < 0) {
01348 ERRPRINTF("failed to send major %d\n", problem->major);
01349 return -1;
01350 }
01351
01352
01353
01354 if(!strcmp(problem->callsig, GS_NO_CALL_SIG))
01355 if(gs_set_call_signature(problem) < 0)
01356 return -1;
01357
01358 if(gs_send_string(sock, problem->callsig) < 0) {
01359 ERRPRINTF("failed to send call signature '%s'\n", problem->callsig);
01360 return -1;
01361 }
01362
01363
01364
01365
01366
01367 for(argptr = problem->arglist; argptr != NULL; argptr = argptr->next)
01368 {
01369 if(((argptr->inout == GS_IN) || (argptr->inout == GS_INOUT)) &&
01370 (argptr->objecttype == GS_SCALAR))
01371 {
01372 if(gs_send_arg(sock, argptr, my_dsig) < 0) {
01373 ERRPRINTF("error in sending input scalar argument\n");
01374 return -1;
01375 }
01376 }
01377 }
01378
01379 return 0;
01380 }
01381
01382
01396 int
01397 gs_send_input_nonscalar_args(SOCKET sock, gs_problem_t *problem, int my_dsig)
01398 {
01399 gs_argument_t *argptr;
01400 char *data_handle;
01401 int i;
01402
01403
01404 i = 0;
01405 for(argptr = problem->arglist; argptr != NULL; argptr = argptr->next, i++)
01406 {
01407 argptr->index = i;
01408 if(((argptr->inout == GS_IN) || (argptr->inout == GS_INOUT)) &&
01409 (argptr->objecttype != GS_SCALAR))
01410 {
01411 if(argptr->dsi) {
01412 if(gs_send_dsi_input_arg(sock, argptr, my_dsig) < 0) {
01413 ERRPRINTF("error in sending input DSI argument\n");
01414 return -1;
01415 }
01416 }
01417
01418
01419 else if (argptr->data_handle) {
01420 data_handle = find_data_handle(
01421 problem->name, problem->seq_id, argptr->index, argptr->data);
01422 if (!data_handle) {
01423 return -1;
01424 }
01425
01426
01427 if (gs_send_string(sock, data_handle) < 0) {
01428 fprintf(stderr, "error in sending data handle - client side\n");
01429 return -1;
01430 }
01431 }
01432
01433 else {
01434 if(gs_send_arg(sock, argptr, my_dsig) < 0) {
01435 fprintf(stderr, "error in sending input argument\n");
01436 return -1;
01437 }
01438 }
01439 }
01440 }
01441
01442 return 0;
01443 }
01444
01464 int
01465 gs_send_input_args(gs_va_list *args, void **argstack, SOCKET sock,
01466 gs_problem_t *problem, int my_dsig, int lang, int major)
01467 {
01468
01469 if(!problem)
01470 return -1;
01471
01472
01473 if(gs_send_input_scalar_args(args, argstack, sock, problem,
01474 my_dsig, lang, major) < 0)
01475 return -1;
01476
01477
01478
01479 if(gs_send_input_nonscalar_args(sock, problem, my_dsig) < 0)
01480 return -1;
01481
01482 return 0;
01483 }
01484
01494 int
01495 gs_wait_for_output(SOCKET sock)
01496 {
01497 int nready;
01498
01499 nready = proxy_readable_timeout(sock, -1);
01500
01501 return (nready > 0) ? 0 : -1;
01502 }
01503
01516 int
01517 gs_recv_output_args(SOCKET sock, gs_problem_t *problem, int sender_dsig,
01518 int my_dsig)
01519 {
01520 gs_argument_t *argptr;
01521 char *data_handle;
01522 int sender_major;
01523 int i;
01524
01525 if(!problem)
01526 return -1;
01527
01528 if(gs_wait_for_output(sock) < 0) {
01529 ERRPRINTF("error waiting for output to be ready\n");
01530 return -1;
01531 }
01532
01533 if(gs_recv_tag(sock, &sender_major) < 0) {
01534 ERRPRINTF("error in receiving major\n");
01535 return -1;
01536 }
01537
01538
01539
01540
01541 gs_set_pass_back_flags(problem, client_callsig);
01542
01543 i = 0;
01544 for(argptr = problem->arglist; argptr != NULL; argptr = argptr->next, i++)
01545 {
01546 argptr->index = i;
01547 if((argptr->inout == GS_INOUT) || (argptr->inout == GS_OUT) ||
01548 (argptr->inout == GS_VAROUT))
01549 {
01550
01551
01552
01553
01554 if (!argptr->pass_back) {
01555
01556 if (gs_recv_string(sock, &data_handle) < 0) {
01557 ERRPRINTF("error in receiving data handle - client side\n");
01558 return -1;
01559 }
01560
01561
01562 if (insert_data_handle(problem->name, problem->seq_id,
01563 argptr->index, argptr->data, data_handle) < 0) {
01564 return -1;
01565 }
01566 }
01567 else {
01568 if(gs_recv_arg(sock, argptr, sender_major, sender_dsig, my_dsig,
01569 GS_CLIENT_SIDE) < 0)
01570 {
01571 ERRPRINTF("error in receiving output arguments\n");
01572 return -1;
01573 }
01574 }
01575 }
01576 }
01577
01578 return 0;
01579 }
01580
01592 int
01593 gs_send_output_args(SOCKET sock, gs_problem_t *problem, int my_dsig)
01594 {
01595 gs_argument_t *argptr;
01596 char *full_name, *path, *file_name, *data_handle;
01597 LFS_DSI_OBJECT *obj;
01598 int i;
01599
01600 if(!problem)
01601 return -1;
01602
01603 if(gs_send_tag(sock, problem->major) < 0) {
01604 ERRPRINTF("failed to send major\n");
01605 return -1;
01606 }
01607
01608 if(gs_receiver_compute_arg_sizes(problem, GS_OUT) < 0)
01609 {
01610 ERRPRINTF("could not compute argument sizes.\n");
01611 return -1;
01612 }
01613
01614
01615
01616 gs_set_pass_back_flags(problem, server_callsig);
01617
01618 i = 0;
01619 for(argptr = problem->arglist; argptr != NULL; argptr = argptr->next, i++)
01620 {
01621
01622 argptr->index = i;
01623 if((argptr->inout == GS_INOUT) || (argptr->inout == GS_OUT) ||
01624 (argptr->inout == GS_VAROUT))
01625 {
01626 if(argptr->dsi) {
01627 if(gs_send_dsi_output_arg(sock, argptr, my_dsig) < 0) {
01628 ERRPRINTF("error in sending output DSI argument\n");
01629 return -1;
01630 }
01631 }
01632
01633
01634
01635 if (!argptr->pass_back) {
01636 struct sockaddr_in addr;
01637 socklen_t addr_length;
01638
01639
01640 if (getsockname(sock, (struct sockaddr *) &addr, &addr_length) < 0) {
01641 fprintf(stderr, "error getting local host name\n");
01642 return -1;
01643 }
01644 argptr->hostname = inet_ntoa(addr.sin_addr);
01645
01646
01647 file_name = gs_seq_create_unique_file_name(argptr);
01648
01649 if (!file_name) {
01650 fprintf(stderr, "error generating unique data file name\n");
01651 return -1;
01652 }
01653
01654 if (!DATA_SEND_FILE_PATH) {
01655 gs_seq_set_lfs_dsi_data_storage_path();
01656 }
01657
01658 path = DATA_SEND_FILE_PATH;
01659
01660
01661 full_name = (char *) malloc(sizeof(char)
01662 * (strlen(path) + strlen(file_name) + 1));
01663 if (!full_name) {
01664 perror("malloc");
01665 return -1;
01666 }
01667
01668 strcpy(full_name, path);
01669 full_name[strlen(path)] = '\0';
01670
01671 strcat(full_name, file_name);
01672 full_name[strlen(path) + strlen(file_name)] = '\0';
01673
01674
01675 if (gs_seq_save_data_into_file(
01676 full_name, argptr, problem->major, my_dsig) < 0) {
01677 fprintf(stderr, "error saving data to file\n");
01678 return -1;
01679 }
01680
01681
01682
01683 obj = gs_seq_create_data_handle(
01684 argptr, path, file_name, problem->major);
01685 if (!obj) {
01686 fprintf(stderr, "error creating data handle\n");
01687 return -1;
01688 }
01689
01690
01691 data_handle = gs_seq_encode_lfs_dsi_object(obj);
01692
01693
01694 if (!data_handle) {
01695 fprintf(stderr, "bad data handle - server side\n");
01696 return -1;
01697 }
01698
01699
01700 if (gs_send_string(sock, data_handle) < 0) {
01701 fprintf(stderr, "error in sending data handle - server side\n");
01702 return -1;
01703 }
01704 }
01705 else {
01706 if(gs_send_arg(sock, argptr, my_dsig) < 0) {
01707 fprintf(stderr, "error in sending output arguments\n");
01708 return -1;
01709 }
01710 }
01711 }
01712 }
01713
01714 return 0;
01715 }
01716
01734 int
01735 gs_recv_input_scalar_args(SOCKET sock, gs_problem_t *problem, int sender_dsig,
01736 int my_dsig, int *sender_major_p)
01737 {
01738 gs_argument_t *argptr;
01739 char *sender_callsig;
01740 int sender_major;
01741
01742 if(!problem)
01743 return -1;
01744
01745 if(gs_recv_tag(sock, &sender_major) < 0) {
01746 ERRPRINTF("error in receiving major\n");
01747 return -1;
01748 }
01749 *sender_major_p = sender_major;
01750
01751 if(gs_recv_string(sock, &sender_callsig) < 0) {
01752 ERRPRINTF("error in receiving sender's call signature\n");
01753 return -1;
01754 }
01755
01756 gs_set_dsi_flags(problem, sender_callsig);
01757
01758 gs_set_pass_back_flags(problem, sender_callsig);
01759 server_callsig = strdup(sender_callsig);
01760
01761
01762
01763
01764
01765 for(argptr = problem->arglist; argptr != NULL; argptr = argptr->next)
01766 {
01767 if(((argptr->inout == GS_IN) || (argptr->inout == GS_INOUT)) &&
01768 (argptr->objecttype == GS_SCALAR))
01769 {
01770
01771 argptr->rows = argptr->cols = 1;
01772 argptr->data = NULL;
01773
01774 if(gs_recv_arg(sock, argptr, sender_major, sender_dsig, my_dsig,
01775 GS_SERVER_SIDE) < 0)
01776 {
01777 ERRPRINTF("error in receiving input scalar argument\n");
01778 return -1;
01779 }
01780 }
01781 }
01782
01783 return 0;
01784 }
01785
01798 int
01799 gs_recv_input_args(SOCKET sock, gs_problem_t *problem, int sender_dsig,
01800 int my_dsig)
01801 {
01802 gs_argument_t *argptr;
01803 int sender_major;
01804 char *data_handle;
01805
01806 if(!problem)
01807 return -1;
01808
01809
01810 if (gs_recv_input_scalar_args(sock, problem, sender_dsig,
01811 my_dsig, &sender_major) < 0)
01812 {
01813 ERRPRINTF("Error in receiving input scalar args\n");
01814 return -1;
01815 }
01816
01817
01818
01819
01820 if(gs_receiver_compute_arg_sizes(problem, GS_IN) < 0)
01821 {
01822 ERRPRINTF("error computing argument sizes.\n");
01823 return -1;
01824 }
01825
01826
01827
01828 for(argptr = problem->arglist; argptr != NULL; argptr = argptr->next)
01829 {
01830 if(((argptr->inout == GS_IN) || (argptr->inout == GS_INOUT)) &&
01831 (argptr->objecttype != GS_SCALAR))
01832 {
01833
01834
01835
01836 if (argptr->data_handle) {
01837
01838 if (gs_recv_string(sock, &data_handle) < 0) {
01839 ERRPRINTF("error receiving data handle - server side\n");
01840 return -1;
01841 }
01842
01843 if (!data_handle) {
01844 ERRPRINTF("bad data handle\n");
01845 return -1;
01846 }
01847
01848
01849
01850 if (gs_send_data_transfer_request(data_handle, argptr, my_dsig) < 0) {
01851 return -1;
01852 }
01853 }
01854 else {
01855 if(gs_recv_arg(sock, argptr, sender_major, sender_dsig, my_dsig,
01856 GS_SERVER_SIDE) < 0)
01857 {
01858 ERRPRINTF("error receiving input non-scalar arguments\n");
01859 return -1;
01860 }
01861 }
01862 }
01863
01864 if((argptr->objecttype == GS_FILE) && (argptr->inout == GS_OUT)) {
01865 argptr->data = strdup(argptr->name);
01866
01867
01868
01869
01870
01871 if(!argptr->data) {
01872 ERRPRINTF("error allocating space for output arg\n");
01873 return -1;
01874 }
01875 }
01876 else if((argptr->objecttype == GS_PACKEDFILE) && (argptr->inout == GS_OUT)) {
01877 char **data;
01878 int i;
01879
01880 argptr->data = malloc(argptr->rows * sizeof(char *));
01881 if(!argptr->data) {
01882 ERRPRINTF("error allocating memory for filenames.\n");
01883 return -1;
01884 }
01885
01886 data = (char **)argptr->data;
01887
01888 for(i = 0; i < argptr->rows; i++) {
01889 data[i] = dstring_sprintf("%s_%d", argptr->name, i);
01890 if(!data[i]) {
01891 ERRPRINTF("error allocating memory for filename.\n");
01892 return -1;
01893 }
01894 }
01895 }
01896 else if((argptr->inout == GS_OUT) || (argptr->inout == GS_WORKSPACE)) {
01897 int elsize;
01898
01899
01900
01901
01902
01903 elsize = gs_get_element_size(argptr->datatype, my_dsig);
01904
01905 argptr->data = (char *)CALLOC(argptr->rows * argptr->cols, elsize);
01906
01907 if(!argptr->data) {
01908 ERRPRINTF("error allocating space for output arg\n");
01909 return -1;
01910 }
01911 }
01912 else if(argptr->inout == GS_VAROUT)
01913 {
01914 char **foo = (char **)malloc(sizeof(char*));
01915 argptr->data = foo;
01916 }
01917 }
01918
01919 return 0;
01920 }
01921
01932 int
01933 gs_dump_args(FILE *out, gs_problem_t *problem)
01934 {
01935 gs_argument_t *args;
01936 int i=0;
01937
01938 if(!out || !problem)
01939 return -1;
01940
01941 fprintf(out, "Arguments:\n");
01942
01943 for(args = problem->arglist; args != NULL; args = args->next)
01944 {
01945 fprintf(out, "arg %d: name='%s', rowexp='%s', ",
01946 i, args->name, args->rowexp);
01947 fprintf(out, "colexp='%s', rows=%d, cols=%d, major=%c\n",
01948 args->colexp, args->rows, args->cols, args->prob->major);
01949
01950 if(!args->data) continue;
01951
01952 switch(args->datatype) {
01953 case GS_INT:
01954 for(i=0;i<args->rows*args->cols;i++)
01955 fprintf(out,"%d\n", ((int*)args->data)[i]);
01956 break;
01957 case GS_FLOAT:
01958 for(i=0;i<args->rows*args->cols;i++)
01959 fprintf(out,"%f\n", ((float*)args->data)[i]);
01960 break;
01961 case GS_DOUBLE:
01962 for(i=0;i<args->rows*args->cols;i++)
01963 fprintf(out,"%g\n", ((double*)args->data)[i]);
01964 break;
01965 case GS_SCOMPLEX:
01966 for(i=0;i<args->rows*args->cols;i++)
01967 fprintf(out,"%g + %gi\n", ((gs_scomplex*)args->data)[i].r,
01968 ((gs_scomplex*)args->data)[i].i);
01969 break;
01970 case GS_DCOMPLEX:
01971 for(i=0;i<args->rows*args->cols;i++)
01972 fprintf(out,"%g + %gi\n", ((gs_dcomplex*)args->data)[i].r,
01973 ((gs_dcomplex*)args->data)[i].i);
01974 break;
01975 case GS_CHAR:
01976 for(i=0;i<args->rows*args->cols;i++)
01977 fprintf(out,"%c\n", ((char*)args->data)[i]);
01978 break;
01979 case GS_BAD_DTYPE:
01980 fprintf(out,"[INVALID DATA TYPE]\n");
01981 break;
01982 }
01983 }
01984
01985 return 0;
01986 }
01987
02001 int
02002 gs_construct_scalar_hashtable(icl_hash_t **new_hash, gs_problem_t *prob_desc,
02003 enum inout cond)
02004 {
02005 gs_argument_t *argptr;
02006 icl_hash_t *symtab;
02007
02008 if(!prob_desc)
02009 return -1;
02010
02011 symtab = icl_hash_create(11, NULL);
02012
02013 if(!symtab)
02014 return -1;
02015
02016 for(argptr = prob_desc->arglist; argptr != NULL; argptr=argptr->next)
02017 {
02018 if(((argptr->inout == GS_IN) || (argptr->inout == cond) ||
02019 (argptr->inout == GS_INOUT)) && (argptr->objecttype == GS_SCALAR))
02020 {
02021 if(!argptr->data) {
02022 icl_hash_destroy(symtab, NULL, NULL);
02023 return -1;
02024 }
02025
02026 switch(argptr->datatype) {
02027 case GS_INT:
02028 argptr->expr_val = (double) *((int *)(argptr->data));
02029 break;
02030 case GS_CHAR:
02031 argptr->expr_val = (double) *((char *)(argptr->data));
02032 break;
02033 case GS_FLOAT:
02034 argptr->expr_val = (double) *((float *)(argptr->data));
02035 break;
02036 case GS_DOUBLE:
02037 argptr->expr_val = (double) *((double *)(argptr->data));
02038 break;
02039 case GS_SCOMPLEX:
02040 argptr->expr_val = (double) ((gs_scomplex *)(argptr->data))->r;
02041 break;
02042 case GS_DCOMPLEX:
02043 argptr->expr_val = (double) ((gs_dcomplex *)(argptr->data))->r;
02044 break;
02045 default:
02046 ERRPRINTF("Bad data type\n");
02047 icl_hash_destroy(symtab, NULL, NULL);
02048 return -1;
02049 }
02050 }
02051 else
02052 argptr->expr_val = 0.0;
02053
02054 icl_hash_insert(symtab, argptr->name, &(argptr->expr_val));
02055 }
02056
02057 *new_hash = symtab;
02058
02059 return 0;
02060 }
02061
02075 int
02076 gs_receiver_compute_arg_sizes(gs_problem_t *prob_desc, enum inout cond)
02077 {
02078 icl_hash_t *symtab;
02079
02080 if(!prob_desc)
02081 return -1;
02082
02083 if(gs_construct_scalar_hashtable(&symtab, prob_desc, cond) < 0) {
02084 ERRPRINTF("error setting up hash table.\n");
02085 return -1;
02086 }
02087
02088 if(gs_setup_workspace(prob_desc, symtab) < 0) {
02089 ERRPRINTF("error setting up workspace for matrix transpose.\n");
02090 return -1;
02091 }
02092
02093 icl_hash_destroy(symtab, NULL, NULL);
02094
02095 return 0;
02096 }
02097
02116 int
02117 gs_sender_compute_arg_sizes(gs_va_list *ap, void **argstack, gs_problem_t *prob_desc,
02118 int language, int major)
02119 {
02120 gs_argument_t *argptr;
02121 icl_hash_t *symtab;
02122 int i, j;
02123
02124 if (prob_desc->size_computed) return 0;
02125
02126 if(!prob_desc)
02127 return -1;
02128
02129 symtab = icl_hash_create(11, NULL);
02130
02131 if(!symtab)
02132 return -1;
02133
02134 prob_desc->major = major;
02135
02136 i = -1;
02137 j = 0;
02138 for(argptr=prob_desc->arglist; argptr != NULL; argptr=argptr->next)
02139 {
02140 argptr->prob = prob_desc;
02141
02142
02143
02144
02145 if(argptr->inout == GS_WORKSPACE) {
02146 argptr->data = NULL;
02147 continue;
02148 }
02149
02150
02151 i++;
02152
02153 if((argptr->objecttype == GS_SCALAR) && (argptr->inout == GS_IN))
02154 {
02155 switch(argptr->datatype) {
02156 case GS_INT:
02157 if(ap) {
02158 if(language == GS_CALL_FROM_C) {
02159 argptr->scalar_val.int_val = (int)va_arg(ap->args, int);
02160 } else
02161 argptr->scalar_val.int_val = *((int*)va_arg(ap->args, int*));
02162 }
02163 else {
02164 argptr->scalar_val.int_val = *((int *)argstack[i]);
02165 }
02166
02167 argptr->data = &(argptr->scalar_val.int_val);
02168 argptr->expr_val = (double) argptr->scalar_val.int_val;
02169 break;
02170 case GS_CHAR:
02171 if(ap) {
02172
02173 if(language == GS_CALL_FROM_C)
02174 argptr->scalar_val.char_val = (char)va_arg(ap->args, int);
02175 else
02176 argptr->scalar_val.char_val = *((char*)va_arg(ap->args, char*));
02177 }
02178 else {
02179 argptr->scalar_val.char_val = *((char *)argstack[i]);
02180 }
02181
02182 argptr->data = &(argptr->scalar_val.char_val);
02183 argptr->expr_val = (double) argptr->scalar_val.char_val;
02184 break;
02185 case GS_FLOAT:
02186 if(ap) {
02187
02188 if(language == GS_CALL_FROM_C)
02189 argptr->scalar_val.float_val = (float)va_arg(ap->args, double);
02190 else
02191 argptr->scalar_val.float_val = *((float*)va_arg(ap->args, float*));
02192 }
02193 else {
02194 argptr->scalar_val.float_val = *((float *)argstack[i]);
02195 }
02196
02197 argptr->data = &(argptr->scalar_val.float_val);
02198 argptr->expr_val = (double) argptr->scalar_val.float_val;
02199 break;
02200 case GS_DOUBLE:
02201 if(ap) {
02202 if(language == GS_CALL_FROM_C)
02203 argptr->scalar_val.double_val = (double)va_arg(ap->args, double);
02204 else
02205 argptr->scalar_val.double_val = *((double*)va_arg(ap->args, double*));
02206 }
02207 else {
02208 argptr->scalar_val.double_val = *((double *)argstack[i]);
02209 }
02210
02211 argptr->data = &(argptr->scalar_val.double_val);
02212 argptr->expr_val = argptr->scalar_val.double_val;
02213 break;
02214 case GS_SCOMPLEX:
02215 if(ap) {
02216 if(language == GS_CALL_FROM_C)
02217 argptr->scalar_val.scomplex_val = va_arg(ap->args, gs_scomplex);
02218 else
02219 argptr->scalar_val.scomplex_val = *((gs_scomplex*)va_arg(ap->args, gs_scomplex*));
02220 }
02221 else {
02222 argptr->scalar_val.scomplex_val = *((gs_scomplex *)argstack[i]);
02223 }
02224
02225 argptr->data = &(argptr->scalar_val.scomplex_val);
02226 argptr->expr_val = argptr->scalar_val.scomplex_val.r;
02227 break;
02228 case GS_DCOMPLEX:
02229 if(ap) {
02230 if(language == GS_CALL_FROM_C)
02231 argptr->scalar_val.dcomplex_val = va_arg(ap->args, gs_dcomplex);
02232 else
02233 argptr->scalar_val.dcomplex_val = *((gs_dcomplex*)va_arg(ap->args, gs_dcomplex*));
02234 }
02235 else {
02236 argptr->scalar_val.dcomplex_val = *((gs_dcomplex *)argstack[i]);
02237 }
02238
02239 argptr->data = &(argptr->scalar_val.dcomplex_val);
02240 argptr->expr_val = argptr->scalar_val.dcomplex_val.r;
02241 break;
02242 default:
02243 ERRPRINTF("Bad data type\n");
02244 return -1;
02245 }
02246
02247 icl_hash_insert(symtab, argptr->name, &(argptr->expr_val));
02248 }
02249 else {
02250 if(argptr->datatype == GS_INT) {
02251 if(ap)
02252 argptr->data = (int*)va_arg(ap->args, int*);
02253 else
02254 argptr->data = (int*)argstack[i];
02255 }
02256 else if(argptr->datatype == GS_CHAR) {
02257 if(ap)
02258 argptr->data = (char*)va_arg(ap->args, char*);
02259 else
02260 argptr->data = (char*)argstack[i];
02261 }
02262 else if(argptr->datatype == GS_FLOAT) {
02263 if(ap)
02264 argptr->data = (float*)va_arg(ap->args, float*);
02265 else
02266 argptr->data = (float*)argstack[i];
02267 }
02268 else if(argptr->datatype == GS_DOUBLE) {
02269 if(ap)
02270 argptr->data = (double*)va_arg(ap->args, double*);
02271 else
02272 argptr->data = (double*)argstack[i];
02273 }
02274 else if(argptr->datatype == GS_SCOMPLEX) {
02275 if(ap)
02276 argptr->data = (gs_scomplex*)va_arg(ap->args, gs_scomplex*);
02277 else
02278 argptr->data = (gs_scomplex*)argstack[i];
02279 }
02280 else if(argptr->datatype == GS_DCOMPLEX) {
02281 if(ap)
02282 argptr->data = (gs_dcomplex*)va_arg(ap->args, gs_dcomplex*);
02283 else
02284 argptr->data = (gs_dcomplex*)argstack[i];
02285 }
02286
02287
02288
02289
02290
02291 if((argptr->objecttype == GS_SCALAR) && (argptr->inout == GS_OUT)) {
02292 argptr->expr_val = 0.0;
02293 icl_hash_insert(symtab, argptr->name, &(argptr->expr_val));
02294 }
02295 }
02296 }
02297
02298 if(ap)
02299 va_end(ap->args);
02300
02301 if(gs_setup_workspace(prob_desc, symtab) < 0) {
02302 ERRPRINTF("problem setting up workspace for matrix transpose.\n");
02303 return -1;
02304 }
02305
02306 icl_hash_destroy(symtab, NULL, NULL);
02307
02308 return 0;
02309 }
02310
02322 int
02323 gs_transpose_matrix(gs_argument_t *arg, char *data, gs_side_t side)
02324 {
02325 int *move,mn,iwrk,iok,rows,cols;
02326
02327 if(!arg)
02328 return -1;
02329
02330 mn = arg->rows * arg->cols;
02331
02332 if(!arg->prob->work) {
02333 iwrk = (arg->rows + arg->cols)/2;
02334 move = (int*)malloc(iwrk*sizeof(int));
02335 }
02336 else {
02337 move = arg->prob->work;
02338 iwrk = arg->prob->worksize;
02339 }
02340
02341 if(!move)
02342 return -1;
02343
02344 if(side == GS_CLIENT_SIDE) {
02345 rows = arg->rows;
02346 cols = arg->cols;
02347 }
02348 else {
02349 rows = arg->cols;
02350 cols = arg->rows;
02351 }
02352
02353 iok = -1;
02354
02355 switch(arg->datatype) {
02356 case GS_INT:
02357 itrans_(data,&rows,&cols,&mn,move,&iwrk,&iok);
02358 break;
02359 case GS_DOUBLE:
02360 dtrans_(data,&rows,&cols,&mn,move,&iwrk,&iok);
02361 break;
02362 case GS_DCOMPLEX:
02363 dctrans_(data,&rows,&cols,&mn,move,&iwrk,&iok);
02364 break;
02365 case GS_SCOMPLEX:
02366 sctrans_(data,&rows,&cols,&mn,move,&iwrk,&iok);
02367 break;
02368 case GS_FLOAT:
02369 strans_(data,&rows,&cols,&mn,move,&iwrk,&iok);
02370 break;
02371 case GS_CHAR:
02372 ctrans_(data,&rows,&cols,&mn,move,&iwrk,&iok);
02373 break;
02374 default:
02375 ERRPRINTF("gs_transpose_matrix: bad data type\n");
02376 }
02377
02378 if(!arg->prob->work)
02379 free(move);
02380
02381 return iok ? -1 : 0;
02382 }
02383
02401 int
02402 gs_save_input_args_to_file(char *name, gs_problem_t *problem, int my_dsig,
02403 int lang, int major)
02404 {
02405 gs_argument_t *argptr;
02406 gs_problem_t *pcopy;
02407 int fd;
02408
02409 fd = open(name, O_WRONLY | O_CREAT | O_TRUNC, 0666);
02410 if(fd < 0) {
02411 ERRPRINTF("File '%s' could not be opened\n", name);
02412 return -1;
02413 }
02414
02415 pcopy = (gs_problem_t *) malloc(sizeof(gs_problem_t));
02416 if(!pcopy) {
02417 close(fd);
02418 return -1;
02419 }
02420
02421 if(gs_dup_problem(pcopy, problem) < 0) {
02422 ERRPRINTF("Failed to duplicate problem\n");
02423 close(fd);
02424 free(pcopy);
02425 return -1;
02426 }
02427
02428
02429
02430
02431
02432 for(argptr=pcopy->arglist; argptr != NULL; argptr=argptr->next)
02433 argptr->dsi = 0;
02434
02435 if(gs_send_input_args(NULL, NULL, fd, pcopy, my_dsig, lang, major) < 0) {
02436 ERRPRINTF("Failed to save input args to file\n");
02437 gs_free_problem(pcopy);
02438 close(fd);
02439 return -1;
02440 }
02441
02442 gs_free_problem(pcopy);
02443 close(fd);
02444 return 0;
02445 }
02446
02459 int
02460 gs_restore_input_args_from_file(char *name, gs_problem_t *problem, int sender_dsig,
02461 int my_dsig)
02462 {
02463 int fd;
02464 char *c;
02465
02466 fd = open(name, O_RDONLY, 0666);
02467 if(fd < 0) {
02468 ERRPRINTF("File '%s' could not be opened\n", name);
02469 return -1;
02470 }
02471
02472
02473 c = problem->callsig;
02474 while(*c) {
02475 if(*c == 'D')
02476 *c = 'P';
02477 c++;
02478 }
02479
02480 if(gs_recv_input_args(fd, problem, sender_dsig, my_dsig) < 0) {
02481 ERRPRINTF("Error reading input args from file\n");
02482 return -1;
02483 }
02484
02485 return 0;
02486 }
02487
02488
02502 int
02503 gs_save_output_args_to_file(int fd, gs_problem_t *problem, int my_dsig)
02504 {
02505 gs_argument_t *argptr;
02506
02507 if(!problem)
02508 return -1;
02509
02510 if(gs_send_tag(fd, problem->major) < 0) {
02511 ERRPRINTF("failed to send major\n");
02512 return -1;
02513 }
02514
02515
02516
02517
02518
02519 for(argptr = problem->arglist; argptr != NULL; argptr = argptr->next)
02520 {
02521 if(((argptr->inout == GS_IN) || (argptr->inout == GS_INOUT) ||
02522 (argptr->inout == GS_OUT)) && (argptr->objecttype == GS_SCALAR))
02523 {
02524 if(gs_send_arg(fd, argptr, my_dsig) < 0) {
02525 ERRPRINTF("error in writing input argument\n");
02526 return -1;
02527 }
02528 }
02529 }
02530
02531 if(gs_receiver_compute_arg_sizes(problem, GS_OUT) < 0)
02532 {
02533 ERRPRINTF("could not compute argument sizes.\n");
02534 return -1;
02535 }
02536
02537 for(argptr = problem->arglist; argptr != NULL; argptr = argptr->next)
02538 {
02539 if((argptr->inout == GS_INOUT) || (argptr->inout == GS_OUT) ||
02540 (argptr->inout == GS_VAROUT))
02541 {
02542 if(argptr->objecttype == GS_FILE) {
02543 if(gs_send_string(fd, argptr->data) < 0) {
02544 ERRPRINTF("error in writing file name\n");
02545 return -1;
02546 }
02547 }
02548 else if(argptr->objecttype == GS_PACKEDFILE) {
02549 int i;
02550
02551 if(gs_send_int(fd, argptr->rows) < 0) {
02552 ERRPRINTF("Error sending string length\n");
02553 return -1;
02554 }
02555
02556 for(i = 0; i < argptr->rows; i++) {
02557 if(gs_send_string(fd, ((char **)(argptr->data))[i]) < 0) {
02558 ERRPRINTF("error in writing file name\n");
02559 return -1;
02560 }
02561 }
02562 }
02563 else {
02564 if(gs_send_arg(fd, argptr, my_dsig) < 0) {
02565 ERRPRINTF("error in writing input argument\n");
02566 return -1;
02567 }
02568 }
02569 }
02570 }
02571
02572 return 0;
02573 }
02574
02588 int
02589 gs_restore_output_args_from_file(int fd, gs_problem_t *problem,
02590 int my_dsig)
02591 {
02592 gs_argument_t *argptr;
02593 int sender_major;
02594
02595 if(!problem)
02596 return -1;
02597
02598 if(gs_recv_tag(fd, &sender_major) < 0) {
02599 ERRPRINTF("error in receiving major\n");
02600 return -1;
02601 }
02602
02603 for(argptr = problem->arglist; argptr != NULL; argptr = argptr->next)
02604 {
02605 if(((argptr->inout == GS_IN) || (argptr->inout == GS_INOUT) ||
02606 (argptr->inout == GS_OUT)) && (argptr->objecttype == GS_SCALAR))
02607 {
02608 argptr->rows = argptr->cols = 1;
02609 argptr->data = NULL;
02610
02611 if(gs_recv_arg(fd, argptr, sender_major, my_dsig, my_dsig,
02612 GS_CLIENT_SIDE) < 0)
02613 {
02614 ERRPRINTF("error in reading input argument\n");
02615 return -1;
02616 }
02617 }
02618 }
02619
02620 if(gs_receiver_compute_arg_sizes(problem, GS_OUT) < 0)
02621 {
02622 ERRPRINTF("could not compute argument sizes.\n");
02623 return -1;
02624 }
02625
02626 for(argptr = problem->arglist; argptr != NULL; argptr = argptr->next)
02627 {
02628 if((argptr->inout == GS_INOUT) || (argptr->inout == GS_OUT) ||
02629 (argptr->inout == GS_VAROUT))
02630 {
02631 if(argptr->objecttype == GS_FILE) {
02632 char *fname;
02633
02634 if(gs_recv_string(fd, &fname) < 0) {
02635 ERRPRINTF("error in reading file name\n");
02636 return -1;
02637 }
02638
02639 argptr->data = fname;
02640 }
02641 else if(argptr->objecttype == GS_PACKEDFILE) {
02642 char *fname;
02643 int i, n;
02644
02645 if(gs_recv_int(fd, &n) < 0) {
02646 ERRPRINTF("error in reading number of packed files\n");
02647 return -1;
02648 }
02649
02650 argptr->data = malloc(n * sizeof(char *));
02651
02652 for(i = 0; i < n; i++) {
02653 if(gs_recv_string(fd, &fname) < 0) {
02654 ERRPRINTF("error in reading file name\n");
02655 return -1;
02656 }
02657
02658 ((char **)(argptr->data))[i] = fname;
02659 }
02660 }
02661 else {
02662 if(gs_recv_arg(fd, argptr, sender_major, my_dsig, my_dsig,
02663 GS_CLIENT_SIDE) < 0)
02664 {
02665 ERRPRINTF("error in reading input argument\n");
02666 return -1;
02667 }
02668 }
02669 }
02670 }
02671
02672 return 0;
02673 }
02674
02687 int
02688 gs_setup_workspace(gs_problem_t *prob_desc, icl_hash_t *symtab)
02689 {
02690 int worksz, row_max = 0, col_max = 0;
02691 gs_argument_t *argptr;
02692
02693 if(!prob_desc)
02694 return -1;
02695
02696 for(argptr = prob_desc->arglist; argptr != NULL; argptr=argptr->next)
02697 {
02698
02699
02700 if((gs_expr_i(argptr->rowexp, &(argptr->rows), symtab) < 0) ||
02701 (gs_expr_i(argptr->colexp, &(argptr->cols), symtab) < 0))
02702 {
02703 ERRPRINTF("Error parsing dimension expression\n");
02704 return -1;
02705 }
02706
02707
02708 if(argptr->objecttype == GS_SPARSEMATRIX) {
02709 if((gs_expr_i(argptr->sparse_attr.nnzexp, &(argptr->sparse_attr.nnz), symtab) < 0)) {
02710 ERRPRINTF("Error parsing NNZ expression\n");
02711 return -1;
02712 }
02713
02714 argptr->sparse_attr.rows_val_saved = argptr->rows;
02715 argptr->sparse_attr.cols_val_saved = argptr->cols;
02716 argptr->rows = argptr->sparse_attr.nnz;
02717 argptr->cols = 1;
02718 }
02719
02720 if(argptr->rows > row_max)
02721 row_max = argptr->rows;
02722 if(argptr->cols > col_max)
02723 col_max = argptr->cols;
02724 }
02725
02726 worksz = (row_max+col_max)/2;
02727
02728
02729
02730
02731
02732 if(!prob_desc->work) {
02733 prob_desc->work = (int *)malloc(worksz *sizeof(int));
02734 prob_desc->worksize = worksz;
02735 }
02736 else if(prob_desc->worksize < worksz) {
02737 prob_desc->work = (int *)realloc(prob_desc->work, worksz *sizeof(int));
02738 prob_desc->worksize = worksz;
02739 }
02740
02741 if(!prob_desc->work)
02742 prob_desc->worksize = 0;
02743
02744 return 0;
02745 }
02746
02747
02748
02749
02750
02751
02752 #ifdef GS_SMART_GRIDSOLVE
02753
02754
02766 int gs_smart_send_tg(SOCKET sock, gs_smart_tg * task_graph){
02767
02768 gs_smart_tg_task_node * task_node;
02769 gs_smart_tg_remote_task_node * tg_rem_task_node;
02770 char * task_node_str=NULL;
02771 char * problem_str=NULL;
02772 char * server_str=NULL;
02773 int i, j;
02774 if(!task_graph) return -1;
02775
02776 if(gs_send_int(sock, task_graph->nb_nodes) < 0) {
02777 DBGPRINTF("failed to send number of task nodes\n");
02778 return -1;
02779 }
02780
02781 if(gs_send_int(sock, task_graph->nb_rem_nodes) < 0) {
02782 DBGPRINTF("failed to send number of task nodes\n");
02783 return -1;
02784 }
02785
02786
02787 for(i=0; i< task_graph->nb_nodes;i++){
02788 if(!task_graph->task_nodes[i]) return -1;
02789
02790 task_node=task_graph->task_nodes[i];
02791
02792 if(gs_send_int(sock,task_node->node_type) < 0) {
02793 ERRPRINTF("SMART: Failed to send task local\n");
02794 return -1;
02795 }
02796 if(gs_send_int(sock,task_node->id) < 0) {
02797 ERRPRINTF("SMART: Failed to send task local\n");
02798 return -1;
02799 }
02800
02801 if(task_node->node_type==GS_SMART_TG_REM_TASK_NODE){
02802
02803 if(!task_node->tg_rem_task_node) return -1;
02804
02805 tg_rem_task_node=task_node->tg_rem_task_node;
02806 if(gs_smart_encode_tg_rem_task_node(&task_node_str, task_node)<0){
02807 ERRPRINTF("SMART: Error encoding remote task node\n");
02808 if(task_node_str) free(task_node_str);
02809 return -1;
02810 }
02811 if(!task_node_str) return -1;
02812
02813 if(gs_send_string(sock, task_node_str) < 0) {
02814 ERRPRINTF("Unsuccessful (Sending Rem Task Node to Client)\n");
02815 if(task_node_str) free(task_node_str);
02816 return -1;
02817 }
02818 if(task_node_str) free(task_node_str);
02819
02820 if(!tg_rem_task_node->problem) return -1;
02821
02822 if(gs_encode_problem(&problem_str, tg_rem_task_node->problem)<0){
02823 ERRPRINTF("SMART: Error encoding problem\n");
02824 if(problem_str) free(problem_str);
02825 return -1;
02826 }
02827 if(!problem_str) return -1;
02828
02829 if(gs_send_string(sock, problem_str) < 0) {
02830 ERRPRINTF("Unsuccessful (Sending Rem Problem STring to Client)\n");
02831 free(problem_str);
02832 return -1;
02833 }
02834
02835 if(problem_str) free(problem_str);
02836
02837 for(j=0;j<tg_rem_task_node->nb_servers;j++){
02838 if(gs_encode_server(&server_str, tg_rem_task_node->avail_servers[j])<0){
02839 ERRPRINTF("SMART: Error encoding server\n");
02840 return -1;
02841 }
02842 if(gs_send_string(sock, server_str) < 0) {
02843 ERRPRINTF("Unsuccessful (Sending Server String to Client)\n");
02844 free(server_str);
02845 return -1;
02846 }
02847 if(server_str) free(server_str);
02848 if(gs_send_string(sock, tg_rem_task_node->avail_servers[j]->my_ping_str) < 0) {
02849 ERRPRINTF("Unsuccessful (Sending Server String to Client)\n");
02850 return -1;
02851 }
02852 }
02853 }
02854 else if(task_node->node_type==GS_SMART_TG_CLIENT_NODE){
02855 if(gs_smart_encode_tg_client_node(&task_node_str, task_node)<0){
02856 ERRPRINTF("SMART: Error encoding client task node\n");
02857 return -1;
02858 }
02859 if(!task_node_str) return -1;
02860
02861 if(gs_send_string(sock, task_node_str) < 0) {
02862 ERRPRINTF("Unsuccessful (Sending Rem Task Node to Client)\n");
02863 free(task_node_str);
02864 return -1;
02865 }
02866
02867 }
02868
02869 }
02870 return 0;
02871
02872 }
02873
02874
02886 int gs_smart_recv_tg(SOCKET sock, gs_smart_tg * task_graph){
02887 int i;
02888 gs_smart_tg_task_node * task_node;
02889 gs_smart_tg_remote_task_node * tg_rem_task_node;
02890 char * task_node_str=NULL;
02891 char * problem_str=NULL;
02892 int nb_servers;
02893 int j;
02894 char * server_str=NULL;
02895 if(!task_graph) return -1;
02896
02897 if(gs_recv_int(sock, &task_graph->nb_nodes) < 0){
02898 return -1;
02899 }
02900
02901 if(gs_recv_int(sock, &task_graph->nb_rem_nodes) < 0){
02902 return -1;
02903 }
02904
02905 task_graph->task_nodes = (gs_smart_tg_task_node **)
02906 calloc(task_graph->nb_nodes, sizeof(gs_smart_tg_task_node *));
02907
02908 if(!task_graph->task_nodes) return -1;
02909
02910 for(i=0; i< task_graph->nb_nodes;i++){
02911 task_graph->task_nodes[i]= (gs_smart_tg_task_node *)
02912 calloc(1, sizeof(gs_smart_tg_task_node ));
02913
02914 if(!task_graph->task_nodes[i]) return -1;
02915
02916 task_node=task_graph->task_nodes[i];
02917 if(gs_recv_int(sock, &task_node->node_type) < 0){
02918 return -1;
02919 }
02920
02921 if(gs_recv_int(sock, &task_node->id) < 0){
02922 return -1;
02923 }
02924
02925 if(task_node->node_type==GS_SMART_TG_REM_TASK_NODE){
02926 task_node->tg_rem_task_node=(gs_smart_tg_remote_task_node *)
02927 calloc(1, sizeof(gs_smart_tg_remote_task_node));
02928
02929 if(!task_node->tg_rem_task_node) return -1;
02930
02931 tg_rem_task_node=task_node->tg_rem_task_node;
02932
02933 if(gs_recv_string(sock, &(task_node_str)) < 0) {
02934 ERRPRINTF("SMART: gs_recv_string task_node_str\n");
02935 return -1;
02936 }
02937 if(gs_smart_decode_tg_rem_task_node(task_node_str, task_graph, task_node)<0){
02938 free(task_node_str);
02939 ERRPRINTF("SMART: gs_decode_rem_task_node error\n");
02940 return -1;
02941 }
02942
02943
02944 if(task_node_str) free(task_node_str);
02945 if(gs_recv_string(sock, &(problem_str)) < 0) {
02946 DBGPRINTF("SMART: gs_recv_string problem_str\n");
02947 return -1;
02948 }
02949 tg_rem_task_node->problem = (gs_problem_t *)
02950 CALLOC(1, sizeof(gs_problem_t));
02951
02952 if(!tg_rem_task_node->problem) return -1;
02953
02954 if(gs_decode_problem(problem_str, tg_rem_task_node->problem)<0){
02955 free(problem_str);
02956 DBGPRINTF("gs_decode_problem error\n");
02957 return -1;
02958 }
02959 free(problem_str);
02960 nb_servers=tg_rem_task_node->nb_servers;
02961 tg_rem_task_node->avail_servers=(gs_server_t **)
02962 CALLOC(nb_servers, sizeof(gs_server_t *));
02963
02964 for(j=0;j<nb_servers;j++){
02965 if(gs_recv_string(sock, &(server_str)) < 0) {
02966 ERRPRINTF("SMART Error: Unsuccessful (Receiving Server String)\n");
02967 free(server_str);
02968 return -1;
02969 }
02970
02971 tg_rem_task_node->avail_servers[j]= (gs_server_t *)
02972 CALLOC(1, sizeof(gs_server_t));
02973
02974 if(gs_decode_server(server_str, tg_rem_task_node->avail_servers[j])){
02975 ERRPRINTF("SMART Error: Unsuccessful (Decoding Server)\n");
02976 free(server_str);
02977 return -1;
02978 }
02979 free(server_str);
02980 if(gs_recv_string(sock, &(tg_rem_task_node->avail_servers[j]->my_ping_str)) < 0) {
02981 ERRPRINTF("SMART : Unsuccessful (Receiving Server String)\n");
02982 return -1;
02983 }
02984 }
02985 }
02986 else if(task_graph->task_nodes[i]->node_type==GS_SMART_TG_CLIENT_NODE){
02987 task_graph->task_nodes[i]->tg_client_node=(gs_smart_tg_client_node *)
02988 calloc(1, sizeof(gs_smart_tg_client_node));
02989
02990 if(!task_graph->task_nodes[i]->tg_client_node){
02991 fprintf(stderr, "Smart Error : Malloc client task node %d\n", i);
02992 return -1;
02993 }
02994
02995 if(gs_recv_string(sock, &(task_node_str)) < 0) {
02996 DBGPRINTF("gs_recv_string task_node_str\n");
02997 return -1;
02998 }
02999 if(gs_smart_decode_tg_client_node(task_node_str, task_graph, task_graph->task_nodes[i])<0){
03000 free(task_node_str);
03001 ERRPRINTF("SMART: gs_decode_loc_task_node error\n");
03002 return -1;
03003 }
03004 if(task_node_str) free(task_node_str);
03005
03006 }
03007
03008 }
03009 return 0;
03010 }
03011
03012
03013
03014
03015
03016
03017
03018
03019
03020
03042 int
03043 gs_smart_send_all_servers(SOCKET sock, gs_server_t **server_list, int count)
03044 {
03045 int i;
03046 if(gs_send_int(sock, count) < 0) {
03047 DBGPRINTF("failed to send number of servers\n");
03048 return -1;
03049 }
03050
03051 for(i = 0; i < count; i++) {
03052 char *srv = NULL;
03053 DBGPRINTF("Encoding server: %s.\n", server_list[i]->hostname);
03054 if(gs_encode_server(&srv, server_list[i]) < 0){
03055 FREE(srv);
03056 DBGPRINTF("Failed to send server list \n");
03057 return -1;
03058 }
03059 if(gs_send_string(sock, srv) < 0) {
03060 FREE(srv);
03061 DBGPRINTF("Failed to send server list \n");
03062 return -1;
03063 }
03064
03065
03066 if(server_list[i]->smart==1){
03067 if(gs_send_string(sock, server_list[i]->my_ping_str) < 0) {
03068 FREE(srv);
03069 DBGPRINTF("Failed to send server list \n");
03070 return -1;
03071 }
03072 }
03073 FREE(srv);
03074 }
03075
03076
03077
03078
03079
03080 return 0;
03081 }
03082
03083
03084
03085
03086
03087
03109 int gs_smart_recv_all_servers(SOCKET sock, gs_server_t *** _server_list,
03110 int *_num_servers){
03111 char *msg=NULL;
03112 int i,num_servers;
03113 if(gs_recv_int(sock, &num_servers) < 0){
03114 ERRPRINTF("SMART: Error receiving number of servers");
03115 return -1;
03116 }
03117 if(num_servers <= 0) {
03118 ERRPRINTF("SMART: Error no servers are currently registered\n");
03119 return -1;
03120 }
03121 gs_server_t ** server_list;
03122 server_list = (gs_server_t **) calloc(num_servers, sizeof(gs_server_t *));
03123 if(!server_list){
03124 ERRPRINTF("SMART: Error allocating memory for server list\n");
03125 return -1;
03126 }
03127 int smart_id=0;;
03128 for(i=0;i<num_servers;i++) {
03129 server_list[i] = (gs_server_t *) CALLOC(1,sizeof(gs_server_t));
03130 if(gs_recv_string(sock, &msg) < 0) {
03131 ERRPRINTF("SMART: Error receiving encoded server string\n");
03132 if(msg )free(msg);
03133 return -1;
03134 }
03135
03136 if(gs_decode_server(msg, server_list[i]) < 0) {
03137 ERRPRINTF("SMART: Error decoding server\n");
03138 if(msg) free(msg);
03139 return -1;
03140 }
03141 if(msg) free(msg);
03142 if(server_list[i]->smart==1){
03143 server_list[i]->smart_id=smart_id;
03144 smart_id++;
03145 if(gs_recv_string(sock, &server_list[i]->my_ping_str) < 0) {
03146 ERRPRINTF("SMART : Error recving ping string from agent\n");
03147 return -1;
03148 }
03149 }
03150
03151 }
03152 *_num_servers=num_servers;
03153 *_server_list=server_list;
03154 return 0;
03155 }
03156
03157
03158
03159
03160
03161
03162
03163
03164
03165
03166
03167
03168
03169
03170
03171
03172
03173
03174
03175
03176
03177
03178
03179
03180
03181 int gs_smart_send_map_info(SOCKET sock, gs_problem_t * problem){
03182
03183 gs_argument_t * argptr;
03184 char * enc_arg_map_info;
03185 int i;
03186 if(!problem || !problem->arglist) return -1;
03187 for(argptr=problem->arglist;argptr!=NULL;argptr=argptr->next) {
03188 if(argptr->objecttype!=GS_SCALAR) {
03189 if(!argptr){
03190 ERRPRINTF("SMART: Error, Arg NULL\n");
03191 }
03192 if(gs_smart_encode_arg_map_info(&enc_arg_map_info,argptr)<0){
03193 ERRPRINTF("SMART: failed to encode smart arg info\n");
03194 return -1;
03195 }
03196
03197 if(gs_send_string(sock, enc_arg_map_info) < 0) {
03198 ERRPRINTF("SMART: failed to send encoded smart arg info\n");
03199 return -1;
03200 }
03201
03202 if((argptr->input_arg_stored) || (argptr->input_arg_received_remotely)){
03203 if(gs_send_string(sock, argptr->input_arg_file) < 0) {
03204 ERRPRINTF("SMART: failed to send encoded smart arg info\n");
03205 return -1;
03206 }
03207 }
03208
03209 if(argptr->output_arg_sent_remotely){
03210 if(gs_send_string(sock, argptr->output_arg_file) < 0) {
03211 ERRPRINTF("SMART: failed to send encoded smart arg info\n");
03212 return -1;
03213 }
03214 for(i=0;i<argptr->output_nb_dest_servers;i++){
03215 if(gs_send_string(sock, argptr->enc_output_dest_servers[i]) < 0) {
03216 ERRPRINTF("SMART: failed to send encoded smart arg info\n");
03217 return -1;
03218 }
03219
03220 }
03221
03222 }
03223
03224 }
03225 }
03226
03227
03228 return 0;
03229 }
03230
03231
03232
03233
03234
03235
03236
03237
03238
03239
03240
03241
03242
03243
03244
03245
03246
03247
03248
03249
03250
03251
03252
03253
03254
03255
03256 int gs_smart_recv_map_info(SOCKET sock, gs_problem_t * problem){
03257 char * enc_arg_map_info;
03258 gs_argument_t * argptr;
03259 int i;
03260 if(!problem || !problem->arglist) return -1;
03261 for(argptr=problem->arglist;argptr!=NULL;argptr=argptr->next) {
03262 if(argptr->objecttype!=GS_SCALAR){
03263 if(!argptr){
03264 ERRPRINTF("SMART: Error, Arg NULL\n");
03265 }
03266
03267 if(gs_recv_string(sock, &enc_arg_map_info)<0){
03268 ERRPRINTF("SMART: Error receiving encoded smart arg info.\n");
03269 return -1;
03270 }
03271
03272 if(gs_smart_decode_arg_map_info(enc_arg_map_info, argptr)<0){
03273 ERRPRINTF("SMART: Error decoding smart arg info.\n");
03274 return -1;
03275 }
03276
03277 if((argptr->input_arg_stored) || (argptr->input_arg_received_remotely)){
03278 if(gs_recv_string(sock, &argptr->input_arg_file) < 0) {
03279 ERRPRINTF("SMART: failed to recv encoded smart arg info\n");
03280 return -1;
03281 }
03282 }
03283
03284 argptr->enc_output_dest_servers=(char **)calloc(argptr->output_nb_dest_servers, sizeof(char *));
03285
03286 if(argptr->output_arg_sent_remotely){
03287 if(gs_recv_string(sock, &argptr->output_arg_file) < 0) {
03288 ERRPRINTF("SMART: failed to recv encoded smart arg info\n");
03289 return -1;
03290 }
03291 for(i=0;i<argptr->output_nb_dest_servers;i++){
03292 if(gs_recv_string(sock, &argptr->enc_output_dest_servers[i]) < 0) {
03293 ERRPRINTF("SMART: failed to recv encoded smart arg info\n");
03294 return -1;
03295 }
03296
03297 }
03298
03299 }
03300 }
03301 }
03302 return 0;
03303 }
03304
03305
03306
03307
03308
03309
03310
03311
03312
03313
03314
03315
03316
03317
03318
03319
03320
03321
03322
03323
03324
03325
03326
03327
03328
03329
03330
03331
03332
03333
03334
03335
03336
03337 int
03338 gs_smart_send_input_args(gs_va_list *args, void **argstack, SOCKET sock,
03339 gs_problem_t *problem, int my_dsig, int lang, int major)
03340 {
03341
03342 if(!problem)
03343 return -1;
03344
03345
03346 if(gs_send_input_scalar_args(args, argstack, sock, problem,
03347 my_dsig, lang, major) < 0)
03348 return -1;
03349
03350
03351
03352 if(gs_smart_send_input_nonscalar_args(sock, problem, my_dsig) < 0)
03353 return -1;
03354
03355 return 0;
03356 }
03357
03358
03380 int
03381 gs_smart_recv_input_args(SOCKET sock, gs_server_t * src_server, gs_problem_t *problem, int sender_dsig,
03382 int my_dsig){
03383
03384 gs_argument_t *argptr;
03385 int sender_major;
03386 int arg_count=0;
03387
03388 if(!problem)
03389 return -1;
03390
03391 if (gs_recv_input_scalar_args(sock, problem, sender_dsig,
03392 my_dsig, &sender_major) < 0)
03393 {
03394 ERRPRINTF("Error in receiving input scalar args\n");
03395 return -1;
03396 }
03397
03398 if(gs_receiver_compute_arg_sizes(problem, GS_IN) < 0)
03399 {
03400 ERRPRINTF("error computing argument sizes.\n");
03401 return -1;
03402 }
03403
03404 for(argptr = problem->arglist; argptr != NULL; argptr = argptr->next)
03405 {
03406 if(((argptr->inout == GS_IN) || (argptr->inout == GS_INOUT)) &&
03407 (argptr->objecttype != GS_SCALAR))
03408 {
03409 argptr->data = NULL;
03410 if(!argptr->input_arg_received_remotely){
03411 if(gs_recv_arg(sock, argptr, sender_major, sender_dsig, my_dsig,
03412 GS_SERVER_SIDE) < 0){
03413 ERRPRINTF("error receiving input non-scalar arguments\n");
03414 return -1;
03415 }
03416 }
03417 else{
03418 int arg_in_mem;
03419 LOGPRINTF("SMART : Problem %s receiving arg %s from server memory\n", problem->name, argptr->name);
03420 if(gs_smart_recv_arg_from_s_mem(src_server, argptr, sender_major, my_dsig,
03421 GS_SERVER_SIDE, &arg_in_mem)<0){
03422 ERRPRINTF("SMART : Error receiving arg from server memory\n");
03423 return -1;
03424 }
03425 if(arg_in_mem==0){
03426 LOGPRINTF("SMART : Problem %s receiving arg %s from file %s\n", problem->name, argptr->name, argptr->input_arg_file);
03427 if(gs_smart_read_arg_from_file(src_server, argptr->input_arg_file, argptr, sender_major, my_dsig)<0){
03428 ERRPRINTF("SMART : Error reading arg from file\n");
03429 return -1;
03430 }
03431 }
03432
03433
03434 }
03435
03436
03437 if(argptr->input_arg_stored==1){
03438 int s_mem_full=0;
03439 argptr->output_arg_file=strdup(argptr->input_arg_file);
03440 LOGPRINTF("SMART : Problem %s storing input arg %s %s to local server memory\n", problem->name, argptr->name, argptr->output_arg_file);
03441 if(gs_smart_send_arg_remotely_to_s_mem(src_server, argptr, problem->major,
03442 my_dsig, &s_mem_full) < 0) {
03443 ERRPRINTF("error in sending output arguments\n");
03444 return -1;
03445 }
03446 if(s_mem_full==1){
03447 LOGPRINTF("SMART : Problem %s storing input arg %s to local file %s as server memory full\n", problem->name, argptr->name, argptr->output_arg_file);
03448 if(gs_smart_write_arg_to_file(argptr->output_arg_file, argptr)<0){
03449 ERRPRINTF("SMART : Error writing arg to file\n");
03450 return -1;
03451 }
03452 }
03453 }
03454 arg_count++;
03455 }
03456
03457 if((argptr->objecttype == GS_FILE) && (argptr->inout == GS_OUT)) {
03458 argptr->data = strdup(argptr->name);
03459
03460
03461 if(!argptr->data) {
03462 ERRPRINTF("error allocating space for output arg\n");
03463 return -1;
03464 }
03465 }
03466 else if((argptr->objecttype == GS_PACKEDFILE) && (argptr->inout == GS_OUT)) {
03467 char **data;
03468 int i;
03469
03470 argptr->data = malloc(argptr->rows * sizeof(char *));
03471 if(!argptr->data) {
03472 ERRPRINTF("error allocating memory for filenames.\n");
03473 return -1;
03474 }
03475
03476 data = (char **)argptr->data;
03477
03478 for(i = 0; i < argptr->rows; i++) {
03479 data[i] = dstring_sprintf("%s_%d", argptr->name, i);
03480 if(!data[i]) {
03481 ERRPRINTF("error allocating memory for filename.\n");
03482 return -1;
03483 }
03484 }
03485 }
03486 else if((argptr->inout == GS_OUT) || (argptr->inout == GS_WORKSPACE)) {
03487 int elsize;
03488
03489 elsize = gs_get_element_size(argptr->datatype, my_dsig);
03490
03491 argptr->data = (char *)CALLOC(argptr->rows * argptr->cols, elsize);
03492
03493
03494
03495 if(!argptr->data) {
03496 ERRPRINTF("error allocating space for output arg\n");
03497 return -1;
03498 }
03499 }
03500 else if((argptr->objecttype == GS_PACKEDFILE) && (argptr->inout == GS_OUT)) {
03501 char **data;
03502 int i;
03503
03504 argptr->data = malloc(argptr->rows * sizeof(char *));
03505 if(!argptr->data) {
03506 ERRPRINTF("error allocating memory for filenames.\n");
03507 return -1;
03508 }
03509
03510 data = (char **)argptr->data;
03511
03512 for(i = 0; i < argptr->rows; i++) {
03513 data[i] = dstring_sprintf("%s_%d", argptr->name, i);
03514 if(!data[i]) {
03515 ERRPRINTF("error allocating memory for filename.\n");
03516 return -1;
03517 }
03518 }
03519 }
03520 else if((argptr->inout == GS_OUT) || (argptr->inout == GS_WORKSPACE)) {
03521 int elsize;
03522
03523 elsize = gs_get_element_size(argptr->datatype, my_dsig);
03524
03525 argptr->data = (char *)CALLOC(argptr->rows * argptr->cols, elsize);
03526
03527 if(!argptr->data) {
03528 ERRPRINTF("error allocating space for output arg\n");
03529 return -1;
03530 }
03531 }
03532 else if(argptr->inout == GS_VAROUT)
03533 {
03534 char **foo = (char **)malloc(sizeof(char*));
03535 argptr->data = foo;
03536 }
03537 }
03538 return 0;
03539 }
03540
03541
03542
03543
03544
03545
03546
03547
03548
03549
03550
03551
03552
03553
03554
03555
03556
03557
03558
03559
03560
03561
03562 int
03563 gs_smart_send_input_nonscalar_args(SOCKET sock, gs_problem_t *problem, int my_dsig)
03564 {
03565 gs_argument_t *argptr;
03566 int arg_count=0;
03567 for(argptr = problem->arglist; argptr != NULL; argptr = argptr->next)
03568 {
03569 if(((argptr->inout == GS_IN) || (argptr->inout == GS_INOUT)) &&
03570 (argptr->objecttype != GS_SCALAR))
03571 {
03572 if(argptr->dsi) {
03573 if(gs_send_dsi_input_arg(sock, argptr, my_dsig) < 0) {
03574 ERRPRINTF("error in sending input DSI argument\n");
03575 return -1;
03576 }
03577 }
03578 else {
03579 if(!argptr->input_arg_received_remotely){
03580 if(argptr->input_arg_stored){
03581 printf("SMART : Sending input arg %s from client to server and stored locally on server\n", argptr->name);
03582 }
03583 else{
03584 printf("SMART : Sending input arg %s from client to server\n", argptr->name);
03585 }
03586 if(gs_send_arg(sock, argptr, my_dsig) < 0) {
03587 ERRPRINTF("error in sending input argument\n");
03588 return -1;
03589 }
03590 }
03591 else{
03592 printf("SMART : Input arg %s will be received remotely\n", argptr->name);
03593
03594 }
03595 }
03596 }
03597 arg_count++;
03598 }
03599
03600 return 0;
03601 }
03602
03603
03604
03605
03606
03607
03623 int gs_smart_send_input_args_remotely(SOCKET sock, gs_problem_t *problem, int my_dsig){
03624 gs_argument_t *argptr;
03625 int s_mem_full;
03626 if(!problem)
03627 return -1;
03628 if(gs_send_tag(sock, problem->major) < 0) {
03629 ERRPRINTF("failed to send major\n");
03630 return -1;
03631 }
03632
03633 if(gs_receiver_compute_arg_sizes(problem, GS_OUT) < 0)
03634 {
03635 ERRPRINTF("could not compute argument sizes.\n");
03636 return -1;
03637 }
03638
03639 int arg_count=0;
03640 for(argptr = problem->arglist; argptr != NULL; argptr = argptr->next)
03641 {
03642 if((argptr->inout == GS_INOUT) || (argptr->inout == GS_IN) ||
03643 (argptr->inout == GS_VAROUT))
03644 {
03645 if(argptr->dsi) {
03646 if(gs_send_dsi_input_arg(sock, argptr, my_dsig) < 0) {
03647 ERRPRINTF("error in sending input DSI argument\n");
03648 return -1;
03649 }
03650 }
03651 else {
03652 if(argptr->input_arg_sent_remotely==1){
03653 gs_server_t ** dest_srvrs;
03654 int nb_dest_servers;
03655 nb_dest_servers=argptr->input_nb_dest_servers;
03656 dest_srvrs=(gs_server_t **) CALLOC(nb_dest_servers,sizeof(gs_server_t *));
03657
03658 argptr->output_arg_file=strdup(argptr->input_arg_file);
03659
03660 int i;
03661 for(i=0;i<nb_dest_servers;i++){
03662 dest_srvrs[i]=(gs_server_t *) CALLOC(1,sizeof(gs_server_t));
03663 if(gs_decode_server(argptr->enc_input_dest_servers[i], dest_srvrs[i]) < 0) {
03664 ERRPRINTF("Error decoding server string\n");
03665 FREE(argptr->enc_input_dest_servers[i]);
03666 return -1;
03667 }
03668 printf("SMART : Problem %s sending input arg %s remotely to server %s to store in server memory\n", problem->name, argptr->name, dest_srvrs[i]->hostname);
03669 if(gs_smart_send_arg_remotely_to_s_mem(dest_srvrs[i], argptr, problem->major, my_dsig, &s_mem_full) < 0) {
03670 ERRPRINTF("SMART : Error in sending input arguments to server memory\n");
03671 return -1;
03672 }
03673
03674 if(s_mem_full==1){
03675 printf("SMART : Problem %s sending arg %s remotely to server %s store in file %s\n", problem->name, argptr->name, dest_srvrs[i]->hostname, argptr->output_arg_file);
03676
03677 if(gs_smart_send_arg_remotely_to_file(dest_srvrs[i], argptr, problem->major,
03678 my_dsig) < 0) {
03679 ERRPRINTF("SMART : Error in sending input arguments remotely to file\n");
03680 return -1;
03681 }
03682 }
03683
03684 if(dest_srvrs[i]) gs_server_free(dest_srvrs[i]);
03685 }
03686 if(dest_srvrs) free(dest_srvrs);
03687 }
03688 }
03689 arg_count++;
03690 }
03691 }
03692 return 0;
03693 }
03694
03695
03696
03697
03698
03699
03700
03701
03716 int gs_smart_send_output_args_remotely(SOCKET sock, gs_server_t * src_server, gs_problem_t *problem, int my_dsig){
03717 gs_argument_t *argptr;
03718 int s_mem_full;
03719 if(!problem)
03720 return -1;
03721
03722 if(gs_send_tag(sock, problem->major) < 0) {
03723 ERRPRINTF("failed to send major\n");
03724 return -1;
03725 }
03726
03727 if(gs_receiver_compute_arg_sizes(problem, GS_OUT) < 0)
03728 {
03729 ERRPRINTF("could not compute argument sizes.\n");
03730 return -1;
03731 }
03732
03733 int arg_count=0;
03734 for(argptr = problem->arglist; argptr != NULL; argptr = argptr->next)
03735 {
03736 if((argptr->inout == GS_INOUT) || (argptr->inout == GS_OUT) ||
03737 (argptr->inout == GS_VAROUT))
03738 {
03739 if(argptr->dsi) {
03740 if(gs_send_dsi_output_arg(sock, argptr, my_dsig) < 0) {
03741 ERRPRINTF("error in sending output DSI argument\n");
03742 return -1;
03743 }
03744 }
03745 else {
03746 if(argptr->output_arg_sent_remotely==1){
03747 if(argptr->output_cached){
03748 LOGPRINTF("SMART : Problem %s storing arg %s in local server memory\n", problem->name, argptr->name);
03749 if(gs_smart_send_arg_remotely_to_s_mem(src_server, argptr, problem->major, my_dsig, &s_mem_full) < 0) {
03750 ERRPRINTF("error in sending output arguments\n");
03751 return -1;
03752 }
03753 if(s_mem_full==1){
03754 LOGPRINTF("SMART : Problem %s storing arg %s in local file %s as server memory full\n", problem->name, argptr->name, argptr->output_arg_file);
03755 if(gs_smart_write_arg_to_file(argptr->output_arg_file, argptr)<0){
03756 ERRPRINTF("SMART : Error writing arg to file\n");
03757 return -1;
03758 }
03759 }
03760
03761 }
03762
03763
03764 gs_server_t ** dest_srvrs;
03765 int nb_dest_servers;
03766 nb_dest_servers=argptr->output_nb_dest_servers;
03767
03768 dest_srvrs=(gs_server_t **) CALLOC(nb_dest_servers,sizeof(gs_server_t *));
03769
03770 int i;
03771 for(i=0;i<nb_dest_servers;i++){
03772
03773 dest_srvrs[i]=(gs_server_t *) CALLOC(1,sizeof(gs_server_t));
03774 if(gs_decode_server(argptr->enc_output_dest_servers[i], dest_srvrs[i]) < 0) {
03775 ERRPRINTF("Error decoding server string\n");
03776 FREE(argptr->enc_output_dest_servers[i]);
03777 return -1;
03778 }
03779
03780 LOGPRINTF("SMART : Problem %s sending arg %s remotely to server %s to store in server memory\n", problem->name, argptr->name, dest_srvrs[i]->hostname);
03781 if(gs_smart_send_arg_remotely_to_s_mem(dest_srvrs[i], argptr, problem->major, my_dsig, &s_mem_full) < 0) {
03782 ERRPRINTF("error in sending output arguments\n");
03783 return -1;
03784 }
03785
03786 if(s_mem_full==1){
03787 LOGPRINTF("SMART : Problem %s sending arg %s remotely to server %s store in file %s\n", problem->name, argptr->name, dest_srvrs[i]->hostname, argptr->output_arg_file);
03788
03789 if(gs_smart_send_arg_remotely_to_file(dest_srvrs[i], argptr, problem->major,
03790 my_dsig) < 0) {
03791 ERRPRINTF("error in sending output arguments\n");
03792 return -1;
03793 }
03794 }
03795
03796 if(dest_srvrs[i]) gs_server_free(dest_srvrs[i]);
03797 }
03798 if(dest_srvrs) free(dest_srvrs);
03799 }
03800 }
03801 arg_count++;
03802 }
03803 }
03804 return 0;
03805 }
03806
03807
03808
03824 int
03825 gs_smart_send_output_args_to_client(SOCKET sock, gs_problem_t *problem, int my_dsig)
03826 {
03827 gs_argument_t *argptr;
03828
03829 if(!problem)
03830 return -1;
03831
03832 if(gs_send_tag(sock, problem->major) < 0) {
03833 ERRPRINTF("failed to send major\n");
03834 return -1;
03835 }
03836
03837 if(gs_receiver_compute_arg_sizes(problem, GS_OUT) < 0)
03838 {
03839 ERRPRINTF("could not compute argument sizes.\n");
03840 return -1;
03841 }
03842
03843 for(argptr = problem->arglist; argptr != NULL; argptr = argptr->next)
03844 {
03845 if((argptr->inout == GS_INOUT) || (argptr->inout == GS_OUT) ||
03846 (argptr->inout == GS_VAROUT))
03847 {
03848 if(argptr->dsi) {
03849 if(gs_send_dsi_output_arg(sock, argptr, my_dsig) < 0) {
03850 ERRPRINTF("error in sending output DSI argument\n");
03851 return -1;
03852 }
03853 }
03854 else {
03855
03856 if(!argptr->output_arg_sent_remotely){
03857 if(gs_send_arg(sock, argptr, my_dsig) < 0) {
03858 ERRPRINTF("error in sending output arguments\n");
03859 return -1;
03860 }
03861
03862 }
03863 }
03864 }
03865 }
03866
03867 return 0;
03868 }
03869
03870
03871
03891 int
03892 gs_smart_recv_output_args_from_server(SOCKET sock, gs_problem_t *req_problem, gs_problem_t *handle_problem,int sender_dsig,
03893 int my_dsig)
03894 {
03895 gs_argument_t *req_argptr;
03896 gs_argument_t *handle_argptr;
03897
03898 int sender_major;
03899 if(!req_problem)
03900 return -1;
03901 if(gs_wait_for_output(sock) < 0) {
03902 ERRPRINTF("error waiting for output to be ready\n");
03903 return -1;
03904 }
03905
03906 if(gs_recv_tag(sock, &sender_major) < 0) {
03907 ERRPRINTF("error in receiving major\n");
03908 return -1;
03909 }
03910
03911 handle_argptr=handle_problem->arglist;
03912
03913 for(req_argptr = req_problem->arglist; req_argptr != NULL; req_argptr = req_argptr->next)
03914 {
03915 if((req_argptr->inout == GS_INOUT) || (req_argptr->inout == GS_OUT) ||
03916 (req_argptr->inout == GS_VAROUT))
03917 {
03918 if(!handle_argptr->output_arg_sent_remotely){
03919 if(gs_recv_arg(sock, req_argptr, sender_major, sender_dsig, my_dsig,
03920 GS_CLIENT_SIDE) < 0)
03921 {
03922 ERRPRINTF("error in receiving output arguments\n");
03923 return -1;
03924 }
03925 }
03926 }
03927 if(handle_argptr != NULL){
03928 handle_argptr = handle_argptr->next;
03929 }
03930 else{
03931 ERRPRINTF("Smart: Error receiving output arguments\n");
03932 return -1;
03933 }
03934 }
03935 return 0;
03936 }
03937
03938
03959 int
03960 gs_smart_recv_output_args(SOCKET sock, gs_problem_t *problem, int sender_dsig,
03961 int my_dsig)
03962 {
03963 gs_argument_t *argptr;
03964 int sender_major;
03965 if(!problem)
03966 return -1;
03967
03968 if(gs_wait_for_output(sock) < 0) {
03969 ERRPRINTF("error waiting for output to be ready\n");
03970 return -1;
03971 }
03972
03973 if(gs_recv_tag(sock, &sender_major) < 0) {
03974 ERRPRINTF("error in receiving major\n");
03975 return -1;
03976 }
03977 for(argptr = problem->arglist; argptr != NULL; argptr = argptr->next)
03978 {
03979 if((argptr->inout == GS_INOUT) || (argptr->inout == GS_OUT) ||
03980 (argptr->inout == GS_VAROUT))
03981 {
03982 if(!argptr->output_arg_sent_remotely){
03983 printf("SMART : Receive output arg %s from server\n", argptr->name);
03984 if(gs_recv_arg(sock, argptr, sender_major, sender_dsig, my_dsig,
03985 GS_CLIENT_SIDE) < 0)
03986 {
03987 ERRPRINTF("error in receiving output arguments\n");
03988 return -1;
03989 }
03990 printf("SMART : Received output arg %s from server\n", argptr->name);
03991 }
03992 else{
03993 printf("SMART : Output arg %s was sent remotely\n", argptr->name);
03994 }
03995
03996 }
03997 }
03998 return 0;
03999 }
04000
04001
04002
04003
04021 int gs_smart_send_arg_remotely(SOCKET dest_srv_sock, gs_argument_t *arg,
04022 int major, int my_dsig){
04023
04024
04025 my_dsig = pvmgetdsig();
04026 if(gs_send_string(dest_srv_sock, arg->name) < 0) {
04027 ERRPRINTF("SMART : Error sending arg name\n");
04028 return -1;
04029 }
04030
04031 if(gs_send_int(dest_srv_sock, (int)arg->objecttype) < 0) {
04032 ERRPRINTF("SMART : Error sending objecttype\n");
04033 return -1;
04034 }
04035
04036 if(gs_send_int(dest_srv_sock,(int)arg->inout) < 0) {
04037 ERRPRINTF("SMART : Error sending inout\n");
04038 return -1;
04039 }
04040
04041 if(gs_send_int(dest_srv_sock, (int)arg->datatype) < 0) {
04042 ERRPRINTF("SMART : Error sending datatype\n");
04043 return -1;
04044 }
04045
04046 if(gs_send_int(dest_srv_sock, (int)arg->prob->major) < 0) {
04047 ERRPRINTF("SMART : Error sending major\n");
04048 return -1;
04049 }
04050
04051 if(gs_send_int(dest_srv_sock, (int)arg->rows) < 0) {
04052 ERRPRINTF("SMART : Error sending rows\n");
04053 return -1;
04054 }
04055
04056 if(gs_send_int(dest_srv_sock, (int)arg->cols) < 0) {
04057 ERRPRINTF("SMART : Error sending cols\n");
04058 return -1;
04059 }
04060
04061 if(gs_send_string(dest_srv_sock, arg->colexp) < 0) {
04062 ERRPRINTF("SMART : Error sending colexp\n");
04063 return -1;
04064 }
04065
04066 if(gs_send_string(dest_srv_sock, arg->rowexp) < 0) {
04067 ERRPRINTF("SMART : Error sending rowexp\n");
04068 return -1;
04069 }
04070
04071 if(gs_send_int(dest_srv_sock, major) <0){
04072 ERRPRINTF("SMART : Error sending major\n");
04073 return -1;
04074 }
04075
04076 if(gs_send_int(dest_srv_sock, my_dsig) <0){
04077 ERRPRINTF("SMART : Error sending my_dsig\n");
04078 return -1;
04079 }
04080
04081 int sender_elsize = gs_get_element_size(arg->datatype, my_dsig);
04082 int data_size = arg->rows * arg->cols * sender_elsize;
04083
04084 arg->byte_size= data_size;
04085
04086 if(gs_send_int(dest_srv_sock, arg->byte_size) <0){
04087 ERRPRINTF("SMART : Error sending byte size\n");
04088 return -1;
04089 }
04090
04091 if(gs_send_string(dest_srv_sock, arg->output_arg_file)<0){
04092 ERRPRINTF("SMART: Error sending arg file loc\n");
04093 return -1;
04094 }
04095
04096 if(gs_send_arg(dest_srv_sock, arg, my_dsig)<0){
04097 ERRPRINTF("SMART : Error sending arg\n");
04098 return -1;
04099 }
04100 return 0;
04101 }
04102
04103
04104
04105
04106
04122 int gs_smart_recv_arg_remotely(SOCKET sock, gs_argument_t *arg, int my_dsig){
04123
04124 int sender_major;
04125 int sender_dsig;
04126 int recv_int;
04127
04128 my_dsig = pvmgetdsig();
04129 if(gs_recv_string(sock, &arg->name) < 0) {
04130 proxy_close(sock);
04131 return -1;
04132 }
04133
04134 if(gs_recv_int(sock, &recv_int) < 0) {
04135 proxy_close(sock);
04136 return -1;
04137 }
04138
04139 arg->objecttype=recv_int;
04140
04141 if(gs_recv_int(sock, &recv_int) < 0) {
04142 proxy_close(sock);
04143 return -1;
04144 }
04145 arg->inout=recv_int;
04146
04147 if(gs_recv_int(sock, &recv_int) < 0) {
04148 proxy_close(sock);
04149 return -1;
04150 }
04151 arg->datatype=recv_int;
04152
04153 if(gs_recv_int(sock, &recv_int) < 0) {
04154 proxy_close(sock);
04155 return -1;
04156 }
04157 arg->prob->major=recv_int;
04158
04159 if(gs_recv_int(sock, &recv_int) < 0) {
04160 proxy_close(sock);
04161 return -1;
04162 }
04163 arg->rows=recv_int;
04164
04165 if(gs_recv_int(sock, &recv_int) < 0) {
04166 proxy_close(sock);
04167 return -1;
04168 }
04169 arg->cols=recv_int;
04170
04171 if(gs_recv_string(sock, &arg->colexp) < 0) {
04172 proxy_close(sock);
04173 return -1;
04174 }
04175
04176 if(gs_recv_string(sock, &arg->rowexp) < 0) {
04177 proxy_close(sock);
04178 return -1;
04179 }
04180
04181 if(gs_recv_int(sock, &sender_major) < 0) {
04182 proxy_close(sock);
04183 return -1;
04184 }
04185
04186
04187 if(gs_recv_int(sock, &sender_dsig) < 0) {
04188 proxy_close(sock);
04189 return -1;
04190 }
04191
04192 if(gs_recv_int(sock, &arg->byte_size) < 0) {
04193 proxy_close(sock);
04194 return -1;
04195 }
04196
04197 if(gs_recv_string(sock, &arg->output_arg_file)<0){
04198 ERRPRINTF("SMART: Error receiving arg file loc\n");
04199 return -1;
04200 }
04201
04202 if(gs_recv_arg(sock, arg, sender_major, sender_dsig, my_dsig, GS_SERVER_SIDE)<0){
04203 ERRPRINTF("SMART : Error receiving arg\n");
04204 return -1;
04205 }
04206
04207 return 0;
04208
04209 }
04210
04211
04212
04213
04214
04246 int gs_smart_send_arg_remotely_to_s_mem(gs_server_t * dest_srvr, gs_argument_t *arg,
04247 int major, int my_dsig, int * s_mem_full){
04248
04249 SOCKET dest_srv_sock;
04250 int tag;
04251
04252 dest_srv_sock = gs_connect_to_host(dest_srvr->componentid, dest_srvr->ipaddress, dest_srvr->port, dest_srvr->proxyip, dest_srvr->proxyport);
04253
04254 if(dest_srv_sock == INVALID_SOCKET){
04255 ERRPRINTF("Smart : Invalid connection with server socket\n");
04256 proxy_close(dest_srv_sock);
04257 return -1;
04258 }
04259
04260 if(gs_send_tag(dest_srv_sock, GS_SMART_STORE_ARG_TO_MEMORY) < 0){
04261 ERRPRINTF("SMART : Error sending tag\n");
04262 return -1;
04263 }
04264
04265 arg->byte_size=gs_smart_get_argument_size(arg);
04266
04267 if(gs_send_int(dest_srv_sock, arg->byte_size)<0){
04268 ERRPRINTF("SMART : Error sending byte size\n");
04269 return -1;
04270 }
04271
04272 if(gs_send_string(dest_srv_sock, arg->output_arg_file)<0){
04273 ERRPRINTF("SMART: Error sending arg file loc\n");
04274 return -1;
04275 }
04276
04277 if(gs_recv_tag(dest_srv_sock, &tag) < 0) {
04278 ERRPRINTF("SMART : Error receiving tag\n");
04279 return -1;
04280 }
04281
04282
04283 if(tag==GS_SMART_ARG_BUFF_FULL){
04284 *s_mem_full=1;
04285 return 0;
04286 }
04287 else{
04288 *s_mem_full=0;
04289 }
04290 if(gs_smart_send_arg_remotely(dest_srv_sock, arg, major,
04291 my_dsig) < 0) {
04292 ERRPRINTF("error in sending output arguments\n");
04293 return -1;
04294 }
04295
04296 if(gs_recv_tag(dest_srv_sock, &tag) < 0) {
04297 ERRPRINTF("SMART : Error receiving tag\n");
04298 return -1;
04299 }
04300
04301 if(tag!=GS_PROT_OK){
04302 ERRPRINTF("SMART : Error sending remote arg\n");
04303 return -1;
04304 }
04305
04306
04307 gs_close_socket(dest_srv_sock);
04308 return 0;
04309 }
04310
04311
04312
04313
04314
04315
04350 int gs_smart_recv_arg_from_s_mem(gs_server_t * src_server, gs_argument_t *arg, int sender_major,
04351 int my_dsig, gs_side_t side, int * arg_in_mem){
04352
04353 SOCKET src_srv_sock;
04354 int tag;
04355 int arg_recvd=0;
04356 double total_wait_time, wait_time;
04357 int sender_dsig;
04358 wait_time=100;
04359 total_wait_time=1;
04360 *arg_in_mem=1;
04361 tag=GS_SMART_ARG_NOT_READY;
04362
04363 while(tag==GS_SMART_ARG_NOT_READY){
04364 src_srv_sock = gs_connect_to_host(src_server->componentid, src_server->ipaddress, src_server->port, src_server->proxyip, src_server->proxyport);
04365
04366 if(src_srv_sock == INVALID_SOCKET){
04367 ERRPRINTF("Smart : Invalid connection with server socket\n");
04368 proxy_close(src_srv_sock);
04369 return -1;
04370 }
04371 if(gs_send_tag(src_srv_sock, GS_SMART_RECV_ARG_FROM_MEMORY) < 0){
04372 ERRPRINTF("SMART : Error sending tag\n");
04373 return -1;
04374 }
04375 if(gs_send_string(src_srv_sock, arg->input_arg_file)<0){
04376 ERRPRINTF("SMART : Error sending arg file\n");
04377 return -1;
04378 }
04379
04380 if(gs_recv_tag(src_srv_sock, &tag)<0){
04381 ERRPRINTF("SMART : Error sending tag\n");
04382 return -1;
04383 }
04384 if(tag==GS_SMART_ARG_NOT_READY){
04385 arg_recvd=0;
04386 gs_close_socket(src_srv_sock);
04387 }
04388
04389
04390
04391 if(tag==GS_SMART_ARG_STORED_IN_S_MEM){
04392 *arg_in_mem=1;
04393 if(gs_recv_int(src_srv_sock, &sender_dsig)<0){
04394 ERRPRINTF("SMART: Error receiving data signature\n");
04395 return -1;
04396 }
04397
04398 if(gs_recv_arg(src_srv_sock, arg, sender_major, sender_dsig, my_dsig,
04399 GS_SERVER_SIDE) < 0){
04400 ERRPRINTF("error receiving input non-scalar arguments\n");
04401 return -1;
04402 }
04403 arg_recvd=1;
04404 gs_close_socket(src_srv_sock);
04405 }
04406 if(arg_recvd==0){
04407 usleep(wait_time);
04408 total_wait_time+=wait_time;
04409 wait_time=total_wait_time/10+1;
04410 if(wait_time>GS_MAX_ARG_POLL)
04411 wait_time=GS_MAX_ARG_POLL;
04412
04413 }
04414 }
04415 if(tag==GS_SMART_ARG_STORED_IN_FILE){
04416
04417 *arg_in_mem=0;
04418 return 0;
04419 }
04420
04421
04422
04423 SOCKET _src_srv_sock;
04424 if(arg->input_arg_deleted){
04425
04426 _src_srv_sock = gs_connect_to_host(src_server->componentid, src_server->ipaddress, src_server->port, src_server->proxyip, src_server->proxyport);
04427
04428 if(_src_srv_sock == INVALID_SOCKET){
04429 ERRPRINTF("Smart : Invalid connection with server socket\n");
04430 proxy_close(src_srv_sock);
04431 return -1;
04432 }
04433 if(gs_send_tag(_src_srv_sock, GS_SMART_DELETE_ARG_FROM_MEMORY) < 0){
04434 ERRPRINTF("SMART : Error sending tag\n");
04435 return -1;
04436 }
04437 if(gs_send_string(_src_srv_sock, arg->input_arg_file)<0){
04438 ERRPRINTF("SMART : Error sending arg file\n");
04439 return -1;
04440 }
04441 if(gs_recv_tag(_src_srv_sock, &tag) < 0){
04442 ERRPRINTF("Smart: Error sending tag\n");
04443 return -1;
04444 }
04445 gs_close_socket(_src_srv_sock);
04446
04447 }
04448
04449 return 0;
04450 }
04451
04452
04453
04454
04455
04456
04457
04487 int
04488 gs_smart_save_output_args_to_file(SOCKET sock, gs_server_t * src_server,
04489 int fd, gs_problem_t *problem, int my_dsig)
04490 {
04491 gs_argument_t *argptr;
04492 int s_mem_full;
04493 if(!problem)
04494 return -1;
04495
04496 if(gs_send_tag(fd, problem->major) < 0) {
04497 ERRPRINTF("failed to send major\n");
04498 return -1;
04499 }
04500
04501
04502 for(argptr = problem->arglist; argptr != NULL; argptr = argptr->next)
04503 {
04504 if(((argptr->inout == GS_IN) || (argptr->inout == GS_INOUT) ||
04505 (argptr->inout == GS_OUT)) && (argptr->objecttype == GS_SCALAR))
04506 {
04507 if(gs_send_arg(fd, argptr, my_dsig) < 0) {
04508 ERRPRINTF("error in writing input argument\n");
04509 return -1;
04510 }
04511 }
04512 }
04513
04514 if(gs_receiver_compute_arg_sizes(problem, GS_OUT) < 0)
04515 {
04516 ERRPRINTF("could not compute argument sizes.\n");
04517 return -1;
04518 }
04519
04520
04521
04522 for(argptr = problem->arglist; argptr != NULL; argptr = argptr->next)
04523 {
04524 if((argptr->inout == GS_INOUT) || (argptr->inout == GS_OUT) ||
04525 (argptr->inout == GS_VAROUT))
04526 {
04527 if(argptr->objecttype == GS_FILE) {
04528 if(gs_send_string(fd, argptr->data) < 0) {
04529 ERRPRINTF("error in writing file name\n");
04530 return -1;
04531 }
04532 }
04533 else if(argptr->objecttype == GS_PACKEDFILE) {
04534 int i;
04535
04536 if(gs_send_int(fd, argptr->rows) < 0) {
04537 ERRPRINTF("Error sending string length\n");
04538 return -1;
04539 }
04540
04541 for(i = 0; i < argptr->rows; i++) {
04542 if(gs_send_string(fd, ((char **)(argptr->data))[i]) < 0) {
04543 ERRPRINTF("error in writing file name\n");
04544 return -1;
04545 }
04546 }
04547 }
04548 else {
04549
04550 if(argptr->output_arg_sent_remotely==1){
04551 if(argptr->output_cached){
04552 LOGPRINTF("SMART : Problem %s storing arg %s to local server memory\n", problem->name, argptr->name);
04553 if(gs_smart_send_arg_remotely_to_s_mem(src_server, argptr, problem->major,
04554 my_dsig, &s_mem_full) < 0) {
04555 ERRPRINTF("error in sending output arguments\n");
04556 return -1;
04557 }
04558 if(s_mem_full==1){
04559 LOGPRINTF("SMART : Problem %s storing arg %s to local file %s as server memory full\n", problem->name, argptr->name, argptr->output_arg_file);
04560 if(gs_smart_write_arg_to_file(argptr->output_arg_file, argptr)<0){
04561 ERRPRINTF("SMART : Error writing arg to file\n");
04562 return -1;
04563 }
04564 }
04565
04566 }
04567
04568 gs_server_t ** dest_srvrs;
04569 int nb_dest_servers;
04570 nb_dest_servers=argptr->output_nb_dest_servers;
04571
04572 dest_srvrs=(gs_server_t **) CALLOC(nb_dest_servers,sizeof(gs_server_t *));
04573 int i;
04574 for(i=0;i<nb_dest_servers;i++){
04575
04576 dest_srvrs[i]=(gs_server_t *) CALLOC(1,sizeof(gs_server_t));
04577 if(gs_decode_server(argptr->enc_output_dest_servers[i], dest_srvrs[i]) < 0) {
04578 ERRPRINTF("Error decoding server string\n");
04579 FREE(argptr->enc_output_dest_servers[i]);
04580 return -1;
04581 }
04582 LOGPRINTF("SMART : Problem %s sending arg %s remotely to server %s to store in memory of server\n", problem->name, argptr->name, dest_srvrs[i]->hostname);
04583 if(gs_smart_send_arg_remotely_to_s_mem(dest_srvrs[i],argptr, problem->major,
04584 my_dsig, &s_mem_full) < 0) {
04585 ERRPRINTF("error in sending output arguments\n");
04586 return -1;
04587 }
04588 if(s_mem_full==1){
04589 LOGPRINTF("SMART : Server buffer full : Problem %s sending arg %s remotely to server %s to store in file %s\n", problem->name, argptr->name, dest_srvrs[i]->hostname, argptr->output_arg_file);
04590 if(gs_smart_send_arg_remotely_to_file(dest_srvrs[i],argptr, problem->major,
04591 my_dsig) < 0) {
04592 ERRPRINTF("error in sending output arguments\n");
04593 return -1;
04594 }
04595 }
04596
04597 if(dest_srvrs[i]) gs_server_free(dest_srvrs[i]);
04598 }
04599
04600 if(dest_srvrs) free(dest_srvrs);
04601 }
04602 else{
04603 if(gs_send_arg(fd, argptr, my_dsig) < 0) {
04604 ERRPRINTF("error in writing input argument\n");
04605 return -1;
04606 }
04607 }
04608 }
04609 }
04610 }
04611 return 0;
04612 }
04613
04614
04615
04629 int
04630 gs_smart_restore_output_args_from_file(int fd, gs_problem_t *problem,
04631 int my_dsig)
04632 {
04633 gs_argument_t *argptr;
04634 int sender_major;
04635
04636 if(!problem)
04637 return -1;
04638
04639 if(gs_recv_tag(fd, &sender_major) < 0) {
04640 ERRPRINTF("error in receiving major\n");
04641 return -1;
04642 }
04643
04644 for(argptr = problem->arglist; argptr != NULL; argptr = argptr->next)
04645 {
04646 if(((argptr->inout == GS_IN) || (argptr->inout == GS_INOUT) ||
04647 (argptr->inout == GS_OUT)) && (argptr->objecttype == GS_SCALAR))
04648 {
04649 argptr->rows = argptr->cols = 1;
04650 argptr->data = NULL;
04651
04652 if(gs_recv_arg(fd, argptr, sender_major, my_dsig, my_dsig,
04653 GS_CLIENT_SIDE) < 0)
04654 {
04655 ERRPRINTF("error in reading input argument\n");
04656 return -1;
04657 }
04658 }
04659 }
04660
04661 if(gs_receiver_compute_arg_sizes(problem, GS_OUT) < 0)
04662 {
04663 ERRPRINTF("could not compute argument sizes.\n");
04664 return -1;
04665 }
04666
04667 for(argptr = problem->arglist; argptr != NULL; argptr = argptr->next)
04668 {
04669 if((argptr->inout == GS_INOUT) || (argptr->inout == GS_OUT) ||
04670 (argptr->inout == GS_VAROUT))
04671 {
04672 if(argptr->objecttype == GS_FILE) {
04673 char *fname;
04674
04675 if(gs_recv_string(fd, &fname) < 0) {
04676 ERRPRINTF("error in reading file name\n");
04677 return -1;
04678 }
04679
04680 argptr->data = fname;
04681 }
04682 else if(argptr->objecttype == GS_PACKEDFILE) {
04683 char *fname;
04684 int i, n;
04685
04686 if(gs_recv_int(fd, &n) < 0) {
04687 ERRPRINTF("error in reading number of packed files\n");
04688 return -1;
04689 }
04690
04691 argptr->data = malloc(n * sizeof(char *));
04692
04693 for(i = 0; i < n; i++) {
04694 if(gs_recv_string(fd, &fname) < 0) {
04695 ERRPRINTF("error in reading file name\n");
04696 return -1;
04697 }
04698
04699 ((char **)(argptr->data))[i] = fname;
04700 }
04701 }
04702 else {
04703 if(!argptr->output_arg_sent_remotely){
04704 if(gs_recv_arg(fd, argptr, sender_major, my_dsig, my_dsig,
04705 GS_CLIENT_SIDE) < 0)
04706 {
04707 ERRPRINTF("error in reading input argument\n");
04708 return -1;
04709 }
04710 }
04711 }
04712 }
04713 }
04714
04715 return 0;
04716 }
04717
04718
04719
04720
04721
04741 int gs_smart_send_arg_remotely_to_file(gs_server_t * dest_srvr, gs_argument_t *arg,
04742 int major, int my_dsig){
04743
04744 SOCKET dest_srv_sock;
04745 int tag;
04746
04747 dest_srv_sock = gs_connect_to_host(dest_srvr->componentid, dest_srvr->ipaddress, dest_srvr->port, dest_srvr->proxyip, dest_srvr->proxyport);
04748
04749 if(dest_srv_sock == INVALID_SOCKET){
04750 ERRPRINTF("Smart : Invalid connection with server socket\n");
04751 proxy_close(dest_srv_sock);
04752 return -1;
04753 }
04754
04755 if(gs_send_tag(dest_srv_sock, GS_SMART_STORE_ARG_TO_FILE) < 0){
04756 ERRPRINTF("SMART : Error sending tag\n");
04757 return -1;
04758 }
04759
04760 if(gs_smart_send_arg_remotely(dest_srv_sock, arg, major,
04761 my_dsig) < 0) {
04762 ERRPRINTF("error in sending output arguments\n");
04763 return -1;
04764 }
04765 if(gs_recv_tag(dest_srv_sock, &tag) < 0) {
04766 ERRPRINTF("SMART : Error receiving tag\n");
04767 return -1;
04768 }
04769
04770 if(tag!=GS_PROT_OK){
04771 ERRPRINTF("SMART : Error sending remote arg\n");
04772 return -1;
04773 }
04774 gs_close_socket(dest_srv_sock);
04775 return 0;
04776 }
04777
04778
04779
04780
04781
04782
04783
04784
04785
04786
04787
04788
04789 int gs_smart_write_arg_to_file(char * filename, gs_argument_t * argptr){
04790 int my_dsig = pvmgetdsig();
04791
04792 int fd;
04793
04794 if((fd = gs_open_locked_file(filename, F_WRLCK, O_WRONLY | O_CREAT)) < 0) {
04795 ERRPRINTF("SMART : Could not open file to write argument\n");
04796 return -1;
04797 }
04798
04799 if(gs_send_arg(fd, argptr, my_dsig )<0){
04800 ERRPRINTF("SMART : Error writing argument to file\n");
04801 return -1;
04802 }
04803
04804 gs_unlock_file(fd);
04805 close(fd);
04806 return 0;
04807 }
04808
04809
04810
04811
04812
04813
04814
04815
04816
04817
04818
04819
04820
04821
04822
04823
04824
04825 int gs_smart_read_arg_from_file(gs_server_t * src_server, char * filename, gs_argument_t * argptr, int sender_major, int my_dsig){
04826
04827 struct stat stbuf;
04828 int fd;
04829
04830 double total_wait_time, wait_time;
04831 wait_time=100;
04832 total_wait_time=1;
04833
04834 do{
04835 usleep(wait_time);
04836 total_wait_time+=wait_time;
04837 wait_time=total_wait_time/10+1;
04838 if(wait_time>GS_MAX_ARG_POLL)
04839 wait_time=GS_MAX_ARG_POLL;
04840 }while(stat(filename, &stbuf)!=0);
04841
04842 if((fd = gs_open_locked_file(filename, F_RDLCK, O_RDONLY))==-1){
04843 ERRPRINTF("SMART : Error opening argument file to read\n");
04844 return -1;
04845 }
04846
04847
04848
04849
04850
04851 if(gs_recv_arg(fd, argptr, sender_major, my_dsig, my_dsig, GS_CLIENT_SIDE)<0){
04852 ERRPRINTF("SMART : Error receiving argument from file\n");
04853 return -1;
04854 }
04855
04856 if(argptr->input_arg_deleted==1){
04857
04858 SOCKET _src_srv_sock;
04859 int tag;
04860 _src_srv_sock = gs_connect_to_host(src_server->componentid, src_server->ipaddress, src_server->port, src_server->proxyip, src_server->proxyport);
04861
04862 if(_src_srv_sock == INVALID_SOCKET){
04863 ERRPRINTF("Smart : Invalid connection with server socket\n");
04864 proxy_close(_src_srv_sock);
04865 return -1;
04866 }
04867 if(gs_send_tag(_src_srv_sock, GS_SMART_DELETE_ARG_FROM_FILE) < 0){
04868 ERRPRINTF("SMART : Error sending tag\n");
04869 return -1;
04870 }
04871 if(gs_send_string(_src_srv_sock, filename)<0){
04872 ERRPRINTF("SMART : Error sending arg file\n");
04873 return -1;
04874 }
04875 if(gs_recv_tag(_src_srv_sock, &tag) < 0){
04876 ERRPRINTF("Smart: Error sending tag\n");
04877 return -1;
04878 }
04879 gs_close_socket(_src_srv_sock);
04880
04881 if(remove(filename)<0){
04882 ERRPRINTF("SMART : Error removing file %s\n", argptr->name);
04883 return -1;
04884 }
04885 }
04886 gs_unlock_file(fd);
04887 close(fd);
04888 return 0;
04889 }
04890
04891
04892
04893
04894
04895
04896
04897
04898
04899
04900
04901
04902
04903
04904
04905
04906
04907
04908
04909 int gs_smart_get_obj_file_name(char ** _arg_file_name, char * arg_file_prefix ,int app_id, int arg_id){
04910 char * int_string= "%d";
04911 char * arg_id_str=(char *)calloc(sizeof(int_string),sizeof(char) );
04912 char * app_id_str=(char *)calloc(sizeof(int_string),sizeof(char) );
04913 char * arg_file_name;
04914 sprintf(arg_id_str,"%d" , arg_id);
04915 sprintf(app_id_str,"%d" , app_id);
04916 arg_file_name = (char *)calloc(strlen(arg_file_prefix) + strlen("__app_id__") + strlen(app_id_str) + strlen("__arg_id__") + strlen(arg_id_str)+3, sizeof(char));
04917 strcat(arg_file_name, arg_file_prefix);
04918 strcat(arg_file_name, "__app_id__");
04919 strcat(arg_file_name, app_id_str);
04920 strcat(arg_file_name, "__arg_id__");
04921 strcat(arg_file_name, arg_id_str);
04922 strcat(arg_file_name, "\0");
04923 *_arg_file_name=arg_file_name;
04924 return 0;
04925 }
04926
04927
04928
04929
04930
04931
04932
04933
04934
04935
04936
04937
04938
04939
04940
04941
04942
04943
04944
04945
04946
04947
04948
04949
04950
04951 int gs_smart_read_update_app_no_file(char * file, int * app_num){
04952 int fd, count, n, exists;
04953 struct stat stbuf;
04954 exists=1;
04955 count=0;
04956 if(stat(file, &stbuf) != 0){
04957 count=0;
04958 exists=0;
04959 }
04960
04961 if((fd = gs_open_locked_file(file, F_WRLCK, O_RDWR | O_CREAT)) < 0) {
04962 ERRPRINTF("Failed to open/lock file '%s' for job count\n", file);
04963 return -1;
04964 }
04965 if(exists==1){
04966 if((n = read(fd, &count, sizeof(count))) < 0) {
04967 ERRPRINTF("read failure getting job count\n");
04968 gs_unlock_file(fd);
04969 close(fd);
04970 return -1;
04971 }
04972 }
04973 *app_num=count;
04974 count++;
04975 lseek(fd, 0, SEEK_SET);
04976 write(fd, &count, sizeof(count));
04977 gs_unlock_file(fd);
04978 close(fd);
04979 return 0;
04980 }
04981
04982
04983
04984
04985
04986
04987
04988
04989
04990
04991
04992
04993
04994
04995 int gs_smart_read_app_no_file(char * file, int * app_num){
04996 int fd, count, n, exists;
04997 struct stat stbuf;
04998 exists=1;
04999 if(stat(file, &stbuf) != 0){
05000 count=0;
05001 exists=0;
05002 }
05003 if((fd = gs_open_locked_file(file, F_WRLCK, O_RDWR | O_CREAT)) < 0) {
05004 ERRPRINTF("Failed to open/lock file '%s' for job count\n", file);
05005 return -1;
05006 }
05007 if(exists==1){
05008 if((n = read(fd, &count, sizeof(count))) < 0) {
05009 ERRPRINTF("read failure getting job count\n");
05010 gs_unlock_file(fd);
05011 close(fd);
05012 return -1;
05013 }
05014 }
05015 *app_num=count;
05016 gs_unlock_file(fd);
05017 close(fd);
05018 return 0;
05019 }
05020
05021
05022 #endif
05023
05024