-
Notifications
You must be signed in to change notification settings - Fork 8
Precompute ancestor hierarchy to speed up term indexing #279
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
69ff5c6
bedd767
96438b9
e8b6262
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,14 @@ class Class < LinkedData::Models::Base | |
| include LinkedData::Concerns::Concept::InScheme | ||
| include LinkedData::Concerns::Concept::InCollection | ||
|
|
||
| class << self | ||
| # When set, index_doc uses this precomputed map instead of per-class | ||
| # SPARQL ancestor traversal. Keyed by class URI string, values are | ||
| # Sets of ancestor URI strings. Set by OntologySubmissionIndexer | ||
| # during bulk indexing and cleared after completion. | ||
| attr_accessor :ancestors_cache | ||
| end | ||
|
|
||
| model :class, name_with: :id, collection: :submission, | ||
| namespace: :owl, :schemaless => :true, | ||
| rdf_type: lambda { |*x| self.class_rdf_type(x) } | ||
|
|
@@ -254,15 +262,17 @@ def index_doc(to_set=nil) | |
| end | ||
|
|
||
| begin | ||
| # paths_to_root = self.paths_to_root | ||
| # paths_to_root.each do |paths| | ||
| # path_ids += paths.map { |p| p.id.to_s } | ||
| # TODO: do we ever need per-class ancestor lookup outside of bulk indexing? | ||
| # If so, uncomment the fallback below. | ||
| # if self.class.ancestors_cache | ||
| # path_ids = (self.class.ancestors_cache[class_id] || Set.new).dup | ||
| # else | ||
| # path_ids = retrieve_hierarchy_ids(:ancestors) | ||
| # end | ||
| # path_ids.delete(class_id) | ||
| path_ids = retrieve_hierarchy_ids(:ancestors) | ||
| path_ids = (self.class.ancestors_cache[class_id] || Set.new).dup | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hard dependency on cache — no fallback (High)
path_ids = (self.class.ancestors_cache[class_id] || Set.new).dupIf if self.class.ancestors_cache
path_ids = (self.class.ancestors_cache[class_id] || Set.new).dup
else
path_ids = retrieve_hierarchy_ids(:ancestors)
end
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking about it but is there a case where index_doc is called outside of bulk indexing? |
||
| path_ids.select! { |x| !x["owl#Thing"] } | ||
| doc[:parents] = path_ids | ||
| rescue Exception => e | ||
| rescue StandardError => e | ||
| doc[:parents] = [] | ||
| puts "Exception getting paths to root for search for #{class_id}: #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" | ||
| end | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,6 +36,8 @@ def index(logger, commit = true, optimize = true) | |
| csv_writer = LinkedData::Utils::OntologyCSVWriter.new | ||
| csv_writer.open(@submission.ontology, @submission.csv_path) | ||
|
|
||
| LinkedData::Models::Class.ancestors_cache = compute_ancestors_map(logger) | ||
|
|
||
| begin | ||
| logger.info("Indexing ontology terms: #{@submission.ontology.acronym}...") | ||
| t0 = Time.now | ||
|
|
@@ -144,6 +146,9 @@ def index(logger, commit = true, optimize = true) | |
| end | ||
| end | ||
|
|
||
| # TODO: Remove once precomputed ancestors are validated against production data | ||
| validate_class_ancestors(c, logger) if ENV['OP_VALIDATE_ANCESTORS'] | ||
|
|
||
| @submission.synchronize do | ||
| csv_writer.write_class(c) | ||
| end | ||
|
|
@@ -180,6 +185,8 @@ def index(logger, commit = true, optimize = true) | |
| logger.error("\n\n#{e.class}: #{e.message}\n") | ||
| logger.error(e.backtrace) | ||
| raise e | ||
| ensure | ||
| LinkedData::Models::Class.ancestors_cache = nil | ||
| end | ||
| end | ||
| logger.info("Completed indexing ontology terms: #{@submission.ontology.acronym} in #{time} sec. #{count_classes} classes.") | ||
|
|
@@ -194,6 +201,104 @@ def index(logger, commit = true, optimize = true) | |
| end | ||
| end | ||
|
|
||
| def compute_ancestors_map(logger) | ||
| @submission.bring(:hasOntologyLanguage) unless @submission.loaded_attributes.include?(:hasOntologyLanguage) | ||
| tree_property = LinkedData::Models::Class.tree_view_property(@submission) | ||
| graph = @submission.id.to_s | ||
|
|
||
| logger.info("Precomputing ancestor hierarchy for indexing...") | ||
| t0 = Time.now | ||
|
|
||
| direct_parents = fetch_all_parent_edges(graph, tree_property) | ||
| edge_count = direct_parents.values.sum(&:length) | ||
| logger.info("Fetched #{edge_count} parent-child edges for #{direct_parents.size} classes in #{Time.now - t0}s") | ||
|
|
||
| ancestors_map = {} | ||
| direct_parents.each_key do |cls| | ||
| compute_ancestors_for(cls, direct_parents, ancestors_map) | ||
| end | ||
|
|
||
| logger.info("Computed ancestor map for #{ancestors_map.size} classes in #{Time.now - t0}s") | ||
| ancestors_map | ||
| end | ||
|
|
||
| def fetch_all_parent_edges(graph, tree_property) | ||
| direct_parents = {} | ||
| page_size = 50_000 | ||
| offset = 0 | ||
|
|
||
| loop do | ||
| query = "SELECT ?child ?parent WHERE { " \ | ||
| "GRAPH <#{graph}> { " \ | ||
| "?child <#{tree_property}> ?parent . " \ | ||
| "FILTER(isIRI(?parent)) " \ | ||
| "} } LIMIT #{page_size} OFFSET #{offset}" | ||
|
|
||
| count = 0 | ||
| Goo.sparql_query_client.query(query, query_options: { rules: :NONE }, graphs: [graph]).each do |sol| | ||
| child = sol[:child].to_s | ||
| parent = sol[:parent].to_s | ||
| next unless child.start_with?("http") && parent.start_with?("http") | ||
| (direct_parents[child] ||= []) << parent | ||
| count += 1 | ||
| end | ||
|
|
||
| break if count < page_size | ||
| offset += page_size | ||
| end | ||
|
|
||
| direct_parents | ||
| end | ||
|
|
||
| def compute_ancestors_for(cls, direct_parents, ancestors_map) | ||
| return ancestors_map[cls] if ancestors_map.key?(cls) | ||
|
|
||
| visited = Set.new | ||
| queue = (direct_parents[cls] || []).dup | ||
|
|
||
| while queue.any? | ||
| parent = queue.shift | ||
| next if visited.include?(parent) | ||
| visited.add(parent) | ||
|
|
||
| if ancestors_map.key?(parent) | ||
| visited.merge(ancestors_map[parent]) | ||
| else | ||
| (direct_parents[parent] || []).each do |grandparent| | ||
| queue.push(grandparent) unless visited.include?(grandparent) | ||
| end | ||
| end | ||
| end | ||
|
|
||
| ancestors_map[cls] = visited | ||
| end | ||
|
|
||
| # TODO: Remove once precomputed ancestors are validated against production data | ||
| def validate_class_ancestors(cls, logger) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instance variables leaked from
|
||
| cls_id = cls.id.to_s | ||
| ancestors_cache = LinkedData::Models::Class.ancestors_cache | ||
| return unless ancestors_cache | ||
|
|
||
| old_time = Benchmark.realtime do | ||
| @old_ancestors_result = cls.retrieve_hierarchy_ids(:ancestors) | ||
| @old_ancestors_result.select! { |x| !x["owl#Thing"] } | ||
| end | ||
|
|
||
| new_time = Benchmark.realtime do | ||
| @new_ancestors_result = (ancestors_cache[cls_id] || Set.new).reject { |x| x["owl#Thing"] }.to_set | ||
| end | ||
|
|
||
| if @old_ancestors_result == @new_ancestors_result | ||
| logger.info("Ancestor OK for #{cls_id}: #{@old_ancestors_result.size} ancestors, old=#{old_time.round(4)}s new=#{new_time.round(4)}s") | ||
| else | ||
| only_old = @old_ancestors_result - @new_ancestors_result | ||
| only_new = @new_ancestors_result - @old_ancestors_result | ||
| logger.warn("Ancestor MISMATCH for #{cls_id}: old=#{@old_ancestors_result.size} (#{old_time.round(4)}s) new=#{@new_ancestors_result.size} (#{new_time.round(4)}s) only_in_old=#{only_old.to_a.first(5)} only_in_new=#{only_new.to_a.first(5)}") | ||
| end | ||
| rescue StandardError => e | ||
| logger.warn("Ancestor validation failed for #{cls_id}: #{e.class}: #{e.message}") | ||
| end | ||
|
|
||
| end | ||
| end | ||
| end | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,166 @@ | ||
| require_relative '../test_case' | ||
|
|
||
| class TestAncestorsPrecompute < LinkedData::TestCase | ||
|
|
||
| def setup | ||
| @indexer = LinkedData::Services::OntologySubmissionIndexer.new(nil) | ||
| end | ||
|
|
||
| # A -> B -> C (linear chain) | ||
| def test_linear_chain | ||
| direct_parents = { | ||
| "http://example.org/C" => ["http://example.org/B"], | ||
| "http://example.org/B" => ["http://example.org/A"] | ||
| } | ||
| ancestors_map = {} | ||
|
|
||
| compute_all(direct_parents, ancestors_map) | ||
|
|
||
| assert_equal Set.new(["http://example.org/B", "http://example.org/A"]), | ||
| ancestors_map["http://example.org/C"] | ||
| assert_equal Set.new(["http://example.org/A"]), | ||
| ancestors_map["http://example.org/B"] | ||
| end | ||
|
|
||
| # Root node with no parents | ||
| def test_root_node | ||
| direct_parents = { | ||
| "http://example.org/A" => [] | ||
| } | ||
| ancestors_map = {} | ||
|
|
||
| compute_all(direct_parents, ancestors_map) | ||
|
|
||
| assert_equal Set.new, ancestors_map["http://example.org/A"] | ||
| end | ||
|
|
||
| # A | ||
| # / \ | ||
| # B C | ||
| # \ / | ||
| # D | ||
| def test_diamond_inheritance | ||
| direct_parents = { | ||
| "http://example.org/D" => ["http://example.org/B", "http://example.org/C"], | ||
| "http://example.org/B" => ["http://example.org/A"], | ||
| "http://example.org/C" => ["http://example.org/A"] | ||
| } | ||
| ancestors_map = {} | ||
|
|
||
| compute_all(direct_parents, ancestors_map) | ||
|
|
||
| assert_equal Set.new(["http://example.org/B", "http://example.org/C", "http://example.org/A"]), | ||
| ancestors_map["http://example.org/D"] | ||
| assert_equal Set.new(["http://example.org/A"]), | ||
| ancestors_map["http://example.org/B"] | ||
| assert_equal Set.new(["http://example.org/A"]), | ||
| ancestors_map["http://example.org/C"] | ||
| end | ||
|
|
||
| # A B | ||
| # | | | ||
| # C D | ||
| def test_multiple_roots | ||
| direct_parents = { | ||
| "http://example.org/C" => ["http://example.org/A"], | ||
| "http://example.org/D" => ["http://example.org/B"] | ||
| } | ||
| ancestors_map = {} | ||
|
|
||
| compute_all(direct_parents, ancestors_map) | ||
|
|
||
| assert_equal Set.new(["http://example.org/A"]), | ||
| ancestors_map["http://example.org/C"] | ||
| assert_equal Set.new(["http://example.org/B"]), | ||
| ancestors_map["http://example.org/D"] | ||
| end | ||
|
|
||
| # A -> B -> A (cycle) | ||
| def test_cycle | ||
| direct_parents = { | ||
| "http://example.org/A" => ["http://example.org/B"], | ||
| "http://example.org/B" => ["http://example.org/A"] | ||
| } | ||
| ancestors_map = {} | ||
|
|
||
| compute_all(direct_parents, ancestors_map) | ||
|
|
||
| assert_includes ancestors_map["http://example.org/A"], "http://example.org/B" | ||
| assert_includes ancestors_map["http://example.org/B"], "http://example.org/A" | ||
| end | ||
|
|
||
| # Class not in direct_parents (leaf with no edges) | ||
| def test_class_not_in_map | ||
| direct_parents = {} | ||
| ancestors_map = {} | ||
|
|
||
| @indexer.send(:compute_ancestors_for, "http://example.org/X", direct_parents, ancestors_map) | ||
|
|
||
| assert_equal Set.new, ancestors_map["http://example.org/X"] | ||
| end | ||
|
|
||
| # Memoization: computing ancestors for a child reuses already-computed parent ancestors | ||
| def test_memoization | ||
| direct_parents = { | ||
| "http://example.org/C" => ["http://example.org/B"], | ||
| "http://example.org/B" => ["http://example.org/A"] | ||
| } | ||
| ancestors_map = {} | ||
|
|
||
| # Compute B first | ||
| @indexer.send(:compute_ancestors_for, "http://example.org/B", direct_parents, ancestors_map) | ||
| assert ancestors_map.key?("http://example.org/B") | ||
| refute ancestors_map.key?("http://example.org/C") | ||
|
|
||
| # Now compute C — should reuse B's cached result | ||
| @indexer.send(:compute_ancestors_for, "http://example.org/C", direct_parents, ancestors_map) | ||
| assert_equal Set.new(["http://example.org/B", "http://example.org/A"]), | ||
| ancestors_map["http://example.org/C"] | ||
| end | ||
|
|
||
| # A | ||
| # / \ | ||
| # B C | ||
| # / \ \ | ||
| # D E F | ||
| # \ / | ||
| # G | ||
| def test_complex_dag | ||
| direct_parents = { | ||
| "http://example.org/D" => ["http://example.org/B"], | ||
| "http://example.org/E" => ["http://example.org/B"], | ||
| "http://example.org/F" => ["http://example.org/C"], | ||
| "http://example.org/G" => ["http://example.org/E", "http://example.org/F"], | ||
| "http://example.org/B" => ["http://example.org/A"], | ||
| "http://example.org/C" => ["http://example.org/A"] | ||
| } | ||
| ancestors_map = {} | ||
|
|
||
| compute_all(direct_parents, ancestors_map) | ||
|
|
||
| assert_equal Set.new(["http://example.org/E", "http://example.org/F", | ||
| "http://example.org/B", "http://example.org/C", | ||
| "http://example.org/A"]), | ||
| ancestors_map["http://example.org/G"] | ||
|
|
||
| assert_equal Set.new(["http://example.org/B", "http://example.org/A"]), | ||
| ancestors_map["http://example.org/D"] | ||
| end | ||
|
|
||
| def test_empty_ontology | ||
| direct_parents = {} | ||
| ancestors_map = {} | ||
|
|
||
| compute_all(direct_parents, ancestors_map) | ||
|
|
||
| assert_equal({}, ancestors_map) | ||
| end | ||
|
|
||
| private | ||
|
|
||
| def compute_all(direct_parents, ancestors_map) | ||
| direct_parents.each_key do |cls| | ||
| @indexer.send(:compute_ancestors_for, cls, direct_parents, ancestors_map) | ||
| end | ||
| end | ||
| end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thread safety of class-level mutable state (High)
ancestors_cacheis a class-levelattr_accessoronLinkedData::Models::Class. If two ontologies are indexed concurrently (separate threads/workers), one worker could overwrite ornilout the cache while another is still reading it. Consider:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
ancestors_cacheis computed once before worker threads are spawned. During indexing, threads only read from it (hash lookups +.dupon the Sets) — no writes occur until all threads complete and the cache is cleared.The class-level state concern would apply if different ontologies were indexed concurrently in the same process. Currently ncbo_cron processes ontologies sequentially, and if we ever move to concurrent indexing, we would likely use separate worker processes subscribing to a queue — each with their own isolated class-level state. In the unlikely event we needed concurrency within a shared process, we could scope the cache via
Thread.current(same pattern already used forRequestStore.store[:requested_lang]in this file).