00001
00002
00003
00004
00005
00006
00007
00008 #include "gs_dag.h"
00009 #include "gs_sequence.h"
00010
00011
00019 int analyze_dep_GS_DAG(GS_DAG_t *dag) {
00020 GS_DAG_Node_t *lptr, *rptr;
00021 gs_argument_t *largp, *rargp;
00022
00023 int mode;
00024 int i, j;
00025
00026
00027 if (dag == NULL) return -1;
00028
00029 if (dag->num_nodes == 1) {
00030 for (largp = dag->head_node->handle->problem_desc->arglist;
00031 largp != NULL; largp = largp->next) {
00032 if (largp->inout != GS_IN) {
00033 largp->pass_back = 1;
00034 largp->data_handle = 0;
00035 }
00036 }
00037
00038 dag->analyzed = 1;
00039
00040 return 0;
00041 }
00042
00043
00044
00045
00046 for (lptr = dag->head_node; lptr != NULL; lptr = lptr->next) {
00047
00048 for (rptr = lptr->next; rptr != NULL; rptr = rptr->next) {
00049
00050
00051 for (largp = lptr->handle->problem_desc->arglist, i = 0;
00052 largp != NULL;
00053 largp = largp->next, i++) {
00054
00055 for (rargp = rptr->handle->problem_desc->arglist, j = 0;
00056 rargp != NULL;
00057 rargp = rargp->next, j++) {
00058
00059
00060
00061 largp->pass_back = 1;
00062 rargp->pass_back = 1;
00063 largp->data_handle = 0;
00064 rargp->data_handle = 0;
00065 largp->num_targets = 0;
00066 rargp->num_targets = 0;
00067 largp->target_server_list = NULL;
00068 rargp->target_server_list = NULL;
00069 largp->source = NULL;
00070 rargp->source = NULL;
00071 largp->index = i;
00072 rargp->index = j;
00073
00074
00075
00076 mode = grpc_get_sequence_mode();
00077
00078
00079 if (mode == CONSERVATIVE_MODE) {
00080
00081
00082
00083
00084
00085 if ((largp->objecttype == GS_SCALAR &&
00086 rargp->objecttype == GS_SCALAR) &&
00087 (largp->datatype == rargp->datatype) &&
00088 (largp->inout != GS_IN ||
00089 rargp->inout != GS_IN)) {
00090
00091
00092 if (rptr->sched_level < lptr->sched_level + 1)
00093 rptr->sched_level = lptr->sched_level + 1;
00094
00095 dag->max_sched_level = rptr->sched_level;
00096
00097 insert_dep_GS_DAG(dag, lptr, rptr,
00098 CONSERVATIVE_SCALAR_DEPENDENCY,
00099 largp, rargp);
00100 }
00101 }
00102
00103
00104
00105
00106
00107
00108
00109
00110 if ((largp->inout == GS_INOUT || largp->inout == GS_OUT) &&
00111 (rargp->inout == GS_IN || rargp->inout == GS_INOUT)) {
00112
00113 if (verify_object_type(largp) == 1 &&
00114 verify_object_type(rargp) == 1 &&
00115 compare_object(largp, rargp) == 0) {
00116
00117
00118 if (rptr->sched_level < lptr->sched_level + 1)
00119 rptr->sched_level = lptr->sched_level + 1;
00120
00121 dag->max_sched_level = rptr->sched_level;
00122
00123 insert_dep_GS_DAG(dag, lptr, rptr,
00124 INPUT_AFTER_OUTPUT_DEPENDENCY,
00125 largp, rargp);
00126 }
00127 }
00128
00129
00130
00131
00132 else if ((largp->inout == GS_IN && rargp->inout == GS_OUT) ||
00133 (largp->inout == GS_IN && rargp->inout == GS_INOUT)) {
00134
00135 if (verify_object_type(largp) == 1 &&
00136 verify_object_type(rargp) == 1 &&
00137 compare_object(largp, rargp) == 0) {
00138
00139
00140 if (rptr->sched_level < lptr->sched_level + 1)
00141 rptr->sched_level = lptr->sched_level + 1;
00142
00143 dag->max_sched_level = rptr->sched_level;
00144
00145 insert_dep_GS_DAG(dag, lptr, rptr,
00146 OUTPUT_AFTER_INPUT_DEPENDENCY,
00147 largp, rargp);
00148 }
00149 }
00150
00151
00152
00153
00154 else if ((largp->inout == GS_OUT && rargp->inout == GS_OUT) ||
00155 (largp->inout == GS_INOUT && rargp->inout == GS_OUT)) {
00156
00157 if (verify_object_type(largp) == 1 &&
00158 verify_object_type(rargp) == 1 &&
00159 compare_object(largp, rargp) == 0) {
00160
00161
00162 if (rptr->sched_level < lptr->sched_level + 1)
00163 rptr->sched_level = lptr->sched_level + 1;
00164
00165 dag->max_sched_level = rptr->sched_level;
00166
00167 insert_dep_GS_DAG(dag, lptr, rptr,
00168 OUTPUT_AFTER_OUTPUT_DEPENDENCY,
00169 largp, rargp);
00170 }
00171 }
00172
00173 }
00174
00175 }
00176
00177 }
00178
00179 }
00180
00181 dag->analyzed = 1;
00182
00183 return 0;
00184 }
00185
00186
00194 int map_to_servers_GS_DAG(GS_DAG_t *dag, int scheduler) {
00195 int num_servers;
00196 int num_reqs_mapped;
00197 int sched_level;
00198 int num_reqs_per_level;
00199 int num_reqs_per_server_level;
00200 int *assignments;
00201 int serv_idx;
00202 int i, j;
00203 GS_DAG_Node_t *node;
00204
00205
00206 if (dag == NULL) return -1;
00207
00208
00209
00210 if (!dag->analyzed) {
00211 analyze_dep_GS_DAG(dag);
00212 }
00213
00214
00215
00216 num_servers = dag->head_node->handle->num_servers;
00217
00218 if (scheduler == ROUND_ROBIN) {
00219 num_reqs_mapped = 0;
00220 sched_level = 0;
00221 while (num_reqs_mapped < dag->num_nodes) {
00222 num_reqs_per_level = 0;
00223 for (node = dag->head_node; node != NULL; node = node->next) {
00224 if (node->sched_level == sched_level)
00225 num_reqs_per_level++;
00226 }
00227
00228 serv_idx = 0;
00229 for (node = dag->head_node; node != NULL; node = node->next) {
00230 if (node->sched_level == sched_level) {
00231 node->handle->srv_idx = serv_idx % num_servers;
00232 serv_idx++;
00233 }
00234 }
00235
00236 num_reqs_mapped += num_reqs_per_level;
00237 sched_level++;
00238 }
00239 }
00240 else if (scheduler == AVERAGE) {
00241 assignments = (int *) malloc(sizeof(int) * num_servers);
00242 num_reqs_mapped = 0;
00243 sched_level = 0;
00244 while (num_reqs_mapped < dag->num_nodes) {
00245 num_reqs_per_level = 0;
00246 for (node = dag->head_node; node != NULL; node = node->next) {
00247 if (node->sched_level == sched_level)
00248 num_reqs_per_level++;
00249 }
00250
00251 num_reqs_per_server_level = num_reqs_per_level / num_servers;
00252 for (i = 0; i < num_servers; i++) {
00253 assignments[i] = num_reqs_per_server_level;
00254 }
00255
00256 j = 0;
00257 for (i = num_reqs_per_server_level * num_servers;
00258 i < num_reqs_per_level; i++) {
00259 assignments[j++]++;
00260 }
00261
00262 i = 0;
00263 for (node = dag->head_node; node != NULL; node = node->next) {
00264 if (node->sched_level == sched_level) {
00265 if (assignments[i] > 0) {
00266 node->handle->srv_idx = i;
00267 } else {
00268 i++;
00269 node->handle->srv_idx = i;
00270 }
00271 assignments[i]--;
00272 }
00273 }
00274
00275 num_reqs_mapped += num_reqs_per_level;
00276 sched_level++;
00277 }
00278 }
00279 else if (scheduler == OPT_COMM) {
00280 }
00281 else if (scheduler == OPT_COMP) {
00282 }
00283 else if (scheduler == AGENT) {
00284
00285 }
00286
00287 return 0;
00288 }
00289
00290
00299 int post_analysis_GS_DAG(GS_DAG_t *dag, va_list arg_list, int n) {
00300 GS_DAG_Dep_t *dep;
00301 int i;
00302
00303
00304 if (dag == NULL) return -1;
00305
00306
00307
00308 for (dep = dag->head_dep; dep != NULL; dep = dep->next) {
00309 if (dep->dep_type == INPUT_AFTER_OUTPUT_DEPENDENCY)
00310 dep->largp->num_targets++;
00311 }
00312
00313
00314
00315
00316
00317
00318
00319 for (dep = dag->head_dep; dep != NULL; dep = dep->next) {
00320
00321 if (dep->dep_type == INPUT_AFTER_OUTPUT_DEPENDENCY &&
00322 dep->largp->num_targets > 0 &&
00323 dep->largp->target_server_list == NULL) {
00324
00325 dep->largp->target_server_list = (gs_server_t **)
00326 malloc(sizeof(gs_server_t *) * dep->largp->num_targets);
00327 if (dep->largp->target_server_list == NULL) {
00328 perror("malloc");
00329 exit(1);
00330 }
00331
00332
00333 for (i = 0; i < dep->largp->num_targets; i++) {
00334 dep->largp->target_server_list[i] = NULL;
00335 }
00336 }
00337 }
00338
00339
00340
00341 for (dep = dag->head_dep; dep != NULL; dep = dep->next) {
00342 if (dep->dep_type == INPUT_AFTER_OUTPUT_DEPENDENCY) {
00343
00344
00345 for (i = 0; i < dep->largp->num_targets; i++) {
00346 if (dep->largp->target_server_list[i] == NULL)
00347 break;
00348 }
00349
00350
00351 dep->largp->target_server_list[i] = dep->cnode->server;
00352
00353
00354
00355
00356
00357
00358
00359
00360
00361 dep->rargp->source = dep->pnode->server;
00362 }
00363 }
00364
00365
00366
00367
00368
00369 for (dep = dag->head_dep; dep != NULL; dep = dep->next) {
00370
00371
00372
00373 if (dep->rargp->source != dep->pnode->server) {
00374 for (i = 0; i < dep->largp->num_targets; i++) {
00375
00376 if (dep->largp->target_server_list[i] ==
00377 dep->rargp->source)
00378 dep->largp->target_server_list[i] = NULL;
00379 }
00380
00381 delete_dep_GS_DAG(dag, dep);
00382 }
00383 }
00384
00385
00386
00387
00388 for (dep = dag->head_dep; dep != NULL; dep = dep->next) {
00389
00390
00391 if (dep->dep_type == INPUT_AFTER_OUTPUT_DEPENDENCY) {
00392 if (!if_passed_back(arg_list, n, dep->largp)) {
00393
00394
00395 dep->largp->pass_back = 0;
00396
00397 dep->rargp->data_handle = 1;
00398 }
00399 }
00400 else if (dep->dep_type == OUTPUT_AFTER_OUTPUT_DEPENDENCY) {
00401
00402 dep->largp->pass_back = 0;
00403 }
00404 }
00405
00406 return 0;
00407 }
00408
00409
00410 int wait_level_finish_GS_DAG(grpc_sessionid_t *req_ids, int num_sched) {
00411 int i;
00412 grpc_error_t status;
00413
00414
00415 for (i = 0; i < num_sched; i++) {
00416
00417
00418 status = grpc_probe(req_ids[i]);
00419
00420
00421 if (status == GRPC_NOT_COMPLETED) {
00422 return 0;
00423 }
00424
00425 else if (status == GRPC_NO_ERROR) {
00426 continue;
00427 }
00428
00429 else {
00430 fprintf(stderr,
00431 "error probing job with request id (%d)\n", req_ids[i]);
00432 return -1;
00433 }
00434 }
00435
00436 return 1;
00437 }
00438
00439
00443 int execute_GS_DAG(GS_DAG_t *dag) {
00444 grpc_error_t status;
00445 grpc_sessionid_t *req_ids;
00446 GS_DAG_Node_t *node;
00447 int total_num_sched, num_sched, sched_level, finished;
00448 int i;
00449
00450
00451
00452
00453 if (!dag->analyzed) {
00454 analyze_dep_GS_DAG(dag);
00455 }
00456
00457 req_ids = (grpc_sessionid_t *)
00458 malloc(sizeof(grpc_sessionid_t) * dag->num_nodes);
00459 if (!req_ids) {
00460 perror("malloc");
00461 return -1;
00462 }
00463
00464 total_num_sched = 0;
00465 num_sched = 0;
00466 sched_level = 0;
00467
00468 while (total_num_sched < dag->num_nodes) {
00469 printf("\nCurrent scheduling level: %d\n", sched_level);
00470
00471 for (node = dag->head_node; node != NULL; node = node->next) {
00472
00473 if (node->sched_level == sched_level) {
00474
00475
00476 if (node->arg_list && !node->arg_stack) {
00477
00478 status = grpc_call_arg_list_async(
00479 node->handle, req_ids + num_sched, node->arg_list);
00480 if (status != GRPC_NO_ERROR) {
00481 fprintf(stderr,
00482 "error submitting job with request id (%d)\n",
00483 req_ids[num_sched]);
00484 return -1;
00485 } else {
00486 printf("Submitting job with request id (%d) onto server (%d)\n",
00487 req_ids[num_sched],
00488 node->handle->srv_idx);
00489
00490 }
00491 }
00492 else if (!node->arg_list && node->arg_stack) {
00493
00494 status = grpc_call_arg_stack_async(
00495 node->handle, req_ids + num_sched, node->arg_stack);
00496 if (status != GRPC_NO_ERROR) {
00497 fprintf(stderr,
00498 "error submitting job with request id (%d)\n",
00499 req_ids[num_sched]);
00500 return -1;
00501 } else {
00502 printf("Submitting job with request id (%d) onto server (%d)\n",
00503 req_ids[num_sched],
00504 node->handle->srv_idx);
00505
00506
00507 }
00508 }
00509
00510 num_sched++;
00511 }
00512 }
00513
00514
00515
00516 while ((finished = wait_level_finish_GS_DAG(req_ids, num_sched)) == 0);
00517
00518
00519 for (i = 0; i < num_sched; i++) {
00520 status = grpc_wait(req_ids[i]);
00521 if (status != GRPC_NO_ERROR) {
00522 fprintf(stderr,
00523 "error finishing job with request id (%d)\n",
00524 req_ids[i]);
00525 return -1;
00526 }
00527 }
00528
00529
00530 total_num_sched += num_sched;
00531 num_sched = 0;
00532
00533
00534 sched_level++;
00535 }
00536
00537 return 0;
00538 }