#include "gs_dag.h"#include "gs_sequence.h"
Go to the source code of this file.
Functions | |
| int | analyze_dep_GS_DAG (GS_DAG_t *dag) |
| int | map_to_servers_GS_DAG (GS_DAG_t *dag, int scheduler) |
| int | post_analysis_GS_DAG (GS_DAG_t *dag, va_list arg_list, int n) |
| int | wait_level_finish_GS_DAG (grpc_sessionid_t *req_ids, int num_sched) |
| int | execute_GS_DAG (GS_DAG_t *dag) |
| int analyze_dep_GS_DAG | ( | GS_DAG_t * | dag | ) |
Analyze data dependencies among DAG nodes
| dag | -- the DAG to be analyzed |
Definition at line 19 of file gs_dag_scheduler.c.
{
GS_DAG_Node_t *lptr, *rptr;
gs_argument_t *largp, *rargp;
/*void *ldataptr, *rdataptr; */
int mode;
int i, j;
if (dag == NULL) return -1;
if (dag->num_nodes == 1) {
for (largp = dag->head_node->handle->problem_desc->arglist;
largp != NULL; largp = largp->next) {
if (largp->inout != GS_IN) {
largp->pass_back = 1;
largp->data_handle = 0;
}
}
dag->analyzed = 1;
return 0;
}
/* analyze dependencies */
/* compare each node with the subsequent nodes in the list */
for (lptr = dag->head_node; lptr != NULL; lptr = lptr->next) {
for (rptr = lptr->next; rptr != NULL; rptr = rptr->next) {
/* compare each pair of arguments of the two nodes */
for (largp = lptr->handle->problem_desc->arglist, i = 0;
largp != NULL;
largp = largp->next, i++) {
for (rargp = rptr->handle->problem_desc->arglist, j = 0;
rargp != NULL;
rargp = rargp->next, j++) {
/* set default value to each field in each
argument that is used in sequencing */
largp->pass_back = 1;
rargp->pass_back = 1;
largp->data_handle = 0;
rargp->data_handle = 0;
largp->num_targets = 0;
rargp->num_targets = 0;
largp->target_server_list = NULL;
rargp->target_server_list = NULL;
largp->source = NULL;
rargp->source = NULL;
largp->index = i;
rargp->index = j;
/* --------- process dependency among scalar ---------
--------- arguments in conservative mode --------- */
mode = grpc_get_sequence_mode();
/* if it is in conservative mode */
if (mode == CONSERVATIVE_MODE) {
/* if both arguments are scalar and have the
same object and data types, and they are
not all input arguments, indentify this
case as a special dependency as
CONSERVATIVE_SCALAR_DEPENDENCY */
if ((largp->objecttype == GS_SCALAR &&
rargp->objecttype == GS_SCALAR) &&
(largp->datatype == rargp->datatype) &&
(largp->inout != GS_IN ||
rargp->inout != GS_IN)) {
/* adjust the scheduling level according
to the dependency relationship */
if (rptr->sched_level < lptr->sched_level + 1)
rptr->sched_level = lptr->sched_level + 1;
/* the currently maximum scheduling level */
dag->max_sched_level = rptr->sched_level;
/* insert the dependency into the DAG */
insert_dep_GS_DAG(dag, lptr, rptr,
CONSERVATIVE_SCALAR_DEPENDENCY,
largp, rargp);
}
}
/* ---- IN <-> IN is considered to be independent ---- */
/* ---------- INPUT-AFTER-OUTPUT dependency ---------- */
/* ------------------ INOUT -> IN -------------------- */
/* ------------------ OUT -> IN ---------------------- */
/* ---------------- INOUT -> INOUT ------------------- */
/* ----------------- OUT -> INOUT -------------------- */
if ((largp->inout == GS_INOUT || largp->inout == GS_OUT) &&
(rargp->inout == GS_IN || rargp->inout == GS_INOUT)) {
/* if they reference the same object */
if (verify_object_type(largp) == 1 &&
verify_object_type(rargp) == 1 &&
compare_object(largp, rargp) == 0) {
/* adjust the scheduling level according
to the dependency relationship */
if (rptr->sched_level < lptr->sched_level + 1)
rptr->sched_level = lptr->sched_level + 1;
/* the currently maximum scheduling level */
dag->max_sched_level = rptr->sched_level;
/* insert the dependency into the DAG */
insert_dep_GS_DAG(dag, lptr, rptr,
INPUT_AFTER_OUTPUT_DEPENDENCY,
largp, rargp);
}
}
/* ---------- OUTPUT-AFTER-INPUT dependency ---------- */
/* ------------------ IN -> OUT -----------------------*/
/* ------------------ IN -> INOUT ---------------------*/
else if ((largp->inout == GS_IN && rargp->inout == GS_OUT) ||
(largp->inout == GS_IN && rargp->inout == GS_INOUT)) {
/* if they reference the same object */
if (verify_object_type(largp) == 1 &&
verify_object_type(rargp) == 1 &&
compare_object(largp, rargp) == 0) {
/* adjust the scheduling level according
to the dependency relationship */
if (rptr->sched_level < lptr->sched_level + 1)
rptr->sched_level = lptr->sched_level + 1;
/* the currently maximum scheduling level */
dag->max_sched_level = rptr->sched_level;
/* insert the dependency into the DAG */
insert_dep_GS_DAG(dag, lptr, rptr,
OUTPUT_AFTER_INPUT_DEPENDENCY,
largp, rargp);
}
}
/* ---------- OUTPUT-AFTER-OUTPUT dependency --------- */
/* ------------------ INOUT -> OUT --------------------*/
/* ------------------- OUT -> OUT ---------------------*/
else if ((largp->inout == GS_OUT && rargp->inout == GS_OUT) ||
(largp->inout == GS_INOUT && rargp->inout == GS_OUT)) {
/* if they reference the same object */
if (verify_object_type(largp) == 1 &&
verify_object_type(rargp) == 1 &&
compare_object(largp, rargp) == 0) {
/* adjust the scheduling level according
to the dependency relationship */
if (rptr->sched_level < lptr->sched_level + 1)
rptr->sched_level = lptr->sched_level + 1;
/* the currently maximum scheduling level */
dag->max_sched_level = rptr->sched_level;
/* insert the dependency into the DAG */
insert_dep_GS_DAG(dag, lptr, rptr,
OUTPUT_AFTER_OUTPUT_DEPENDENCY,
largp, rargp);
}
} /* if & else-if */
} /* inner-most for */
} /* third-level for */
} /* second-level for */
} /* outer-most for */
dag->analyzed = 1;
return 0;
}


