24eee2a6af6cf970fc5ff6d99e4278dd2885cc83
[goodguy/history.git] / cinelerra-5.1 / cinelerra / renderfarmclient.C
1
2 /*
3  * CINELERRA
4  * Copyright (C) 2008 Adam Williams <broadcast at earthling dot net>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  *
20  */
21
22 #include "asset.h"
23 #include "assets.h"
24 #include "clip.h"
25 #include "bchash.h"
26 #include "dvbtune.h"
27 #include "edl.h"
28 #include "filesystem.h"
29 #include "filexml.h"
30 #include "language.h"
31 #include "mutex.h"
32 #include "mwindow.h"
33 #include "pluginserver.h"
34 #include "preferences.h"
35 #include "renderfarm.h"
36 #include "renderfarmclient.h"
37 //#include "renderfarmfsclient.h"
38 #include "sighandler.h"
39
40 #include <arpa/inet.h>
41 #include <errno.h>
42 #include <fcntl.h>
43 #include <netdb.h>
44 #include <netinet/in.h>
45 #include <stdio.h>
46 #include <string.h>
47 #include <sys/socket.h>
48 #include <sys/types.h>
49 #include <sys/un.h>
50 #include <sys/wait.h>
51 #include <unistd.h>
52
53
54
55
56 // The render client waits for connections from the server.
57 // Then it starts a thread for each connection.
58 RenderFarmClient::RenderFarmClient(int port,
59         char *deamon_path,
60         int nice_value,
61         char *config_path)
62 {
63         char string[BCTEXTLEN];
64
65         this->port = port;
66         this->deamon_path = deamon_path;
67         SigHandler *signals = new SigHandler;
68         signals->initialize("/tmp/cinelerra_farm%d.dmp");
69
70         this_pid = getpid();
71         (void)nice(nice_value);
72
73
74         MWindow::init_defaults(boot_defaults, config_path);
75         boot_preferences = new Preferences;
76         boot_preferences->load_defaults(boot_defaults);
77         MWindow::init_plugins(0, boot_preferences);
78         BC_Signals::set_catch_segv(boot_preferences->trap_sigsegv);
79         BC_Signals::set_catch_intr(0);
80         if( boot_preferences->trap_sigsegv ) {
81                 BC_Trace::enable_locks();
82         }
83         else {
84                 BC_Trace::disable_locks();
85         }
86
87         strcpy(string, boot_preferences->plugin_dir);
88         strcat(string, "/" FONT_SEARCHPATH);
89         BC_Resources::init_fontconfig(string);
90 }
91
92
93
94
95 RenderFarmClient::~RenderFarmClient()
96 {
97         delete boot_defaults;
98         delete boot_preferences;
99 }
100
101
102 void RenderFarmClient::main_loop()
103 {
104         int socket_fd;
105         BC_WindowBase::get_resources()->vframe_shm = 1;
106
107
108 // Open listening port
109
110         if(!deamon_path)
111         {
112                 struct sockaddr_in addr;
113
114                 addr.sin_family = AF_INET;
115                 addr.sin_port = htons((unsigned short)port);
116                 addr.sin_addr.s_addr = htonl(INADDR_ANY);
117
118                 if((socket_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
119                 {
120                         perror(_("RenderFarmClient::main_loop: socket"));
121                         return;
122                 }
123
124                 if(bind(socket_fd,
125                         (struct sockaddr*)&addr,
126                         sizeof(addr)) < 0)
127                 {
128                         fprintf(stderr,
129                                 _("RenderFarmClient::main_loop: bind port %d: %s"),
130                                 port,
131                                 strerror(errno));
132                         return;
133                 }
134         }
135         else
136         {
137                 struct sockaddr_un addr;
138                 addr.sun_family = AF_FILE;
139                 strcpy(addr.sun_path, deamon_path);
140                 int size = (offsetof(struct sockaddr_un, sun_path) +
141                         strlen(deamon_path) + 1);
142
143                 if((socket_fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0)
144                 {
145                         perror(_("RenderFarmClient::main_loop: socket"));
146                         return;
147                 }
148
149                 if(bind(socket_fd,
150                         (struct sockaddr*)&addr,
151                         size) < 0)
152                 {
153                         fprintf(stderr,
154                                 _("RenderFarmClient::main_loop: bind path %s: %s\n"),
155                                 deamon_path,
156                                 strerror(errno));
157                         return;
158                 }
159         }
160
161 // Wait for connections
162         printf("RenderFarmClient::main_loop: client started\n");
163         while(1)
164         {
165                 if(listen(socket_fd, 256) < 0) {
166                         perror(_("RenderFarmClient::main_loop: listen"));
167                         return;
168                 }
169
170                 int new_socket_fd;
171
172                 if(!deamon_path)
173                 {
174                         struct sockaddr_in clientname;
175                         socklen_t size = sizeof(clientname);
176                         if( (new_socket_fd = accept(socket_fd,
177                                 (struct sockaddr*)&clientname, &size) ) < 0 ) {
178                                 perror(_("RenderFarmClient::main_loop: accept"));
179                                 return;
180                         }
181 printf("RenderFarmClient::main_loop: Session started from %s\n", inet_ntoa(clientname.sin_addr));
182                         RenderFarmClientThread *thread =
183                                 new RenderFarmClientThread(this);
184                         thread->main_loop(new_socket_fd);
185                 }
186                 else
187                 {
188                         struct sockaddr_un clientname;
189                         socklen_t size = sizeof(clientname);
190                         if( (new_socket_fd = accept(socket_fd,
191                                         (struct sockaddr*)&clientname, &size)) < 0 ) {
192                                 perror(_("RenderFarmClient::main_loop: accept"));
193                                 return;
194                         }
195 printf("RenderFarmClient::main_loop: Session started from %s\n", clientname.sun_path);
196                         RenderFarmClientThread *thread =
197                                 new RenderFarmClientThread(this);
198                         thread->main_loop(new_socket_fd);
199                 }
200         }
201 }
202
203 void RenderFarmClient::kill_client()
204 {
205 printf("RenderFarmClient::kill_client 1\n");
206         if(deamon_path)
207         {
208 printf("RenderFarmClient::kill_client 2\n");
209                 remove(deamon_path);
210                 kill(this_pid, SIGKILL);
211         }
212 }
213
214
215 // The thread requests jobs from the server until the job table is empty
216 // or the server reports an error.  This thread must poll the server
217 // after every frame for the error status.
218 // Detaches when finished.
219 RenderFarmClientThread::RenderFarmClientThread(RenderFarmClient *client)
220  : Thread(0, 0, 1)
221 {
222         this->client = client;
223         this->edl = 0;
224         frames_per_second = 0;
225         Thread::set_synchronous(0);
226 //      fs_client = 0;
227         mutex_lock = new Mutex("RenderFarmClientThread::mutex_lock");
228         watchdog = 0;
229         keep_alive = 0;
230 }
231
232 RenderFarmClientThread::~RenderFarmClientThread()
233 {
234 //      if(fs_client) delete fs_client;
235         delete mutex_lock;
236         delete watchdog;
237         delete keep_alive;
238 }
239
240
241 int RenderFarmClientThread::send_request_header(int request,
242         int len)
243 {
244         unsigned char datagram[5];
245         datagram[0] = request;
246
247         int i = 1;
248         STORE_INT32(len);
249 // printf("RenderFarmClientThread::send_request_header %d %02x%02x%02x%02x%02x\n",
250 // request, datagram[0], datagram[1], datagram[2], datagram[3], datagram[4]);
251
252         return (write_socket((char*)datagram, 5) != 5);
253 }
254
255 int RenderFarmClientThread::write_socket(char *data, int len)
256 {
257 //printf("RenderFarmClientThread::write_socket 1\n");
258         int result = write(socket_fd, data, len);
259 //printf("RenderFarmClientThread::write_socket 10\n");
260         return result;
261 }
262
263 int RenderFarmClientThread::read_socket(char *data, int len)
264 {
265         int bytes_read = 0;
266         int offset = 0;
267 //printf("RenderFarmClientThread::read_socket 1\n");
268         watchdog->begin_request();
269         while(len > 0 && bytes_read >= 0)
270         {
271                 bytes_read = read(socket_fd, data + offset, len);
272                 if(bytes_read > 0)
273                 {
274                         len -= bytes_read;
275                         offset += bytes_read;
276                 }
277                 else
278                 if(bytes_read < 0)
279                 {
280                         break;
281                 }
282         }
283         watchdog->end_request();
284 //printf("RenderFarmClientThread::read_socket 10\n");
285
286         return offset;
287 }
288
289 int RenderFarmClientThread::write_int64(int64_t value)
290 {
291         unsigned char data[sizeof(int64_t)];
292         data[0] = (value >> 56) & 0xff;
293         data[1] = (value >> 48) & 0xff;
294         data[2] = (value >> 40) & 0xff;
295         data[3] = (value >> 32) & 0xff;
296         data[4] = (value >> 24) & 0xff;
297         data[5] = (value >> 16) & 0xff;
298         data[6] = (value >> 8) & 0xff;
299         data[7] = value & 0xff;
300         return (write_socket((char*)data, sizeof(int64_t)) != sizeof(int64_t));
301 }
302
303 int64_t RenderFarmClientThread::read_int64(int *error)
304 {
305         int temp = 0;
306         if(!error) error = &temp;
307
308         unsigned char data[sizeof(int64_t)];
309         *error = (read_socket((char*)data, sizeof(int64_t)) != sizeof(int64_t));
310
311 // Make it return 1 if error so it can be used to read a result code from the
312 // server.
313         int64_t result = 1;
314         if(!*error)
315         {
316                 result = (((int64_t)data[0]) << 56) |
317                         (((uint64_t)data[1]) << 48) |
318                         (((uint64_t)data[2]) << 40) |
319                         (((uint64_t)data[3]) << 32) |
320                         (((uint64_t)data[4]) << 24) |
321                         (((uint64_t)data[5]) << 16) |
322                         (((uint64_t)data[6]) << 8)  |
323                         data[7];
324         }
325         return result;
326 }
327
328 void RenderFarmClientThread::read_string(char* &string)
329 {
330         unsigned char header[4];
331         if(read_socket((char*)header, 4) != 4)
332         {
333                 string = 0;
334                 return;
335         }
336
337         int64_t len = (((u_int32_t)header[0]) << 24) |
338                                 (((u_int32_t)header[1]) << 16) |
339                                 (((u_int32_t)header[2]) << 8) |
340                                 ((u_int32_t)header[3]);
341
342         if(len)
343         {
344                 string = new char[len];
345                 if(read_socket(string, len) != len)
346                 {
347                         delete [] string;
348                         string = 0;
349                 }
350         }
351         else
352                 string = 0;
353
354 }
355
356 void RenderFarmClientThread::abort()
357 {
358         send_completion(socket_fd);
359         close(socket_fd);
360         exit(1);
361 }
362
363 void RenderFarmClientThread::lock(const char *location)
364 {
365         mutex_lock->lock(location);
366 }
367
368 void RenderFarmClientThread::unlock()
369 {
370         mutex_lock->unlock();
371 }
372
373 void RenderFarmClientThread::get_command(int socket_fd, int *command)
374 {
375         int error;
376         *command = read_int64(&error);
377         if(error)
378         {
379                 *command = 0;
380                 return;
381         }
382 }
383
384
385 void RenderFarmClientThread::read_preferences(int socket_fd,
386         Preferences *preferences)
387 {
388         lock("RenderFarmClientThread::read_preferences");
389         send_request_header(RENDERFARM_PREFERENCES,
390                 0);
391
392         char *string;
393         read_string(string);
394
395         BC_Hash defaults;
396         defaults.load_string((char*)string);
397         preferences->load_defaults(&defaults);
398
399         delete [] string;
400         unlock();
401 }
402
403
404
405 void RenderFarmClientThread::read_asset(int socket_fd, Asset *asset)
406 {
407         lock("RenderFarmClientThread::read_asset");
408         send_request_header(RENDERFARM_ASSET,
409                 0);
410
411         char *string1;
412         char *string2;
413         read_string(string1);
414         read_string(string2);
415
416
417
418         FileXML file;
419         file.read_from_string((char*)string2);
420         asset->read(&file);
421
422
423
424         BC_Hash defaults;
425         defaults.load_string((char*)string1);
426         asset->load_defaults(&defaults,
427                 0,
428                 1,
429                 1,
430                 1,
431                 1,
432                 1);
433
434 //printf("RenderFarmClientThread::read_asset %d\n", __LINE__);
435 //asset->dump();
436
437         delete [] string1;
438         delete [] string2;
439         unlock();
440 }
441
442 void RenderFarmClientThread::read_edl(int socket_fd,
443                 EDL *edl, Preferences *preferences)
444 {
445         lock("RenderFarmClientThread::read_edl");
446         send_request_header(RENDERFARM_EDL, 0);
447
448         char *string;
449         read_string(string);
450
451         FileXML file;
452         file.read_from_string((char*)string);
453         delete [] string;
454
455         edl->load_xml(&file, LOAD_ALL);
456         unlock();
457 }
458
459 int RenderFarmClientThread::read_package(int socket_fd, RenderPackage *package)
460 {
461         lock("RenderFarmClientThread::read_package");
462         send_request_header(RENDERFARM_PACKAGE,
463                 4);
464
465         unsigned char datagram[5];
466         int i = 0;
467
468
469 // Fails if -ieee isn't set.
470         int64_t fixed = !EQUIV(frames_per_second, 0.0) ?
471                 (int64_t)(frames_per_second * 65536.0) : 0;
472         STORE_INT32(fixed);
473         write_socket((char*)datagram, 4);
474
475
476 //printf("RenderFarmClientThread::read_package 1 %f %ld\n", frames_per_second, fixed);
477         char *data;
478         unsigned char *data_ptr;
479         read_string(data);
480 //printf("RenderFarmClientThread::read_package 2 %p\n", data);
481 // Signifies end of session.
482         if(!data)
483         {
484 //              printf(_("RenderFarmClientThread::read_package no output path recieved.\n"));
485                 unlock();
486                 return 1;
487         }
488
489 //printf("RenderFarmClientThread::read_package 2\n");
490
491
492         data_ptr = (unsigned char*)data;
493         strcpy(package->path, data);
494         data_ptr += strlen(package->path);
495         data_ptr++;
496         package->audio_start = READ_INT32(data_ptr);
497         data_ptr += 4;
498         package->audio_end = READ_INT32(data_ptr);
499         data_ptr += 4;
500         package->video_start = READ_INT32(data_ptr);
501         data_ptr += 4;
502         package->video_end = READ_INT32(data_ptr);
503         data_ptr += 4;
504         package->use_brender = READ_INT32(data_ptr);
505         data_ptr += 4;
506         package->audio_do = READ_INT32(data_ptr);
507         data_ptr += 4;
508         package->video_do = READ_INT32(data_ptr);
509
510         delete [] data;
511         unlock();
512
513         return 0;
514 }
515
516 int RenderFarmClientThread::send_completion(int socket_fd)
517 {
518         lock("RenderFarmClientThread::send_completion");
519         int result = send_request_header(RENDERFARM_DONE, 0);
520         unlock();
521         return result;
522 }
523
524
525 void RenderFarmClientThread::ping_server()
526 {
527         lock("RenderFarmClientThread::ping_server");
528         send_request_header(RENDERFARM_KEEPALIVE, 0);
529         unlock();
530 }
531
532
533
534 void RenderFarmClientThread::main_loop(int socket_fd)
535 {
536         this->socket_fd = socket_fd;
537
538         Thread::start();
539 }
540
541 void RenderFarmClientThread::run()
542 {
543 // Create new memory space
544         pid = fork();
545         if(pid != 0)
546         {
547                 int return_value;
548                 waitpid(pid, &return_value, 0);
549                 return;
550         }
551
552 // Get the pid of the fork if inside the fork
553         pid = getpid();
554         BC_Signals::set_trap_hook(trap_hook, this);
555         TheList::reset();
556
557         int socket_fd = this->socket_fd;
558         int watchdog_timeout = client->boot_preferences->renderfarm_watchdog_timeout;
559         if( watchdog_timeout > 0 )
560                 init_client_keepalive(watchdog_timeout);
561
562 // Get command to run
563         int command;
564         lock("RenderFarmClientThread::run");
565         get_command(socket_fd, &command);
566         unlock();
567
568 //printf("RenderFarmClientThread::run command=%d\n", command);
569
570         switch(command)
571         {
572                 case RENDERFARM_TUNER:
573                         do_tuner(socket_fd);
574                         break;
575                 case RENDERFARM_PACKAGES:
576                         do_packages(socket_fd);
577                         break;
578         }
579
580         _exit(0);
581 }
582
583
584 void RenderFarmClientThread::init_client_keepalive(int timeout_secs)
585 {
586         keep_alive = new RenderFarmKeepalive(this);
587         keep_alive->start();
588         watchdog = new RenderFarmWatchdog(timeout_secs, 0, this);
589         watchdog->start();
590 }
591
592
593
594 void RenderFarmClientThread::do_tuner(int socket_fd)
595 {
596 // Currently only 1 tuner driver.  Maybe more someday.
597         DVBTune server(this);
598         server.main_loop();
599         ::close(socket_fd);
600 }
601
602
603 void RenderFarmClientThread::do_packages(int socket_fd)
604 {
605         RenderPackage *package;
606         Asset *default_asset;
607         Preferences *preferences;
608
609
610
611         FarmPackageRenderer package_renderer(this, socket_fd);
612         int result = 0;
613
614
615
616 //printf("RenderFarmClientThread::run 2\n");
617 // Read settings
618         preferences = new Preferences;
619         default_asset = new Asset;
620         package = new RenderPackage;
621         edl = new EDL;
622         edl->create_objects();
623
624 //printf("RenderFarmClientThread::run 3\n");
625         read_preferences(socket_fd, preferences);
626 //printf("RenderFarmClientThread::run 4\n");
627         read_asset(socket_fd, default_asset);
628 //printf("RenderFarmClientThread::run 5\n");
629         read_edl(socket_fd, edl, preferences);
630 //edl->dump();
631 //printf("RenderFarmClientThread::run 6\n");
632
633         package_renderer.initialize(0, edl, preferences, default_asset);
634
635 // Read packages
636         while(1)
637         {
638 //printf("RenderFarmClientThread::run 5\n");
639                 result = read_package(socket_fd, package);
640 //printf("RenderFarmClientThread::run 6 %d\n", result);
641
642
643 // Finished list
644                 if(result)
645                 {
646 //printf("RenderFarmClientThread::run 7\n");
647
648                         result = send_completion(socket_fd);
649                         break;
650                 }
651
652 //              Timer timer;
653 //              timer.update();
654
655 // Error
656                 if(package_renderer.render_package(package))
657                 {
658 //printf("RenderFarmClientThread::run 8\n");
659                         result = send_completion(socket_fd);
660                         break;
661                 }
662
663
664                 frames_per_second = package_renderer.frames_per_second;
665 //              frames_per_second = (double)(package->video_end - package->video_start) /
666 //                      ((double)timer.get_difference() / 1000);
667
668 //printf("RenderFarmClientThread::run 9\n");
669
670
671
672         }
673
674
675 //printf("RenderFarmClientThread::run 9\n");
676         default_asset->Garbage::remove_user();
677 //printf("RenderFarmClientThread::run 10\n");
678         edl->Garbage::remove_user();
679         edl = 0;
680 //printf("RenderFarmClientThread::run 11\n");
681         delete preferences;
682 printf(_("RenderFarmClientThread::run: Session finished.\n"));
683 }
684
685 void RenderFarmClientThread::trap_hook(FILE *fp, void *vp)
686 {
687         RenderFarmClientThread *thread = (RenderFarmClientThread*)vp;
688         fprintf(fp, "\nEDL:\n");
689         if( thread->edl ) thread->edl->dump(fp);
690 }
691
692
693
694 RenderFarmKeepalive::RenderFarmKeepalive(
695         RenderFarmClientThread *client_thread)
696  : Thread(1, 0, 0)
697 {
698         this->client_thread = client_thread;
699         done = 0;
700 }
701
702 RenderFarmKeepalive::~RenderFarmKeepalive()
703 {
704         done = 1;
705         cancel();
706         join();
707 }
708
709 void RenderFarmKeepalive::run()
710 {
711         while(!done)
712         {
713                 enable_cancel();
714                 sleep(5);
715                 disable_cancel();
716                 if( !done ) {
717 //printf("RenderFarmKeepalive::run 1\n");
718 // watchdog thread kills this if it gets stuck
719                         client_thread->ping_server();
720 //printf("RenderFarmKeepalive::run 10\n");
721                 }
722         }
723 }
724
725
726 FarmPackageRenderer::FarmPackageRenderer(RenderFarmClientThread *thread,
727                 int socket_fd)
728  : PackageRenderer()
729 {
730         this->thread = thread;
731         this->socket_fd = socket_fd;
732 }
733
734
735 FarmPackageRenderer::~FarmPackageRenderer()
736 {
737 }
738
739
740 int FarmPackageRenderer::get_result()
741 {
742         thread->lock("FarmPackageRenderer::get_result");
743         thread->send_request_header(RENDERFARM_GET_RESULT,
744                 0);
745         unsigned char data[1];
746         data[0] = 1;
747         if(thread->read_socket((char*)data, 1) != 1)
748         {
749                 thread->unlock();
750                 return 1;
751         }
752         thread->unlock();
753         return data[0];
754 }
755
756 void FarmPackageRenderer::set_result(int value)
757 {
758         thread->lock("FarmPackageRenderer::set_result");
759         thread->send_request_header(RENDERFARM_SET_RESULT,
760                 1);
761         unsigned char data[1];
762         data[0] = value;
763         thread->write_socket((char*)data, 1);
764         thread->unlock();
765 }
766
767 void FarmPackageRenderer::set_progress(int64_t total_samples)
768 {
769         thread->lock("FarmPackageRenderer::set_progress");
770         thread->send_request_header(RENDERFARM_PROGRESS,
771                 8);
772         unsigned char datagram[8];
773         int i = 0;
774         STORE_INT32(total_samples);
775
776
777         int64_t fixed = (!EQUIV(frames_per_second, 0.0)) ?
778                 (int64_t)(frames_per_second * 65536.0) : 0;
779         STORE_INT32(fixed);
780
781
782         thread->write_socket((char*)datagram, 8);
783         thread->unlock();
784 }
785
786 int FarmPackageRenderer::set_video_map(int64_t position, int value)
787 {
788         int result = 0;
789         unsigned char datagram[8];
790         char return_value[1];
791         int i = 0;
792
793         thread->lock("FarmPackageRenderer::set_video_map");
794         thread->send_request_header(RENDERFARM_SET_VMAP,
795                 8);
796         STORE_INT32(position);
797         STORE_INT32(value);
798         thread->write_socket((char*)datagram, 8);
799
800 // Get completion since the GUI may be locked for a long time.
801         if(!thread->read_socket(return_value, 1))
802         {
803                 result = 1;
804         }
805
806         thread->unlock();
807         return result;
808 }
809
810