65 #define inline __inline
68 #if defined( _WIN32 ) || defined( _WIN64 )
69 # define fopen(ppfile, name, mode) fopen_s(ppfile, name, mode)
70 # define strdup _strdup
73 # define fopen(ppfile, name, mode) *ppfile = fopen(name, mode)
99 # define ULLONG_MAX 18446744073709551615ULL
129 #define tasklevel_width_max_level 5000
233 TAILQ_HEAD( completed_tasks_head_s, completed_tasks_node_s );
241 RB_ENTRY( task_priority_tree_node_s ) n_entry;
243 RB_HEAD( task_priority_tree_head_s, task_priority_tree_node_s );
247 int diff = n2->priority - n1->priority;
251 RB_GENERATE( task_priority_tree_head_s, task_priority_tree_node_s, n_entry, compare_task_priority_tree_nodes );
259 static Task *quark_task_new();
260 static void *quark_task_delete(
Quark *quark,
Task *task);
261 static Worker *quark_worker_new(
Quark *quark,
int rank);
262 static void quark_worker_delete(
Worker *worker);
263 static inline int quark_worker_find_next_assignable(
Quark *quark );
264 static void quark_insert_task_dependencies(
Quark * quark,
Task * task);
265 static void quark_check_and_queue_ready_task(
Quark *quark,
Task *task,
int worker_rank );
266 static void quark_work_set_affinity_and_call_main_loop(
Worker *worker);
267 static long long quark_work_main_loop(
Worker *worker);
268 static Scratch *quark_scratch_new(
void *arg_ptr,
int arg_size,
icl_list_t *task_args_list_node_ptr);
269 static void quark_scratch_allocate(
Task *task );
270 static void quark_scratch_deallocate(
Task *task );
271 static void quark_worker_remove_completed_task_enqueue_for_later_processing(
Quark *quark,
Task *task,
int worker_rank);
272 static void quark_remove_completed_task_and_check_for_ready(
Quark *quark,
Task *task,
int worker_rank);
273 static void quark_process_completed_tasks(
Quark *quark);
274 static void quark_address_set_node_free(
void* data );
275 static inline void quark_fatal_error(
const char *func_name,
char* msg_text);
291 inline static int pthread_mutex_lock_address_set(
pthread_mutex_t *mtx) {
int rv;
if ((rv=
pthread_mutex_lock( mtx ))!=0) { quark_fatal_error(
"pthread_mutex_lock_address_set", strerror(rv)); }
return rv; }
293 inline static int pthread_mutex_unlock_address_set(
pthread_mutex_t *mtx) {
int rv;
if ((rv=
pthread_mutex_unlock( mtx ))!=0) { quark_fatal_error(
"pthread_mutex_unlock_address_set", strerror(rv)); }
return rv; }
295 inline static int pthread_mutex_lock_ready_list(
pthread_mutex_t *mtx) {
int rv;
if ((rv=
pthread_mutex_lock( mtx ))!=0) { quark_fatal_error(
"pthread_mutex_lock_ready_list", strerror(rv)); }
return rv; }
297 inline static int pthread_mutex_unlock_ready_list(
pthread_mutex_t *mtx) {
int rv;
if ((rv=
pthread_mutex_unlock( mtx ))!=0) { quark_fatal_error(
"pthread_mutex_unlock_ready_list", strerror(rv)); }
return rv; }
299 inline static int pthread_mutex_lock_task(
pthread_mutex_t *mtx) {
int rv;
if ((rv=
pthread_mutex_lock( mtx ))!=0) { quark_fatal_error(
"pthread_mutex_lock_task", strerror(rv)); }
return rv; }
300 inline static int pthread_mutex_unlock_task(
pthread_mutex_t *mtx) {
int rv;
if ((rv=
pthread_mutex_unlock( mtx ))!=0) { quark_fatal_error(
"pthread_mutex_unlock_task", strerror(rv)); }
return rv; }
302 inline static int pthread_mutex_lock_atomic_add(
pthread_mutex_t *mtx) {
int rv;
if ((rv=
pthread_mutex_lock( mtx ))!=0) { quark_fatal_error(
"pthread_mutex_lock_atomic_add", strerror(rv)); }
return rv; }
303 inline static int pthread_mutex_lock_atomic_set(
pthread_mutex_t *mtx) {
int rv;
if ((rv=
pthread_mutex_lock( mtx ))!=0) { quark_fatal_error(
"pthread_mutex_lock_atomic_set", strerror(rv)); }
return rv; }
304 inline static int pthread_mutex_lock_atomic_get(
pthread_mutex_t *mtx) {
int rv;
if ((rv=
pthread_mutex_lock( mtx ))!=0) { quark_fatal_error(
"pthread_mutex_lock_atomic_get", strerror(rv)); }
return rv; }
305 inline static int pthread_mutex_unlock_atomic(
pthread_mutex_t *mtx) {
int rv;
if ((rv=
pthread_mutex_unlock( mtx ))!=0) { quark_fatal_error(
"pthread_mutex_unlock_atomic", strerror(rv)); }
return rv; }
307 inline static int pthread_mutex_lock_wrap(
pthread_mutex_t *mtx) {
int rv;
if ((rv=
pthread_mutex_lock( mtx ))!=0) { quark_fatal_error(
"pthread_mutex_lock_wrap", strerror(rv)); }
return rv; }
308 inline static int pthread_mutex_unlock_wrap(
pthread_mutex_t *mtx) {
int rv;
if ((rv=
pthread_mutex_unlock( mtx ))!=0) { quark_fatal_error(
"pthread_mutex_unlock_wrap", strerror(rv)); }
return rv; }
310 inline static int pthread_mutex_lock_completed_tasks(
pthread_mutex_t *mtx) {
int rv;
if ((rv=
pthread_mutex_lock( mtx ))!=0) { quark_fatal_error(
"pthread_mutex_lock_completed_tasks", strerror(rv)); }
return rv; }
312 inline static int pthread_mutex_unlock_completed_tasks(
pthread_mutex_t *mtx) {
int rv;
if ((rv=
pthread_mutex_unlock( mtx ))!=0) { quark_fatal_error(
"pthread_mutex_unlock_completed_tasks", strerror(rv)); }
return rv; }
313 inline static int pthread_cond_wait_ready_list(
pthread_cond_t *cond,
pthread_mutex_t *mtx ) {
int rv;
if ((rv=
pthread_cond_wait( cond, mtx))!=0) { quark_fatal_error(
"pthread_cond_wait_ready_list", strerror(rv)); }
return rv; }
322 static char *quark_task_default_label =
" ";
323 static char *quark_task_default_color =
"white";
324 #define DEPCOLOR "black"
325 #define DEPCOLOR_R_FIRST "black"
326 #define DEPCOLOR_W_FIRST "black"
327 #define DEPCOLOR_RAR "black"
328 #define DEPCOLOR_WAW "black"
329 #define DEPCOLOR_RAW "black"
330 #define DEPCOLOR_WAR "red"
331 #define DEPCOLOR_GATHERV "green"
332 #define DOT_DAG_FILENAME "dot_dag_file.dot"
334 #define dot_dag_print_edge( quark, parentid, parent_level, childid, child_level, color) \
335 if ( quark->dot_dag_enable ) { \
336 pthread_mutex_lock_wrap( &quark->dot_dag_mutex ); \
337 if ( parentid>0 ) fprintf(dot_dag_file, "t%lld->t%lld [color=\"%s\"];\n", parentid, childid, color); \
338 fflush(dot_dag_file); \
339 child_level = (parent_level+1 <= child_level ? child_level : parent_level+1 ); \
340 pthread_mutex_unlock_wrap( &quark->dot_dag_mutex ); \
348 #define quark_atomic_add( pval, addvalue, pmutex ) { \
349 pthread_mutex_lock_atomic_add(pmutex); pval += addvalue; pthread_mutex_unlock_atomic(pmutex); \
359 #define quark_atomic_set( pval, setvalue, pmutex ) { \
360 pthread_mutex_lock_atomic_set(pmutex); pval = setvalue; pthread_mutex_unlock_atomic(pmutex); \
373 #define quark_atomic_get( retval, pval, pmutex ) { \
388 static void quark_fatal_error(
const char *func_name,
char* msg_text)
390 fprintf(stderr,
"QUARK_FATAL_ERROR: %s(): %s\n", func_name, msg_text);
406 fprintf(stderr,
"QUARK_WARNING: %s(): %s\n", func_name, msg_text);
413 static inline void *quark_malloc(
size_t size)
415 void *mem = malloc(size);
416 if ( mem == NULL ) quark_fatal_error(
"malloc",
"memory allocation failed" );
424 static Task *quark_task_new()
426 static unsigned long long taskid = 1;
431 if ( task->
args_list == NULL) quark_fatal_error(
"quark_task_new",
"Allocating arg list" );
433 if ( task->
dependency_list == NULL) quark_fatal_error(
"quark_task_new",
"Allocating dependency list" );
436 if ( task->
scratch_list == NULL) quark_fatal_error(
"quark_task_new",
"Allocating scratch list" );
437 if ( taskid >=
ULLONG_MAX) quark_fatal_error(
"quark_task_new",
"Task id > ULLONG_MAX, too many tasks" );
463 static void *quark_task_delete(
Quark *quark,
Task *task)
486 pthread_mutex_unlock_task( &task->
task_mutex );
550 pthread_mutex_lock_wrap( &curr_task->
task_mutex );
553 pthread_mutex_unlock_wrap( &curr_task->
task_mutex );
578 if ( node == NULL ) {
580 if (node!=NULL) arg = node->
data;
583 if (node!=NULL) arg = node->
data;
593 static inline unsigned int fnv_hash_function(
void *key,
int len )
595 unsigned char *p = key;
596 unsigned int h = 2166136261u;
598 for ( i = 0; i < len; i++ )
599 h = ( h * 16777619 ) ^ p[i];
609 static inline unsigned int address_hash_function(
void *address)
611 int len =
sizeof(
void *);
612 unsigned int hashval = fnv_hash_function( &address, len );
619 static inline int address_key_compare(
void *addr1,
void *addr2)
621 return (addr1 == addr2);
628 static inline unsigned int ullong_hash_function(
void *key )
630 int len =
sizeof(
unsigned long long);
631 unsigned int hashval = fnv_hash_function( key, len );
638 static inline int ullong_key_compare(
void *key1,
void *key2 )
640 return ( *(
unsigned long long*)key1 == *(
unsigned long long*)key2 );
649 static inline int quark_worker_find_next_assignable(
Quark *quark )
660 static inline char *arg_dup(
char *arg,
int size)
662 char *argbuf = (
char *) quark_malloc(size);
663 memcpy(argbuf, arg, size);
700 static Worker *quark_worker_new(
Quark *quark,
int rank)
724 static void quark_worker_delete(
Worker * worker)
728 for ( node =
RB_MIN( task_priority_tree_head_s, worker->
ready_list ); node != NULL; node = nxt) {
743 static Scratch *quark_scratch_new(
void *arg_ptr,
int arg_size,
icl_list_t *task_args_list_node_ptr )
746 scratch->
ptr = arg_ptr;
747 scratch->
size = arg_size;
756 static void quark_scratch_allocate(
Task *task )
760 scr_node != NULL && scr_node->
data != NULL;
763 if ( scratch->
ptr == NULL ) {
765 if ( scratch->
size <= 0 ) quark_fatal_error(
"quark_scratch_allocate",
"scratch->size <= 0 " );
766 void *scratchspace = quark_malloc( scratch->
size );
776 static void quark_scratch_deallocate(
Task *task )
780 scr_node != NULL && scr_node->
data!=NULL;
783 if ( scratch->
ptr == NULL ) {
808 int quark_unroll_tasks_per_thread =
quark_getenv_int(
"QUARK_UNROLL_TASKS_PER_THREAD", 50);
809 int quark_unroll_tasks =
quark_getenv_int(
"QUARK_UNROLL_TASKS", quark_unroll_tasks_per_thread * num_threads);
843 quark->
worker[0] = quark_worker_new(quark, 0);
848 for(i = 1; i < num_threads; i++)
849 quark->
worker[i] = quark_worker_new(quark, i);
875 if ( num_threads < 1 ) {
877 if ( nthrd == -1 ) nthrd = 1;
890 for(i = 1; i < nthrd; i++) {
892 if ( rc != 0 ) quark_fatal_error (
" QUARK_New",
"Could not create threads properly" );
910 long long num_tasks = 1;
915 quark_process_completed_tasks(quark);
916 num_tasks = quark_work_main_loop( quark->
worker[0] );
917 #ifdef QUARK_WITH_VALGRIND
921 }
while ( num_tasks > 0 );
925 unsigned long long tasklevel = 0;
929 tasklevel = tasklevel -1;
936 fprintf(dot_dag_file,
"// QUARK_Barrier reached: level=%llu \n", tasklevel );
961 worker = quark->
worker[i];
989 quark_worker_delete( quark->
worker[i] );
990 quark_worker_delete( quark->
worker[0] );
1011 void *exitcodep = NULL;
1062 Task *task = quark_task_new();
1093 bool arg_locality, accumulator, gatherv;
1099 switch ( arg_direction ) {
1103 if ( value_mask==0 ) {
1106 task->
priority = *((
int *)arg_ptr);
1115 }
else if ( (arg_flags &
TASK_COLOR) != 0 ) {
1117 task->
task_color = arg_dup(arg_ptr, arg_size);
1119 }
else if ( (arg_flags &
TASK_LABEL) != 0 ) {
1121 task->
task_label = arg_dup(arg_ptr, arg_size) ;
1130 scratcharg = quark_scratch_new( arg_ptr, arg_size, task_args_list_node_ptr);
1142 data_region = (arg_flags & QUARK_REGION_BITMASK);
1146 Dependency *dep = dependency_new(arg_ptr, arg_size, arg_direction, arg_locality, task, accumulator, gatherv, data_region, task_args_list_node_ptr);
1149 icl_list_t *task_dependency_list_node_ptr = NULL;
1183 long long num_tasks = -1;
1184 unsigned long long taskid = task->
taskid;
1195 quark_task_delete( quark, task );
1218 quark_insert_task_dependencies( quark, task );
1220 pthread_mutex_lock_task( &task->
task_mutex );
1221 quark_check_and_queue_ready_task( quark, task, -1 );
1222 pthread_mutex_unlock_task( &task->
task_mutex );
1227 quark_process_completed_tasks(quark);
1229 num_tasks = quark_work_main_loop(quark->
worker[0]);
1230 quark_process_completed_tasks(quark);
1268 unsigned long long taskid;
1273 va_start(varg_list, task_flags);
1274 while( (arg_size = va_arg(varg_list,
int)) != 0) {
1275 void *arg_ptr = va_arg(varg_list,
void *);
1276 int arg_flags = va_arg(varg_list,
int);
1320 va_start(varg_list, task_flags);
1322 while( (arg_size = va_arg(varg_list,
int)) != 0) {
1323 void *arg_ptr = va_arg(varg_list,
void *);
1324 int arg_flags = va_arg(varg_list,
int);
1338 quark_scratch_allocate( task );
1340 quark_scratch_deallocate( task );
1377 if ( task == NULL ) {
1381 pthread_mutex_lock_task( &task->
task_mutex );
1383 pthread_mutex_unlock_task( &task->
task_mutex );
1388 pthread_mutex_unlock_task( &task->
task_mutex );
1398 static Address_Set_Node *quark_address_set_node_new(
void* address,
int size )
1401 address_set_node->
address = address;
1402 address_set_node->
size = size;
1406 quark_fatal_error(
"quark_address_set_node_new",
"Problem creating icl_list_new" );
1413 return address_set_node;
1420 static void quark_address_set_node_free(
void* data )
1425 free (address_set_node );
1439 static void quark_check_and_queue_ready_task(
Quark *quark,
Task *task,
int worker_rank )
1441 int worker_thread_id = -1;
1443 int assigned_thread_count = 0;
1444 int first_worker_thread_id_repeated = -1;
1460 if ( worker_thread_id<0 && task->locality_preserving_dep != NULL ) {
1461 int test_thread_id = -1;
1464 if ( address_set_node != NULL )
1469 if (( test_thread_id >= 0 )
1474 worker_thread_id = test_thread_id;
1477 if ( worker_thread_id < 0 ) {
1479 int test_thread_id = quark_worker_find_next_assignable( quark );
1480 if (( test_thread_id >= 0 )
1485 worker_thread_id = test_thread_id;
1492 if ( worker_thread_id < 0 )
1493 quark_fatal_error(
"quark_check_and_queue_ready_task",
"Task could not be assigned to any thread" );
1497 quark_fatal_error(
"quark_check_and_queue_ready_task",
"Task requests more threads than available" );
1500 && ( worker_thread_id == 0 ))
1503 first_worker_thread_id_repeated = worker_thread_id;
1504 while ( assigned_thread_count < task->task_thread_count) {
1505 worker = quark->
worker[worker_thread_id];
1509 new_task_tree_node->
task = task;
1511 if ( pthread_mutex_lock_ready_list( &worker->
worker_mutex )==0 ) {
1514 pthread_mutex_unlock_ready_list(&worker->
worker_mutex );
1517 assigned_thread_count++;
1527 for ( wtid=worker_thread_id; ; ) {
1533 if ( wtid==worker_thread_id )
break;
1537 if ( assigned_thread_count < task->task_thread_count ) {
1539 worker_thread_id = (worker_thread_id+1) % quark->
num_threads;
1540 while ( worker_thread_id != first_worker_thread_id_repeated &&
1542 worker_thread_id = (worker_thread_id+1) % quark->
num_threads;
1546 if ( worker_thread_id == first_worker_thread_id_repeated )
1547 quark_fatal_error(
"quark_check_and_queue_ready_task",
"Not enough workers for task" );
1571 int count_initial_input_deps = 0;
1572 bool output_dep_reached =
FALSE;
1573 int quark_num_queued_tasks;
1575 double avg_queued_tasks_per_thread = (double)quark_num_queued_tasks/(
double)quark->
num_threads;
1581 if ( avg_queued_tasks_per_thread < 0.4 ) min_input_deps = 1;
1582 else if ( avg_queued_tasks_per_thread < 0.75 ) min_input_deps = 6;
1583 else if ( avg_queued_tasks_per_thread < 0.90 ) min_input_deps = 7;
1584 else if ( avg_queued_tasks_per_thread < 1.20 ) min_input_deps = 10;
1585 else if ( avg_queued_tasks_per_thread > 1.80 ) min_input_deps = 2000;
1587 else min_input_deps = (int)(7 + 27 * avg_queued_tasks_per_thread);
1590 min_input_deps =
quark_getenv_int(
"QUARK_AVOID_WAR_WHEN_NUM_WAITING_READS", min_input_deps );
1600 count_initial_input_deps++;
1602 output_dep_reached =
TRUE;
1608 if ( count_initial_input_deps>=min_input_deps && output_dep_reached ) {
1612 void *datacopy = quark_malloc( asn_old->
size );
1615 memcpy( datacopy, asn_old->
address, asn_old->
size );
1617 asn_new = quark_address_set_node_new( datacopy, asn_old->
size );
1623 dep_node_asn_old!=NULL; ) {
1624 icl_list_t *dep_node_asn_old_to_be_deleted = NULL;
1628 dep_node_asn_old_to_be_deleted = dep_node_asn_old;
1638 pthread_mutex_lock_task( &task->
task_mutex );
1640 quark_check_and_queue_ready_task( quark, task, -1 );
1641 pthread_mutex_unlock_task( &task->
task_mutex );
1648 if (dep_node_asn_old_to_be_deleted!=NULL) {
1665 static void quark_address_set_node_initial_gatherv_check_and_launch(
Quark *quark,
Address_Set_Node *address_set_node,
Dependency *completed_dep,
int worker_rank)
1668 Task *completed_task = completed_dep->
task;
1670 next_dep_node!=NULL && next_dep_node->
data != NULL;
1685 pthread_mutex_lock_task( &next_task->
task_mutex );
1688 quark_check_and_queue_ready_task( quark, next_task, worker_rank );
1689 pthread_mutex_unlock_task( &next_task->
task_mutex );
1704 static void quark_address_set_node_accumulator_find_prepend(
Quark *quark,
Address_Set_Node *address_set_node)
1711 int acc_dep_count = 0;
1724 if (first_ready_dep_node==NULL) first_ready_dep_node = dep_node;
1725 last_ready_dep_node = dep_node;
1740 if (acc_dep_count % 2 == 0 ) {
1741 if ( last_ready_dep_node!=NULL ) swap_node = last_ready_dep_node;
1743 if ( first_ready_dep_node != NULL ) swap_node = first_ready_dep_node;
1745 if ( swap_node != NULL ) {
1773 free( address_set_node->
address );
1775 pthread_mutex_unlock_wrap( &address_set_node->
asn_mutex );
1777 free( address_set_node );
1780 pthread_mutex_unlock_wrap( &address_set_node->
asn_mutex );
1791 static void quark_insert_task_dependencies(
Quark * quark,
Task * task)
1804 if ( address_set_node == NULL ) {
1805 address_set_node = quark_address_set_node_new( dep->
address, dep->
size );
1813 if ( pthread_mutex_lock_wrap( &address_set_node->
asn_mutex ) == 0 ) {
1822 if ( prev_dep_node != NULL ) {
1825 if ( prev_task->taskid == task->
taskid ) {
1826 pthread_mutex_lock_task( &task->
task_mutex );
1841 pthread_mutex_unlock_task( &task->
task_mutex );
1855 if ( prev_dep_node != NULL ) {
1863 if ( prev_dep_node != NULL ) {
1880 pthread_mutex_unlock_wrap( &address_set_node->
asn_mutex );
1901 quark_work_main_loop( quark->
worker[thread_rank] );
1914 static void quark_work_set_affinity_and_call_main_loop(
Worker *worker)
1919 quark_work_main_loop( quark->
worker[thread_rank] );
1925 static Task *quark_work_main_loop_check_for_task(
Quark *quark,
Worker *worker,
int worker_rank )
1930 int ready_list_victim = worker_rank;
1931 int worker_finalize =
FALSE;
1932 int completed_tasks_size;
1933 int quark_num_queued_tasks = 0;
1938 while ( task==NULL && !worker->
finalize ) {
1942 if ( worker_rank==0 ) quark_process_completed_tasks(quark);
1945 else if ( completed_tasks_size>1 ) quark_process_completed_tasks(quark);
1948 worker_victim = quark->
worker[ready_list_victim];
1951 if ( worker_rank==ready_list_victim ) {
1952 if ( pthread_mutex_lock_ready_list( &worker_victim->
worker_mutex ) == 0 ) {
1953 task_priority_tree_node =
RB_MIN( task_priority_tree_head_s, worker_victim->
ready_list );
1954 if ( task_priority_tree_node != NULL ) {
1955 task = task_priority_tree_node->
task;
1956 RB_REMOVE( task_priority_tree_head_s, worker_victim->
ready_list, task_priority_tree_node );
1957 free( task_priority_tree_node );
1960 pthread_mutex_unlock_ready_list( &worker_victim->
worker_mutex );
1962 }
else if ( worker_rank!=ready_list_victim ) {
1963 if ( pthread_mutex_trylock_ready_list( &worker_victim->
worker_mutex ) == 0) {
1966 task_priority_tree_node =
RB_MAX( task_priority_tree_head_s, worker_victim->
ready_list );
1967 if ( task_priority_tree_node != NULL ) {
1968 Task *task_to_steal = task_priority_tree_node->
task;
1977 task = task_to_steal;
1979 RB_REMOVE( task_priority_tree_head_s, worker_victim->
ready_list, task_priority_tree_node );
1980 free( task_priority_tree_node );
1987 pthread_mutex_unlock_ready_list( &worker_victim->
worker_mutex );
1991 if ( task == NULL ) {
1995 ready_list_victim = (ready_list_victim + 1) % quark->
num_threads;
2000 if ( worker_rank==0 && ready_list_victim==0 )
return NULL;
2002 if ( worker_rank==0 && quark_num_queued_tasks==0 )
return NULL;
2004 if ( quark_num_queued_tasks==0 && worker_rank!=0 ) {
2007 worker_finalize = worker->
finalize;
2009 while ( quark_num_queued_tasks==0 && !worker_finalize ) {
2013 worker_finalize = worker->
finalize;
2033 static long long quark_work_main_loop(
Worker *worker)
2037 long long num_tasks = -1;
2041 do {}
while ( !quark->
start );
2055 task = quark_work_main_loop_check_for_task( quark, worker, worker_rank );
2060 int sequence_status = 0;
2066 pthread_mutex_lock_task( &task->
task_mutex );
2070 pthread_mutex_unlock_task( &task->
task_mutex );
2074 quark_scratch_allocate( task );
2075 pthread_mutex_unlock_task( &task->
task_mutex );
2079 struct timeval tstart; gettimeofday( &tstart, NULL );
2084 struct timeval tend; gettimeofday( &tend, NULL );
2085 struct timeval tresult; timersub( &tend, &tstart, &tresult );
2088 pthread_mutex_lock_task( &task->
task_mutex );
2089 quark_scratch_deallocate( task );
2092 pthread_mutex_unlock_task( &task->
task_mutex );
2096 quark_worker_remove_completed_task_enqueue_for_later_processing(quark, task, worker_rank);
2099 if ( worker_rank==0 )
break;
2130 ll_list_head_t *head = quark_malloc(
sizeof(ll_list_head_t));
2153 if ( quark==NULL || sequence==NULL )
return QUARK_ERR;
2162 long long int taskid = np->
val;
2190 if ( quark==NULL || sequence==NULL )
return NULL;
2218 if ( quark==NULL || sequence==NULL)
return QUARK_ERR;
2221 quark_process_completed_tasks( quark );
2222 quark_work_main_loop( quark->
worker[myrank] );
2285 static void quark_worker_remove_completed_task_enqueue_for_later_processing(
Quark *quark,
Task *task,
int worker_rank)
2287 int threads_remaining_for_this_task = -1;
2288 pthread_mutex_lock_task( &task->
task_mutex );
2290 pthread_mutex_unlock_task( &task->
task_mutex );
2291 if ( threads_remaining_for_this_task == 0 ) {
2306 static void quark_process_completed_tasks(
Quark *quark )
2308 int completed_tasks_size;
2312 if ( completed_tasks_size==0 )
return;
2317 if ( completed_task_node!= NULL ) {
2320 task = completed_task_node->
task;
2321 workerid = completed_task_node->
workerid;
2322 free( completed_task_node );
2327 quark_remove_completed_task_and_check_for_ready( quark, task, workerid );
2328 }
while ( task!=NULL );
2339 static void quark_address_set_node_initial_check_and_launch(
Quark *quark,
Address_Set_Node *address_set_node,
Dependency *completed_dep,
int worker_rank )
2341 int read_data_region = 0;
2342 int write_data_region = 0;
2344 int keep_processing_more_nodes = 1;
2347 dep_node!=NULL && keep_processing_more_nodes==1;
2355 if ( (dep->
data_region & write_data_region) == 0 ) {
2358 pthread_mutex_lock_task( &task->
task_mutex );
2360 quark_check_and_queue_ready_task( quark, task, worker_rank );
2361 pthread_mutex_unlock_task( &task->
task_mutex );
2365 read_data_region = read_data_region | dep->
data_region;
2372 pthread_mutex_lock_task( &task->
task_mutex );
2374 quark_check_and_queue_ready_task( quark, task, worker_rank );
2375 pthread_mutex_unlock_task( &task->
task_mutex );
2385 write_data_region = write_data_region | dep->
data_region;
2387 keep_processing_more_nodes = 0;
2393 DBGPRINTF(
"Unexpected dependency direction (not INPUT, OUTPUT, INOUT)\n");
2404 static void quark_remove_completed_task_and_check_for_ready(
Quark *quark,
Task *task,
int worker_rank)
2409 fprintf(dot_dag_file,
"t%lld [fillcolor=\"%s\",label=\"%s\",style=filled]; // %lld %d %p %d %lld \n",
2414 fprintf(dot_dag_file,
"{rank=same;%lld;t%lld};\n", task->
tasklevel, task->
taskid );
2421 dep_node != NULL && dep_node->
data!=NULL;
2426 if ( pthread_mutex_lock_wrap( &address_set_node->
asn_mutex )==0 ) {
2449 quark_address_set_node_initial_gatherv_check_and_launch(quark, address_set_node, dep, worker_rank);
2451 quark_address_set_node_accumulator_find_prepend( quark, address_set_node );
2453 quark_address_set_node_initial_check_and_launch( quark, address_set_node, dep, worker_rank );
2454 pthread_mutex_unlock_wrap( &address_set_node->
asn_mutex );
2456 pthread_mutex_unlock_wrap( &address_set_node->
asn_mutex );
2463 task = quark_task_delete(quark, task);
2599 fprintf(dot_dag_file,
"digraph G { size=\"10,7.5\"; center=1; orientation=portrait; \n");
2601 fprintf(dot_dag_file,
"%d [style=\"invis\"]\n", 0);
2624 fprintf(dot_dag_file,
"%d [label=\"%d:%d\"]\n", i, i, quark->
tasklevel_width[i] );
2625 fprintf(dot_dag_file,
"%d->%d [style=\"invis\"];\n", i-1, i );
2627 fprintf(dot_dag_file,
"} // close graph\n");
2628 fprintf(dot_dag_file,
"// ---------------------- \n");
2629 fprintf(dot_dag_file,
"\n\n");
2630 fclose( dot_dag_file );