34 prt_assert(proxy != NULL,
"malloc failed");
37 proxy->num_agents = num_agents;
38 proxy->max_channel_size = 0;
39 proxy->num_callbacks = 0;
45 prt_assert(proxy->tags_hash != NULL,
"icl_hash_create failed");
49 proxy->sends_requested =
51 prt_assert(proxy->sends_requested != NULL,
"malloc failed");
52 for (i = 0; i < num_agents; i++) {
54 prt_assert(proxy->sends_requested[i] != NULL,
"icl_deque_new failed");
60 prt_assert(proxy->sends_posted != NULL,
"malloc failed");
61 for (i = 0; i < num_agents; i++) {
63 prt_assert(proxy->sends_posted[i] != NULL,
"icl_list_new failed");
68 prt_assert(proxy->recvs_posted != NULL,
"icl_list_new failed");
72 prt_assert(proxy->transfers != NULL,
"icl_deque_new failed");
93 prt_assert(status == 0,
"icl_hash_destroy failed");
98 for (i = 0; i < proxy->num_agents; i++) {
100 prt_assert(size == 0,
"destroying non-empty deque");
102 prt_assert(status == 0,
"icl_deque_destroy failed");
104 free(proxy->sends_requested);
107 for (i = 0; i < proxy->num_agents; i++) {
109 prt_assert(size == 0,
"destroying non-empty list");
111 prt_assert(status == 0,
"icl_list_destroy failed");
113 free(proxy->sends_posted);
117 prt_assert(size == 0,
"destroying non-empty deque");
119 prt_assert(status == 0,
"icl_deque_destroy failed");
139 if (channel->size > proxy->max_channel_size)
140 proxy->max_channel_size = channel->size;
155 retval = MPI_Get_count(&request->status, MPI_BYTE, &count);
156 prt_assert(retval == MPI_SUCCESS,
"MPI_Get_count failed");
158 if (count != request->packet->size)
162 int source = request->status.MPI_SOURCE;
163 int tag = request->status.MPI_TAG;
164 int *source_tag = prt_tuple_new2(source, tag);
169 if (channel->dst_vdp->location == PRT_LOCATION_HOST) {
191 for (i = 0; i < proxy->num_agents; i++) {
207 for (i = 0; i < proxy->num_agents; i++) {
228 packet, proxy->max_channel_size,
229 MPI_ANY_SOURCE, MPI_ANY_TAG);
260 for (dev = 0; dev < proxy->vsa->num_devices; dev++)
268 switch (transfer->direction) {
269 case PRT_HOST_TO_DEVICE:
271 transfer->packet, transfer->channel);
273 case PRT_DEVICE_TO_HOST:
275 transfer->packet, transfer->channel);
277 case PRT_DEVICE_TO_DEVICE:
279 transfer->packet, transfer->channel);
281 case PRT_DEVICE_MPI_TO_HOST:
287 case PRT_DEVICE_MPI_FROM_HOST: {
291 transfer->packet->size,
292 transfer->channel->dst_node,
293 transfer->channel->tag);
295 proxy->sends_requested[transfer->agent], request);
298 case PRT_DEVICE_PACKET_RELEASE:
323 MPI_Barrier(MPI_COMM_WORLD);
328 pthread_barrier_wait(&proxy->vsa->barrier);
334 for (dev = 0; dev < proxy->vsa->num_devices; dev++) {
343 int threads_finished;
344 int devices_finished;
347 if (proxy->vsa->num_nodes > 1)
351 if (proxy->vsa->num_devices > 0)
355 devices_finished = 1;
357 for (i = 0; i < proxy->vsa->num_devices; i++)
358 if (!proxy->vsa->device[i]->finished)
359 devices_finished = 0;
361 threads_finished = 1;
363 for (i = 0; i < proxy->vsa->num_threads; i++)
364 if (!proxy->vsa->thread[i]->finished)
365 threads_finished = 0;
369 for (i = 0; i < proxy->num_agents; i++)
375 for (i = 0; i < proxy->num_agents; i++)
380 !threads_finished || !devices_finished ||
381 sends_requested || sends_posted ||
382 icl_deque_size(proxy->transfers) > 0 || proxy->num_callbacks > 0);
390 for (dev = 0; dev < proxy->vsa->num_devices; dev++) {
392 cudaDeviceSynchronize();
396 pthread_barrier_wait(&proxy->vsa->barrier);
402 MPI_Barrier(MPI_COMM_WORLD);
408 return (stop - start);