1   package eu.fbk.dkm.premon.util;
2   
3   import java.util.Collection;
4   import java.util.Map;
5   import java.util.Set;
6   
7   import com.google.common.collect.HashMultimap;
8   import com.google.common.collect.ImmutableList;
9   import com.google.common.collect.ImmutableMultimap;
10  import com.google.common.collect.ImmutableSet;
11  import com.google.common.collect.Multimap;
12  import com.google.common.collect.Sets;
13  
14  import org.openrdf.model.Resource;
15  import org.openrdf.model.Statement;
16  import org.openrdf.model.URI;
17  import org.openrdf.model.Value;
18  import org.openrdf.model.vocabulary.RDF;
19  import org.openrdf.model.vocabulary.RDFS;
20  import org.openrdf.rio.RDFHandler;
21  import org.openrdf.rio.RDFHandlerException;
22  import org.slf4j.Logger;
23  import org.slf4j.LoggerFactory;
24  
25  import eu.fbk.rdfpro.Mapper;
26  import eu.fbk.rdfpro.RDFProcessor;
27  import eu.fbk.rdfpro.RDFProcessors;
28  import eu.fbk.rdfpro.RDFSource;
29  import eu.fbk.rdfpro.RDFSources;
30  import eu.fbk.rdfpro.Reducer;
31  import eu.fbk.rdfpro.util.Hash;
32  import eu.fbk.rdfpro.util.Namespaces;
33  import eu.fbk.rdfpro.util.Options;
34  import eu.fbk.rdfpro.util.Statements;
35  import eu.fbk.rdfpro.util.Tracker;
36  
37  public class ProcessorUndoRDFS implements RDFProcessor {
38  
39      private static final Logger LOGGER = LoggerFactory.getLogger(ProcessorUndoRDFS.class);
40  
41      private final Mapper mapper;
42  
43      private final Reducer reducer;
44  
45      static RDFProcessor create(final String name, final String... args) {
46          final Options options = Options.parse("b!|w|+", args);
47          final URI base = (URI) Statements.parseValue(options.getOptionArg("b", String.class),
48                  Namespaces.DEFAULT);
49          final boolean preserveBNodes = !options.hasOption("w");
50          final String[] fileSpecs = options.getPositionalArgs(String.class).toArray(new String[0]);
51          final RDFSource tbox = RDFProcessors.track(
52                  new Tracker(LOGGER, null, "%d TBox triples read (%d tr/s avg)", //
53                          "%d TBox triples read (%d tr/s, %d tr/s avg)")).wrap(
54                  RDFSources.read(true, preserveBNodes, base == null ? null : base.stringValue(),
55                          null, fileSpecs));
56          return new ProcessorUndoRDFS(tbox);
57      }
58  
59      public ProcessorUndoRDFS(final RDFSource tbox) {
60          final Multimap<Resource, Resource> superTypes = HashMultimap.create();
61          final Multimap<URI, URI> superProperties = HashMultimap.create();
62          tbox.forEach((final Statement stmt) -> {
63              final Resource subj = stmt.getSubject();
64              final URI pred = stmt.getPredicate();
65              final Value obj = stmt.getObject();
66              if (RDFS.SUBCLASSOF.equals(pred) && obj instanceof Resource) {
67                  superTypes.put(subj, (Resource) obj);
68              } else if (RDFS.SUBPROPERTYOF.equals(pred) //
69                      && subj instanceof URI && obj instanceof URI) {
70                  superProperties.put((URI) subj, (URI) obj);
71              }
72          });
73          this.mapper = new UndoRDFSMapper(elements(superTypes), elements(superProperties));
74          this.reducer = new UndoRDFSReducer(close(superTypes), close(superProperties));
75      }
76  
77      public ProcessorUndoRDFS(final Multimap<Resource, Resource> superTypes,
78              final Multimap<URI, URI> superProperties) {
79          this.mapper = new UndoRDFSMapper(elements(superTypes), elements(superProperties));
80          this.reducer = new UndoRDFSReducer(close(superTypes), close(superProperties));
81      }
82  
83      @Override
84      public RDFHandler wrap(final RDFHandler handler) {
85          return RDFProcessors.mapReduce(this.mapper, this.reducer, true).wrap(handler);
86      }
87  
88      private static <T> Set<T> elements(final Multimap<T, T> multimap) {
89          final Set<T> set = Sets.newHashSet();
90          for (final Map.Entry<T, Collection<T>> entry : multimap.asMap().entrySet()) {
91              set.add(entry.getKey());
92              set.addAll(entry.getValue());
93          }
94          return ImmutableSet.copyOf(set);
95      }
96  
97      private static <T> Multimap<T, T> close(final Multimap<T, T> multimap) {
98          final ImmutableMultimap.Builder<T, T> builder = ImmutableMultimap.builder();
99          for (final T child : multimap.keySet()) {
100             final Set<T> parents = Sets.newHashSet();
101             closeHelper(multimap, child, parents);
102             parents.remove(child);
103             builder.putAll(child, parents);
104         }
105         return builder.build();
106     }
107 
108     private static <T> void closeHelper(final Multimap<T, T> multimap, final T child,
109             final Set<T> parents) {
110         for (final T parent : multimap.get(child)) {
111             if (parents.add(parent)) {
112                 closeHelper(multimap, parent, parents);
113             }
114         }
115     }
116 
117     private static final class UndoRDFSMapper implements Mapper {
118 
119         private static final Value[] BYPASS = new Value[] { Mapper.BYPASS_KEY };
120 
121         private final Set<Resource> types;
122 
123         private final Set<URI> properties;
124 
125         public UndoRDFSMapper(final Iterable<Resource> types, final Iterable<URI> properties) {
126             this.types = ImmutableSet.copyOf(types);
127             this.properties = ImmutableSet.copyOf(properties);
128         }
129 
130         @Override
131         public Value[] map(final Statement stmt) throws RDFHandlerException {
132 
133             if (this.properties.contains(stmt.getPredicate())) {
134                 final Hash ctxHash = Statements.computeHash(stmt.getContext());
135                 final Hash subjHash = Statements.computeHash(stmt.getSubject());
136                 final Hash objHash = Statements.computeHash(stmt.getObject());
137                 final String key = "p:" + Hash.combine(ctxHash, subjHash, objHash).toString();
138                 return new Value[] { Statements.VALUE_FACTORY.createURI(key) };
139 
140             } else if (RDF.TYPE.equals(stmt.getPredicate())
141                     && this.types.contains(stmt.getObject())) {
142                 final Hash ctxHash = Statements.computeHash(stmt.getContext());
143                 final Hash subjHash = Statements.computeHash(stmt.getSubject());
144                 final String key = "t:" + Hash.combine(ctxHash, subjHash).toString();
145                 return new Value[] { Statements.VALUE_FACTORY.createURI(key) };
146 
147             } else {
148                 return BYPASS;
149             }
150         }
151 
152     }
153 
154     private static final class UndoRDFSReducer implements Reducer {
155 
156         private final Multimap<Resource, Resource> superTypes;
157 
158         private final Multimap<URI, URI> superProperties;
159 
160         public UndoRDFSReducer(final Multimap<Resource, Resource> superTypes,
161                 final Multimap<URI, URI> superProperties) {
162             this.superTypes = ImmutableMultimap.copyOf(superTypes);
163             this.superProperties = ImmutableMultimap.copyOf(superProperties);
164         }
165 
166         @Override
167         public void reduce(final Value key, final Statement[] stmts, final RDFHandler handler)
168                 throws RDFHandlerException {
169 
170             final String keyString = key.stringValue();
171 
172             if (keyString.startsWith("t:")) {
173                 // Emit only the <s rdf:type t ctx> statements where the type cannot be inferred
174                 final Set<Resource> types = Sets.newHashSet();
175                 for (final Statement stmt : stmts) {
176                     types.add((Resource) stmt.getObject());
177                 }
178                 for (final Resource type : ImmutableList.copyOf(types)) {
179                     if (types.contains(type)) {
180                         types.removeAll(this.superTypes.get(type));
181                     }
182                 }
183                 final Resource subj = stmts[0].getSubject();
184                 final Resource ctx = stmts[0].getContext();
185                 for (final Resource type : types) {
186                     handler.handleStatement(Statements.VALUE_FACTORY.createStatement(subj,
187                             RDF.TYPE, type, ctx));
188                 }
189 
190             } else if (keyString.startsWith("p:")) {
191                 // Emit only the <s p o ctx> statements where the property cannot be inferred
192                 final Set<URI> properties = Sets.newHashSet();
193                 for (final Statement stmt : stmts) {
194                     properties.add(stmt.getPredicate());
195                 }
196                 for (final URI property : ImmutableList.copyOf(properties)) {
197                     if (properties.contains(property)) {
198                         properties.removeAll(this.superProperties.get(property));
199                     }
200                 }
201                 final Resource subj = stmts[0].getSubject();
202                 final Value obj = stmts[0].getObject();
203                 final Resource ctx = stmts[0].getContext();
204                 for (final URI property : properties) {
205                     handler.handleStatement(Statements.VALUE_FACTORY.createStatement(subj,
206                             property, obj, ctx));
207                 }
208 
209             } else {
210                 // If the key is unrecognized, emit all the statements of the partition unchanged
211                 for (final Statement stmt : stmts) {
212                     handler.handleStatement(stmt);
213                 }
214             }
215         }
216 
217     }
218 
219 }