Credit Andrew - improve in-tree documentation
[goodguy/cinelerra.git] / cinelerra / renderfarmclient.C
1
2 /*
3  * CINELERRA
4  * Copyright (C) 2008 Adam Williams <broadcast at earthling dot net>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  *
20  */
21
22 #include "asset.h"
23 #include "assets.h"
24 #include "clip.h"
25 #include "bchash.h"
26 #include "dvbtune.h"
27 #include "edl.h"
28 #include "filesystem.h"
29 #include "filexml.h"
30 #include "language.h"
31 #include "mutex.h"
32 #include "mwindow.h"
33 #include "pluginserver.h"
34 #include "preferences.h"
35 #include "renderfarm.h"
36 #include "renderfarmclient.h"
37 //#include "renderfarmfsclient.h"
38 #include "sighandler.h"
39
40 #include <arpa/inet.h>
41 #include <errno.h>
42 #include <fcntl.h>
43 #include <netdb.h>
44 #include <netinet/in.h>
45 #include <stdio.h>
46 #include <string.h>
47 #include <sys/socket.h>
48 #include <sys/types.h>
49 #include <sys/un.h>
50 #include <sys/wait.h>
51 #include <unistd.h>
52
53
54 #ifndef AF_FILE
55 #define AF_FILE AF_LOCAL
56 #endif
57
58 // The render client waits for connections from the server.
59 // Then it starts a thread for each connection.
60 RenderFarmClient::RenderFarmClient(int port,
61         char *deamon_path,
62         int nice_value,
63         char *config_path)
64 {
65         char string[BCTEXTLEN];
66
67         this->port = port;
68         this->deamon_path = deamon_path;
69         SigHandler *signals = new SigHandler;
70         signals->initialize("/tmp/cinelerra_farm%d.dmp");
71
72         this_pid = getpid();
73         (void)nice(nice_value);
74
75         boot_defaults = 0;
76         MWindow::init_defaults(boot_defaults, config_path);
77         boot_preferences = new Preferences;
78         boot_preferences->load_defaults(boot_defaults);
79         MWindow::init_plugins(0, boot_preferences);
80         BC_Signals::set_catch_segv(boot_preferences->trap_sigsegv);
81         BC_Signals::set_catch_intr(0);
82         if( boot_preferences->trap_sigsegv ) {
83                 BC_Trace::enable_locks();
84         }
85         else {
86                 BC_Trace::disable_locks();
87         }
88
89         strcpy(string, boot_preferences->plugin_dir);
90         strcat(string, "/" FONT_SEARCHPATH);
91         BC_Resources::init_fontconfig(string);
92 }
93
94
95
96
97 RenderFarmClient::~RenderFarmClient()
98 {
99         delete boot_defaults;
100         delete boot_preferences;
101 }
102
103
104 void RenderFarmClient::main_loop()
105 {
106         int socket_fd;
107         BC_WindowBase::get_resources()->vframe_shm = 1;
108
109
110 // Open listening port
111
112         if(!deamon_path)
113         {
114                 struct sockaddr_in addr;
115
116                 addr.sin_family = AF_INET;
117                 addr.sin_port = htons((unsigned short)port);
118                 addr.sin_addr.s_addr = htonl(INADDR_ANY);
119
120                 if((socket_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
121                 {
122                         perror(_("RenderFarmClient::main_loop: socket"));
123                         return;
124                 }
125
126                 if(bind(socket_fd,
127                         (struct sockaddr*)&addr,
128                         sizeof(addr)) < 0)
129                 {
130                         fprintf(stderr,
131                                 _("RenderFarmClient::main_loop: bind port %d: %s"),
132                                 port,
133                                 strerror(errno));
134                         return;
135                 }
136         }
137         else
138         {
139                 struct sockaddr_un addr;
140                 addr.sun_family = AF_FILE;
141                 strcpy(addr.sun_path, deamon_path);
142                 int size = (offsetof(struct sockaddr_un, sun_path) +
143                         strlen(deamon_path) + 1);
144
145                 if((socket_fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0)
146                 {
147                         perror(_("RenderFarmClient::main_loop: socket"));
148                         return;
149                 }
150
151                 if(bind(socket_fd,
152                         (struct sockaddr*)&addr,
153                         size) < 0)
154                 {
155                         fprintf(stderr,
156                                 _("RenderFarmClient::main_loop: bind path %s: %s\n"),
157                                 deamon_path,
158                                 strerror(errno));
159                         return;
160                 }
161         }
162
163 // Wait for connections
164         printf("RenderFarmClient::main_loop: client started\n");
165         while(1)
166         {
167                 if(listen(socket_fd, 256) < 0) {
168                         perror(_("RenderFarmClient::main_loop: listen"));
169                         return;
170                 }
171
172                 int new_socket_fd;
173
174                 if(!deamon_path)
175                 {
176                         struct sockaddr_in clientname;
177                         socklen_t size = sizeof(clientname);
178                         if( (new_socket_fd = accept(socket_fd,
179                                 (struct sockaddr*)&clientname, &size) ) < 0 ) {
180                                 perror(_("RenderFarmClient::main_loop: accept"));
181                                 return;
182                         }
183 printf("RenderFarmClient::main_loop: Session started from %s\n", inet_ntoa(clientname.sin_addr));
184                         RenderFarmClientThread *thread =
185                                 new RenderFarmClientThread(this);
186                         thread->main_loop(new_socket_fd);
187                 }
188                 else
189                 {
190                         struct sockaddr_un clientname;
191                         socklen_t size = sizeof(clientname);
192                         if( (new_socket_fd = accept(socket_fd,
193                                         (struct sockaddr*)&clientname, &size)) < 0 ) {
194                                 perror(_("RenderFarmClient::main_loop: accept"));
195                                 return;
196                         }
197 printf("RenderFarmClient::main_loop: Session started from %s\n", clientname.sun_path);
198                         RenderFarmClientThread *thread =
199                                 new RenderFarmClientThread(this);
200                         thread->main_loop(new_socket_fd);
201                 }
202         }
203 }
204
205 void RenderFarmClient::kill_client()
206 {
207 printf("RenderFarmClient::kill_client 1\n");
208         if(deamon_path)
209         {
210 printf("RenderFarmClient::kill_client 2\n");
211                 remove(deamon_path);
212                 kill(this_pid, SIGKILL);
213         }
214 }
215
216
217 // The thread requests jobs from the server until the job table is empty
218 // or the server reports an error.  This thread must poll the server
219 // after every frame for the error status.
220 // Detaches when finished.
221 RenderFarmClientThread::RenderFarmClientThread(RenderFarmClient *client)
222  : Thread(0, 0, 1)
223 {
224         this->client = client;
225         this->edl = 0;
226         frames_per_second = 0;
227         Thread::set_synchronous(0);
228 //      fs_client = 0;
229         mutex_lock = new Mutex("RenderFarmClientThread::mutex_lock");
230         watchdog = 0;
231         keep_alive = 0;
232 }
233
234 RenderFarmClientThread::~RenderFarmClientThread()
235 {
236 //      if(fs_client) delete fs_client;
237         delete mutex_lock;
238         delete watchdog;
239         delete keep_alive;
240 }
241
242
243 int RenderFarmClientThread::send_request_header(int request,
244         int len)
245 {
246         unsigned char datagram[5];
247         datagram[0] = request;
248
249         int i = 1;
250         STORE_INT32(len);
251 // printf("RenderFarmClientThread::send_request_header %d %02x%02x%02x%02x%02x\n",
252 // request, datagram[0], datagram[1], datagram[2], datagram[3], datagram[4]);
253
254         return (write_socket((char*)datagram, 5) != 5);
255 }
256
257 int RenderFarmClientThread::write_socket(char *data, int len)
258 {
259 //printf("RenderFarmClientThread::write_socket 1\n");
260         int result = write(socket_fd, data, len);
261 //printf("RenderFarmClientThread::write_socket 10\n");
262         return result;
263 }
264
265 int RenderFarmClientThread::read_socket(char *data, int len)
266 {
267         int bytes_read = 0;
268         int offset = 0;
269 //printf("RenderFarmClientThread::read_socket 1\n");
270         watchdog->begin_request();
271         while(len > 0 && bytes_read >= 0)
272         {
273                 bytes_read = read(socket_fd, data + offset, len);
274                 if(bytes_read > 0)
275                 {
276                         len -= bytes_read;
277                         offset += bytes_read;
278                 }
279                 else
280                 if(bytes_read < 0)
281                 {
282                         break;
283                 }
284         }
285         watchdog->end_request();
286 //printf("RenderFarmClientThread::read_socket 10\n");
287
288         return offset;
289 }
290
291 int RenderFarmClientThread::write_int64(int64_t value)
292 {
293         unsigned char data[sizeof(int64_t)];
294         data[0] = (value >> 56) & 0xff;
295         data[1] = (value >> 48) & 0xff;
296         data[2] = (value >> 40) & 0xff;
297         data[3] = (value >> 32) & 0xff;
298         data[4] = (value >> 24) & 0xff;
299         data[5] = (value >> 16) & 0xff;
300         data[6] = (value >> 8) & 0xff;
301         data[7] = value & 0xff;
302         return (write_socket((char*)data, sizeof(int64_t)) != sizeof(int64_t));
303 }
304
305 int64_t RenderFarmClientThread::read_int64(int *error)
306 {
307         int temp = 0;
308         if(!error) error = &temp;
309
310         unsigned char data[sizeof(int64_t)];
311         *error = (read_socket((char*)data, sizeof(int64_t)) != sizeof(int64_t));
312
313 // Make it return 1 if error so it can be used to read a result code from the
314 // server.
315         int64_t result = 1;
316         if(!*error)
317         {
318                 result = (((int64_t)data[0]) << 56) |
319                         (((uint64_t)data[1]) << 48) |
320                         (((uint64_t)data[2]) << 40) |
321                         (((uint64_t)data[3]) << 32) |
322                         (((uint64_t)data[4]) << 24) |
323                         (((uint64_t)data[5]) << 16) |
324                         (((uint64_t)data[6]) << 8)  |
325                         data[7];
326         }
327         return result;
328 }
329
330 void RenderFarmClientThread::read_string(char* &string)
331 {
332         unsigned char header[4];
333         if(read_socket((char*)header, 4) != 4)
334         {
335                 string = 0;
336                 return;
337         }
338
339         int64_t len = (((u_int32_t)header[0]) << 24) |
340                                 (((u_int32_t)header[1]) << 16) |
341                                 (((u_int32_t)header[2]) << 8) |
342                                 ((u_int32_t)header[3]);
343
344         if(len)
345         {
346                 string = new char[len];
347                 if(read_socket(string, len) != len)
348                 {
349                         delete [] string;
350                         string = 0;
351                 }
352         }
353         else
354                 string = 0;
355
356 }
357
358 void RenderFarmClientThread::abort()
359 {
360         send_completion(socket_fd);
361         close(socket_fd);
362         exit(1);
363 }
364
365 void RenderFarmClientThread::lock(const char *location)
366 {
367         mutex_lock->lock(location);
368 }
369
370 void RenderFarmClientThread::unlock()
371 {
372         mutex_lock->unlock();
373 }
374
375 void RenderFarmClientThread::get_command(int socket_fd, int *command)
376 {
377         int error;
378         *command = read_int64(&error);
379         if(error)
380         {
381                 *command = 0;
382                 return;
383         }
384 }
385
386
387 void RenderFarmClientThread::read_preferences(int socket_fd,
388         Preferences *preferences)
389 {
390         lock("RenderFarmClientThread::read_preferences");
391         send_request_header(RENDERFARM_PREFERENCES,
392                 0);
393
394         char *string;
395         read_string(string);
396
397         BC_Hash defaults;
398         defaults.load_string((char*)string);
399         preferences->load_defaults(&defaults);
400
401         delete [] string;
402         unlock();
403 }
404
405
406
407 void RenderFarmClientThread::read_asset(int socket_fd, Asset *asset)
408 {
409         lock("RenderFarmClientThread::read_asset");
410         send_request_header(RENDERFARM_ASSET,
411                 0);
412
413         char *string1;
414         char *string2;
415         read_string(string1);
416         read_string(string2);
417
418
419
420         FileXML file;
421         file.read_from_string((char*)string2);
422         asset->read(&file);
423
424
425
426         BC_Hash defaults;
427         defaults.load_string((char*)string1);
428         asset->load_defaults(&defaults,
429                 0,
430                 1,
431                 1,
432                 1,
433                 1,
434                 1);
435
436 //printf("RenderFarmClientThread::read_asset %d\n", __LINE__);
437 //asset->dump();
438
439         delete [] string1;
440         delete [] string2;
441         unlock();
442 }
443
444 void RenderFarmClientThread::read_edl(int socket_fd,
445                 EDL *edl, Preferences *preferences)
446 {
447         lock("RenderFarmClientThread::read_edl");
448         send_request_header(RENDERFARM_EDL, 0);
449
450         char *string;
451         read_string(string);
452
453         FileXML file;
454         file.read_from_string((char*)string);
455         delete [] string;
456
457         edl->load_xml(&file, LOAD_ALL);
458         unlock();
459 }
460
461 int RenderFarmClientThread::read_package(int socket_fd, RenderPackage *package)
462 {
463         lock("RenderFarmClientThread::read_package");
464         send_request_header(RENDERFARM_PACKAGE,
465                 4);
466
467         unsigned char datagram[5];
468         int i = 0;
469
470
471 // Fails if -ieee isn't set.
472         int64_t fixed = !EQUIV(frames_per_second, 0.0) ?
473                 (int64_t)(frames_per_second * 65536.0) : 0;
474         STORE_INT32(fixed);
475         write_socket((char*)datagram, 4);
476
477
478 //printf("RenderFarmClientThread::read_package 1 %f %ld\n", frames_per_second, fixed);
479         char *data;
480         unsigned char *data_ptr;
481         read_string(data);
482 //printf("RenderFarmClientThread::read_package 2 %p\n", data);
483 // Signifies end of session.
484         if(!data)
485         {
486 //              printf(_("RenderFarmClientThread::read_package no output path received.\n"));
487                 unlock();
488                 return 1;
489         }
490
491 //printf("RenderFarmClientThread::read_package 2\n");
492
493
494         data_ptr = (unsigned char*)data;
495         strcpy(package->path, data);
496         data_ptr += strlen(package->path);
497         data_ptr++;
498         package->audio_start = READ_INT32(data_ptr);
499         data_ptr += 4;
500         package->audio_end = READ_INT32(data_ptr);
501         data_ptr += 4;
502         package->video_start = READ_INT32(data_ptr);
503         data_ptr += 4;
504         package->video_end = READ_INT32(data_ptr);
505         data_ptr += 4;
506         package->use_brender = READ_INT32(data_ptr);
507         data_ptr += 4;
508         package->audio_do = READ_INT32(data_ptr);
509         data_ptr += 4;
510         package->video_do = READ_INT32(data_ptr);
511
512         delete [] data;
513         unlock();
514
515         return 0;
516 }
517
518 int RenderFarmClientThread::send_completion(int socket_fd)
519 {
520         lock("RenderFarmClientThread::send_completion");
521         int result = send_request_header(RENDERFARM_DONE, 0);
522         unlock();
523         return result;
524 }
525
526
527 void RenderFarmClientThread::ping_server()
528 {
529         lock("RenderFarmClientThread::ping_server");
530         send_request_header(RENDERFARM_KEEPALIVE, 0);
531         unlock();
532 }
533
534
535
536 void RenderFarmClientThread::main_loop(int socket_fd)
537 {
538         this->socket_fd = socket_fd;
539
540         Thread::start();
541 }
542
543 void RenderFarmClientThread::run()
544 {
545 // Create new memory space
546         pid = fork();
547         if(pid != 0)
548         {
549                 int return_value;
550                 waitpid(pid, &return_value, 0);
551                 return;
552         }
553
554 // Get the pid of the fork if inside the fork
555         pid = getpid();
556         BC_Signals::set_trap_hook(trap_hook, this);
557         TheList::reset();
558
559         int socket_fd = this->socket_fd;
560         int watchdog_timeout = client->boot_preferences->renderfarm_watchdog_timeout;
561         if( watchdog_timeout > 0 )
562                 init_client_keepalive(watchdog_timeout);
563
564 // Get command to run
565         int command;
566         lock("RenderFarmClientThread::run");
567         get_command(socket_fd, &command);
568         unlock();
569
570 //printf("RenderFarmClientThread::run command=%d\n", command);
571
572         switch(command)
573         {
574                 case RENDERFARM_TUNER:
575                         do_tuner(socket_fd);
576                         break;
577                 case RENDERFARM_PACKAGES:
578                         do_packages(socket_fd);
579                         break;
580         }
581
582         _exit(0);
583 }
584
585
586 void RenderFarmClientThread::init_client_keepalive(int timeout_secs)
587 {
588         keep_alive = new RenderFarmKeepalive(this);
589         keep_alive->start();
590         watchdog = new RenderFarmWatchdog(timeout_secs, 0, this);
591         watchdog->start();
592 }
593
594
595
596 void RenderFarmClientThread::do_tuner(int socket_fd)
597 {
598 // Currently only 1 tuner driver.  Maybe more someday.
599 //      DVBTune server(this);
600 //      server.main_loop();
601 //      ::close(socket_fd);
602 }
603
604
605 void RenderFarmClientThread::do_packages(int socket_fd)
606 {
607         RenderPackage *package;
608         Asset *default_asset;
609         Preferences *preferences;
610
611
612
613         FarmPackageRenderer package_renderer(this, socket_fd);
614         int result = 0;
615
616
617
618 //printf("RenderFarmClientThread::run 2\n");
619 // Read settings
620         preferences = new Preferences;
621         default_asset = new Asset;
622         package = new RenderPackage;
623         edl = new EDL;
624         edl->create_objects();
625
626 //printf("RenderFarmClientThread::run 3\n");
627         read_preferences(socket_fd, preferences);
628 //printf("RenderFarmClientThread::run 4\n");
629         read_asset(socket_fd, default_asset);
630 //printf("RenderFarmClientThread::run 5\n");
631         read_edl(socket_fd, edl, preferences);
632 //edl->dump();
633 //printf("RenderFarmClientThread::run 6\n");
634
635         package_renderer.initialize(0, edl, preferences, default_asset);
636
637 // Read packages
638         while(1)
639         {
640 //printf("RenderFarmClientThread::run 5\n");
641                 result = read_package(socket_fd, package);
642 //printf("RenderFarmClientThread::run 6 %d\n", result);
643
644
645 // Finished list
646                 if(result)
647                 {
648 //printf("RenderFarmClientThread::run 7\n");
649
650                         result = send_completion(socket_fd);
651                         break;
652                 }
653
654 //              Timer timer;
655 //              timer.update();
656
657 // Error
658                 if(package_renderer.render_package(package))
659                 {
660 //printf("RenderFarmClientThread::run 8\n");
661                         result = send_completion(socket_fd);
662                         break;
663                 }
664
665
666                 frames_per_second = package_renderer.frames_per_second;
667 //              frames_per_second = (double)(package->video_end - package->video_start) /
668 //                      ((double)timer.get_difference() / 1000);
669
670 //printf("RenderFarmClientThread::run 9\n");
671
672
673
674         }
675
676
677 //printf("RenderFarmClientThread::run 9\n");
678         default_asset->Garbage::remove_user();
679 //printf("RenderFarmClientThread::run 10\n");
680         edl->Garbage::remove_user();
681         edl = 0;
682 //printf("RenderFarmClientThread::run 11\n");
683         delete preferences;
684 printf(_("RenderFarmClientThread::run: Session finished.\n"));
685 }
686
687 void RenderFarmClientThread::trap_hook(FILE *fp, void *vp)
688 {
689         RenderFarmClientThread *thread = (RenderFarmClientThread*)vp;
690         fprintf(fp, "\nEDL:\n");
691         if( thread->edl ) thread->edl->dump(fp);
692 }
693
694
695
696 RenderFarmKeepalive::RenderFarmKeepalive(
697         RenderFarmClientThread *client_thread)
698  : Thread(1, 0, 0)
699 {
700         this->client_thread = client_thread;
701         done = 0;
702 }
703
704 RenderFarmKeepalive::~RenderFarmKeepalive()
705 {
706         done = 1;
707         cancel();
708         join();
709 }
710
711 void RenderFarmKeepalive::run()
712 {
713         while(!done)
714         {
715                 enable_cancel();
716                 sleep(5);
717                 disable_cancel();
718                 if( !done ) {
719 //printf("RenderFarmKeepalive::run 1\n");
720 // watchdog thread kills this if it gets stuck
721                         client_thread->ping_server();
722 //printf("RenderFarmKeepalive::run 10\n");
723                 }
724         }
725 }
726
727
728 FarmPackageRenderer::FarmPackageRenderer(RenderFarmClientThread *thread,
729                 int socket_fd)
730  : PackageRenderer()
731 {
732         this->thread = thread;
733         this->socket_fd = socket_fd;
734 }
735
736
737 FarmPackageRenderer::~FarmPackageRenderer()
738 {
739 }
740
741
742 int FarmPackageRenderer::get_result()
743 {
744         thread->lock("FarmPackageRenderer::get_result");
745         thread->send_request_header(RENDERFARM_GET_RESULT,
746                 0);
747         unsigned char data[1];
748         data[0] = 1;
749         if(thread->read_socket((char*)data, 1) != 1)
750         {
751                 thread->unlock();
752                 return 1;
753         }
754         thread->unlock();
755         return data[0];
756 }
757
758 void FarmPackageRenderer::set_result(int value)
759 {
760         thread->lock("FarmPackageRenderer::set_result");
761         thread->send_request_header(RENDERFARM_SET_RESULT,
762                 1);
763         unsigned char data[1];
764         data[0] = value;
765         thread->write_socket((char*)data, 1);
766         thread->unlock();
767 }
768
769 void FarmPackageRenderer::set_progress(int64_t total_samples)
770 {
771         thread->lock("FarmPackageRenderer::set_progress");
772         thread->send_request_header(RENDERFARM_PROGRESS,
773                 8);
774         unsigned char datagram[8];
775         int i = 0;
776         STORE_INT32(total_samples);
777
778
779         int64_t fixed = (!EQUIV(frames_per_second, 0.0)) ?
780                 (int64_t)(frames_per_second * 65536.0) : 0;
781         STORE_INT32(fixed);
782
783
784         thread->write_socket((char*)datagram, 8);
785         thread->unlock();
786 }
787
788 int FarmPackageRenderer::set_video_map(int64_t position, int value)
789 {
790         int result = 0;
791         unsigned char datagram[8];
792         char return_value[1];
793         int i = 0;
794
795         thread->lock("FarmPackageRenderer::set_video_map");
796         thread->send_request_header(RENDERFARM_SET_VMAP,
797                 8);
798         STORE_INT32(position);
799         STORE_INT32(value);
800         thread->write_socket((char*)datagram, 8);
801
802 // Get completion since the GUI may be locked for a long time.
803         if(!thread->read_socket(return_value, 1))
804         {
805                 result = 1;
806         }
807
808         thread->unlock();
809         return result;
810 }
811
812