minor improvements from Andrew
[goodguy/cinelerra.git] / cinelerra-5.1 / cinelerra / renderfarmclient.C
1
2 /*
3  * CINELERRA
4  * Copyright (C) 2008 Adam Williams <broadcast at earthling dot net>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  *
20  */
21
22 #include "asset.h"
23 #include "assets.h"
24 #include "clip.h"
25 #include "bchash.h"
26 #include "dvbtune.h"
27 #include "edl.h"
28 #include "filesystem.h"
29 #include "filexml.h"
30 #include "language.h"
31 #include "mutex.h"
32 #include "mwindow.h"
33 #include "pluginserver.h"
34 #include "preferences.h"
35 #include "renderfarm.h"
36 #include "renderfarmclient.h"
37 //#include "renderfarmfsclient.h"
38 #include "sighandler.h"
39
40 #include <arpa/inet.h>
41 #include <errno.h>
42 #include <fcntl.h>
43 #include <netdb.h>
44 #include <netinet/in.h>
45 #include <stdio.h>
46 #include <string.h>
47 #include <sys/socket.h>
48 #include <sys/types.h>
49 #include <sys/un.h>
50 #include <sys/wait.h>
51 #include <unistd.h>
52
53
54 #ifndef AF_FILE
55 #define AF_FILE AF_LOCAL
56 #endif
57
58 // The render client waits for connections from the server.
59 // Then it starts a thread for each connection.
60 RenderFarmClient::RenderFarmClient(int port,
61         char *deamon_path,
62         int nice_value,
63         char *config_path)
64 {
65         char string[BCTEXTLEN];
66
67         this->port = port;
68         this->deamon_path = deamon_path;
69         SigHandler *signals = new SigHandler;
70         signals->initialize("/tmp/cinelerra_farm%d.dmp");
71
72         this_pid = getpid();
73         (void)nice(nice_value);
74
75         boot_defaults = 0;
76         MWindow::init_defaults(boot_defaults, config_path);
77         boot_preferences = new Preferences;
78         boot_preferences->load_defaults(boot_defaults);
79         MWindow::init_plugins(0, boot_preferences);
80         MWindow::init_ladspa_plugins(0, boot_preferences);  
81         BC_Signals::set_catch_segv(boot_preferences->trap_sigsegv);
82         BC_Signals::set_catch_intr(0);
83         if( boot_preferences->trap_sigsegv ) {
84                 BC_Trace::enable_locks();
85         }
86         else {
87                 BC_Trace::disable_locks();
88         }
89
90         strcpy(string, boot_preferences->plugin_dir);
91         strcat(string, "/" FONT_SEARCHPATH);
92         BC_Resources::init_fontconfig(string);
93 }
94
95
96
97
98 RenderFarmClient::~RenderFarmClient()
99 {
100         delete boot_defaults;
101         delete boot_preferences;
102 }
103
104
105 void RenderFarmClient::main_loop()
106 {
107         int socket_fd;
108         BC_WindowBase::get_resources()->vframe_shm = 1;
109
110
111 // Open listening port
112
113         if(!deamon_path)
114         {
115                 struct sockaddr_in addr;
116
117                 addr.sin_family = AF_INET;
118                 addr.sin_port = htons((unsigned short)port);
119                 addr.sin_addr.s_addr = htonl(INADDR_ANY);
120
121                 if((socket_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
122                 {
123                         perror(_("RenderFarmClient::main_loop: socket"));
124                         return;
125                 }
126                 struct linger lgr;
127                 lgr.l_onoff = 0;
128                 lgr.l_linger = 0;
129                 if( setsockopt(socket_fd, SOL_SOCKET, SO_LINGER, &lgr, sizeof(lgr)) < 0 )
130                         perror("RenderFarmClient::setsockopt:setlinger 0");
131
132                 if(bind(socket_fd,
133                         (struct sockaddr*)&addr,
134                         sizeof(addr)) < 0)
135                 {
136                         fprintf(stderr,
137                                 _("RenderFarmClient::main_loop: bind port %d: %s"),
138                                 port,
139                                 strerror(errno));
140                         return;
141                 }
142         }
143         else
144         {
145                 struct sockaddr_un addr;
146                 addr.sun_family = AF_FILE;
147                 strcpy(addr.sun_path, deamon_path);
148                 int size = (offsetof(struct sockaddr_un, sun_path) +
149                         strlen(deamon_path) + 1);
150
151                 if((socket_fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0)
152                 {
153                         perror(_("RenderFarmClient::main_loop: socket"));
154                         return;
155                 }
156                 struct linger lgr;
157                 lgr.l_onoff = 0;
158                 lgr.l_linger = 0;
159                 if( setsockopt(socket_fd, SOL_SOCKET, SO_LINGER, &lgr, sizeof(lgr)) < 0 )
160                         perror("RenderFarmClient::setsockopt:setlinger 1");
161
162                 if(bind(socket_fd,
163                         (struct sockaddr*)&addr,
164                         size) < 0)
165                 {
166                         fprintf(stderr,
167                                 _("RenderFarmClient::main_loop: bind path %s: %s\n"),
168                                 deamon_path,
169                                 strerror(errno));
170                         return;
171                 }
172         }
173
174 // Wait for connections
175         printf("RenderFarmClient::main_loop: client started\n");
176         while(1)
177         {
178                 if(listen(socket_fd, 256) < 0) {
179                         perror(_("RenderFarmClient::main_loop: listen"));
180                         return;
181                 }
182
183                 int new_socket_fd;
184
185                 if(!deamon_path)
186                 {
187                         struct sockaddr_in clientname;
188                         socklen_t size = sizeof(clientname);
189                         if( (new_socket_fd = accept(socket_fd,
190                                 (struct sockaddr*)&clientname, &size) ) < 0 ) {
191                                 perror(_("RenderFarmClient::main_loop: accept"));
192                                 return;
193                         }
194 printf("RenderFarmClient::main_loop: Session started from %s\n", inet_ntoa(clientname.sin_addr));
195                         RenderFarmClientThread *thread =
196                                 new RenderFarmClientThread(this);
197                         thread->main_loop(new_socket_fd);
198                 }
199                 else
200                 {
201                         struct sockaddr_un clientname;
202                         socklen_t size = sizeof(clientname);
203                         if( (new_socket_fd = accept(socket_fd,
204                                         (struct sockaddr*)&clientname, &size)) < 0 ) {
205                                 perror(_("RenderFarmClient::main_loop: accept"));
206                                 return;
207                         }
208 printf("RenderFarmClient::main_loop: Session started from %s\n", clientname.sun_path);
209                         RenderFarmClientThread *thread =
210                                 new RenderFarmClientThread(this);
211                         thread->main_loop(new_socket_fd);
212                 }
213         }
214 }
215
216 void RenderFarmClient::kill_client()
217 {
218 printf("RenderFarmClient::kill_client 1\n");
219         if(deamon_path)
220         {
221 printf("RenderFarmClient::kill_client 2\n");
222                 remove(deamon_path);
223                 kill(this_pid, SIGKILL);
224         }
225 }
226
227
228 // The thread requests jobs from the server until the job table is empty
229 // or the server reports an error.  This thread must poll the server
230 // after every frame for the error status.
231 // Detaches when finished.
232 RenderFarmClientThread::RenderFarmClientThread(RenderFarmClient *client)
233  : Thread(0, 0, 1)
234 {
235         this->client = client;
236         this->edl = 0;
237         frames_per_second = 0;
238         Thread::set_synchronous(0);
239 //      fs_client = 0;
240         mutex_lock = new Mutex("RenderFarmClientThread::mutex_lock");
241         watchdog = 0;
242         keep_alive = 0;
243 }
244
245 RenderFarmClientThread::~RenderFarmClientThread()
246 {
247 //      if(fs_client) delete fs_client;
248         delete mutex_lock;
249         delete watchdog;
250         delete keep_alive;
251 }
252
253
254 int RenderFarmClientThread::send_request_header(int request,
255         int len)
256 {
257         unsigned char datagram[5];
258         datagram[0] = request;
259
260         int i = 1;
261         STORE_INT32(len);
262 // printf("RenderFarmClientThread::send_request_header %d %02x%02x%02x%02x%02x\n",
263 // request, datagram[0], datagram[1], datagram[2], datagram[3], datagram[4]);
264
265         return (write_socket((char*)datagram, 5) != 5);
266 }
267
268 int RenderFarmClientThread::write_socket(char *data, int len)
269 {
270 //printf("RenderFarmClientThread::write_socket 1\n");
271         int result = write(socket_fd, data, len);
272 //printf("RenderFarmClientThread::write_socket 10\n");
273         return result;
274 }
275
276 int RenderFarmClientThread::read_socket(char *data, int len)
277 {
278         int bytes_read = 0;
279         int offset = 0;
280 //printf("RenderFarmClientThread::read_socket 1\n");
281         if( watchdog )
282                 watchdog->begin_request();
283         while(len > 0 && bytes_read >= 0)
284         {
285                 bytes_read = read(socket_fd, data + offset, len);
286                 if(bytes_read > 0)
287                 {
288                         len -= bytes_read;
289                         offset += bytes_read;
290                 }
291                 else
292                 if(bytes_read < 0)
293                 {
294                         break;
295                 }
296         }
297         if( watchdog )
298                 watchdog->end_request();
299 //printf("RenderFarmClientThread::read_socket 10\n");
300
301         return offset;
302 }
303
304 int RenderFarmClientThread::write_int64(int64_t value)
305 {
306         unsigned char data[sizeof(int64_t)];
307         data[0] = (value >> 56) & 0xff;
308         data[1] = (value >> 48) & 0xff;
309         data[2] = (value >> 40) & 0xff;
310         data[3] = (value >> 32) & 0xff;
311         data[4] = (value >> 24) & 0xff;
312         data[5] = (value >> 16) & 0xff;
313         data[6] = (value >> 8) & 0xff;
314         data[7] = value & 0xff;
315         return (write_socket((char*)data, sizeof(int64_t)) != sizeof(int64_t));
316 }
317
318 int64_t RenderFarmClientThread::read_int64(int *error)
319 {
320         int temp = 0;
321         if(!error) error = &temp;
322
323         unsigned char data[sizeof(int64_t)];
324         *error = (read_socket((char*)data, sizeof(int64_t)) != sizeof(int64_t));
325
326 // Make it return 1 if error so it can be used to read a result code from the
327 // server.
328         int64_t result = 1;
329         if(!*error)
330         {
331                 result = (((int64_t)data[0]) << 56) |
332                         (((uint64_t)data[1]) << 48) |
333                         (((uint64_t)data[2]) << 40) |
334                         (((uint64_t)data[3]) << 32) |
335                         (((uint64_t)data[4]) << 24) |
336                         (((uint64_t)data[5]) << 16) |
337                         (((uint64_t)data[6]) << 8)  |
338                         data[7];
339         }
340         return result;
341 }
342
343 void RenderFarmClientThread::read_string(char* &string)
344 {
345         unsigned char header[4];
346         if(read_socket((char*)header, 4) != 4)
347         {
348                 string = 0;
349                 return;
350         }
351
352         int64_t len = (((u_int32_t)header[0]) << 24) |
353                                 (((u_int32_t)header[1]) << 16) |
354                                 (((u_int32_t)header[2]) << 8) |
355                                 ((u_int32_t)header[3]);
356
357         if(len)
358         {
359                 string = new char[len];
360                 if(read_socket(string, len) != len)
361                 {
362                         delete [] string;
363                         string = 0;
364                 }
365         }
366         else
367                 string = 0;
368
369 }
370
371 void RenderFarmClientThread::abort()
372 {
373         send_completion(socket_fd);
374         close(socket_fd);
375         exit(1);
376 }
377
378 void RenderFarmClientThread::lock(const char *location)
379 {
380         mutex_lock->lock(location);
381 }
382
383 void RenderFarmClientThread::unlock()
384 {
385         mutex_lock->unlock();
386 }
387
388 void RenderFarmClientThread::get_command(int socket_fd, int *command)
389 {
390         int error;
391         *command = read_int64(&error);
392         if(error)
393         {
394                 *command = 0;
395                 return;
396         }
397 }
398
399
400 void RenderFarmClientThread::read_preferences(int socket_fd,
401         Preferences *preferences)
402 {
403         lock("RenderFarmClientThread::read_preferences");
404         send_request_header(RENDERFARM_PREFERENCES,
405                 0);
406
407         char *string;
408         read_string(string);
409
410         BC_Hash defaults;
411         defaults.load_string((char*)string);
412         preferences->load_defaults(&defaults);
413
414         delete [] string;
415         unlock();
416 }
417
418
419
420 void RenderFarmClientThread::read_asset(int socket_fd, Asset *asset)
421 {
422         lock("RenderFarmClientThread::read_asset");
423         send_request_header(RENDERFARM_ASSET,
424                 0);
425
426         char *string1;
427         char *string2;
428         read_string(string1);
429         read_string(string2);
430
431
432
433         FileXML file;
434         file.read_from_string((char*)string2);
435         asset->read(&file);
436
437
438
439         BC_Hash defaults;
440         defaults.load_string((char*)string1);
441         asset->load_defaults(&defaults, 0, 1, 1, 1, 1, 1);
442
443 //printf("RenderFarmClientThread::read_asset %d\n", __LINE__);
444 //asset->dump();
445
446         delete [] string1;
447         delete [] string2;
448         unlock();
449 }
450
451 void RenderFarmClientThread::read_edl(int socket_fd,
452                 EDL *edl, Preferences *preferences)
453 {
454         lock("RenderFarmClientThread::read_edl");
455         send_request_header(RENDERFARM_EDL, 0);
456
457         char *string;
458         read_string(string);
459
460         FileXML file;
461         file.read_from_string((char*)string);
462         delete [] string;
463
464         edl->load_xml(&file, LOAD_ALL);
465         unlock();
466 }
467
468 int RenderFarmClientThread::read_package(int socket_fd, RenderPackage *package)
469 {
470         lock("RenderFarmClientThread::read_package");
471         send_request_header(RENDERFARM_PACKAGE,
472                 4);
473
474         unsigned char datagram[5];
475         int i = 0;
476
477
478 // Fails if -ieee isn't set.
479         int64_t fixed = !EQUIV(frames_per_second, 0.0) ?
480                 (int64_t)(frames_per_second * 65536.0) : 0;
481         STORE_INT32(fixed);
482         write_socket((char*)datagram, 4);
483
484
485 //printf("RenderFarmClientThread::read_package 1 %f %ld\n", frames_per_second, fixed);
486         char *data;
487         unsigned char *data_ptr;
488         read_string(data);
489 //printf("RenderFarmClientThread::read_package 2 %p\n", data);
490 // Signifies end of session.
491         if(!data)
492         {
493 //              printf(_("RenderFarmClientThread::read_package no output path received.\n"));
494                 unlock();
495                 return 1;
496         }
497
498 //printf("RenderFarmClientThread::read_package 2\n");
499
500
501         data_ptr = (unsigned char*)data;
502         strcpy(package->path, data);
503         data_ptr += strlen(package->path);
504         data_ptr++;
505         package->audio_start = READ_INT32(data_ptr);
506         data_ptr += 4;
507         package->audio_end = READ_INT32(data_ptr);
508         data_ptr += 4;
509         package->video_start = READ_INT32(data_ptr);
510         data_ptr += 4;
511         package->video_end = READ_INT32(data_ptr);
512         data_ptr += 4;
513         package->use_brender = READ_INT32(data_ptr);
514         data_ptr += 4;
515         package->audio_do = READ_INT32(data_ptr);
516         data_ptr += 4;
517         package->video_do = READ_INT32(data_ptr);
518
519         delete [] data;
520         unlock();
521
522         return 0;
523 }
524
525 int RenderFarmClientThread::send_completion(int socket_fd)
526 {
527         lock("RenderFarmClientThread::send_completion");
528         int result = send_request_header(RENDERFARM_DONE, 0);
529         unlock();
530         return result;
531 }
532
533
534 void RenderFarmClientThread::ping_server()
535 {
536         lock("RenderFarmClientThread::ping_server");
537         send_request_header(RENDERFARM_KEEPALIVE, 0);
538         unlock();
539 }
540
541
542
543 void RenderFarmClientThread::main_loop(int socket_fd)
544 {
545         this->socket_fd = socket_fd;
546
547         Thread::start();
548 }
549
550 void RenderFarmClientThread::run()
551 {
552 // Create new memory space
553         pid = fork();
554         if(pid != 0)
555         {
556                 int return_value;
557                 waitpid(pid, &return_value, 0);
558                 return;
559         }
560
561 // Get the pid of the fork if inside the fork
562         pid = getpid();
563         BC_Signals::set_trap_hook(trap_hook, this);
564         TheList::reset();
565
566         int socket_fd = this->socket_fd;
567         int watchdog_timeout = client->boot_preferences->renderfarm_watchdog_timeout;
568         if( watchdog_timeout > 0 )
569                 init_client_keepalive(watchdog_timeout);
570
571 // Get command to run
572         int command;
573         lock("RenderFarmClientThread::run");
574         get_command(socket_fd, &command);
575         unlock();
576
577 //printf("RenderFarmClientThread::run command=%d\n", command);
578
579         switch(command)
580         {
581                 case RENDERFARM_TUNER:
582                         do_tuner(socket_fd);
583                         break;
584                 case RENDERFARM_PACKAGES:
585                         do_packages(socket_fd);
586                         break;
587         }
588
589         _exit(0);
590 }
591
592
593 void RenderFarmClientThread::init_client_keepalive(int timeout_secs)
594 {
595         keep_alive = new RenderFarmKeepalive(this);
596         keep_alive->start();
597         watchdog = new RenderFarmWatchdog(timeout_secs, 0, this);
598         watchdog->start();
599 }
600
601
602
603 void RenderFarmClientThread::do_tuner(int socket_fd)
604 {
605 // Currently only 1 tuner driver.  Maybe more someday.
606 //      DVBTune server(this);
607 //      server.main_loop();
608 //      ::close(socket_fd);
609 }
610
611
612 void RenderFarmClientThread::do_packages(int socket_fd)
613 {
614         RenderPackage *package;
615         Asset *default_asset;
616         Preferences *preferences;
617
618
619
620         FarmPackageRenderer package_renderer(this, socket_fd);
621         int result = 0;
622
623
624
625 //printf("RenderFarmClientThread::run 2\n");
626 // Read settings
627         preferences = new Preferences;
628         default_asset = new Asset;
629         package = new RenderPackage;
630         edl = new EDL;
631         edl->create_objects();
632
633 //printf("RenderFarmClientThread::run 3\n");
634         read_preferences(socket_fd, preferences);
635 //printf("RenderFarmClientThread::run 4\n");
636         read_asset(socket_fd, default_asset);
637 //printf("RenderFarmClientThread::run 5\n");
638         read_edl(socket_fd, edl, preferences);
639 //edl->dump();
640 //printf("RenderFarmClientThread::run 6\n");
641
642         package_renderer.initialize(0, edl, preferences, default_asset);
643
644 // Read packages
645         while(1)
646         {
647 //printf("RenderFarmClientThread::run 5\n");
648                 result = read_package(socket_fd, package);
649 //printf("RenderFarmClientThread::run 6 %d\n", result);
650
651
652 // Finished list
653                 if(result)
654                 {
655 //printf("RenderFarmClientThread::run 7\n");
656
657                         result = send_completion(socket_fd);
658                         break;
659                 }
660
661 //              Timer timer;
662 //              timer.update();
663
664 // Error
665                 if(package_renderer.render_package(package))
666                 {
667 //printf("RenderFarmClientThread::run 8\n");
668                         result = send_completion(socket_fd);
669                         break;
670                 }
671
672
673                 frames_per_second = package_renderer.frames_per_second;
674 //              frames_per_second = (double)(package->video_end - package->video_start) /
675 //                      ((double)timer.get_difference() / 1000);
676
677 //printf("RenderFarmClientThread::run 9\n");
678
679
680
681         }
682
683
684 //printf("RenderFarmClientThread::run 9\n");
685         default_asset->Garbage::remove_user();
686 //printf("RenderFarmClientThread::run 10\n");
687         edl->Garbage::remove_user();
688         edl = 0;
689 //printf("RenderFarmClientThread::run 11\n");
690         delete preferences;
691 printf(_("RenderFarmClientThread::run: Session finished.\n"));
692 }
693
694 void RenderFarmClientThread::trap_hook(FILE *fp, void *vp)
695 {
696         RenderFarmClientThread *thread = (RenderFarmClientThread*)vp;
697         fprintf(fp, "\nEDL:\n");
698         if( thread->edl ) thread->edl->dump(fp);
699 }
700
701
702
703 RenderFarmKeepalive::RenderFarmKeepalive(
704         RenderFarmClientThread *client_thread)
705  : Thread(1, 0, 0)
706 {
707         this->client_thread = client_thread;
708         done = 0;
709 }
710
711 RenderFarmKeepalive::~RenderFarmKeepalive()
712 {
713         done = 1;
714         cancel();
715         join();
716 }
717
718 void RenderFarmKeepalive::run()
719 {
720         while(!done)
721         {
722                 enable_cancel();
723                 sleep(5);
724                 disable_cancel();
725                 if( !done ) {
726 //printf("RenderFarmKeepalive::run 1\n");
727 // watchdog thread kills this if it gets stuck
728                         client_thread->ping_server();
729 //printf("RenderFarmKeepalive::run 10\n");
730                 }
731         }
732 }
733
734
735 FarmPackageRenderer::FarmPackageRenderer(RenderFarmClientThread *thread,
736                 int socket_fd)
737  : PackageRenderer()
738 {
739         this->thread = thread;
740         this->socket_fd = socket_fd;
741 }
742
743
744 FarmPackageRenderer::~FarmPackageRenderer()
745 {
746 }
747
748
749 int FarmPackageRenderer::get_result()
750 {
751         thread->lock("FarmPackageRenderer::get_result");
752         thread->send_request_header(RENDERFARM_GET_RESULT,
753                 0);
754         unsigned char data[1];
755         data[0] = 1;
756         if(thread->read_socket((char*)data, 1) != 1)
757         {
758                 thread->unlock();
759                 return 1;
760         }
761         thread->unlock();
762         return data[0];
763 }
764
765 void FarmPackageRenderer::set_result(int value)
766 {
767         thread->lock("FarmPackageRenderer::set_result");
768         thread->send_request_header(RENDERFARM_SET_RESULT,
769                 1);
770         unsigned char data[1];
771         data[0] = value;
772         thread->write_socket((char*)data, 1);
773         thread->unlock();
774 }
775
776 void FarmPackageRenderer::set_progress(int64_t total_samples)
777 {
778         thread->lock("FarmPackageRenderer::set_progress");
779         thread->send_request_header(RENDERFARM_PROGRESS,
780                 8);
781         unsigned char datagram[8];
782         int i = 0;
783         STORE_INT32(total_samples);
784
785
786         int64_t fixed = (!EQUIV(frames_per_second, 0.0)) ?
787                 (int64_t)(frames_per_second * 65536.0) : 0;
788         STORE_INT32(fixed);
789
790
791         thread->write_socket((char*)datagram, 8);
792         thread->unlock();
793 }
794
795 int FarmPackageRenderer::set_video_map(int64_t position, int value)
796 {
797         int result = 0;
798         unsigned char datagram[8];
799         char return_value[1];
800         int i = 0;
801
802         thread->lock("FarmPackageRenderer::set_video_map");
803         thread->send_request_header(RENDERFARM_SET_VMAP,
804                 8);
805         STORE_INT32(position);
806         STORE_INT32(value);
807         thread->write_socket((char*)datagram, 8);
808
809 // Get completion since the GUI may be locked for a long time.
810         if(!thread->read_socket(return_value, 1))
811         {
812                 result = 1;
813         }
814
815         thread->unlock();
816         return result;
817 }
818
819