DefaultPersistence.java

  1. /*
  2.  * *********************************************************************************************************************
  3.  *
  4.  * blueMarine II: Semantic Media Centre
  5.  * http://tidalwave.it/projects/bluemarine2
  6.  *
  7.  * Copyright (C) 2015 - 2021 by Tidalwave s.a.s. (http://tidalwave.it)
  8.  *
  9.  * *********************************************************************************************************************
  10.  *
  11.  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  12.  * the License. You may obtain a copy of the License at
  13.  *
  14.  *     http://www.apache.org/licenses/LICENSE-2.0
  15.  *
  16.  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  17.  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the License for the
  18.  * specific language governing permissions and limitations under the License.
  19.  *
  20.  * *********************************************************************************************************************
  21.  *
  22.  * git clone https://bitbucket.org/tidalwave/bluemarine2-src
  23.  * git clone https://github.com/tidalwave-it/bluemarine2-src
  24.  *
  25.  * *********************************************************************************************************************
  26.  */
  27. package it.tidalwave.bluemarine2.persistence.impl;

  28. import it.tidalwave.bluemarine2.util.SortingRDFHandler;
  29. import javax.annotation.Nonnull;
  30. import javax.inject.Inject;
  31. import java.util.Optional;
  32. import java.util.concurrent.CountDownLatch;
  33. import java.io.IOException;
  34. import java.io.PrintWriter;
  35. import java.io.Reader;
  36. import java.nio.file.Files;
  37. import java.nio.file.Path;
  38. import java.nio.file.Paths;
  39. import org.apache.commons.io.FileUtils;
  40. import com.google.common.annotations.VisibleForTesting;
  41. import org.eclipse.rdf4j.repository.Repository;
  42. import org.eclipse.rdf4j.repository.RepositoryConnection;
  43. import org.eclipse.rdf4j.repository.RepositoryException;
  44. import org.eclipse.rdf4j.repository.sail.SailRepository;
  45. import org.eclipse.rdf4j.sail.Sail;
  46. import org.eclipse.rdf4j.sail.memory.MemoryStore;
  47. import org.eclipse.rdf4j.sail.nativerdf.NativeStore;
  48. import org.eclipse.rdf4j.rio.RDFFormat;
  49. import org.eclipse.rdf4j.rio.RDFHandler;
  50. import org.eclipse.rdf4j.rio.RDFHandlerException;
  51. import org.eclipse.rdf4j.rio.RDFParseException;
  52. import org.eclipse.rdf4j.rio.n3.N3Writer;
  53. import it.tidalwave.util.TypeSafeMap;
  54. import it.tidalwave.messagebus.MessageBus;
  55. import it.tidalwave.messagebus.annotation.ListensTo;
  56. import it.tidalwave.messagebus.annotation.SimpleMessageSubscriber;
  57. import it.tidalwave.bluemarine2.message.PersistenceInitializedNotification;
  58. import it.tidalwave.bluemarine2.message.PowerOffNotification;
  59. import it.tidalwave.bluemarine2.message.PowerOnNotification;
  60. import it.tidalwave.bluemarine2.persistence.Persistence;
  61. import lombok.extern.slf4j.Slf4j;
  62. import static java.util.concurrent.TimeUnit.SECONDS;
  63. import static java.nio.charset.StandardCharsets.UTF_8;
  64. import static it.tidalwave.bluemarine2.persistence.PersistencePropertyNames.*;

  65. /***********************************************************************************************************************
  66.  *
  67.  * @author  Fabrizio Giudici
  68.  *
  69.  **********************************************************************************************************************/
  70. @SimpleMessageSubscriber @Slf4j
  71. public class DefaultPersistence implements Persistence
  72.   {
  73.     @Inject
  74.     private MessageBus messageBus;

  75.     private final CountDownLatch initialized = new CountDownLatch(1);

  76.     private Repository repository;

  77.     @VisibleForTesting Sail sail;

  78.     /*******************************************************************************************************************
  79.      *
  80.      * {@inheritDoc}
  81.      *
  82.      ******************************************************************************************************************/
  83.     @Override @Nonnull
  84.     public Repository getRepository()
  85.       {
  86.         waitForPowerOn();
  87.         return repository;
  88.       }

  89.     /*******************************************************************************************************************
  90.      *
  91.      *
  92.      ******************************************************************************************************************/
  93.     @VisibleForTesting void onPowerOnNotification (@ListensTo @Nonnull final PowerOnNotification notification)
  94.       throws RepositoryException, IOException, RDFParseException
  95.       {
  96.         log.info("onPowerOnNotification({})", notification);
  97.         final TypeSafeMap properties = notification.getProperties();

  98.         final Optional<Path> importFile = properties.getOptional(IMPORT_FILE);
  99.         final Optional<Path> storageFolder = properties.getOptional(STORAGE_FOLDER);

  100.         if (storageFolder.isEmpty())
  101.           {
  102.             log.warn("No storage path: working in memory");
  103.             sail = new MemoryStore();
  104.           }
  105.         else
  106.           {
  107.             log.info("Disk storage at {}", storageFolder);

  108.             if (importFile.isPresent() && Files.exists(importFile.get()))
  109.               {
  110.                 log.warn("Scratching store ...");
  111.                 FileUtils.deleteDirectory(storageFolder.get().toFile()); // FIXME: rename to backup folder with timestamp
  112.               }

  113.             sail = new NativeStore(storageFolder.get().toFile());
  114.           }

  115.         repository = new SailRepository(sail);
  116.         repository.initialize();

  117.         if (importFile.isPresent() && Files.exists(importFile.get()))
  118.           {
  119.             importFromFile(importFile.get());

  120.             if (properties.getOptional(RENAME_IMPORT_FILE).orElse(false))
  121.               {
  122.                 Files.move(importFile.get(), Paths.get(importFile.get().toString() + "~"));
  123.               }
  124.           }

  125.         initialized.countDown();
  126.         messageBus.publish(new PersistenceInitializedNotification());
  127.       }

  128.     /*******************************************************************************************************************
  129.      *
  130.      *
  131.      ******************************************************************************************************************/
  132.     @VisibleForTesting void onPowerOffNotification (@ListensTo @Nonnull final PowerOffNotification notification)
  133.       throws RepositoryException, IOException, RDFParseException
  134.       {
  135.         log.info("onPowerOffNotification({})", notification);

  136.         if (repository != null)
  137.           {
  138.             repository.shutDown();
  139.           }
  140.       }

  141.     /*******************************************************************************************************************
  142.      *
  143.      * Exports the repository to the given file.
  144.      *
  145.      * @param   path                    where to export the data to
  146.      * @throws  RDFHandlerException
  147.      * @throws  IOException
  148.      * @throws  RepositoryException
  149.      *
  150.      ******************************************************************************************************************/
  151.     @Override
  152.     public void exportToFile (@Nonnull final Path path)
  153.       throws RDFHandlerException, IOException, RepositoryException
  154.       {
  155.         log.info("exportToFile({})", path);
  156.         Files.createDirectories(path.getParent());

  157.         try (final PrintWriter pw = new PrintWriter(Files.newBufferedWriter(path, UTF_8));
  158.              final RepositoryConnection connection = repository.getConnection())
  159.           {
  160.             final RDFHandler writer = new SortingRDFHandler(new N3Writer(pw));

  161. //            FIXME: use Iterations - and sort
  162. //            for (final Namespace namespace : connection.getNamespaces().asList())
  163. //              {
  164. //                writer.handleNamespace(namespace.getPrefix(), namespace.getName());
  165. //              }

  166.             writer.handleNamespace("bio",   "http://purl.org/vocab/bio/0.1/");
  167.             writer.handleNamespace("bmmo",  "http://bluemarine.tidalwave.it/2015/04/mo/");
  168.             writer.handleNamespace("dc",    "http://purl.org/dc/elements/1.1/");
  169.             writer.handleNamespace("foaf",  "http://xmlns.com/foaf/0.1/");
  170.             writer.handleNamespace("owl",   "http://www.w3.org/2002/07/owl#");
  171.             writer.handleNamespace("mo",    "http://purl.org/ontology/mo/");
  172.             writer.handleNamespace("rdfs",  "http://www.w3.org/2000/01/rdf-schema#");
  173.             writer.handleNamespace("rel",   "http://purl.org/vocab/relationship/");
  174.             writer.handleNamespace("vocab", "http://dbtune.org/musicbrainz/resource/vocab/");
  175.             writer.handleNamespace("xs",    "http://www.w3.org/2001/XMLSchema#");

  176.             connection.export(writer);
  177.           }
  178.       }

  179.     /*******************************************************************************************************************
  180.      *
  181.      *
  182.      ******************************************************************************************************************/
  183.     @Override
  184.     public <E extends Exception> void runInTransaction (@Nonnull final TransactionalTask<E> task)
  185.       throws E, RepositoryException
  186.       {
  187.         log.info("runInTransaction({})", task);
  188.         waitForPowerOn();
  189.         final long baseTime = System.nanoTime();

  190.         try (final RepositoryConnection connection = repository.getConnection()) // TODO: pool?
  191.           {
  192.             task.run(connection);
  193.             connection.commit();
  194.           }
  195.         catch (Exception e)
  196.           {
  197.             log.error("Transaction failed: {}", e.toString());
  198.           }

  199.         if (log.isDebugEnabled())
  200.           {
  201.             log.debug(">>>> done in {} ms", (System.nanoTime() - baseTime) * 1E-6);
  202.           }
  203.       }

  204.     /*******************************************************************************************************************
  205.      *
  206.      * Imports the repository from the given file.
  207.      *
  208.      * @param   path                    where to import the data from
  209.      * @throws  RDFHandlerException
  210.      * @throws  IOException
  211.      * @throws  RepositoryException
  212.      *
  213.      ******************************************************************************************************************/
  214.     private void importFromFile (@Nonnull final Path path)
  215.       throws IOException, RepositoryException, RDFParseException
  216.       {
  217.         try (final RepositoryConnection connection = repository.getConnection();
  218.              final Reader reader = Files.newBufferedReader(path, UTF_8))
  219.           {
  220.             log.info("Importing repository from {} ...", path);
  221.             connection.add(reader, path.toUri().toString(), RDFFormat.N3);
  222.             connection.commit();
  223.           }
  224.       }

  225.     /*******************************************************************************************************************
  226.      *
  227.      *
  228.      ******************************************************************************************************************/
  229.     private void waitForPowerOn()
  230.       {
  231.         try
  232.           {
  233.             if (!initialized.await(10, SECONDS))
  234.               {
  235.                 throw new IllegalStateException("Did not receive PowerOnNotification");
  236.               }
  237.           }
  238.         catch (InterruptedException ex)
  239.           {
  240.             throw new IllegalStateException("Interrupted while waiting for PowerOnNotification");
  241.           }
  242.       }
  243.   }