Credit Andrew - improve in-tree documentation
[goodguy/cinelerra.git] / cinelerra / loadbalance.C
1
2 /*
3  * CINELERRA
4  * Copyright (C) 2008 Adam Williams <broadcast at earthling dot net>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  *
20  */
21
22 #include "condition.h"
23 #include "mutex.h"
24 #include "loadbalance.h"
25
26
27
28
29 LoadPackage::LoadPackage()
30 {
31         completion_lock = new Condition(0, "LoadPackage::completion_lock");
32 }
33 LoadPackage::~LoadPackage()
34 {
35         delete completion_lock;
36 }
37
38
39
40
41
42
43
44
45
46 LoadClient::LoadClient(LoadServer *server)
47  : Thread(1, 0, 0)
48 {
49         this->server = server;
50         done = 0;
51         package_number = 0;
52         input_lock = new Condition(0, "LoadClient::input_lock");
53         completion_lock = new Condition(0, "LoadClient::completion_lock");
54 }
55
56 LoadClient::LoadClient()
57  : Thread(1, 0, 0)
58 {
59         server = 0;
60         done = 0;
61         package_number = 0;
62         input_lock = new Condition(0, "LoadClient::input_lock");
63         completion_lock = new Condition(0, "LoadClient::completion_lock");
64 }
65
66 LoadClient::~LoadClient()
67 {
68         done = 1;
69         input_lock->unlock();
70         Thread::join();
71         delete input_lock;
72         delete completion_lock;
73 }
74
75 int LoadClient::get_package_number()
76 {
77         return package_number;
78 }
79
80 LoadServer* LoadClient::get_server()
81 {
82         return server;
83 }
84
85
86 void LoadClient::run()
87 {
88         while(!done)
89         {
90                 input_lock->lock("LoadClient::run");
91
92                 if(!done)
93                 {
94 // Read packet
95                         LoadPackage *package;
96
97
98                         server->client_lock->lock("LoadClient::run");
99                         if(server->current_package < server->total_packages)
100                         {
101                                 package_number = server->current_package;
102                                 package = server->packages[server->current_package++];
103                                 server->client_lock->unlock();
104                                 input_lock->unlock();
105
106                                 process_package(package);
107
108                                 package->completion_lock->unlock();
109                         }
110                         else
111                         {
112                                 server->client_lock->unlock();
113                                 completion_lock->unlock();
114                         }
115                 }
116         }
117 }
118
119 void LoadClient::run_single()
120 {
121         if(server->total_packages)
122         {
123                 for(int i = 0; i < server->total_packages; i++)
124                 {
125                         process_package(server->packages[i]);
126                 }
127         }
128 }
129
130 void LoadClient::process_package(LoadPackage *package)
131 {
132         printf("LoadClient::process_package\n");
133 }
134
135
136
137
138
139 LoadServer::LoadServer(int total_clients, int total_packages)
140 {
141         if(total_clients <= 0)
142                 printf("LoadServer::LoadServer total_clients == %d\n", total_clients);
143         this->total_clients = total_clients;
144         this->total_packages = total_packages;
145         current_package = 0;
146         clients = 0;
147         packages = 0;
148         client_lock = new Mutex("LoadServer::client_lock");
149         is_single = 0;
150         single_client = 0;
151 }
152
153 LoadServer::~LoadServer()
154 {
155         delete_clients();
156         delete_packages();
157         delete client_lock;
158 }
159
160 void LoadServer::delete_clients()
161 {
162         if(clients)
163         {
164                 for(int i = 0; i < total_clients; i++)
165                         delete clients[i];
166                 delete [] clients;
167         }
168
169         if(single_client) delete single_client;
170
171         clients = 0;
172         single_client = 0;
173 }
174
175 void LoadServer::delete_packages()
176 {
177         if(packages)
178         {
179                 for(int i = 0; i < total_packages; i++)
180                         delete packages[i];
181                 delete [] packages;
182         }
183         packages = 0;
184 }
185
186 void LoadServer::set_package_count(int total_packages)
187 {
188         delete_packages();
189         this->total_packages = total_packages;
190         create_packages();
191 }
192
193
194 void LoadServer::create_clients()
195 {
196         if(!is_single && !clients)
197         {
198                 clients = new LoadClient*[total_clients];
199                 for(int i = 0; i < total_clients; i++)
200                 {
201                         clients[i] = new_client();
202                         clients[i]->server = this;
203                         clients[i]->start();
204                 }
205         }
206
207         if(is_single && !single_client)
208         {
209                 single_client = new_client();
210                 single_client->server = this;
211         }
212 }
213
214 void LoadServer::create_packages()
215 {
216         if(!packages)
217         {
218                 packages = new LoadPackage*[total_packages];
219                 for(int i = 0; i < total_packages; i++)
220                         packages[i] = new_package();
221         }
222 }
223
224 LoadPackage* LoadServer::get_package(int number)
225 {
226         return packages[number];
227 }
228
229 LoadClient* LoadServer::get_client(int number)
230 {
231         return clients[number];
232 }
233
234 int LoadServer::get_total_packages()
235 {
236 //      if(is_single) return 1;
237         return total_packages;
238 }
239
240 int LoadServer::get_total_clients()
241 {
242         if(is_single) return 1;
243         return total_clients;
244 }
245
246 void LoadServer::process_packages()
247 {
248         if(total_clients == 1)
249         {
250                 process_single();
251                 return;
252         }
253
254         is_single = 0;
255         create_clients();
256         create_packages();
257
258
259
260 // Set up packages
261         init_packages();
262
263         current_package = 0;
264 // Start all clients
265         for(int i = 0; i < total_clients; i++)
266         {
267                 clients[i]->input_lock->unlock();
268         }
269
270 // Wait for packages to get finished
271         for(int i = 0; i < total_packages; i++)
272         {
273                 packages[i]->completion_lock->lock("LoadServer::process_packages 1");
274         }
275
276 // Wait for clients to finish before allowing changes to packages
277         for(int i = 0; i < total_clients; i++)
278         {
279                 clients[i]->completion_lock->lock("LoadServer::process_packages 2");
280         }
281 }
282
283 void LoadServer::process_single()
284 {
285         is_single = 1;
286         create_clients();
287         create_packages();
288         init_packages();
289         current_package = 0;
290         single_client->run_single();
291 }
292