| int execute_GS_DAG | ( | GS_DAG_t * | dag | ) |
Definition at line 443 of file gs_dag_scheduler.c.
{
grpc_error_t status;
grpc_sessionid_t *req_ids;
GS_DAG_Node_t *node;
int total_num_sched, num_sched, sched_level, finished;
int i;
/* if the DAG has not yet been analyzed,
analyze it first before executing */
if (!dag->analyzed) {
analyze_dep_GS_DAG(dag);
}
req_ids = (grpc_sessionid_t *)
malloc(sizeof(grpc_sessionid_t) * dag->num_nodes);
if (!req_ids) {
perror("malloc");
return -1;
}
total_num_sched = 0;
num_sched = 0;
sched_level = 0;
/* if there is node that is not executed */
while (total_num_sched < dag->num_nodes) {
printf("\nCurrent scheduling level: %d\n", sched_level);
/* execute the DAG in a level-by-level fasion */
for (node = dag->head_node; node != NULL; node = node->next) {
/* if the node has the same scheduling level as the current */
if (node->sched_level == sched_level) {
/*node->handle->bind_servers_at_call_time = 1; */
/* calling */
if (node->arg_list && !node->arg_stack) {
/* async variable argument list call */
status = grpc_call_arg_list_async(
node->handle, req_ids + num_sched, node->arg_list);
if (status != GRPC_NO_ERROR) {
fprintf(stderr,
"error submitting job with request id (%d)\n",
req_ids[num_sched]);
return -1;
} else {
printf("Submitting job with request id (%d) onto server (%d)\n",
req_ids[num_sched],
node->handle->srv_idx);
/*node->handle->server_list[node->handle->srv_idx]->hostname); */
}
}
else if (!node->arg_list && node->arg_stack) {
/* async argument stack call */
status = grpc_call_arg_stack_async(
node->handle, req_ids + num_sched, node->arg_stack);
if (status != GRPC_NO_ERROR) {
fprintf(stderr,
"error submitting job with request id (%d)\n",
req_ids[num_sched]);
return -1;
} else {
printf("Submitting job with request id (%d) onto server (%d)\n",
req_ids[num_sched],
node->handle->srv_idx);
/*node->handle->server_list[node->handle->srv_idx]->hostname); */
}
}
num_sched++;
}
}
/* finish submitting jobs at the current scheduling level */
/* wait for the current scheduling level finishes */
while ((finished = wait_level_finish_GS_DAG(req_ids, num_sched)) == 0);
/* get back the results of the current scheduling level */
for (i = 0; i < num_sched; i++) {
status = grpc_wait(req_ids[i]);
if (status != GRPC_NO_ERROR) {
fprintf(stderr,
"error finishing job with request id (%d)\n",
req_ids[i]);
return -1;
}
}
/* the total number of nodes scheduled at this point */
total_num_sched += num_sched;
num_sched = 0;
/* proceed to the next scheduling level */
sched_level++;
}
return 0;
}


