Remove usage of AsynchronousExecutor library in favor or a simpler implementation.
This commit is contained in:
parent
d8249b7886
commit
3177d55579
5 changed files with 198 additions and 577 deletions
|
@ -1,32 +1,130 @@
|
||||||
package net.minecraftforge.common.chunkio;
|
package net.minecraftforge.common.chunkio;
|
||||||
|
|
||||||
import net.minecraftforge.common.util.AsynchronousExecutor;
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
public class ChunkIOExecutor {
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
|
import net.minecraft.world.World;
|
||||||
|
import net.minecraft.world.chunk.Chunk;
|
||||||
|
import net.minecraft.world.chunk.storage.AnvilChunkLoader;
|
||||||
|
import net.minecraft.world.gen.ChunkProviderServer;
|
||||||
|
import net.minecraftforge.fml.common.FMLLog;
|
||||||
|
|
||||||
|
public class ChunkIOExecutor
|
||||||
|
{
|
||||||
static final int BASE_THREADS = 1;
|
static final int BASE_THREADS = 1;
|
||||||
static final int PLAYERS_PER_THREAD = 50;
|
static final int PLAYERS_PER_THREAD = 50;
|
||||||
|
|
||||||
private static final AsynchronousExecutor<QueuedChunk, net.minecraft.world.chunk.Chunk, Runnable, RuntimeException> instance = new AsynchronousExecutor<QueuedChunk, net.minecraft.world.chunk.Chunk, Runnable, RuntimeException>(new ChunkIOProvider(), BASE_THREADS);
|
private static final Map<QueuedChunk, ChunkIOProvider> tasks = Maps.newConcurrentMap();
|
||||||
|
private static final ThreadPoolExecutor pool = new ThreadPoolExecutor(BASE_THREADS, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
|
||||||
|
new LinkedBlockingQueue<Runnable>(),
|
||||||
|
new ThreadFactory()
|
||||||
|
{
|
||||||
|
private AtomicInteger count = new AtomicInteger(1);
|
||||||
|
@Override
|
||||||
|
public Thread newThread(Runnable r)
|
||||||
|
{
|
||||||
|
Thread thread = new Thread(r, "Chunk I/O Executor Thread-" + count.getAndIncrement());
|
||||||
|
thread.setDaemon(true);
|
||||||
|
return thread;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
public static net.minecraft.world.chunk.Chunk syncChunkLoad(net.minecraft.world.World world, net.minecraft.world.chunk.storage.AnvilChunkLoader loader, net.minecraft.world.gen.ChunkProviderServer provider, int x, int z) {
|
//Load the chunk completely in this thread. Dequeue as needed...
|
||||||
return instance.getSkipQueue(new QueuedChunk(x, z, loader, world, provider));
|
public static Chunk syncChunkLoad(World world, AnvilChunkLoader loader, ChunkProviderServer provider, int x, int z)
|
||||||
|
{
|
||||||
|
QueuedChunk key = new QueuedChunk(x, z, loader, world, provider);
|
||||||
|
ChunkIOProvider task = tasks.get(key);
|
||||||
|
if (task != null)
|
||||||
|
{
|
||||||
|
if (!pool.remove(task) && !task.runFinished()) // If it wasn't in the pool, and run hasn't finished, then wait for the async thread.
|
||||||
|
{
|
||||||
|
synchronized(task)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
task.wait();
|
||||||
|
}
|
||||||
|
catch (InterruptedException e)
|
||||||
|
{
|
||||||
|
e.printStackTrace(); // Something happened? Log it?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
task = new ChunkIOProvider(key);
|
||||||
|
task.run();
|
||||||
|
}
|
||||||
|
task.syncCallback();
|
||||||
|
return task.getChunk();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void queueChunkLoad(net.minecraft.world.World world, net.minecraft.world.chunk.storage.AnvilChunkLoader loader, net.minecraft.world.gen.ChunkProviderServer provider, int x, int z, Runnable runnable) {
|
//Queue the chunk to be loaded, and call the runnable when finished
|
||||||
instance.add(new QueuedChunk(x, z, loader, world, provider), runnable);
|
public static void queueChunkLoad(World world, AnvilChunkLoader loader, ChunkProviderServer provider, int x, int z, Runnable runnable)
|
||||||
|
{
|
||||||
|
QueuedChunk key = new QueuedChunk(x, z, loader, world, provider);
|
||||||
|
ChunkIOProvider task = tasks.get(key);
|
||||||
|
if (task == null)
|
||||||
|
{
|
||||||
|
task = new ChunkIOProvider(key);
|
||||||
|
task.addCallback(runnable); // Add before calling execute for thread safety
|
||||||
|
tasks.put(key, task);
|
||||||
|
pool.execute(task);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
task.addCallback(runnable);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Abuses the fact that hashCode and equals for QueuedChunk only use world and coords
|
// Abuses the fact that hashCode and equals for QueuedChunk only use world and coords
|
||||||
public static void dropQueuedChunkLoad(net.minecraft.world.World world, int x, int z, Runnable runnable) {
|
// Remove the chunk from the queue if it's in the list.
|
||||||
instance.drop(new QueuedChunk(x, z, null, world, null), runnable);
|
public static void dropQueuedChunkLoad(World world, int x, int z, Runnable runnable)
|
||||||
|
{
|
||||||
|
QueuedChunk key = new QueuedChunk(x, z, null, world, null);
|
||||||
|
ChunkIOProvider task = tasks.get(key);
|
||||||
|
if (task == null)
|
||||||
|
{
|
||||||
|
FMLLog.warning("Attempted to dequeue chunk that wasn't queued? %d @ (%d, %d)", world.provider.getDimension(), x, z);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void adjustPoolSize(int players) {
|
task.removeCallback(runnable);
|
||||||
|
|
||||||
|
if (!task.hasCallback())
|
||||||
|
{
|
||||||
|
tasks.remove(key);
|
||||||
|
pool.remove(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void adjustPoolSize(int players)
|
||||||
|
{
|
||||||
int size = Math.max(BASE_THREADS, (int) Math.ceil(players / PLAYERS_PER_THREAD));
|
int size = Math.max(BASE_THREADS, (int) Math.ceil(players / PLAYERS_PER_THREAD));
|
||||||
instance.setActiveThreads(size);
|
pool.setCorePoolSize(size);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void tick() {
|
public static void tick()
|
||||||
instance.finishActive();
|
{
|
||||||
|
Iterator<ChunkIOProvider> itr = tasks.values().iterator();
|
||||||
|
while (itr.hasNext())
|
||||||
|
{
|
||||||
|
ChunkIOProvider task = itr.next();
|
||||||
|
if (task.runFinished() && task.hasCallback())
|
||||||
|
{
|
||||||
|
task.syncCallback();
|
||||||
|
}
|
||||||
|
itr.remove();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -1,64 +1,113 @@
|
||||||
package net.minecraftforge.common.chunkio;
|
package net.minecraftforge.common.chunkio;
|
||||||
|
|
||||||
|
|
||||||
import net.minecraft.world.ChunkCoordIntPair;
|
import net.minecraft.world.ChunkCoordIntPair;
|
||||||
|
import net.minecraft.world.chunk.Chunk;
|
||||||
|
import net.minecraft.world.chunk.storage.AnvilChunkLoader;
|
||||||
|
import net.minecraft.world.gen.ChunkProviderServer;
|
||||||
|
import net.minecraft.nbt.NBTTagCompound;
|
||||||
import net.minecraftforge.common.MinecraftForge;
|
import net.minecraftforge.common.MinecraftForge;
|
||||||
import net.minecraftforge.common.util.AsynchronousExecutor;
|
|
||||||
import net.minecraftforge.event.world.ChunkDataEvent;
|
import net.minecraftforge.event.world.ChunkDataEvent;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
|
||||||
class ChunkIOProvider implements AsynchronousExecutor.CallBackProvider<QueuedChunk, net.minecraft.world.chunk.Chunk, Runnable, RuntimeException> {
|
class ChunkIOProvider implements Runnable
|
||||||
private final AtomicInteger threadNumber = new AtomicInteger(1);
|
{
|
||||||
|
private QueuedChunk chunkInfo;
|
||||||
|
private Chunk chunk;
|
||||||
|
private NBTTagCompound nbt;
|
||||||
|
private ConcurrentLinkedQueue<Runnable> callbacks = new ConcurrentLinkedQueue<Runnable>();
|
||||||
|
private boolean ran = false;
|
||||||
|
|
||||||
// async stuff
|
ChunkIOProvider(QueuedChunk chunk)
|
||||||
public net.minecraft.world.chunk.Chunk callStage1(QueuedChunk queuedChunk) throws RuntimeException {
|
{
|
||||||
net.minecraft.world.chunk.storage.AnvilChunkLoader loader = queuedChunk.loader;
|
this.chunkInfo = chunk;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addCallback(Runnable callback)
|
||||||
|
{
|
||||||
|
this.callbacks.add(callback);
|
||||||
|
}
|
||||||
|
public void removeCallback(Runnable callback)
|
||||||
|
{
|
||||||
|
this.callbacks.remove(callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() // async stuff
|
||||||
|
{
|
||||||
|
synchronized(this)
|
||||||
|
{
|
||||||
|
AnvilChunkLoader loader = chunkInfo.loader;
|
||||||
Object[] data = null;
|
Object[] data = null;
|
||||||
try {
|
try
|
||||||
data = loader.loadChunk__Async(queuedChunk.world, queuedChunk.x, queuedChunk.z);
|
{
|
||||||
} catch (IOException e) {
|
data = loader.loadChunk__Async(chunkInfo.world, chunkInfo.x, chunkInfo.z);
|
||||||
|
}
|
||||||
|
catch (IOException e)
|
||||||
|
{
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (data != null) {
|
if (data != null)
|
||||||
queuedChunk.compound = (net.minecraft.nbt.NBTTagCompound) data[1];
|
{
|
||||||
return (net.minecraft.world.chunk.Chunk) data[0];
|
this.nbt = (NBTTagCompound)data[1];
|
||||||
|
this.chunk = (Chunk)data[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
return null;
|
this.ran = true;
|
||||||
|
this.notifyAll();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// sync stuff
|
// sync stuff
|
||||||
public void callStage2(QueuedChunk queuedChunk, net.minecraft.world.chunk.Chunk chunk) throws RuntimeException {
|
public void syncCallback()
|
||||||
if(chunk == null) {
|
{
|
||||||
|
ChunkProviderServer provider = this.chunkInfo.provider;
|
||||||
|
if (chunk == null)
|
||||||
|
{
|
||||||
// If the chunk loading failed just do it synchronously (may generate)
|
// If the chunk loading failed just do it synchronously (may generate)
|
||||||
queuedChunk.provider.originalLoadChunk(queuedChunk.x, queuedChunk.z);
|
provider.originalLoadChunk(this.chunkInfo.x, this.chunkInfo.z);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
queuedChunk.loader.loadEntities(queuedChunk.world, queuedChunk.compound.getCompoundTag("Level"), chunk);
|
// Load Entities
|
||||||
MinecraftForge.EVENT_BUS.post(new ChunkDataEvent.Load(chunk, queuedChunk.compound)); // Don't call ChunkDataEvent.Load async
|
this.chunkInfo.loader.loadEntities(this.chunkInfo.world, this.nbt.getCompoundTag("Level"), this.chunk);
|
||||||
chunk.setLastSaveTime(queuedChunk.provider.worldObj.getTotalWorldTime());
|
|
||||||
queuedChunk.provider.id2ChunkMap.add(ChunkCoordIntPair.chunkXZ2Int(queuedChunk.x, queuedChunk.z), chunk);
|
|
||||||
queuedChunk.provider.loadedChunks.add(chunk);
|
|
||||||
chunk.onChunkLoad();
|
|
||||||
|
|
||||||
if (queuedChunk.provider.chunkGenerator != null) {
|
MinecraftForge.EVENT_BUS.post(new ChunkDataEvent.Load(this.chunk, this.nbt)); // Don't call ChunkDataEvent.Load async
|
||||||
queuedChunk.provider.chunkGenerator.recreateStructures(chunk, queuedChunk.x, queuedChunk.z);
|
|
||||||
|
this.chunk.setLastSaveTime(provider.worldObj.getTotalWorldTime());
|
||||||
|
provider.id2ChunkMap.add(ChunkCoordIntPair.chunkXZ2Int(this.chunkInfo.x, this.chunkInfo.z), this.chunk);
|
||||||
|
provider.loadedChunks.add(this.chunk);
|
||||||
|
this.chunk.onChunkLoad();
|
||||||
|
|
||||||
|
if (provider.chunkGenerator != null)
|
||||||
|
{
|
||||||
|
provider.chunkGenerator.recreateStructures(this.chunk, this.chunkInfo.x, this.chunkInfo.z);
|
||||||
}
|
}
|
||||||
|
|
||||||
chunk.populateChunk(queuedChunk.provider, queuedChunk.provider.chunkGenerator);
|
this.chunk.populateChunk(provider, provider.chunkGenerator);
|
||||||
|
|
||||||
|
for (Runnable r : this.callbacks)
|
||||||
|
{
|
||||||
|
r.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void callStage3(QueuedChunk queuedChunk, net.minecraft.world.chunk.Chunk chunk, Runnable runnable) throws RuntimeException {
|
this.callbacks.clear();
|
||||||
runnable.run();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Thread newThread(Runnable runnable) {
|
public Chunk getChunk()
|
||||||
Thread thread = new Thread(runnable, "Chunk I/O Executor Thread-" + threadNumber.getAndIncrement());
|
{
|
||||||
thread.setDaemon(true);
|
return this.chunk;
|
||||||
return thread;
|
}
|
||||||
|
|
||||||
|
public boolean runFinished()
|
||||||
|
{
|
||||||
|
return this.ran;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean hasCallback()
|
||||||
|
{
|
||||||
|
return this.callbacks.size() > 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -7,7 +7,6 @@ class QueuedChunk {
|
||||||
final net.minecraft.world.chunk.storage.AnvilChunkLoader loader;
|
final net.minecraft.world.chunk.storage.AnvilChunkLoader loader;
|
||||||
final net.minecraft.world.World world;
|
final net.minecraft.world.World world;
|
||||||
final net.minecraft.world.gen.ChunkProviderServer provider;
|
final net.minecraft.world.gen.ChunkProviderServer provider;
|
||||||
net.minecraft.nbt.NBTTagCompound compound;
|
|
||||||
|
|
||||||
public QueuedChunk(int x, int z, net.minecraft.world.chunk.storage.AnvilChunkLoader loader, net.minecraft.world.World world, net.minecraft.world.gen.ChunkProviderServer provider) {
|
public QueuedChunk(int x, int z, net.minecraft.world.chunk.storage.AnvilChunkLoader loader, net.minecraft.world.World world, net.minecraft.world.gen.ChunkProviderServer provider) {
|
||||||
this.x = x;
|
this.x = x;
|
||||||
|
|
|
@ -1,360 +0,0 @@
|
||||||
package net.minecraftforge.common.util;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Queue;
|
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
import java.util.concurrent.ThreadFactory;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
|
||||||
|
|
||||||
import net.minecraftforge.fml.common.FMLLog;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Executes tasks using a multi-stage process executor. Synchronous executions are via {@link AsynchronousExecutor#finishActive()} or the {@link AsynchronousExecutor#get(Object)} methods.
|
|
||||||
* <li \> Stage 1 creates the object from a parameter, and is usually called asynchronously.
|
|
||||||
* <li \> Stage 2 takes the parameter and object from stage 1 and does any synchronous processing to prepare it.
|
|
||||||
* <li \> Stage 3 takes the parameter and object from stage 1, as well as a callback that was registered, and performs any synchronous calculations.
|
|
||||||
*
|
|
||||||
* @param <P> The type of parameter you provide to make the object that will be created. It should implement {@link Object#hashCode()} and {@link Object#equals(Object)} if you want to get the value early.
|
|
||||||
* @param <T> The type of object you provide. This is created in stage 1, and passed to stage 2, 3, and returned if get() is called.
|
|
||||||
* @param <C> The type of callback you provide. You may register many of these to be passed to the provider in stage 3, one at a time.
|
|
||||||
* @param <E> A type of exception you may throw and expect to be handled by the main thread
|
|
||||||
* @author Wesley Wolfe (c) 2012, 2014
|
|
||||||
*/
|
|
||||||
public final class AsynchronousExecutor<P, T, C, E extends Throwable> {
|
|
||||||
|
|
||||||
public static interface CallBackProvider<P, T, C, E extends Throwable> extends ThreadFactory {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Normally an asynchronous call, but can be synchronous
|
|
||||||
*
|
|
||||||
* @param parameter parameter object provided
|
|
||||||
* @return the created object
|
|
||||||
*/
|
|
||||||
T callStage1(P parameter) throws E;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Synchronous call
|
|
||||||
*
|
|
||||||
* @param parameter parameter object provided
|
|
||||||
* @param object the previously created object
|
|
||||||
*/
|
|
||||||
void callStage2(P parameter, T object) throws E;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Synchronous call, called multiple times, once per registered callback
|
|
||||||
*
|
|
||||||
* @param parameter parameter object provided
|
|
||||||
* @param object the previously created object
|
|
||||||
* @param callback the current callback to execute
|
|
||||||
*/
|
|
||||||
void callStage3(P parameter, T object, C callback) throws E;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class StateHolder {
|
|
||||||
|
|
||||||
protected static final int PENDING = 0x0;
|
|
||||||
protected static final int STAGE_1_ASYNC = PENDING + 1;
|
|
||||||
protected static final int STAGE_1_SYNC = STAGE_1_ASYNC + 1;
|
|
||||||
protected static final int STAGE_1_COMPLETE = STAGE_1_SYNC + 1;
|
|
||||||
protected static final int FINISHED = STAGE_1_COMPLETE + 1;
|
|
||||||
|
|
||||||
protected volatile int state = PENDING;
|
|
||||||
|
|
||||||
protected static final AtomicIntegerFieldUpdater<StateHolder> STATE = AtomicIntegerFieldUpdater.newUpdater(StateHolder.class, "state");
|
|
||||||
}
|
|
||||||
|
|
||||||
private class Task extends StateHolder implements Runnable {
|
|
||||||
|
|
||||||
private final P parameter;
|
|
||||||
private T object;
|
|
||||||
private final List<C> callbacks = new LinkedList<C>();
|
|
||||||
private E t = null;
|
|
||||||
|
|
||||||
private Task(final P parameter) {
|
|
||||||
this.parameter = parameter;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void run() {
|
|
||||||
if (initAsync()) {
|
|
||||||
finished.add(this);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean initAsync() {
|
|
||||||
if (STATE.compareAndSet(this, PENDING, STAGE_1_ASYNC)) {
|
|
||||||
boolean ret = true;
|
|
||||||
|
|
||||||
try {
|
|
||||||
init();
|
|
||||||
} finally {
|
|
||||||
if (STATE.compareAndSet(this, STAGE_1_ASYNC, STAGE_1_COMPLETE)) {
|
|
||||||
// No one is/will be waiting
|
|
||||||
} else {
|
|
||||||
// We know that the sync thread will be waiting
|
|
||||||
synchronized (this) {
|
|
||||||
if (state != STAGE_1_SYNC) {
|
|
||||||
// They beat us to the synchronized block
|
|
||||||
this.notifyAll();
|
|
||||||
} else {
|
|
||||||
// We beat them to the synchronized block
|
|
||||||
}
|
|
||||||
state = STAGE_1_COMPLETE; // They're already synchronized, atomic locks are not needed
|
|
||||||
}
|
|
||||||
// We want to return false, because we know a synchronous task already handled the finish()
|
|
||||||
ret = false; // Don't return inside finally; VERY bad practice.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void initSync() {
|
|
||||||
if (STATE.compareAndSet(this, PENDING, STAGE_1_COMPLETE)) {
|
|
||||||
// If we succeed that variable switch, good as done
|
|
||||||
init();
|
|
||||||
} else if (STATE.compareAndSet(this, STAGE_1_ASYNC, STAGE_1_SYNC)) {
|
|
||||||
// Async thread is running, but this shouldn't be likely; we need to sync to wait on them because of it.
|
|
||||||
synchronized (this) {
|
|
||||||
if (STATE.compareAndSet(this, STAGE_1_SYNC, PENDING)) { // They might NOT synchronized yet, atomic lock IS needed
|
|
||||||
// We are the first into the lock
|
|
||||||
while (state != STAGE_1_COMPLETE) {
|
|
||||||
try {
|
|
||||||
this.wait();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw new RuntimeException("Unable to handle interruption on " + parameter, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// They beat us to the synchronized block
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Async thread is not pending, the more likely situation for a task not pending
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private void init() {
|
|
||||||
try {
|
|
||||||
object = provider.callStage1(parameter);
|
|
||||||
} catch (final Throwable t) {
|
|
||||||
this.t = (E) t;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private T get() throws E {
|
|
||||||
initSync();
|
|
||||||
if (callbacks.isEmpty()) {
|
|
||||||
// 'this' is a placeholder to prevent callbacks from being empty during finish call
|
|
||||||
// See get method below
|
|
||||||
callbacks.add((C) this);
|
|
||||||
}
|
|
||||||
finish();
|
|
||||||
return object;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void finish() throws E {
|
|
||||||
switch (state) {
|
|
||||||
default:
|
|
||||||
case PENDING:
|
|
||||||
case STAGE_1_ASYNC:
|
|
||||||
case STAGE_1_SYNC:
|
|
||||||
throw new IllegalStateException("Attempting to finish unprepared(" + state + ") task(" + parameter + ")");
|
|
||||||
case STAGE_1_COMPLETE:
|
|
||||||
try {
|
|
||||||
if (t != null) {
|
|
||||||
throw t;
|
|
||||||
}
|
|
||||||
if (callbacks.isEmpty()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
final CallBackProvider<P, T, C, E> provider = AsynchronousExecutor.this.provider;
|
|
||||||
final P parameter = this.parameter;
|
|
||||||
final T object = this.object;
|
|
||||||
|
|
||||||
provider.callStage2(parameter, object);
|
|
||||||
for (C callback : callbacks) {
|
|
||||||
if (callback == this) {
|
|
||||||
// 'this' is a placeholder to prevent callbacks from being empty on a get() call
|
|
||||||
// See get method above
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
provider.callStage3(parameter, object, callback);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
tasks.remove(parameter);
|
|
||||||
state = FINISHED;
|
|
||||||
}
|
|
||||||
case FINISHED:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean drop() {
|
|
||||||
if (STATE.compareAndSet(this, PENDING, FINISHED)) {
|
|
||||||
// If we succeed that variable switch, good as forgotten
|
|
||||||
tasks.remove(parameter);
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
// We need the async thread to finish normally to properly dispose of the task
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private final CallBackProvider<P, T, C, E> provider;
|
|
||||||
private final Queue<Task> finished = new ConcurrentLinkedQueue<Task>();
|
|
||||||
private final Map<P, Task> tasks = new HashMap<P, Task>();
|
|
||||||
private final ThreadPoolExecutor pool;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Uses a thread pool to pass executions to the provider.
|
|
||||||
* @see AsynchronousExecutor
|
|
||||||
*/
|
|
||||||
public AsynchronousExecutor(final CallBackProvider<P, T, C, E> provider, final int coreSize) {
|
|
||||||
if (provider == null) {
|
|
||||||
throw new IllegalArgumentException("Provider cannot be null");
|
|
||||||
}
|
|
||||||
this.provider = provider;
|
|
||||||
|
|
||||||
// We have an unbound queue size so do not need a max thread size
|
|
||||||
pool = new ThreadPoolExecutor(coreSize, Integer.MAX_VALUE, 60l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), provider);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Adds a callback to the parameter provided, adding parameter to the queue if needed.
|
|
||||||
* <p>
|
|
||||||
* This should always be synchronous.
|
|
||||||
*/
|
|
||||||
public void add(P parameter, C callback) {
|
|
||||||
Task task = tasks.get(parameter);
|
|
||||||
if (task == null) {
|
|
||||||
tasks.put(parameter, task = new Task(parameter));
|
|
||||||
pool.execute(task);
|
|
||||||
}
|
|
||||||
task.callbacks.add(callback);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This removes a particular callback from the specified parameter.
|
|
||||||
* <p>
|
|
||||||
* If no callbacks remain for a given parameter, then the {@link CallBackProvider CallBackProvider's} stages may be omitted from execution.
|
|
||||||
* Stage 3 will have no callbacks, stage 2 will be skipped unless a {@link #get(Object)} is used, and stage 1 will be avoided on a best-effort basis.
|
|
||||||
* <p>
|
|
||||||
* Subsequent calls to {@link #getSkipQueue(Object)} will always work.
|
|
||||||
* <p>
|
|
||||||
* Subsequent calls to {@link #get(Object)} might work.
|
|
||||||
* <p>
|
|
||||||
* This should always be synchronous
|
|
||||||
* @return true if no further execution for the parameter is possible, such that, no exceptions will be thrown in {@link #finishActive()} for the parameter, and {@link #get(Object)} will throw an {@link IllegalStateException}, false otherwise
|
|
||||||
* @throws IllegalStateException if parameter is not in the queue anymore
|
|
||||||
* @throws IllegalStateException if the callback was not specified for given parameter
|
|
||||||
*/
|
|
||||||
public boolean drop(P parameter, C callback) throws IllegalStateException {
|
|
||||||
final Task task = tasks.get(parameter);
|
|
||||||
if (task == null) {
|
|
||||||
// Print debug info for QueuedChunk and avoid crash
|
|
||||||
//throw new IllegalStateException("Unknown " + parameter);
|
|
||||||
FMLLog.info("Unknown %s", parameter);
|
|
||||||
FMLLog.info("This should not happen. Please report this error to Forge.");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (!task.callbacks.remove(callback)) {
|
|
||||||
throw new IllegalStateException("Unknown " + callback + " for " + parameter);
|
|
||||||
}
|
|
||||||
if (task.callbacks.isEmpty()) {
|
|
||||||
return task.drop();
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This method attempts to skip the waiting period for said parameter.
|
|
||||||
* <p>
|
|
||||||
* This should always be synchronous.
|
|
||||||
* @throws IllegalStateException if the parameter is not in the queue anymore, or sometimes if called from asynchronous thread
|
|
||||||
*/
|
|
||||||
public T get(P parameter) throws E, IllegalStateException {
|
|
||||||
final Task task = tasks.get(parameter);
|
|
||||||
if (task == null) {
|
|
||||||
throw new IllegalStateException("Unknown " + parameter);
|
|
||||||
}
|
|
||||||
return task.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Processes a parameter as if it was in the queue, without ever passing to another thread.
|
|
||||||
*/
|
|
||||||
public T getSkipQueue(P parameter) throws E {
|
|
||||||
return skipQueue(parameter);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Processes a parameter as if it was in the queue, without ever passing to another thread.
|
|
||||||
*/
|
|
||||||
public T getSkipQueue(P parameter, C callback) throws E {
|
|
||||||
final T object = skipQueue(parameter);
|
|
||||||
provider.callStage3(parameter, object, callback);
|
|
||||||
return object;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Processes a parameter as if it was in the queue, without ever passing to another thread.
|
|
||||||
*/
|
|
||||||
public T getSkipQueue(P parameter, C... callbacks) throws E {
|
|
||||||
final CallBackProvider<P, T, C, E> provider = this.provider;
|
|
||||||
final T object = skipQueue(parameter);
|
|
||||||
for (C callback : callbacks) {
|
|
||||||
provider.callStage3(parameter, object, callback);
|
|
||||||
}
|
|
||||||
return object;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Processes a parameter as if it was in the queue, without ever passing to another thread.
|
|
||||||
*/
|
|
||||||
public T getSkipQueue(P parameter, Iterable<C> callbacks) throws E {
|
|
||||||
final CallBackProvider<P, T, C, E> provider = this.provider;
|
|
||||||
final T object = skipQueue(parameter);
|
|
||||||
for (C callback : callbacks) {
|
|
||||||
provider.callStage3(parameter, object, callback);
|
|
||||||
}
|
|
||||||
return object;
|
|
||||||
}
|
|
||||||
|
|
||||||
private T skipQueue(P parameter) throws E {
|
|
||||||
Task task = tasks.get(parameter);
|
|
||||||
if (task != null) {
|
|
||||||
return task.get();
|
|
||||||
}
|
|
||||||
T object = provider.callStage1(parameter);
|
|
||||||
provider.callStage2(parameter, object);
|
|
||||||
return object;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This is the 'heartbeat' that should be called synchronously to finish any pending tasks
|
|
||||||
*/
|
|
||||||
public void finishActive() throws E {
|
|
||||||
final Queue<Task> finished = this.finished;
|
|
||||||
while (!finished.isEmpty()) {
|
|
||||||
finished.poll().finish();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setActiveThreads(final int coreSize) {
|
|
||||||
pool.setCorePoolSize(coreSize);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,165 +0,0 @@
|
||||||
GNU LESSER GENERAL PUBLIC LICENSE
|
|
||||||
Version 3, 29 June 2007
|
|
||||||
|
|
||||||
Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/>
|
|
||||||
Everyone is permitted to copy and distribute verbatim copies
|
|
||||||
of this license document, but changing it is not allowed.
|
|
||||||
|
|
||||||
|
|
||||||
This version of the GNU Lesser General Public License incorporates
|
|
||||||
the terms and conditions of version 3 of the GNU General Public
|
|
||||||
License, supplemented by the additional permissions listed below.
|
|
||||||
|
|
||||||
0. Additional Definitions.
|
|
||||||
|
|
||||||
As used herein, "this License" refers to version 3 of the GNU Lesser
|
|
||||||
General Public License, and the "GNU GPL" refers to version 3 of the GNU
|
|
||||||
General Public License.
|
|
||||||
|
|
||||||
"The Library" refers to a covered work governed by this License,
|
|
||||||
other than an Application or a Combined Work as defined below.
|
|
||||||
|
|
||||||
An "Application" is any work that makes use of an interface provided
|
|
||||||
by the Library, but which is not otherwise based on the Library.
|
|
||||||
Defining a subclass of a class defined by the Library is deemed a mode
|
|
||||||
of using an interface provided by the Library.
|
|
||||||
|
|
||||||
A "Combined Work" is a work produced by combining or linking an
|
|
||||||
Application with the Library. The particular version of the Library
|
|
||||||
with which the Combined Work was made is also called the "Linked
|
|
||||||
Version".
|
|
||||||
|
|
||||||
The "Minimal Corresponding Source" for a Combined Work means the
|
|
||||||
Corresponding Source for the Combined Work, excluding any source code
|
|
||||||
for portions of the Combined Work that, considered in isolation, are
|
|
||||||
based on the Application, and not on the Linked Version.
|
|
||||||
|
|
||||||
The "Corresponding Application Code" for a Combined Work means the
|
|
||||||
object code and/or source code for the Application, including any data
|
|
||||||
and utility programs needed for reproducing the Combined Work from the
|
|
||||||
Application, but excluding the System Libraries of the Combined Work.
|
|
||||||
|
|
||||||
1. Exception to Section 3 of the GNU GPL.
|
|
||||||
|
|
||||||
You may convey a covered work under sections 3 and 4 of this License
|
|
||||||
without being bound by section 3 of the GNU GPL.
|
|
||||||
|
|
||||||
2. Conveying Modified Versions.
|
|
||||||
|
|
||||||
If you modify a copy of the Library, and, in your modifications, a
|
|
||||||
facility refers to a function or data to be supplied by an Application
|
|
||||||
that uses the facility (other than as an argument passed when the
|
|
||||||
facility is invoked), then you may convey a copy of the modified
|
|
||||||
version:
|
|
||||||
|
|
||||||
a) under this License, provided that you make a good faith effort to
|
|
||||||
ensure that, in the event an Application does not supply the
|
|
||||||
function or data, the facility still operates, and performs
|
|
||||||
whatever part of its purpose remains meaningful, or
|
|
||||||
|
|
||||||
b) under the GNU GPL, with none of the additional permissions of
|
|
||||||
this License applicable to that copy.
|
|
||||||
|
|
||||||
3. Object Code Incorporating Material from Library Header Files.
|
|
||||||
|
|
||||||
The object code form of an Application may incorporate material from
|
|
||||||
a header file that is part of the Library. You may convey such object
|
|
||||||
code under terms of your choice, provided that, if the incorporated
|
|
||||||
material is not limited to numerical parameters, data structure
|
|
||||||
layouts and accessors, or small macros, inline functions and templates
|
|
||||||
(ten or fewer lines in length), you do both of the following:
|
|
||||||
|
|
||||||
a) Give prominent notice with each copy of the object code that the
|
|
||||||
Library is used in it and that the Library and its use are
|
|
||||||
covered by this License.
|
|
||||||
|
|
||||||
b) Accompany the object code with a copy of the GNU GPL and this license
|
|
||||||
document.
|
|
||||||
|
|
||||||
4. Combined Works.
|
|
||||||
|
|
||||||
You may convey a Combined Work under terms of your choice that,
|
|
||||||
taken together, effectively do not restrict modification of the
|
|
||||||
portions of the Library contained in the Combined Work and reverse
|
|
||||||
engineering for debugging such modifications, if you also do each of
|
|
||||||
the following:
|
|
||||||
|
|
||||||
a) Give prominent notice with each copy of the Combined Work that
|
|
||||||
the Library is used in it and that the Library and its use are
|
|
||||||
covered by this License.
|
|
||||||
|
|
||||||
b) Accompany the Combined Work with a copy of the GNU GPL and this license
|
|
||||||
document.
|
|
||||||
|
|
||||||
c) For a Combined Work that displays copyright notices during
|
|
||||||
execution, include the copyright notice for the Library among
|
|
||||||
these notices, as well as a reference directing the user to the
|
|
||||||
copies of the GNU GPL and this license document.
|
|
||||||
|
|
||||||
d) Do one of the following:
|
|
||||||
|
|
||||||
0) Convey the Minimal Corresponding Source under the terms of this
|
|
||||||
License, and the Corresponding Application Code in a form
|
|
||||||
suitable for, and under terms that permit, the user to
|
|
||||||
recombine or relink the Application with a modified version of
|
|
||||||
the Linked Version to produce a modified Combined Work, in the
|
|
||||||
manner specified by section 6 of the GNU GPL for conveying
|
|
||||||
Corresponding Source.
|
|
||||||
|
|
||||||
1) Use a suitable shared library mechanism for linking with the
|
|
||||||
Library. A suitable mechanism is one that (a) uses at run time
|
|
||||||
a copy of the Library already present on the user's computer
|
|
||||||
system, and (b) will operate properly with a modified version
|
|
||||||
of the Library that is interface-compatible with the Linked
|
|
||||||
Version.
|
|
||||||
|
|
||||||
e) Provide Installation Information, but only if you would otherwise
|
|
||||||
be required to provide such information under section 6 of the
|
|
||||||
GNU GPL, and only to the extent that such information is
|
|
||||||
necessary to install and execute a modified version of the
|
|
||||||
Combined Work produced by recombining or relinking the
|
|
||||||
Application with a modified version of the Linked Version. (If
|
|
||||||
you use option 4d0, the Installation Information must accompany
|
|
||||||
the Minimal Corresponding Source and Corresponding Application
|
|
||||||
Code. If you use option 4d1, you must provide the Installation
|
|
||||||
Information in the manner specified by section 6 of the GNU GPL
|
|
||||||
for conveying Corresponding Source.)
|
|
||||||
|
|
||||||
5. Combined Libraries.
|
|
||||||
|
|
||||||
You may place library facilities that are a work based on the
|
|
||||||
Library side by side in a single library together with other library
|
|
||||||
facilities that are not Applications and are not covered by this
|
|
||||||
License, and convey such a combined library under terms of your
|
|
||||||
choice, if you do both of the following:
|
|
||||||
|
|
||||||
a) Accompany the combined library with a copy of the same work based
|
|
||||||
on the Library, uncombined with any other library facilities,
|
|
||||||
conveyed under the terms of this License.
|
|
||||||
|
|
||||||
b) Give prominent notice with the combined library that part of it
|
|
||||||
is a work based on the Library, and explaining where to find the
|
|
||||||
accompanying uncombined form of the same work.
|
|
||||||
|
|
||||||
6. Revised Versions of the GNU Lesser General Public License.
|
|
||||||
|
|
||||||
The Free Software Foundation may publish revised and/or new versions
|
|
||||||
of the GNU Lesser General Public License from time to time. Such new
|
|
||||||
versions will be similar in spirit to the present version, but may
|
|
||||||
differ in detail to address new problems or concerns.
|
|
||||||
|
|
||||||
Each version is given a distinguishing version number. If the
|
|
||||||
Library as you received it specifies that a certain numbered version
|
|
||||||
of the GNU Lesser General Public License "or any later version"
|
|
||||||
applies to it, you have the option of following the terms and
|
|
||||||
conditions either of that published version or of any later version
|
|
||||||
published by the Free Software Foundation. If the Library as you
|
|
||||||
received it does not specify a version number of the GNU Lesser
|
|
||||||
General Public License, you may choose any version of the GNU Lesser
|
|
||||||
General Public License ever published by the Free Software Foundation.
|
|
||||||
|
|
||||||
If the Library as you received it specifies that a proxy can decide
|
|
||||||
whether future versions of the GNU Lesser General Public License shall
|
|
||||||
apply, that proxy's public statement of acceptance of any version is
|
|
||||||
permanent authorization for you to choose that version for the
|
|
||||||
Library.
|
|
Loading…
Reference in a new issue