Merge CV, ver=5.1; ops/methods from HV, and interface from CV where possible
[goodguy/history.git] / cinelerra-5.1 / cinelerra / renderfarm.C
1
2 /*
3  * CINELERRA
4  * Copyright (C) 2008 Adam Williams <broadcast at earthling dot net>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  *
20  */
21
22 #include "asset.h"
23 #include "bcsignals.h"
24 #include "brender.h"
25 #include "clip.h"
26 #include "condition.h"
27 #include "bchash.h"
28 #include "edl.h"
29 #include "filesystem.h"
30 #include "filexml.h"
31 #include "language.h"
32 #include "mutex.h"
33 #include "mwindow.h"
34 #include "packagedispatcher.h"
35 #include "preferences.h"
36 #include "render.h"
37 #include "renderfarm.h"
38 #include "renderfarmclient.h"
39 #include "bctimer.h"
40 #include "transportque.h"
41
42
43 #include <arpa/inet.h>
44 #include <errno.h>
45 #include <fcntl.h>
46 #include <netdb.h>
47 #include <netinet/in.h>
48 #include <signal.h>
49 #include <stdio.h>
50 #include <string.h>
51 #include <sys/socket.h>
52 #include <sys/types.h>
53 #include <sys/un.h>
54 #include <unistd.h>
55
56
57
58
59 RenderFarmServer::RenderFarmServer(
60         MWindow *mwindow,
61         PackageDispatcher *packages,
62         Preferences *preferences,
63         int use_local_rate,
64         int *result_return,
65         int64_t *total_return,
66         Mutex *total_return_lock,
67         Asset *default_asset,
68         EDL *edl,
69         BRender *brender)
70 {
71         this->mwindow = mwindow;
72         this->packages = packages;
73         this->preferences = preferences;
74         this->use_local_rate = use_local_rate;
75         this->result_return = result_return;
76         this->total_return = total_return;
77         this->total_return_lock = total_return_lock;
78         this->default_asset = default_asset;
79         this->edl = edl;
80         this->brender = brender;
81         client_lock = new Mutex("RenderFarmServer::client_lock");
82 }
83
84 RenderFarmServer::~RenderFarmServer()
85 {
86         clients.remove_all_objects();
87         delete client_lock;
88 }
89
90 // Open connections to clients.
91 int RenderFarmServer::start_clients()
92 {
93         int result = 0;
94
95         for(int i = 0; i < preferences->get_enabled_nodes() && !result; i++)
96         {
97                 client_lock->lock("RenderFarmServer::start_clients");
98                 RenderFarmServerThread *client = new RenderFarmServerThread(this,
99                         i);
100                 clients.append(client);
101
102                 result = client->start_loop();
103                 client_lock->unlock();
104         }
105
106         return result;
107 }
108
109 // The render farm must wait for all the clients to finish.
110 int RenderFarmServer::wait_clients()
111 {
112 //printf("RenderFarmServer::wait_clients 1\n");
113         clients.remove_all_objects();
114 //printf("RenderFarmServer::wait_clients 2\n");
115         return 0;
116 }
117
118
119 // Waits for requests from every client.
120 // Joins when the client is finished.
121 RenderFarmServerThread::RenderFarmServerThread(RenderFarmServer *server,
122         int number)
123  : Thread(1, 0, 0)
124 {
125         this->server = server;
126         this->number = number;
127         socket_fd = -1;
128         frames_per_second = 0;
129         watchdog = 0;
130         buffer = 0;
131         datagram = 0;
132         Thread::set_synchronous(1);
133 }
134
135
136
137 RenderFarmServerThread::~RenderFarmServerThread()
138 {
139 //printf("RenderFarmServerThread::~RenderFarmServerThread 1 %p\n", this);
140         Thread::join();
141 //printf("RenderFarmServerThread::~RenderFarmServerThread 1\n");
142         if(socket_fd >= 0) close(socket_fd);
143         if(watchdog) delete watchdog;
144         if(buffer) delete [] buffer;
145         if(datagram) delete [] datagram;
146 //printf("RenderFarmServerThread::~RenderFarmServerThread 2\n");
147 }
148
149
150 int RenderFarmServerThread::open_client(const char *hostname, int port)
151 {
152         int socket_fd = -1;
153         int result = 0;
154
155 // Open file for master node
156         if(hostname[0] == '/')
157         {
158                 if((socket_fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0)
159                 {
160                         perror(_("RenderFarmServerThread::start_loop: socket\n"));
161                         result = 1;
162                 }
163                 else
164                 {
165                         struct sockaddr_un addr;
166                         addr.sun_family = AF_FILE;
167                         strcpy(addr.sun_path, hostname);
168                         int size = (offsetof(struct sockaddr_un, sun_path) +
169                                 strlen(hostname) + 1);
170
171 // The master node is always created by BRender.  Keep trying for 30 seconds.
172
173 #define ATTEMPT_DELAY 100000
174                         int done = 0;
175                         int attempt = 0;
176
177                         do
178                         {
179                                 if(connect(socket_fd, (struct sockaddr*)&addr, size) < 0)
180                                 {
181                                         attempt++;
182                                         if(attempt > 30000000 / ATTEMPT_DELAY)
183                                         {
184                                                 fprintf(stderr, _("RenderFarmServerThread::open_client: %s: %s\n"),
185                                                         hostname,
186                                                         strerror(errno));
187                                                 result = 1;
188                                         }
189                                         else
190                                                 usleep(ATTEMPT_DELAY);
191                                 }
192                                 else
193                                         done = 1;
194                         }while(!result && !done);
195                 }
196         }
197         else
198 // Open socket
199         {
200                 if((socket_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
201                 {
202                         perror(_("RenderFarmServerThread::start_loop: socket"));
203                         result = 1;
204                 }
205                 else
206                 {
207 // Open port
208                         struct sockaddr_in addr;
209                         struct hostent *hostinfo;
210                         addr.sin_family = AF_INET;
211                         addr.sin_port = htons(port);
212                         hostinfo = gethostbyname(hostname);
213                         if(hostinfo == NULL)
214                 {
215                         fprintf(stderr, _("RenderFarmServerThread::open_client: unknown host %s.\n"),
216                                         hostname);
217                         result = 1;
218                 }
219                         else
220                         {
221                                 addr.sin_addr = *(struct in_addr *) hostinfo->h_addr;
222
223                                 if(connect(socket_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0)
224                                 {
225                                         fprintf(stderr, _("RenderFarmServerThread::open_client: %s: %s\n"),
226                                                 hostname,
227                                                 strerror(errno));
228                                         result = 1;
229                                 }
230                         }
231                 }
232         }
233
234         if(result) socket_fd = -1;
235
236         return socket_fd;
237 }
238
239 int RenderFarmServerThread::start_loop()
240 {
241         int result = 0;
242
243         socket_fd = open_client(server->preferences->get_node_hostname(number),
244                 server->preferences->get_node_port(number));
245
246         if(socket_fd < 0) result = 1;
247
248         if(!result)
249         {
250                 watchdog = new RenderFarmWatchdog(this, 0);
251                 watchdog->start();
252         }
253
254         if(!result) Thread::start();
255
256         return result;
257 }
258
259
260
261
262
263
264
265
266
267
268
269 int64_t RenderFarmServerThread::read_int64(int *error)
270 {
271         int temp = 0;
272         if(!error) error = &temp;
273
274         unsigned char data[sizeof(int64_t)];
275         *error = (read_socket((char*)data, sizeof(int64_t)) !=
276                 sizeof(int64_t));
277
278 // Make it return 1 if error so it can be used to read a result code from the
279 // server.
280         int64_t result = 1;
281         if(!*error)
282         {
283                 result = (((int64_t)data[0]) << 56) |
284                         (((uint64_t)data[1]) << 48) |
285                         (((uint64_t)data[2]) << 40) |
286                         (((uint64_t)data[3]) << 32) |
287                         (((uint64_t)data[4]) << 24) |
288                         (((uint64_t)data[5]) << 16) |
289                         (((uint64_t)data[6]) << 8)  |
290                         data[7];
291         }
292         return result;
293 }
294
295 int RenderFarmServerThread::write_int64(int64_t value)
296 {
297         unsigned char data[sizeof(int64_t)];
298         data[0] = (value >> 56) & 0xff;
299         data[1] = (value >> 48) & 0xff;
300         data[2] = (value >> 40) & 0xff;
301         data[3] = (value >> 32) & 0xff;
302         data[4] = (value >> 24) & 0xff;
303         data[5] = (value >> 16) & 0xff;
304         data[6] = (value >> 8) & 0xff;
305         data[7] = value & 0xff;
306         return (write_socket((char*)data, sizeof(int64_t)) !=
307                 sizeof(int64_t));
308 }
309
310
311
312 int RenderFarmServerThread::read_socket(char *data, int len)
313 {
314         int bytes_read = 0;
315         int offset = 0;
316 //printf("RenderFarmServerThread::read_socket 1\n");
317         watchdog->begin_request();
318         while(len > 0 && bytes_read >= 0)
319         {
320                 enable_cancel();
321                 bytes_read = read(socket_fd, data + offset, len);
322                 disable_cancel();
323                 if(bytes_read > 0)
324                 {
325                         len -= bytes_read;
326                         offset += bytes_read;
327                 }
328                 else
329                 if(bytes_read < 0)
330                         break;
331         }
332         watchdog->end_request();
333 //printf("RenderFarmServerThread::read_socket 10\n");
334
335         return offset;
336 }
337
338 int RenderFarmServerThread::write_socket(char *data, int len)
339 {
340 //printf("RenderFarmServerThread::write_socket 1\n");
341         int result = write(socket_fd, data, len);
342 //printf("RenderFarmServerThread::write_socket 10\n");
343
344         return result;
345 }
346
347 void RenderFarmServerThread::reallocate_buffer(int size)
348 {
349         if(buffer && buffer_allocated < size)
350         {
351                 delete [] buffer;
352                 buffer = 0;
353         }
354
355         if(!buffer && size)
356         {
357                 buffer = new unsigned char[size];
358                 buffer_allocated = size;
359         }
360 }
361
362 void RenderFarmServerThread::run()
363 {
364 // Wait for requests
365         unsigned char header[5];
366         int done = 0;
367         int bytes_read = 0;
368
369
370         buffer = 0;
371         buffer_allocated = 0;
372 //      fs_server = new RenderFarmFSServer(this);
373 //      fs_server->initialize();
374
375
376 // Send command to run package renderer.
377         write_int64(RENDERFARM_PACKAGES);
378
379
380
381         while(!done)
382         {
383
384 // Wait for requests.
385 // Requests consist of request ID's and accompanying buffers.
386 // Get request ID.
387                 bytes_read = read_socket((char*)header, 5);
388 //printf("RenderFarmServerThread::run 1\n");
389                 if(bytes_read != 5)
390                 {
391                         done = 1;
392                         continue;
393                 }
394
395                 int request_id = header[0];
396                 int64_t request_size = (((u_int32_t)header[1]) << 24) |
397                                                         (((u_int32_t)header[2]) << 16) |
398                                                         (((u_int32_t)header[3]) << 8)  |
399                                                         (u_int32_t)header[4];
400
401                 reallocate_buffer(request_size);
402
403 // Get accompanying buffer
404                 bytes_read = read_socket((char*)buffer, request_size);
405
406 //printf("RenderFarmServerThread::run 2 %d %jd %d\n", request_id, request_size, bytes_read);
407                 if(bytes_read != request_size)
408                 {
409                         done = 1;
410                         continue;
411                 }
412 //printf("RenderFarmServerThread::run 3\n");
413
414                 switch(request_id)
415                 {
416                         case RENDERFARM_PREFERENCES:
417                                 send_preferences();
418                                 break;
419
420                         case RENDERFARM_ASSET:
421                                 send_asset();
422                                 break;
423
424                         case RENDERFARM_EDL:
425                                 send_edl();
426                                 break;
427
428                         case RENDERFARM_PACKAGE:
429                                 send_package(buffer);
430                                 break;
431
432                         case RENDERFARM_PROGRESS:
433                                 set_progress(buffer);
434                                 break;
435
436                         case RENDERFARM_SET_RESULT:
437                                 set_result(buffer);
438                                 break;
439
440                         case RENDERFARM_SET_VMAP:
441                                 set_video_map(buffer);
442                                 break;
443
444                         case RENDERFARM_GET_RESULT:
445                                 get_result();
446                                 break;
447
448                         case RENDERFARM_DONE:
449 //printf("RenderFarmServerThread::run 10\n");
450                                 done = 1;
451                                 break;
452
453                         case RENDERFARM_KEEPALIVE:
454                                 break;
455
456                         default:
457 //                              if(!fs_server->handle_request(request_id, request_size, (unsigned char*)buffer))
458                                 {
459                                         printf(_("RenderFarmServerThread::run: unknown request %02x\n"), request_id);
460                                 }
461                                 break;
462                 }
463 //printf("RenderFarmServerThread::run 10 %d %jd\n", request_id, request_size);
464         }
465
466 // Don't let watchdog kill the entire renderfarm when a client finishes
467 // normally.
468         if(watchdog)
469         {
470 //printf("RenderFarmServerThread::run 20\n");
471                 delete watchdog;
472                 watchdog = 0;
473         }
474
475 //      delete fs_server;
476 }
477
478 void RenderFarmServerThread::write_string(char *string)
479 {
480         int i, len;
481         i = 0;
482
483         len = strlen(string) + 1;
484         datagram = new char[len + 4];
485         STORE_INT32(len);
486         memcpy(datagram + i, string, len);
487         write_socket((char*)datagram, len + 4);
488 //printf("RenderFarmServerThread::write_string %02x%02x%02x%02x\n",
489 //      datagram[0], datagram[1], datagram[2], datagram[3]);
490
491         delete [] datagram;
492         datagram = 0;
493 }
494
495 void RenderFarmServerThread::send_preferences()
496 {
497         BC_Hash defaults;
498         char *string;
499
500         server->preferences->save_defaults(&defaults);
501         defaults.save_string(string);
502         write_string(string);
503         free(string);
504 }
505
506 void RenderFarmServerThread::send_asset()
507 {
508         BC_Hash defaults;
509         char *string1;
510
511 // The asset must be sent in two segments.
512 // One segment is stored in the EDL and contains decoding information.
513 // One segment is stored in the asset and contains encoding information.
514         server->default_asset->save_defaults(&defaults,
515                 0,
516                 1,
517                 1,
518                 1,
519                 1,
520                 1);
521         defaults.save_string(string1);
522         FileXML file;
523         server->default_asset->write(&file, 0, 0);
524         file.terminate_string();
525
526         write_string(string1);
527         write_string(file.string());
528         free(string1);
529 }
530
531
532 void RenderFarmServerThread::send_edl()
533 {
534         FileXML file;
535
536 // Save the XML
537         server->edl->save_xml(&file,
538                 0,
539                 0,
540                 0);
541         file.terminate_string();
542 //printf("RenderFarmServerThread::send_edl\n%s\n\n", file.string);
543
544         write_string(file.string());
545 //printf("RenderFarmServerThread::send_edl 2\n");
546 }
547
548
549 void RenderFarmServerThread::send_package(unsigned char *buffer)
550 {
551         this->frames_per_second = (double)((((u_int32_t)buffer[0]) << 24) |
552                 (((u_int32_t)buffer[1]) << 16) |
553                 (((u_int32_t)buffer[2]) << 8)  |
554                 ((u_int32_t)buffer[3])) /
555                 65536.0;
556
557 //printf("RenderFarmServerThread::send_package 1 %f\n", frames_per_second);
558         RenderPackage *package =
559                 server->packages->get_package(frames_per_second,
560                         number,
561                         server->use_local_rate);
562
563 //printf("RenderFarmServerThread::send_package 2\n");
564         datagram = new char[BCTEXTLEN];
565
566 // No more packages
567         if(!package)
568         {
569 //printf("RenderFarmServerThread::send_package 1\n");
570                 datagram[0] = datagram[1] = datagram[2] = datagram[3] = 0;
571                 write_socket(datagram, 4);
572         }
573         else
574 // Encode package
575         {
576 //printf("RenderFarmServerThread::send_package 10\n");
577                 int i = 4;
578                 strcpy(&datagram[i], package->path);
579                 i += strlen(package->path);
580                 datagram[i++] = 0;
581
582                 STORE_INT32(package->audio_start);
583                 STORE_INT32(package->audio_end);
584                 STORE_INT32(package->video_start);
585                 STORE_INT32(package->video_end);
586                 int use_brender = (server->brender ? 1 : 0);
587                 STORE_INT32(use_brender);
588                 STORE_INT32(package->audio_do);
589                 STORE_INT32(package->video_do);
590
591                 int len = i;
592                 i = 0;
593                 STORE_INT32(len - 4);
594
595                 write_socket(datagram, len);
596         }
597         delete [] datagram;
598         datagram = 0;
599 }
600
601
602 void RenderFarmServerThread::set_progress(unsigned char *buffer)
603 {
604         server->total_return_lock->lock("RenderFarmServerThread::set_progress");
605         *server->total_return += (int64_t)(((u_int32_t)buffer[0]) << 24) |
606                 (((u_int32_t)buffer[1]) << 16) |
607                 (((u_int32_t)buffer[2]) << 8)  |
608                 ((u_int32_t)buffer[3]);
609         frames_per_second = (double)((((u_int32_t)buffer[4]) << 24) |
610                 (((u_int32_t)buffer[5]) << 16) |
611                 (((u_int32_t)buffer[6]) << 8)  |
612                 ((u_int32_t)buffer[7])) /
613                 65536.0;
614         server->total_return_lock->unlock();
615
616         server->preferences->set_rate(frames_per_second, number);
617
618 // This locks the preferences
619         if(server->mwindow) server->mwindow->preferences->copy_rates_from(
620                 server->preferences);
621 }
622
623 int RenderFarmServerThread::set_video_map(unsigned char *buffer)
624 {
625         if(server->brender)
626         {
627                 server->brender->set_video_map((int64_t)(((u_int32_t)buffer[0]) << 24) |
628                                                         (((u_int32_t)buffer[1]) << 16) |
629                                                         (((u_int32_t)buffer[2]) << 8)  |
630                                                         ((u_int32_t)buffer[3]),
631                                                         (int64_t)(((u_int32_t)buffer[4]) << 24) |
632                                                         (((u_int32_t)buffer[5]) << 16) |
633                                                         (((u_int32_t)buffer[6]) << 8)  |
634                                                         ((u_int32_t)buffer[7]));
635                 char return_value[1];
636                 return_value[0] = 0;
637                 write_socket(return_value, 1);
638                 return 0;
639         }
640         return 1;
641 }
642
643
644 void RenderFarmServerThread::set_result(unsigned char *buffer)
645 {
646 //printf("RenderFarmServerThread::set_result %p\n", buffer);
647         if(!*server->result_return)
648                 *server->result_return = buffer[0];
649 }
650
651
652 void RenderFarmServerThread::get_result()
653 {
654         unsigned char data[1];
655         data[0] = *server->result_return;
656         write_socket((char*)data, 1);
657 }
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672 RenderFarmWatchdog::RenderFarmWatchdog(
673         RenderFarmServerThread *server,
674         RenderFarmClientThread *client)
675  : Thread(1, 0, 0)
676 {
677         this->server = server;
678         this->client = client;
679         next_request = new Condition(0, "RenderFarmWatchdog::next_request", 0);
680         request_complete = new Condition(0, "RenderFarmWatchdog::request_complete", 0);
681         done = 0;
682 }
683
684 RenderFarmWatchdog::~RenderFarmWatchdog()
685 {
686         done = 1;
687         next_request->unlock();
688         request_complete->unlock();
689         join();
690         delete next_request;
691         delete request_complete;
692 }
693
694 void RenderFarmWatchdog::begin_request()
695 {
696         next_request->unlock();
697 }
698
699 void RenderFarmWatchdog::end_request()
700 {
701         request_complete->unlock();
702 }
703
704 void RenderFarmWatchdog::run()
705 {
706         while(!done)
707         {
708                 next_request->lock("RenderFarmWatchdog::run");
709
710                 int result = request_complete->timed_lock(RENDERFARM_TIMEOUT * 1000000,
711                         "RenderFarmWatchdog::run");
712 //printf("RenderFarmWatchdog::run 1 %d\n", result);
713
714                 if(result)
715                 {
716                         if(client)
717                         {
718                                 printf("RenderFarmWatchdog::run 1 killing client pid %d\n", client->pid);
719                                 kill(client->pid, SIGKILL);
720                         }
721                         else
722                         if(server)
723                         {
724                                 printf("RenderFarmWatchdog::run 1 killing server thread %p\n", server);
725                                 server->cancel();
726                                 unsigned char buffer[4];
727                                 buffer[0] = 1;
728                                 server->set_result(buffer);
729                         }
730
731                         done = 1;
732                 }
733         }
734 }
735
736
737
738