| int map_to_servers_GS_DAG | ( | GS_DAG_t * | dag, | |
| int | scheduler | |||
| ) |
Map a DAG to a set of servers
| dag | -- the DAG to be mapped |
Definition at line 194 of file gs_dag_scheduler.c.
{
int num_servers;
int num_reqs_mapped;
int sched_level;
int num_reqs_per_level;
int num_reqs_per_server_level;
int *assignments;
int serv_idx;
int i, j;
GS_DAG_Node_t *node;
if (dag == NULL) return -1;
/* if the DAG has not yet been analyzed,
analyze it first before mapping */
if (!dag->analyzed) {
analyze_dep_GS_DAG(dag);
}
/* number of servers available; we assume
that all servers have identical services */
num_servers = dag->head_node->handle->num_servers;
if (scheduler == ROUND_ROBIN) {
num_reqs_mapped = 0;
sched_level = 0;
while (num_reqs_mapped < dag->num_nodes) {
num_reqs_per_level = 0;
for (node = dag->head_node; node != NULL; node = node->next) {
if (node->sched_level == sched_level)
num_reqs_per_level++;
}
serv_idx = 0;
for (node = dag->head_node; node != NULL; node = node->next) {
if (node->sched_level == sched_level) {
node->handle->srv_idx = serv_idx % num_servers;
serv_idx++;
}
}
num_reqs_mapped += num_reqs_per_level;
sched_level++;
}
}
else if (scheduler == AVERAGE) {
assignments = (int *) malloc(sizeof(int) * num_servers);
num_reqs_mapped = 0;
sched_level = 0;
while (num_reqs_mapped < dag->num_nodes) {
num_reqs_per_level = 0;
for (node = dag->head_node; node != NULL; node = node->next) {
if (node->sched_level == sched_level)
num_reqs_per_level++;
}
num_reqs_per_server_level = num_reqs_per_level / num_servers;
for (i = 0; i < num_servers; i++) {
assignments[i] = num_reqs_per_server_level;
}
j = 0;
for (i = num_reqs_per_server_level * num_servers;
i < num_reqs_per_level; i++) {
assignments[j++]++;
}
i = 0;
for (node = dag->head_node; node != NULL; node = node->next) {
if (node->sched_level == sched_level) {
if (assignments[i] > 0) {
node->handle->srv_idx = i;
} else {
i++;
node->handle->srv_idx = i;
}
assignments[i]--;
}
}
num_reqs_mapped += num_reqs_per_level;
sched_level++;
}
}
else if (scheduler == OPT_COMM) {
}
else if (scheduler == OPT_COMP) {
}
else if (scheduler == AGENT) {
/* use the default mapping obtained from the agent */
}
return 0;
}


