-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathaggregation.cpp
More file actions
138 lines (113 loc) · 3.5 KB
/
Copy pathaggregation.cpp
File metadata and controls
138 lines (113 loc) · 3.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#include "interface.h"
#include "ucp/api/ucp.h"
#include <cuda_runtime.h>
#include <iostream>
#include <ostream>
#define FLOW_ID 0
int get_get_buffers_count() {
int count = 1;
// Compute the number of buffers needed on each target node
// This should be computed based on application needs.
return count;
}
typedef struct {
int age_partial_sum;
} Tuple;
int main(int argc, char *argv[]) {
ucp_worker_attr_t worker;
// Setup node
//
node_t node;
node.worker_addr = (uint64_t)worker.address;
init_node(&node);
if (node.id == 0) { // We assume the node.id == 0 is the master node and
// initializes the channel.
// Setup the udf args and pipeline
//
udf_arg_t filter1_arg;
int *filter1_arg_values = new int[3];
filter1_arg_values[0] = 0; // column key to filter on
filter1_arg_values[1] = UDF_FILTER_OP_EQ; // filter operation to run
filter1_arg_values[2] = 123; // filtered value
filter1_arg.values = filter1_arg_values;
filter1_arg.length = 3 * sizeof(int);
udf_arg_t filter2_arg;
int *filter2_arg_values = new int[3];
filter2_arg_values[0] = 0; // column key to filter on
filter2_arg_values[1] = UDF_FILTER_OP_LE; // filter operation to run
filter2_arg_values[2] = 93; // filtered value
filter2_arg.values = filter2_arg_values;
filter2_arg.length = 3 * sizeof(int);
udf_arg_t sum_arg;
int *sum_arg_values = new int[1];
sum_arg_values[0] = 1; // column key to run sum on
sum_arg.values = sum_arg_values;
sum_arg.length = sizeof(int);
udf_t *udfs = new udf_t[3];
udfs[0] = UDF_FILTER;
udfs[1] = UDF_FILTER;
udfs[2] = UDF_SUM;
udf_arg_t *udfs_args = new udf_arg_t[3];
udfs_args[0] = filter1_arg;
udfs_args[1] = filter2_arg;
udfs_args[2] = sum_arg;
udf_chain_t udf_chain;
udf_chain.udfs = udfs;
udf_chain.args = udfs_args;
// Setup schema for tuples
//
column_t column1 = {.name = "id", .size = 4};
column_t column2 = {.name = "age", .size = 4};
column_t *columns = new column_t[2];
schema_t schema = {.columns = columns};
// Setup nodes
//
uuid_t *sources = new uuid_t[4];
uuid_t *targets = new uuid_t[1];
sources[0] = 0;
sources[1] = 1;
sources[2] = 2;
sources[3] = 3;
targets[0] = 4;
// Setup the channel
//
channel_id_t channel_id = FLOW_ID;
partition_key_t partition_key = 0;
channel_create(channel_id, sources, targets, schema, udf_chain, partition_key);
}
// Application code (put and get functions should be called within CUDA
// kernels)
//
// Any node that it is not the target will put its data into the channel
if (node.id == 0 || node.id == 1) {
int data_size = 1024 * 1024;
int data[data_size];
for (int i = 0; i < data_size; i++) {
put(FLOW_ID, &data[i]);
}
}
// Target node sums up the partial sums
if (node.id == 4) {
channel_handle_t channel_handle;
get_channel_info(&channel_handle);
int total_sum = 0;
void *data;
while (get(FLOW_ID, &data)) {
Tuple *tuple = (Tuple *)data;
total_sum += tuple->age_partial_sum;
}
std::cout << "Total Sum is: " << total_sum << std::endl;
}
return 0;
}
// __global__ void partition(int *data) {
// put(FLOW_ID, &data[threadIdx.x]);
// }
//
// __global__ void sum(int *results) {
// void *data;
// get(FLOW_ID, &data);
// Tuple *tuple = (Tuple *)data;
//
// atomicAdd(results, tuple->age_partial_sum);
// }