Class BookKeeper
- java.lang.Object
-
- org.apache.sling.distribution.journal.bookkeeper.BookKeeper
-
- All Implemented Interfaces:
Closeable
,AutoCloseable
public class BookKeeper extends Object implements Closeable
Keeps track of offset and processed status and manages coordinates the import/retry handling. The offset store is identified by the agentName only. With non clustered publish instances deployment, each instance stores the offset in its own node store, thus avoiding mix ups. Moreover, when cloning an instance from a node store, the cloned instance will implicitly recover the offsets and start from the last processed offset. With clustered publish instances deployment, only one Subscriber agent must run on the cluster in order to avoid mix ups. The clustered and non clustered publish instances use cases can be supported by only running the Subscriber agent on the leader instance.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
BookKeeper.PackageStatus
-
Field Summary
Fields Modifier and Type Field Description static int
COMMIT_AFTER_NUM_SKIPPED
static String
KEY_OFFSET
static String
STORE_TYPE_STATUS
-
Constructor Summary
Constructors Constructor Description BookKeeper(org.apache.sling.api.resource.ResourceResolverFactory resolverFactory, DistributionMetricsService distributionMetricsService, org.apache.sling.distribution.journal.bookkeeper.PackageHandler packageHandler, org.osgi.service.event.EventAdmin eventAdmin, Consumer<org.apache.sling.distribution.journal.messages.PackageStatusMessage> sender, Consumer<org.apache.sling.distribution.journal.messages.LogMessage> logSender, BookKeeperConfig config)
BookKeeper(org.apache.sling.api.resource.ResourceResolverFactory resolverFactory, DistributionMetricsService distributionMetricsService, org.apache.sling.distribution.journal.bookkeeper.PackageHandler packageHandler, org.osgi.service.event.EventAdmin eventAdmin, Consumer<org.apache.sling.distribution.journal.messages.PackageStatusMessage> sender, Consumer<org.apache.sling.distribution.journal.messages.LogMessage> logSender, BookKeeperConfig config, org.apache.sling.distribution.ImportPostProcessor importPostProcessor, org.apache.sling.distribution.InvalidationProcessor invalidationProcessor)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
clearPackageRetriesOnSuccess(org.apache.sling.distribution.journal.messages.PackageMessage pkgMsg)
This method clears the packageRetries storage for a given package and emits metrics on the success of the retry.void
close()
PackageRetries
getPackageRetries()
int
getRetries(String pubAgentName)
void
importPackage(org.apache.sling.distribution.journal.messages.PackageMessage pkgMsg, long offset, long createdTime)
We aim at processing the packages exactly once.void
invalidateCache(org.apache.sling.distribution.journal.messages.PackageMessage pkgMsg, long offset)
long
loadOffset()
void
markStatusSent()
void
removePackage(org.apache.sling.distribution.journal.messages.PackageMessage pkgMsg, long offset)
boolean
sendStoredStatus(int retry)
boolean
shouldCommitSkipped()
void
skipPackage(long offset)
-
-
-
Field Detail
-
STORE_TYPE_STATUS
public static final String STORE_TYPE_STATUS
- See Also:
- Constant Field Values
-
KEY_OFFSET
public static final String KEY_OFFSET
- See Also:
- Constant Field Values
-
COMMIT_AFTER_NUM_SKIPPED
public static final int COMMIT_AFTER_NUM_SKIPPED
- See Also:
- Constant Field Values
-
-
Constructor Detail
-
BookKeeper
public BookKeeper(org.apache.sling.api.resource.ResourceResolverFactory resolverFactory, DistributionMetricsService distributionMetricsService, org.apache.sling.distribution.journal.bookkeeper.PackageHandler packageHandler, org.osgi.service.event.EventAdmin eventAdmin, Consumer<org.apache.sling.distribution.journal.messages.PackageStatusMessage> sender, Consumer<org.apache.sling.distribution.journal.messages.LogMessage> logSender, BookKeeperConfig config)
-
BookKeeper
public BookKeeper(org.apache.sling.api.resource.ResourceResolverFactory resolverFactory, DistributionMetricsService distributionMetricsService, org.apache.sling.distribution.journal.bookkeeper.PackageHandler packageHandler, org.osgi.service.event.EventAdmin eventAdmin, Consumer<org.apache.sling.distribution.journal.messages.PackageStatusMessage> sender, Consumer<org.apache.sling.distribution.journal.messages.LogMessage> logSender, BookKeeperConfig config, org.apache.sling.distribution.ImportPostProcessor importPostProcessor, org.apache.sling.distribution.InvalidationProcessor invalidationProcessor)
-
-
Method Detail
-
importPackage
public void importPackage(org.apache.sling.distribution.journal.messages.PackageMessage pkgMsg, long offset, long createdTime) throws org.apache.sling.distribution.common.DistributionException
We aim at processing the packages exactly once. Processing the packages exactly once is possible with the following conditions I. The package importer is configured to disable auto-committing changes. II. A single commit aggregates three content updates C1. install the package C2. store the processing status C3. store the offset processed Some package importers require auto-saving or issue partial commits before failing. For those packages importers, we aim at processing packages at least once, thanks to the order in which the content updates are applied.- Throws:
org.apache.sling.distribution.common.DistributionException
-
invalidateCache
public void invalidateCache(org.apache.sling.distribution.journal.messages.PackageMessage pkgMsg, long offset) throws org.apache.sling.distribution.common.DistributionException
- Throws:
org.apache.sling.distribution.common.DistributionException
-
removePackage
public void removePackage(org.apache.sling.distribution.journal.messages.PackageMessage pkgMsg, long offset) throws org.apache.sling.api.resource.LoginException, org.apache.sling.api.resource.PersistenceException
- Throws:
org.apache.sling.api.resource.LoginException
org.apache.sling.api.resource.PersistenceException
-
skipPackage
public void skipPackage(long offset) throws org.apache.sling.api.resource.LoginException, org.apache.sling.api.resource.PersistenceException
- Throws:
org.apache.sling.api.resource.LoginException
org.apache.sling.api.resource.PersistenceException
-
shouldCommitSkipped
public boolean shouldCommitSkipped()
-
sendStoredStatus
public boolean sendStoredStatus(int retry)
- Returns:
true
if the status has been sent ;false
otherwise.
-
markStatusSent
public void markStatusSent()
-
loadOffset
public long loadOffset()
-
getRetries
public int getRetries(String pubAgentName)
-
getPackageRetries
public PackageRetries getPackageRetries()
-
clearPackageRetriesOnSuccess
public void clearPackageRetriesOnSuccess(org.apache.sling.distribution.journal.messages.PackageMessage pkgMsg)
This method clears the packageRetries storage for a given package and emits metrics on the success of the retry.- Parameters:
pkgMsg
- : package distributed
-
close
public void close() throws IOException
- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceCloseable
- Throws:
IOException
-
-