From c50525cec736f6325d6a0f2c777b9f19532fcc27 Mon Sep 17 00:00:00 2001 From: Stewart McKee Date: Tue, 26 Nov 2013 23:27:09 +0000 Subject: [PATCH] fixed some bugs around duplicate queued urls --- lib/crawl.rb | 50 +++++++++++++++++++++++++++++---------------- lib/crawl_job.rb | 2 -- lib/crawl_worker.rb | 19 ++++++++++------- 3 files changed, 44 insertions(+), 27 deletions(-) diff --git a/lib/crawl.rb b/lib/crawl.rb index 8ee3a1a..b2873ff 100644 --- a/lib/crawl.rb +++ b/lib/crawl.rb @@ -22,6 +22,15 @@ def already_queued?(link) @redis.sismember "queued", link end + def already_running?(link) + @redis.sismember "currently_running", link + end + + def already_handled?(link) + already_crawled?(link) || already_queued?(link) || already_running?(link) + end + + # Returns true if the crawl count is within limits def within_crawl_limits? @options[:crawl_limit].nil? || crawl_counter < @options[:crawl_limit].to_i @@ -50,16 +59,19 @@ def within_queue_limits? end def retrieve - unless @redis.sismember("currently_running", @options[:url]) - @redis.sadd("currently_running", @options[:url]) - unless already_crawled? + + unless already_running? @options[:url] + unless already_crawled? @options[:url] + @redis.sadd("currently_running", @options[:url]) if within_crawl_limits? @stats.update_status("Retrieving #{@options[:url]}...") - @content = Cobweb.new(@options).get(@options[:url], @options) - if @options[:url] == @redis.get("original_base_url") - @redis.set("crawled_base_url", @content[:base_url]) + lock("update_queues") do + @content = Cobweb.new(@options).get(@options[:url], @options) + if @options[:url] == @redis.get("original_base_url") + @redis.set("crawled_base_url", @content[:base_url]) + end + update_queues end - update_queues if content.permitted_type? ## update statistics @@ -128,7 +140,7 @@ def content end def update_queues - lock("update_queues") do + #lock("update_queues") do #@redis.incr "inprogress" # move the url from the queued list to the crawled list - for both the original url, and the content url (to handle redirects) @redis.srem "queued", @options[:url] @@ -146,25 +158,27 @@ def update_queues increment_crawl_counter end decrement_queue_counter - end + #end end def to_be_processed? - !finished? && within_process_limits? && !@redis.sismember("enqueued", @options[:url]) + !finished? && within_process_limits? && !@redis.sismember("queued", @options[:url]) end def process(&block) - if @options[:crawl_limit_by_page] - if content.mime_type.match("text/html") + lock("process") do + if @options[:crawl_limit_by_page] + if content.mime_type.match("text/html") + increment_process_counter + end + else increment_process_counter end - else - increment_process_counter - end - @redis.sadd "enqueued", @options[:url] + #@redis.sadd "queued", @options[:url] - yield if block_given? - @redis.incr("crawl_job_enqueued_count") + yield if block_given? + @redis.incr("crawl_job_enqueued_count") + end end def finished_processing diff --git a/lib/crawl_job.rb b/lib/crawl_job.rb index 1e2c8af..59683e2 100644 --- a/lib/crawl_job.rb +++ b/lib/crawl_job.rb @@ -31,7 +31,6 @@ def self.perform(content_request) end - @crawl.debug_puts "====== @crawl.to_be_processed?: #{@crawl.to_be_processed?}" if @crawl.to_be_processed? @crawl.process do @@ -61,7 +60,6 @@ def self.perform(content_request) # let the crawl know we're finished with this object @crawl.finished_processing - # test queue and crawl sizes to see if we have completed the crawl @crawl.debug_puts "finished? #{@crawl.finished?}" if @crawl.finished? diff --git a/lib/crawl_worker.rb b/lib/crawl_worker.rb index 75cd586..36f86b1 100644 --- a/lib/crawl_worker.rb +++ b/lib/crawl_worker.rb @@ -16,6 +16,7 @@ class CrawlWorker sidekiq_options :queue => "crawl_worker", :retry => false if SIDEKIQ_INSTALLED def perform(content_request) + puts "Performing for #{content_request["url"]}" # setup the crawl class to manage the crawl of this object @crawl = CobwebModule::Crawl.new(content_request) @@ -25,12 +26,17 @@ def perform(content_request) # if the crawled object is an object type we are interested if @crawl.content.permitted_type? - # extract links from content and process them if we are still within queue limits (block will not run if we are outwith limits) - @crawl.process_links do |link| + @crawl.lock("queue_links") do + # extract links from content and process them if we are still within queue limits (block will not run if we are outwith limits) + @crawl.process_links do |link| - @crawl.debug_puts "ENQUEUED LINK: #{link}" - enqueue_content(content_request, link) + if @crawl.within_crawl_limits? && !@crawl.already_handled?(link) + # enqueue the links to sidekiq + @crawl.debug_puts "QUEUED LINK: #{link}" + enqueue_content(content_request, link) + end + end end if @crawl.to_be_processed? @@ -38,7 +44,7 @@ def perform(content_request) @crawl.process do # enqueue to processing queue - @crawl.debug_puts "ENQUEUED [#{@crawl.redis.get("crawl_job_enqueued_count")}] URL: #{@crawl.content.url}" + @crawl.debug_puts "SENT FOR PROCESSING [#{@crawl.redis.get("crawl_job_enqueued_count")}] URL: #{@crawl.content.url}" send_to_processing_queue(@crawl.content.to_hash, content_request) #if the enqueue counter has been requested update that @@ -64,8 +70,7 @@ def perform(content_request) # test queue and crawl sizes to see if we have completed the crawl @crawl.debug_puts "finished? #{@crawl.finished?}" - @crawl.debug_puts "first_to_finish? #{@crawl.first_to_finish?}" if @crawl.finished? - if @crawl.finished? && @crawl.first_to_finish? + if @crawl.finished? @crawl.debug_puts "Calling crawl_job finished" finished(content_request) end