- package it.tidalwave.bluemarine2.persistence.impl;
- import it.tidalwave.bluemarine2.util.SortingRDFHandler;
- import javax.annotation.Nonnull;
- import javax.inject.Inject;
- import java.util.Optional;
- import java.util.concurrent.CountDownLatch;
- import;
- import;
- import;
- import java.nio.file.Files;
- import java.nio.file.Path;
- import java.nio.file.Paths;
- import;
- import;
- import org.eclipse.rdf4j.repository.Repository;
- import org.eclipse.rdf4j.repository.RepositoryConnection;
- import org.eclipse.rdf4j.repository.RepositoryException;
- import org.eclipse.rdf4j.repository.sail.SailRepository;
- import org.eclipse.rdf4j.sail.Sail;
- import org.eclipse.rdf4j.sail.memory.MemoryStore;
- import org.eclipse.rdf4j.sail.nativerdf.NativeStore;
- import;
- import;
- import;
- import;
- import;
- import it.tidalwave.util.TypeSafeMap;
- import it.tidalwave.messagebus.MessageBus;
- import it.tidalwave.messagebus.annotation.ListensTo;
- import it.tidalwave.messagebus.annotation.SimpleMessageSubscriber;
- import it.tidalwave.bluemarine2.message.PersistenceInitializedNotification;
- import it.tidalwave.bluemarine2.message.PowerOffNotification;
- import it.tidalwave.bluemarine2.message.PowerOnNotification;
- import it.tidalwave.bluemarine2.persistence.Persistence;
- import lombok.extern.slf4j.Slf4j;
- import static java.util.concurrent.TimeUnit.SECONDS;
- import static java.nio.charset.StandardCharsets.UTF_8;
- import static it.tidalwave.bluemarine2.persistence.PersistencePropertyNames.*;
- /***********************************************************************************************************************
- *
- * @author Fabrizio Giudici
- *
- **********************************************************************************************************************/
- @SimpleMessageSubscriber @Slf4j
- public class DefaultPersistence implements Persistence
- {
- @Inject
- private MessageBus messageBus;
- private final CountDownLatch initialized = new CountDownLatch(1);
- private Repository repository;
- @VisibleForTesting Sail sail;
- /*******************************************************************************************************************
- *
- * {@inheritDoc}
- *
- ******************************************************************************************************************/
- @Override @Nonnull
- public Repository getRepository()
- {
- waitForPowerOn();
- return repository;
- }
- /*******************************************************************************************************************
- *
- *
- ******************************************************************************************************************/
- @VisibleForTesting void onPowerOnNotification (@ListensTo @Nonnull final PowerOnNotification notification)
- throws RepositoryException, IOException, RDFParseException
- {
-"onPowerOnNotification({})", notification);
- final TypeSafeMap properties = notification.getProperties();
- final Optional<Path> importFile = properties.getOptional(IMPORT_FILE);
- final Optional<Path> storageFolder = properties.getOptional(STORAGE_FOLDER);
- if (storageFolder.isEmpty())
- {
- log.warn("No storage path: working in memory");
- sail = new MemoryStore();
- }
- else
- {
-"Disk storage at {}", storageFolder);
- if (importFile.isPresent() && Files.exists(importFile.get()))
- {
- log.warn("Scratching store ...");
- FileUtils.deleteDirectory(storageFolder.get().toFile()); // FIXME: rename to backup folder with timestamp
- }
- sail = new NativeStore(storageFolder.get().toFile());
- }
- repository = new SailRepository(sail);
- repository.initialize();
- if (importFile.isPresent() && Files.exists(importFile.get()))
- {
- importFromFile(importFile.get());
- if (properties.getOptional(RENAME_IMPORT_FILE).orElse(false))
- {
- Files.move(importFile.get(), Paths.get(importFile.get().toString() + "~"));
- }
- }
- initialized.countDown();
- messageBus.publish(new PersistenceInitializedNotification());
- }
- /*******************************************************************************************************************
- *
- *
- ******************************************************************************************************************/
- @VisibleForTesting void onPowerOffNotification (@ListensTo @Nonnull final PowerOffNotification notification)
- throws RepositoryException, IOException, RDFParseException
- {
-"onPowerOffNotification({})", notification);
- if (repository != null)
- {
- repository.shutDown();
- }
- }
- /*******************************************************************************************************************
- *
- * Exports the repository to the given file.
- *
- * @param path where to export the data to
- * @throws RDFHandlerException
- * @throws IOException
- * @throws RepositoryException
- *
- ******************************************************************************************************************/
- @Override
- public void exportToFile (@Nonnull final Path path)
- throws RDFHandlerException, IOException, RepositoryException
- {
-"exportToFile({})", path);
- Files.createDirectories(path.getParent());
- try (final PrintWriter pw = new PrintWriter(Files.newBufferedWriter(path, UTF_8));
- final RepositoryConnection connection = repository.getConnection())
- {
- final RDFHandler writer = new SortingRDFHandler(new N3Writer(pw));
- // FIXME: use Iterations - and sort
- // for (final Namespace namespace : connection.getNamespaces().asList())
- // {
- // writer.handleNamespace(namespace.getPrefix(), namespace.getName());
- // }
- writer.handleNamespace("bio", "");
- writer.handleNamespace("bmmo", "");
- writer.handleNamespace("dc", "");
- writer.handleNamespace("foaf", "");
- writer.handleNamespace("owl", "");
- writer.handleNamespace("mo", "");
- writer.handleNamespace("rdfs", "");
- writer.handleNamespace("rel", "");
- writer.handleNamespace("vocab", "");
- writer.handleNamespace("xs", "");
- connection.export(writer);
- }
- }
- /*******************************************************************************************************************
- *
- *
- ******************************************************************************************************************/
- @Override
- public <E extends Exception> void runInTransaction (@Nonnull final TransactionalTask<E> task)
- throws E, RepositoryException
- {
-"runInTransaction({})", task);
- waitForPowerOn();
- final long baseTime = System.nanoTime();
- try (final RepositoryConnection connection = repository.getConnection()) // TODO: pool?
- {
- connection.commit();
- }
- catch (Exception e)
- {
- log.error("Transaction failed: {}", e.toString());
- }
- if (log.isDebugEnabled())
- {
- log.debug(">>>> done in {} ms", (System.nanoTime() - baseTime) * 1E-6);
- }
- }
- /*******************************************************************************************************************
- *
- * Imports the repository from the given file.
- *
- * @param path where to import the data from
- * @throws RDFHandlerException
- * @throws IOException
- * @throws RepositoryException
- *
- ******************************************************************************************************************/
- private void importFromFile (@Nonnull final Path path)
- throws IOException, RepositoryException, RDFParseException
- {
- try (final RepositoryConnection connection = repository.getConnection();
- final Reader reader = Files.newBufferedReader(path, UTF_8))
- {
-"Importing repository from {} ...", path);
- connection.add(reader, path.toUri().toString(), RDFFormat.N3);
- connection.commit();
- }
- }
- /*******************************************************************************************************************
- *
- *
- ******************************************************************************************************************/
- private void waitForPowerOn()
- {
- try
- {
- if (!initialized.await(10, SECONDS))
- {
- throw new IllegalStateException("Did not receive PowerOnNotification");
- }
- }
- catch (InterruptedException ex)
- {
- throw new IllegalStateException("Interrupted while waiting for PowerOnNotification");
- }
- }
- }