Note: To run the examples, you need to clone ActiveJ project from GitHub:
$ git clone https://github.com/activej/activej
And import it as a Maven project. Check out tag v4.1. Before running the examples, build the project.
These examples are located at activej -> examples -> cloud -> crdt.

These examples utilize other ActiveJ technologies, particularly ActiveJ platform and ActiveJ Serializer

Simple CRDT

This example illustrates some basic CRDT functionality. We will create a ‘remote’ CRDT storage that contains some key-value pairs. There is also a CrdtServer for this storage.

// create the 'remote' storage
CrdtStorageMap<String, TimestampContainer<Integer>> remoteStorage = CrdtStorageMap.create(eventloop, CRDT_FUNCTION);

// put some default data into that storage
remoteStorage.put("mx", TimestampContainer.now(2));
remoteStorage.put("test", TimestampContainer.now(3));
remoteStorage.put("test", TimestampContainer.now(5));
remoteStorage.put("only_remote", TimestampContainer.now(35));
remoteStorage.put("only_remote", TimestampContainer.now(4));

// and also output it for later comparison
System.out.println("Data at 'remote' storage:");
remoteStorage.iterator().forEachRemaining(System.out::println);
System.out.println();

// create and run a server for the 'remote' storage
CrdtServer<String, TimestampContainer<Integer>> server = CrdtServer.create(eventloop, remoteStorage, INTEGER_SERIALIZER)
		.withListenAddress(ADDRESS);
server.listen();

In order to resolve possible conflicts when adding new data to the remote storage, we define a CrdtFunction. If there are any conflicting keys, crdtFunction compares their corresponding values and returns a bigger one. It will put to the storage.

private static final CrdtFunction<TimestampContainer<Integer>> CRDT_FUNCTION =
		TimestampContainer.createCrdtFunction(Integer::max);

We also need to define a CRDT client which is a CrdtStorageClient. And a local storage similarly to the remote storage.

// now crate the client for that 'remote' storage
CrdtStorage<String, TimestampContainer<Integer>> client =
		CrdtStorageClient.create(eventloop, ADDRESS, INTEGER_SERIALIZER);

// and also create the local storage
CrdtStorageMap<String, TimestampContainer<Integer>> localStorage =
		CrdtStorageMap.create(eventloop, CRDT_FUNCTION);

// and fill it with some other values
localStorage.put("mx", TimestampContainer.now(22));
// conflicting keys will be resolved with the crdt function
localStorage.put("mx", TimestampContainer.now(2));
// so the actual value will be the max of all values of that key
localStorage.put("mx", TimestampContainer.now(23));
localStorage.put("test", TimestampContainer.now(1));
localStorage.put("test", TimestampContainer.now(2));
localStorage.put("test", TimestampContainer.now(4));
localStorage.put("test", TimestampContainer.now(3));
localStorage.put("only_local", TimestampContainer.now(47));
localStorage.put("only_local", TimestampContainer.now(12));

// and output it too for later comparison
System.out.println("Data at the local storage:");
localStorage.iterator().forEachRemaining(System.out::println);
System.out.println("\n");

Now we can test uploading files to the remote storage from our client and then downloading it back. There will be some conflicting keys, so the conflicts will be resolved according to the provided CRDT function.

// now stream the local storage into the remote one through the TCP client-server pair
StreamSupplier.ofPromise(localStorage.download())
		.streamTo(StreamConsumer.ofPromise(client.upload()))
		.whenComplete(() -> {

			// check what is now at the 'remote' storage, the output should differ
			System.out.println("Synced data at 'remote' storage:");
			remoteStorage.iterator().forEachRemaining(System.out::println);
			System.out.println();

			// and now do the reverse process
			StreamSupplier.ofPromise(client.download())
					.streamTo(StreamConsumer.ofPromise(localStorage.upload()))
					.whenComplete(() -> {
						// now output the local storage, should be identical to the remote one
						System.out.println("Synced data at the local storage:");
						localStorage.iterator().forEachRemaining(System.out::println);
						System.out.println();

						// also stop the server to let the program finish
						server.close();
					});
		});

eventloop.run();

CRDT clusters

One of the core CRDT features is clusterization. It allows clients to upload and download data to/from several servers.

// we create a list of 10 local partitions with string partition ids and string keys
// normally all of them would be network clients for remote partitions
Map<String, CrdtStorage<String, LWWSet<String>>> clients = new HashMap<>();
for (int i = 0; i < 10; i++) {
	String id = "partition" + i;
	Path storage = Files.createTempDirectory("storage_"+ id);
	Files.createDirectories(storage.resolve(LocalActiveFs.DEFAULT_TEMP_DIR));
	ActiveFs fs = LocalActiveFs.create(eventloop, executor, storage);
	clients.put(id, CrdtStorageFs.create(eventloop, fs, SERIALIZER));
}

// grab a couple of them to work with
CrdtStorage<String, LWWSet<String>> partition3 = clients.get("partition3");
CrdtStorage<String, LWWSet<String>> partition6 = clients.get("partition6");

// create a cluster with string keys, string partition ids,
// and with replication count of 5 meaning that uploading items to the
// cluster will make 5 copies of them across known partitions
CrdtStorageCluster<String, String, LWWSet<String>> cluster = CrdtStorageCluster.create(eventloop, clients)
		.withReplicationCount(5);
