audio chan pos rework, batchrender deadlock, titler/crikey grab_event tweak, thread...
[goodguy/history.git] / cinelerra-5.1 / cinelerra / renderfarm.C
1
2 /*
3  * CINELERRA
4  * Copyright (C) 2008 Adam Williams <broadcast at earthling dot net>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  *
20  */
21
22 #include "asset.h"
23 #include "bcsignals.h"
24 #include "brender.h"
25 #include "clip.h"
26 #include "condition.h"
27 #include "bchash.h"
28 #include "edl.h"
29 #include "filesystem.h"
30 #include "filexml.h"
31 #include "language.h"
32 #include "mutex.h"
33 #include "mwindow.h"
34 #include "packagedispatcher.h"
35 #include "preferences.h"
36 #include "render.h"
37 #include "renderfarm.h"
38 #include "renderfarmclient.h"
39 #include "bctimer.h"
40 #include "transportque.h"
41
42
43 #include <arpa/inet.h>
44 #include <errno.h>
45 #include <fcntl.h>
46 #include <netdb.h>
47 #include <netinet/in.h>
48 #include <signal.h>
49 #include <stdio.h>
50 #include <string.h>
51 #include <sys/socket.h>
52 #include <sys/types.h>
53 #include <sys/un.h>
54 #include <unistd.h>
55
56
57
58
59 RenderFarmServer::RenderFarmServer(
60         MWindow *mwindow,
61         PackageDispatcher *packages,
62         Preferences *preferences,
63         int use_local_rate,
64         int *result_return,
65         int64_t *total_return,
66         Mutex *total_return_lock,
67         Asset *default_asset,
68         EDL *edl,
69         BRender *brender)
70 {
71         this->mwindow = mwindow;
72         this->packages = packages;
73         this->preferences = preferences;
74         this->use_local_rate = use_local_rate;
75         this->result_return = result_return;
76         this->total_return = total_return;
77         this->total_return_lock = total_return_lock;
78         this->default_asset = default_asset;
79         this->edl = edl;
80         this->brender = brender;
81         client_lock = new Mutex("RenderFarmServer::client_lock");
82 }
83
84 RenderFarmServer::~RenderFarmServer()
85 {
86         clients.remove_all_objects();
87         delete client_lock;
88 }
89
90 // Open connections to clients.
91 int RenderFarmServer::start_clients()
92 {
93         int result = 0;
94
95         for(int i = 0; i < preferences->get_enabled_nodes() && !result; i++)
96         {
97                 client_lock->lock("RenderFarmServer::start_clients");
98                 RenderFarmServerThread *client = new RenderFarmServerThread(this,
99                         i);
100                 clients.append(client);
101
102                 result = client->start_loop();
103                 client_lock->unlock();
104         }
105
106         return result;
107 }
108
109 // The render farm must wait for all the clients to finish.
110 int RenderFarmServer::wait_clients()
111 {
112 //printf("RenderFarmServer::wait_clients 1\n");
113         clients.remove_all_objects();
114 //printf("RenderFarmServer::wait_clients 2\n");
115         return 0;
116 }
117
118 int RenderFarmServer::active_clients()
119 {
120         int n = 0;
121         for( int i=0; i<clients.size(); ++i )
122                 if( clients[i]->running() ) ++n;
123         return n;
124 }
125
126 // Waits for requests from every client.
127 // Joins when the client is finished.
128 RenderFarmServerThread::RenderFarmServerThread(RenderFarmServer *server,
129         int number)
130  : Thread(1, 0, 0)
131 {
132         this->server = server;
133         this->number = number;
134         socket_fd = -1;
135         frames_per_second = 0;
136         watchdog = 0;
137         buffer = 0;
138         datagram = 0;
139         Thread::set_synchronous(1);
140 }
141
142
143
144 RenderFarmServerThread::~RenderFarmServerThread()
145 {
146 //printf("RenderFarmServerThread::~RenderFarmServerThread 1 %p\n", this);
147         Thread::join();
148 //printf("RenderFarmServerThread::~RenderFarmServerThread 1\n");
149         if(socket_fd >= 0) close(socket_fd);
150         if(watchdog) delete watchdog;
151         if(buffer) delete [] buffer;
152         if(datagram) delete [] datagram;
153 //printf("RenderFarmServerThread::~RenderFarmServerThread 2\n");
154 }
155
156
157 int RenderFarmServerThread::open_client(const char *hostname, int port)
158 {
159         int socket_fd = -1;
160         int result = 0;
161
162 // Open file for master node
163         if(hostname[0] == '/')
164         {
165                 if((socket_fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0)
166                 {
167                         perror(_("RenderFarmServerThread::start_loop: socket\n"));
168                         result = 1;
169                 }
170                 else
171                 {
172                         struct sockaddr_un addr;
173                         addr.sun_family = AF_FILE;
174                         strcpy(addr.sun_path, hostname);
175                         int size = (offsetof(struct sockaddr_un, sun_path) +
176                                 strlen(hostname) + 1);
177
178 // The master node is always created by BRender.  Keep trying for 30 seconds.
179
180 #define ATTEMPT_DELAY 100000
181                         int done = 0;
182                         int attempt = 0;
183
184                         do
185                         {
186                                 if(connect(socket_fd, (struct sockaddr*)&addr, size) < 0)
187                                 {
188                                         attempt++;
189                                         if(attempt > 30000000 / ATTEMPT_DELAY)
190                                         {
191                                                 fprintf(stderr, _("RenderFarmServerThread::open_client: %s: %s\n"),
192                                                         hostname,
193                                                         strerror(errno));
194                                                 result = 1;
195                                         }
196                                         else
197                                                 usleep(ATTEMPT_DELAY);
198                                 }
199                                 else
200                                         done = 1;
201                         }while(!result && !done);
202                 }
203         }
204         else
205 // Open socket
206         {
207                 if((socket_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
208                 {
209                         perror(_("RenderFarmServerThread::start_loop: socket"));
210                         result = 1;
211                 }
212                 else
213                 {
214 // Open port
215                         struct sockaddr_in addr;
216                         struct hostent *hostinfo;
217                         addr.sin_family = AF_INET;
218                         addr.sin_port = htons(port);
219                         hostinfo = gethostbyname(hostname);
220                         if(hostinfo == NULL)
221                 {
222                         fprintf(stderr, _("RenderFarmServerThread::open_client: unknown host %s.\n"),
223                                         hostname);
224                         result = 1;
225                 }
226                         else
227                         {
228                                 addr.sin_addr = *(struct in_addr *) hostinfo->h_addr;
229
230                                 if(connect(socket_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0)
231                                 {
232                                         fprintf(stderr, _("RenderFarmServerThread::open_client: %s: %s\n"),
233                                                 hostname,
234                                                 strerror(errno));
235                                         result = 1;
236                                 }
237                         }
238                 }
239         }
240
241         if(result) socket_fd = -1;
242
243         return socket_fd;
244 }
245
246 int RenderFarmServerThread::start_loop()
247 {
248         int result = 0;
249
250         socket_fd = open_client(server->preferences->get_node_hostname(number),
251                 server->preferences->get_node_port(number));
252
253         if(socket_fd < 0) result = 1;
254
255         if(!result)
256         {
257                 watchdog = new RenderFarmWatchdog(this, 0);
258                 watchdog->start();
259         }
260
261         if(!result) Thread::start();
262
263         return result;
264 }
265
266
267
268
269
270
271
272
273
274
275
276 int64_t RenderFarmServerThread::read_int64(int *error)
277 {
278         int temp = 0;
279         if(!error) error = &temp;
280
281         unsigned char data[sizeof(int64_t)];
282         *error = (read_socket((char*)data, sizeof(int64_t)) !=
283                 sizeof(int64_t));
284
285 // Make it return 1 if error so it can be used to read a result code from the
286 // server.
287         int64_t result = 1;
288         if(!*error)
289         {
290                 result = (((int64_t)data[0]) << 56) |
291                         (((uint64_t)data[1]) << 48) |
292                         (((uint64_t)data[2]) << 40) |
293                         (((uint64_t)data[3]) << 32) |
294                         (((uint64_t)data[4]) << 24) |
295                         (((uint64_t)data[5]) << 16) |
296                         (((uint64_t)data[6]) << 8)  |
297                         data[7];
298         }
299         return result;
300 }
301
302 int RenderFarmServerThread::write_int64(int64_t value)
303 {
304         unsigned char data[sizeof(int64_t)];
305         data[0] = (value >> 56) & 0xff;
306         data[1] = (value >> 48) & 0xff;
307         data[2] = (value >> 40) & 0xff;
308         data[3] = (value >> 32) & 0xff;
309         data[4] = (value >> 24) & 0xff;
310         data[5] = (value >> 16) & 0xff;
311         data[6] = (value >> 8) & 0xff;
312         data[7] = value & 0xff;
313         return (write_socket((char*)data, sizeof(int64_t)) !=
314                 sizeof(int64_t));
315 }
316
317
318
319 int RenderFarmServerThread::read_socket(char *data, int len)
320 {
321         int bytes_read = 0;
322         int offset = 0;
323 //printf("RenderFarmServerThread::read_socket 1\n");
324         watchdog->begin_request();
325         while(len > 0 && bytes_read >= 0)
326         {
327                 enable_cancel();
328                 bytes_read = read(socket_fd, data + offset, len);
329                 disable_cancel();
330                 if(bytes_read > 0)
331                 {
332                         len -= bytes_read;
333                         offset += bytes_read;
334                 }
335                 else
336                 if(bytes_read < 0)
337                         break;
338         }
339         watchdog->end_request();
340 //printf("RenderFarmServerThread::read_socket 10\n");
341
342         return offset;
343 }
344
345 int RenderFarmServerThread::write_socket(char *data, int len)
346 {
347 //printf("RenderFarmServerThread::write_socket 1\n");
348         int result = write(socket_fd, data, len);
349 //printf("RenderFarmServerThread::write_socket 10\n");
350
351         return result;
352 }
353
354 void RenderFarmServerThread::reallocate_buffer(int size)
355 {
356         if(buffer && buffer_allocated < size)
357         {
358                 delete [] buffer;
359                 buffer = 0;
360         }
361
362         if(!buffer && size)
363         {
364                 buffer = new unsigned char[size];
365                 buffer_allocated = size;
366         }
367 }
368
369 void RenderFarmServerThread::run()
370 {
371 // Wait for requests
372         unsigned char header[5];
373         int done = 0;
374         int bytes_read = 0;
375
376
377         buffer = 0;
378         buffer_allocated = 0;
379 //      fs_server = new RenderFarmFSServer(this);
380 //      fs_server->initialize();
381
382
383 // Send command to run package renderer.
384         write_int64(RENDERFARM_PACKAGES);
385
386
387
388         while(!done)
389         {
390
391 // Wait for requests.
392 // Requests consist of request ID's and accompanying buffers.
393 // Get request ID.
394                 bytes_read = read_socket((char*)header, 5);
395 //printf("RenderFarmServerThread::run 1\n");
396                 if(bytes_read != 5)
397                 {
398                         done = 1;
399                         continue;
400                 }
401
402                 int request_id = header[0];
403                 int64_t request_size = (((u_int32_t)header[1]) << 24) |
404                                                         (((u_int32_t)header[2]) << 16) |
405                                                         (((u_int32_t)header[3]) << 8)  |
406                                                         (u_int32_t)header[4];
407
408                 reallocate_buffer(request_size);
409
410 // Get accompanying buffer
411                 bytes_read = read_socket((char*)buffer, request_size);
412
413 //printf("RenderFarmServerThread::run 2 %d %jd %d\n", request_id, request_size, bytes_read);
414                 if(bytes_read != request_size)
415                 {
416                         done = 1;
417                         continue;
418                 }
419 //printf("RenderFarmServerThread::run 3\n");
420
421                 switch(request_id)
422                 {
423                         case RENDERFARM_PREFERENCES:
424                                 send_preferences();
425                                 break;
426
427                         case RENDERFARM_ASSET:
428                                 send_asset();
429                                 break;
430
431                         case RENDERFARM_EDL:
432                                 send_edl();
433                                 break;
434
435                         case RENDERFARM_PACKAGE:
436                                 send_package(buffer);
437                                 break;
438
439                         case RENDERFARM_PROGRESS:
440                                 set_progress(buffer);
441                                 break;
442
443                         case RENDERFARM_SET_RESULT:
444                                 set_result(buffer);
445                                 break;
446
447                         case RENDERFARM_SET_VMAP:
448                                 set_video_map(buffer);
449                                 break;
450
451                         case RENDERFARM_GET_RESULT:
452                                 get_result();
453                                 break;
454
455                         case RENDERFARM_DONE:
456 //printf("RenderFarmServerThread::run 10\n");
457                                 done = 1;
458                                 break;
459
460                         case RENDERFARM_KEEPALIVE:
461                                 break;
462
463                         default:
464 //                              if(!fs_server->handle_request(request_id, request_size, (unsigned char*)buffer))
465                                 {
466                                         printf(_("RenderFarmServerThread::run: unknown request %02x\n"), request_id);
467                                 }
468                                 break;
469                 }
470 //printf("RenderFarmServerThread::run 10 %d %jd\n", request_id, request_size);
471         }
472
473 // Don't let watchdog kill the entire renderfarm when a client finishes
474 // normally.
475         if(watchdog)
476         {
477 //printf("RenderFarmServerThread::run 20\n");
478                 delete watchdog;
479                 watchdog = 0;
480         }
481
482 //      delete fs_server;
483 }
484
485 void RenderFarmServerThread::write_string(char *string)
486 {
487         int i, len;
488         i = 0;
489
490         len = strlen(string) + 1;
491         datagram = new char[len + 4];
492         STORE_INT32(len);
493         memcpy(datagram + i, string, len);
494         write_socket((char*)datagram, len + 4);
495 //printf("RenderFarmServerThread::write_string %02x%02x%02x%02x\n",
496 //      datagram[0], datagram[1], datagram[2], datagram[3]);
497
498         delete [] datagram;
499         datagram = 0;
500 }
501
502 void RenderFarmServerThread::send_preferences()
503 {
504         BC_Hash defaults;
505         char *string;
506
507         server->preferences->save_defaults(&defaults);
508         defaults.save_string(string);
509         write_string(string);
510         free(string);
511 }
512
513 void RenderFarmServerThread::send_asset()
514 {
515         BC_Hash defaults;
516         char *string1;
517
518 // The asset must be sent in two segments.
519 // One segment is stored in the EDL and contains decoding information.
520 // One segment is stored in the asset and contains encoding information.
521         server->default_asset->save_defaults(&defaults,
522                 0,
523                 1,
524                 1,
525                 1,
526                 1,
527                 1);
528         defaults.save_string(string1);
529         FileXML file;
530         server->default_asset->write(&file, 0, 0);
531         file.terminate_string();
532
533         write_string(string1);
534         write_string(file.string());
535         free(string1);
536 }
537
538
539 void RenderFarmServerThread::send_edl()
540 {
541         FileXML file;
542
543 // Save the XML
544         server->edl->save_xml(&file,
545                 0,
546                 0,
547                 0);
548         file.terminate_string();
549 //printf("RenderFarmServerThread::send_edl\n%s\n\n", file.string);
550
551         write_string(file.string());
552 //printf("RenderFarmServerThread::send_edl 2\n");
553 }
554
555
556 void RenderFarmServerThread::send_package(unsigned char *buffer)
557 {
558         this->frames_per_second = (double)((((u_int32_t)buffer[0]) << 24) |
559                 (((u_int32_t)buffer[1]) << 16) |
560                 (((u_int32_t)buffer[2]) << 8)  |
561                 ((u_int32_t)buffer[3])) /
562                 65536.0;
563
564 //printf("RenderFarmServerThread::send_package 1 %f\n", frames_per_second);
565         RenderPackage *package =
566                 server->packages->get_package(frames_per_second,
567                         number,
568                         server->use_local_rate);
569
570 //printf("RenderFarmServerThread::send_package 2\n");
571         datagram = new char[BCTEXTLEN];
572
573 // No more packages
574         if(!package)
575         {
576 //printf("RenderFarmServerThread::send_package 1\n");
577                 datagram[0] = datagram[1] = datagram[2] = datagram[3] = 0;
578                 write_socket(datagram, 4);
579         }
580         else
581 // Encode package
582         {
583 //printf("RenderFarmServerThread::send_package 10\n");
584                 int i = 4;
585                 strcpy(&datagram[i], package->path);
586                 i += strlen(package->path);
587                 datagram[i++] = 0;
588
589                 STORE_INT32(package->audio_start);
590                 STORE_INT32(package->audio_end);
591                 STORE_INT32(package->video_start);
592                 STORE_INT32(package->video_end);
593                 int use_brender = (server->brender ? 1 : 0);
594                 STORE_INT32(use_brender);
595                 STORE_INT32(package->audio_do);
596                 STORE_INT32(package->video_do);
597
598                 int len = i;
599                 i = 0;
600                 STORE_INT32(len - 4);
601
602                 write_socket(datagram, len);
603         }
604         delete [] datagram;
605         datagram = 0;
606 }
607
608
609 void RenderFarmServerThread::set_progress(unsigned char *buffer)
610 {
611         server->total_return_lock->lock("RenderFarmServerThread::set_progress");
612         *server->total_return += (int64_t)(((u_int32_t)buffer[0]) << 24) |
613                 (((u_int32_t)buffer[1]) << 16) |
614                 (((u_int32_t)buffer[2]) << 8)  |
615                 ((u_int32_t)buffer[3]);
616         frames_per_second = (double)((((u_int32_t)buffer[4]) << 24) |
617                 (((u_int32_t)buffer[5]) << 16) |
618                 (((u_int32_t)buffer[6]) << 8)  |
619                 ((u_int32_t)buffer[7])) /
620                 65536.0;
621         server->total_return_lock->unlock();
622
623         server->preferences->set_rate(frames_per_second, number);
624
625 // This locks the preferences
626         if(server->mwindow) server->mwindow->preferences->copy_rates_from(
627                 server->preferences);
628 }
629
630 int RenderFarmServerThread::set_video_map(unsigned char *buffer)
631 {
632         if(server->brender)
633         {
634                 server->brender->set_video_map((int64_t)(((u_int32_t)buffer[0]) << 24) |
635                                                         (((u_int32_t)buffer[1]) << 16) |
636                                                         (((u_int32_t)buffer[2]) << 8)  |
637                                                         ((u_int32_t)buffer[3]),
638                                                         (int64_t)(((u_int32_t)buffer[4]) << 24) |
639                                                         (((u_int32_t)buffer[5]) << 16) |
640                                                         (((u_int32_t)buffer[6]) << 8)  |
641                                                         ((u_int32_t)buffer[7]));
642                 char return_value[1];
643                 return_value[0] = 0;
644                 write_socket(return_value, 1);
645                 return 0;
646         }
647         return 1;
648 }
649
650
651 void RenderFarmServerThread::set_result(unsigned char *buffer)
652 {
653 //printf("RenderFarmServerThread::set_result %p\n", buffer);
654         if(!*server->result_return)
655                 *server->result_return = buffer[0];
656 }
657
658
659 void RenderFarmServerThread::get_result()
660 {
661         unsigned char data[1];
662         data[0] = *server->result_return;
663         write_socket((char*)data, 1);
664 }
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679 RenderFarmWatchdog::RenderFarmWatchdog(
680         RenderFarmServerThread *server,
681         RenderFarmClientThread *client)
682  : Thread(1, 0, 0)
683 {
684         this->server = server;
685         this->client = client;
686         next_request = new Condition(0, "RenderFarmWatchdog::next_request", 0);
687         request_complete = new Condition(0, "RenderFarmWatchdog::request_complete", 0);
688         done = 0;
689 }
690
691 RenderFarmWatchdog::~RenderFarmWatchdog()
692 {
693         done = 1;
694         next_request->unlock();
695         request_complete->unlock();
696         join();
697         delete next_request;
698         delete request_complete;
699 }
700
701 void RenderFarmWatchdog::begin_request()
702 {
703         next_request->unlock();
704 }
705
706 void RenderFarmWatchdog::end_request()
707 {
708         request_complete->unlock();
709 }
710
711 void RenderFarmWatchdog::run()
712 {
713         while(!done)
714         {
715                 next_request->lock("RenderFarmWatchdog::run");
716
717                 int result = request_complete->timed_lock(RENDERFARM_TIMEOUT * 1000000,
718                         "RenderFarmWatchdog::run");
719 //printf("RenderFarmWatchdog::run 1 %d\n", result);
720
721                 if(result)
722                 {
723                         if(client)
724                         {
725                                 printf("RenderFarmWatchdog::run 1 killing client pid %d\n", client->pid);
726                                 kill(client->pid, SIGKILL);
727                         }
728                         else
729                         if(server)
730                         {
731                                 printf("RenderFarmWatchdog::run 1 killing server thread %p\n", server);
732                                 server->cancel();
733                                 unsigned char buffer[4];
734                                 buffer[0] = 1;
735                                 server->set_result(buffer);
736                         }
737
738                         done = 1;
739                 }
740         }
741 }
742
743
744
745