logbot-consumer 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. #!/usr/bin/perl
  2. # import event json files created by logbot-irc and inserts into network
  3. # database
  4. use local::lib;
  5. use v5.10;
  6. use strict;
  7. use warnings;
  8. use FindBin qw( $RealBin );
  9. use lib "$RealBin/lib";
  10. BEGIN { $ENV{TZ} = 'UTC' }
  11. use Encode qw( encode );
  12. use LogBot::Config qw( find_config load_config reload_config );
  13. use LogBot::Database qw( dbh execute_with_retry );
  14. use LogBot::JobQueue ();
  15. use LogBot::Util qw( event_to_string file_for logbot_init timestamp touch );
  16. my $config = load_config(find_config(shift)) // die "syntax: logbot-consumer <config file>\n";
  17. logbot_init($config);
  18. my $quit = 0;
  19. $SIG{INT} = sub { $quit = 1 };
  20. # init/cache database/queue
  21. dbh($config, read_write => 1, cached => 1);
  22. my $job_queue = LogBot::JobQueue->new($config);
  23. my $last_status = '';
  24. while (!$quit) {
  25. # wait for next job, blocking call
  26. my ($job, $event) = $job_queue->fetch_job();
  27. last unless defined $job;
  28. if ($ENV{DEBUG}) {
  29. say timestamp(), ' << ', encode('UTF-8', event_to_string($event));
  30. }
  31. $config = reload_config($config);
  32. # deal with lag between queue and config
  33. my $ignore = 0;
  34. if (my $channel = $config->{channels}->{ $event->{channel} }) {
  35. $ignore = 1 if $channel->{no_logs};
  36. $ignore = 1 if $channel->{disabled};
  37. } else {
  38. $ignore = 1;
  39. }
  40. if ($ignore) {
  41. say timestamp(), ' -- ignoring event' if $ENV{DEBUG};
  42. $job_queue->delete_job($job);
  43. next;
  44. }
  45. if ($event->{type} == 3) {
  46. # topic
  47. $quit = !execute_with_retry(
  48. $config,
  49. sub {
  50. my ($dbh) = @_;
  51. return 0 if $quit;
  52. my ($id, $time, $topic) =
  53. $dbh->selectrow_array('SELECT id,time,topic FROM topics WHERE channel=?', undef, $event->{channel});
  54. $topic //= '';
  55. if (!$id) {
  56. $dbh->do('INSERT INTO topics(time,channel,topic) VALUES (?,?,?)',
  57. undef, $event->{time}, $event->{channel}, $event->{text});
  58. } elsif ($event->{time} > $time && $event->{text} ne $topic) {
  59. $dbh->do('UPDATE topics SET time=?, topic=? WHERE id=?',
  60. undef, $event->{time}, $event->{text}, $id);
  61. touch(file_for($config, 'topics_lastmod'));
  62. }
  63. return 1;
  64. }
  65. );
  66. } else {
  67. # channel message, action, or notice
  68. $quit = !execute_with_retry(
  69. $config,
  70. sub {
  71. my ($dbh) = @_;
  72. return 0 if $quit;
  73. $dbh->do('INSERT INTO logs(time,channel,nick,type,text) VALUES (?,?,?,?,?)',
  74. undef, $event->{time}, $event->{channel}, $event->{nick}, $event->{type}, $event->{text});
  75. return 1;
  76. }
  77. );
  78. }
  79. $job_queue->delete_job($job);
  80. undef $job;
  81. }