DefaultPersistence.java
/*
* *********************************************************************************************************************
*
* blueMarine II: Semantic Media Centre
* http://tidalwave.it/projects/bluemarine2
*
* Copyright (C) 2015 - 2021 by Tidalwave s.a.s. (http://tidalwave.it)
*
* *********************************************************************************************************************
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* *********************************************************************************************************************
*
* git clone https://bitbucket.org/tidalwave/bluemarine2-src
* git clone https://github.com/tidalwave-it/bluemarine2-src
*
* *********************************************************************************************************************
*/
package it.tidalwave.bluemarine2.persistence.impl;
import it.tidalwave.bluemarine2.util.SortingRDFHandler;
import javax.annotation.Nonnull;
import javax.inject.Inject;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Reader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.apache.commons.io.FileUtils;
import com.google.common.annotations.VisibleForTesting;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.eclipse.rdf4j.repository.sail.SailRepository;
import org.eclipse.rdf4j.sail.Sail;
import org.eclipse.rdf4j.sail.memory.MemoryStore;
import org.eclipse.rdf4j.sail.nativerdf.NativeStore;
import org.eclipse.rdf4j.rio.RDFFormat;
import org.eclipse.rdf4j.rio.RDFHandler;
import org.eclipse.rdf4j.rio.RDFHandlerException;
import org.eclipse.rdf4j.rio.RDFParseException;
import org.eclipse.rdf4j.rio.n3.N3Writer;
import it.tidalwave.util.TypeSafeMap;
import it.tidalwave.messagebus.MessageBus;
import it.tidalwave.messagebus.annotation.ListensTo;
import it.tidalwave.messagebus.annotation.SimpleMessageSubscriber;
import it.tidalwave.bluemarine2.message.PersistenceInitializedNotification;
import it.tidalwave.bluemarine2.message.PowerOffNotification;
import it.tidalwave.bluemarine2.message.PowerOnNotification;
import it.tidalwave.bluemarine2.persistence.Persistence;
import lombok.extern.slf4j.Slf4j;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.nio.charset.StandardCharsets.UTF_8;
import static it.tidalwave.bluemarine2.persistence.PersistencePropertyNames.*;
/***********************************************************************************************************************
*
* @author Fabrizio Giudici
*
**********************************************************************************************************************/
@SimpleMessageSubscriber @Slf4j
public class DefaultPersistence implements Persistence
{
@Inject
private MessageBus messageBus;
private final CountDownLatch initialized = new CountDownLatch(1);
private Repository repository;
@VisibleForTesting Sail sail;
/*******************************************************************************************************************
*
* {@inheritDoc}
*
******************************************************************************************************************/
@Override @Nonnull
public Repository getRepository()
{
waitForPowerOn();
return repository;
}
/*******************************************************************************************************************
*
*
******************************************************************************************************************/
@VisibleForTesting void onPowerOnNotification (@ListensTo @Nonnull final PowerOnNotification notification)
throws RepositoryException, IOException, RDFParseException
{
log.info("onPowerOnNotification({})", notification);
final TypeSafeMap properties = notification.getProperties();
final Optional<Path> importFile = properties.getOptional(IMPORT_FILE);
final Optional<Path> storageFolder = properties.getOptional(STORAGE_FOLDER);
if (storageFolder.isEmpty())
{
log.warn("No storage path: working in memory");
sail = new MemoryStore();
}
else
{
log.info("Disk storage at {}", storageFolder);
if (importFile.isPresent() && Files.exists(importFile.get()))
{
log.warn("Scratching store ...");
FileUtils.deleteDirectory(storageFolder.get().toFile()); // FIXME: rename to backup folder with timestamp
}
sail = new NativeStore(storageFolder.get().toFile());
}
repository = new SailRepository(sail);
repository.initialize();
if (importFile.isPresent() && Files.exists(importFile.get()))
{
importFromFile(importFile.get());
if (properties.getOptional(RENAME_IMPORT_FILE).orElse(false))
{
Files.move(importFile.get(), Paths.get(importFile.get().toString() + "~"));
}
}
initialized.countDown();
messageBus.publish(new PersistenceInitializedNotification());
}
/*******************************************************************************************************************
*
*
******************************************************************************************************************/
@VisibleForTesting void onPowerOffNotification (@ListensTo @Nonnull final PowerOffNotification notification)
throws RepositoryException, IOException, RDFParseException
{
log.info("onPowerOffNotification({})", notification);
if (repository != null)
{
repository.shutDown();
}
}
/*******************************************************************************************************************
*
* Exports the repository to the given file.
*
* @param path where to export the data to
* @throws RDFHandlerException
* @throws IOException
* @throws RepositoryException
*
******************************************************************************************************************/
@Override
public void exportToFile (@Nonnull final Path path)
throws RDFHandlerException, IOException, RepositoryException
{
log.info("exportToFile({})", path);
Files.createDirectories(path.getParent());
try (final PrintWriter pw = new PrintWriter(Files.newBufferedWriter(path, UTF_8));
final RepositoryConnection connection = repository.getConnection())
{
final RDFHandler writer = new SortingRDFHandler(new N3Writer(pw));
// FIXME: use Iterations - and sort
// for (final Namespace namespace : connection.getNamespaces().asList())
// {
// writer.handleNamespace(namespace.getPrefix(), namespace.getName());
// }
writer.handleNamespace("bio", "http://purl.org/vocab/bio/0.1/");
writer.handleNamespace("bmmo", "http://bluemarine.tidalwave.it/2015/04/mo/");
writer.handleNamespace("dc", "http://purl.org/dc/elements/1.1/");
writer.handleNamespace("foaf", "http://xmlns.com/foaf/0.1/");
writer.handleNamespace("owl", "http://www.w3.org/2002/07/owl#");
writer.handleNamespace("mo", "http://purl.org/ontology/mo/");
writer.handleNamespace("rdfs", "http://www.w3.org/2000/01/rdf-schema#");
writer.handleNamespace("rel", "http://purl.org/vocab/relationship/");
writer.handleNamespace("vocab", "http://dbtune.org/musicbrainz/resource/vocab/");
writer.handleNamespace("xs", "http://www.w3.org/2001/XMLSchema#");
connection.export(writer);
}
}
/*******************************************************************************************************************
*
*
******************************************************************************************************************/
@Override
public <E extends Exception> void runInTransaction (@Nonnull final TransactionalTask<E> task)
throws E, RepositoryException
{
log.info("runInTransaction({})", task);
waitForPowerOn();
final long baseTime = System.nanoTime();
try (final RepositoryConnection connection = repository.getConnection()) // TODO: pool?
{
task.run(connection);
connection.commit();
}
catch (Exception e)
{
log.error("Transaction failed: {}", e.toString());
}
if (log.isDebugEnabled())
{
log.debug(">>>> done in {} ms", (System.nanoTime() - baseTime) * 1E-6);
}
}
/*******************************************************************************************************************
*
* Imports the repository from the given file.
*
* @param path where to import the data from
* @throws RDFHandlerException
* @throws IOException
* @throws RepositoryException
*
******************************************************************************************************************/
private void importFromFile (@Nonnull final Path path)
throws IOException, RepositoryException, RDFParseException
{
try (final RepositoryConnection connection = repository.getConnection();
final Reader reader = Files.newBufferedReader(path, UTF_8))
{
log.info("Importing repository from {} ...", path);
connection.add(reader, path.toUri().toString(), RDFFormat.N3);
connection.commit();
}
}
/*******************************************************************************************************************
*
*
******************************************************************************************************************/
private void waitForPowerOn()
{
try
{
if (!initialized.await(10, SECONDS))
{
throw new IllegalStateException("Did not receive PowerOnNotification");
}
}
catch (InterruptedException ex)
{
throw new IllegalStateException("Interrupted while waiting for PowerOnNotification");
}
}
}