// sets on partition3
CrdtData<String, LWWSet<String>> firstOn3 = new CrdtData<>("first", LWWSet.of("#1", "#2", "#3", "#4"));
CrdtData<String, LWWSet<String>> secondOn3 = new CrdtData<>("second", LWWSet.of("#3", "#4", "#5", "#6"));

// sets on partition6
CrdtData<String, LWWSet<String>> firstOn6 = new CrdtData<>("first", LWWSet.of("#3", "#4", "#5", "#6"));

// current implementation of LWWSet depends on system time
// so to make the below removes with a higher timestamp, we wait for just a bit
try {
	Thread.sleep(1);
} catch (InterruptedException ignored) {
}

LWWSet<String> set = LWWSet.of("#2", "#4");
set.remove("#5");
set.remove("#6");
CrdtData<String, LWWSet<String>> secondOn6 = new CrdtData<>("second", set);
// then upload these sets to both partition3 and partition6
Promise<Void> uploadTo3 = StreamSupplier.of(firstOn3, secondOn3).streamTo(StreamConsumer.ofPromise(partition3.upload()));
Promise<Void> uploadTo6 = StreamSupplier.of(firstOn6, secondOn6).streamTo(StreamConsumer.ofPromise(partition6.upload()));

// wait for both of uploads to finish
Promises.all(uploadTo3, uploadTo6)
		// and then download items from the cluster, and wait for result
		.then(() -> cluster.download())
		// also collecting it to list
		.then(StreamSupplier::toList)
		// and then print the resulting list of items, it should match the expectation from above
		// (remember that sets are unordered, so you may not see it exactly as above)
		.whenComplete((list, $) -> System.out.println(list + "\n"));

// actually run the eventloop and then shutdown the executor allowing the program to finish
eventloop.run();
executor.shutdown();

CRDT consolidation

This example illustrates consolidation process. ActiveJ CRDT file system creates a new file for each upload. This approach is faster and more efficient than writing everything in a single file. However when there are way too many files, this also becomes inefficient. In order to resolve this issue, ActiveJ CRDT has a special consolidation process which reorganizes several files in a single one:

// create our storage dir and an fs client which operates on that dir
Path storage = Files.createTempDirectory("storage");
Files.createDirectories(storage.resolve(LocalActiveFs.DEFAULT_TEMP_DIR));
LocalActiveFs fsClient = LocalActiveFs.create(eventloop, executor, storage);

// our item is a set of integers, so we create a CRDT function for it
// also each CRDT item needs to have a timestamp, so we wrap the sets
// and the function using the TimestampContainer
CrdtFunction<TimestampContainer<Set<Integer>>> crdtFunction = TimestampContainer.createCrdtFunction(CrdtFsConsolidationExample::union);

// same with serializer for the timestamp container of the set of integers
CrdtDataSerializer<String, TimestampContainer<Set<Integer>>> serializer =
		new CrdtDataSerializer<>(UTF8_SERIALIZER, TimestampContainer.createSerializer(ofSet(INT_SERIALIZER)));

// create an FS-based CRDT client
CrdtStorageFs<String, TimestampContainer<Set<Integer>>> client =
		CrdtStorageFs.create(eventloop, fsClient, serializer, crdtFunction);
// upload two streams of items to it in parallel
Promise<Void> firstUpload =
		StreamSupplier.ofStream(Stream.of(
				new CrdtData<>("1_test_1", TimestampContainer.now(set(1, 2, 3))),
				new CrdtData<>("1_test_2", TimestampContainer.now(set(2, 3, 7))),
				new CrdtData<>("1_test_3", TimestampContainer.now(set(78, 2, 3))),
				new CrdtData<>("12_test_1", TimestampContainer.now(set(123, 124, 125))),
				new CrdtData<>("12_test_2", TimestampContainer.now(set(12)))).sorted())
				.streamTo(StreamConsumer.ofPromise(client.upload()));

Promise<Void> secondUpload =
		StreamSupplier.ofStream(Stream.of(
				new CrdtData<>("2_test_1", TimestampContainer.now(set(1, 2, 3))),
				new CrdtData<>("2_test_2", TimestampContainer.now(set(2, 3, 4))),
				new CrdtData<>("2_test_3", TimestampContainer.now(set(0, 1, 2))),
				new CrdtData<>("12_test_1", TimestampContainer.now(set(123, 542, 125, 2))),
				new CrdtData<>("12_test_2", TimestampContainer.now(set(12, 13)))).sorted())
				.streamTo(StreamConsumer.ofPromise(client.upload()));
// and wait for both of uploads to finish
Promises.all(firstUpload, secondUpload)
		.whenComplete(() -> {

			// all the operations are async, but we run them sequentially
			// because we need to see the file list exactly before and after
			// consolidation process
			Promises.sequence(
					// here we can see that two files were created, one for each upload
					() -> fsClient.list("**")
							.whenResult(res -> System.out.println("\n" + res + "\n"))
							.toVoid(),

					// run the consolidation process
					client::consolidate,

					// now we can see that there is only one file left, and its size is
					// less than the sum of the sizes of the two files from above
					() -> fsClient.list("**")
							.whenResult(res -> System.out.println("\n" + res + "\n"))
							.toVoid()
			);
		});

// all of the above will not run until we actually start the eventloop
eventloop.run();
// shutdown the executor after the eventloop finishes (meaning there is no more work to do)
// because executor waits for 60 seconds of being idle until it shuts down on its own
executor.shutdown();