| int post_analysis_GS_DAG | ( | GS_DAG_t * | dag, | |
| va_list | arg_list, | |||
| int | n | |||
| ) |
Do some post analysis stuffs to optimize the DAG after server mapping
| dag | -- the DAG to be marked |
Definition at line 299 of file gs_dag_scheduler.c.
{
GS_DAG_Dep_t *dep;
int i;
if (dag == NULL) return -1;
/* count the number of depending nodes of each
argument in each RAW dependency in the DAG */
for (dep = dag->head_dep; dep != NULL; dep = dep->next) {
if (dep->dep_type == INPUT_AFTER_OUTPUT_DEPENDENCY)
dep->largp->num_targets++;
}
/* process all the intermediate arguments in terms of
whether they should be pass back to the client and
which server they should be pushed to */
/* create a target server list for each argument
that is depended by some other nodes */
for (dep = dag->head_dep; dep != NULL; dep = dep->next) {
if (dep->dep_type == INPUT_AFTER_OUTPUT_DEPENDENCY &&
dep->largp->num_targets > 0 &&
dep->largp->target_server_list == NULL) {
dep->largp->target_server_list = (gs_server_t **)
malloc(sizeof(gs_server_t *) * dep->largp->num_targets);
if (dep->largp->target_server_list == NULL) {
perror("malloc");
exit(1);
}
/* initialize to NULL */
for (i = 0; i < dep->largp->num_targets; i++) {
dep->largp->target_server_list[i] = NULL;
}
}
}
/* find out all the target nodes that are depending
on each depended argument in each dependency */
for (dep = dag->head_dep; dep != NULL; dep = dep->next) {
if (dep->dep_type == INPUT_AFTER_OUTPUT_DEPENDENCY) {
/* find the first unused position in target
server list in order to insert a new target */
for (i = 0; i < dep->largp->num_targets; i++) {
if (dep->largp->target_server_list[i] == NULL)
break;
}
/* insert the new target */
dep->largp->target_server_list[i] = dep->cnode->server;
/* mark that the input argument of the depending node
will come from the output argument of the dependend
node; note that only the last depended node will be
the source. This ensures that if one object is involved
in multiple RAW dependencies, ony the last depended node
will be the source. In addition, the way we analyze
dependency will ensure the correct order */
/*dep->largp->source = dep->pnode->server;*/
dep->rargp->source = dep->pnode->server;
}
}
/* the above steps setup the target server list and source
of each argument involved in each dependency; however,
there may be redundant and unnessary target markup, so
this step remove all the redundancies */
for (dep = dag->head_dep; dep != NULL; dep = dep->next) {
/* the child node is actually not expecting
to receive the intermediate result from
the parent node */
if (dep->rargp->source != dep->pnode->server) {
for (i = 0; i < dep->largp->num_targets; i++) {
/* remove the redundant target */
if (dep->largp->target_server_list[i] ==
dep->rargp->source)
dep->largp->target_server_list[i] = NULL;
}
delete_dep_GS_DAG(dag, dep);
}
}
/* mark the 'pass_back' field of the depended
argument of the parent node with 0, means not
to pass back this intermediate argument */
for (dep = dag->head_dep; dep != NULL; dep = dep->next) {
/* under these two kinds of dependencies,
arguments need not to be passed back */
if (dep->dep_type == INPUT_AFTER_OUTPUT_DEPENDENCY) {
if (!if_passed_back(arg_list, n, dep->largp)) {
/* the depended argument should not be passed back */
/* dep->largp->pass_back = 0; */
dep->largp->pass_back = 0;
/* the dependent argument expects to receive data handle */
dep->rargp->data_handle = 1;
}
}
else if (dep->dep_type == OUTPUT_AFTER_OUTPUT_DEPENDENCY) {
/* the depended argument should not be passed back */
dep->largp->pass_back = 0;
}
}
return 0;
}


| int wait_level_finish_GS_DAG | ( | grpc_sessionid_t * | req_ids, | |
| int | num_sched | |||
| ) |
Definition at line 410 of file gs_dag_scheduler.c.
{
int i;
grpc_error_t status;
for (i = 0; i < num_sched; i++) {
/* probe to see if all the jobs in the
current scheduling level are completed */
status = grpc_probe(req_ids[i]);
/* mot completed */
if (status == GRPC_NOT_COMPLETED) {
return 0;
}
/* completed */
else if (status == GRPC_NO_ERROR) {
continue;
}
/* error condition */
else {
fprintf(stderr,
"error probing job with request id (%d)\n", req_ids[i]);
return -1;
}
}
return 1;
}


1.6.3-20100507