diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java new file mode 100644 index 000000000..c25d5e2b7 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.engine.modification; + +/** + * Deletion is a delete operation on a timeseries. + */ +public class Deletion extends Modification { + private long timestamp; + + public Deletion(String path, long versionNum, long timestamp) { + super(Type.DELETION, path, versionNum); + this.timestamp = timestamp; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof Modification)) + return false; + Deletion del = (Deletion) obj; + return super.equals(obj) && del.timestamp == this.timestamp; + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Modification.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Modification.java new file mode 100644 index 000000000..3504749ad --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Modification.java @@ -0,0 +1,70 @@ +/** + * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.engine.modification; + +/** + * Modification represents an UPDATE or DELETE operation on a certain timeseries. + */ +public abstract class Modification { + + protected Type type; + protected String path; + protected long versionNum; + + public Modification(Type type, String path, long versionNum) { + this.type = type; + this.path = path; + this.versionNum = versionNum; + } + + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + + public long getVersionNum() { + return versionNum; + } + + public void setVersionNum(long versionNum) { + this.versionNum = versionNum; + } + + public Type getType() { + return type; + } + + public void setType(Type type) { + this.type = type; + } + + public enum Type { + DELETION + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof Modification)) + return false; + Modification mod = (Modification) obj; + return mod.type.equals(this.type) && mod.path.equals(this.path) + && mod.versionNum == this.versionNum; + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java new file mode 100644 index 000000000..c04ff1d12 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java @@ -0,0 +1,101 @@ +/** + * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.engine.modification; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.iotdb.db.engine.modification.io.LocalTextModificationAccessor; +import org.apache.iotdb.db.engine.modification.io.ModificationReader; +import org.apache.iotdb.db.engine.modification.io.ModificationWriter; + +/** + * ModificationFile stores the Modifications of a TsFile or unseq file in another file in the same + * directory. + * Methods in this class are highly synchronized for concurrency safety. + */ +public class ModificationFile { + + private Collection modifications; + private String filePath; + private ModificationWriter writer; + private ModificationReader reader; + + /** + * Construct a ModificationFile using a file as its storage. + * @param filePath the path of the storage file. + * @throws IOException when IOException raised when + */ + public ModificationFile(String filePath) throws IOException { + this.filePath = filePath; + LocalTextModificationAccessor accessor = new LocalTextModificationAccessor(filePath); + this.writer = accessor; + this.reader = accessor; + } + + private void init() throws IOException { + synchronized (this) { + Collection mods = reader.read(); + if (mods == null) { + mods = new ArrayList<>(); + } + modifications = mods; + } + } + + private void checkInit() throws IOException { + if (modifications == null) { + init(); + } + } + + /** + * Release the resources such as streams and caches. + */ + public void close() throws IOException { + synchronized (this) { + writer.close(); + modifications = null; + } + } + + /** + * Write a modification in this file. The modification will first be written to the persistent + * store then the memory cache. + * @param mod the modification to be written. + * @throws IOException if IOException is thrown when writing the modification to the store. + */ + public void write(Modification mod) throws IOException { + synchronized (this) { + checkInit(); + writer.write(mod); + modifications.add(mod); + } + } + + /** + * Get all modifications stored in this file. + * @return + */ + public Collection getModifications() throws IOException { + synchronized (this) { + checkInit(); + return new ArrayList<>(modifications); + } + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java new file mode 100644 index 000000000..9fd14414e --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java @@ -0,0 +1,135 @@ +/** + * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.engine.modification.io; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.iotdb.db.engine.modification.Deletion; +import org.apache.iotdb.db.engine.modification.Modification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * LocalTextModificationAccessor uses a file on local file system to store the modifications + * in text format, and writes modifications by appending to the tail of the file. + */ +public class LocalTextModificationAccessor implements ModificationReader, ModificationWriter { + + private static final Logger logger = LoggerFactory.getLogger(LocalTextModificationAccessor.class); + private static final String SEPARATOR = ","; + + private String filePath; + private BufferedWriter writer; + + /** + * Construct a LocalTextModificationAccessor using a file specified by filePath. Only a writer + * will be created because the reader will be created only if necessary(call of read()). + * + * @param filePath the path of the file that is used for storing modifications. + * @throws IOException if the writer cannot be created. + */ + public LocalTextModificationAccessor(String filePath) throws IOException { + this.filePath = filePath; + writer = new BufferedWriter(new FileWriter(filePath)); + } + + @Override + public Collection read() throws IOException { + BufferedReader reader; + try { + reader = new BufferedReader(new FileReader(filePath)); + } catch (FileNotFoundException e) { + return null; + } + String line; + + List modificationList = new ArrayList<>(); + try { + while ((line = reader.readLine()) != null) { + modificationList.add(decodeModification(line)); + } + } catch (IOException e) { + reader.close(); + logger.error("An error occurred when reading modifications, and the remaining modifications " + + "were ignored.", e); + } + return modificationList; + } + + @Override + public void close() throws IOException { + writer.close(); + } + + @Override + public void write(Modification mod) throws IOException { + writer.write(encodeModification(mod)); + writer.newLine(); + writer.flush(); + } + + private static String encodeModification(Modification mod) { + if (mod instanceof Deletion) + return encodeDeletion((Deletion) mod); + return null; + } + + private static Modification decodeModification(String src) throws IOException { + String[] fields = src.split(SEPARATOR); + if (Modification.Type.DELETION.name().equals(fields[0])) { + return decodeDeletion(fields); + } + throw new IOException("Unknown modification type: " + fields[0]); + } + + private static String encodeDeletion(Deletion del) { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(del.getType().toString()).append(SEPARATOR).append(del.getPath()) + .append(SEPARATOR).append(del.getVersionNum()).append(SEPARATOR) + .append(del.getTimestamp()); + return stringBuilder.toString(); + } + + private static Deletion decodeDeletion(String[] fields) throws IOException { + if (fields.length != 4) { + throw new IOException("Incorrect deletion fields number: " + fields.length); + } + + String path = fields[1]; + long versionNum, timestamp; + try { + versionNum = Long.parseLong(fields[2]); + } catch (NumberFormatException e) { + throw new IOException("Invalide version number: " + fields[2]); + } + try { + timestamp = Long.parseLong(fields[3]); + } catch (NumberFormatException e) { + throw new IOException("Invalide timestamp: " + fields[3]); + } + + return new Deletion(path, versionNum, timestamp); + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java new file mode 100644 index 000000000..d770eccbc --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java @@ -0,0 +1,40 @@ +/** + * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.engine.modification.io; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.iotdb.db.engine.modification.Modification; + +/** + * ModificationReader reads all modifications from a persistent medium like file system. + */ +public interface ModificationReader { + + /** + * Read all modifications from a persistent medium. + * + * @return a list of modifications contained the medium. + */ + Collection read() throws IOException; + + /** + * Release resources like streams. + */ + void close() throws IOException; +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java new file mode 100644 index 000000000..5c3806b3e --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.engine.modification.io; + +import java.io.IOException; + +import org.apache.iotdb.db.engine.modification.Modification; + +/** + * ModificationWriter provides methods for writing a modification to a persistent medium like file + * system. + */ +public interface ModificationWriter { + + /** + * Write a new modification to the persistent medium. + * @param mod the modification to be written. + */ + void write(Modification mod) throws IOException; + + /** + * Release resources like streams. + */ + void close() throws IOException; +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/package-info.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/package-info.java new file mode 100644 index 000000000..49697e169 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/package-info.java @@ -0,0 +1,22 @@ +/** + * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ * modification is the functional module responsible for processing UPDATE and DELETE. + */ + +/** + * modification is the functional module responsible for processing UPDATE and DELETE. + */ +package org.apache.iotdb.db.engine.modification; \ No newline at end of file diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java new file mode 100644 index 000000000..6907a3d8d --- /dev/null +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java @@ -0,0 +1,62 @@ +/** + * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.engine.modification; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class ModificationFileTest { + @Test + public void readMyWrite() { + String tempFileName = "mod.temp"; + Modification[] modifications = new Modification[]{ + new Deletion("p1", 1, 1), + new Deletion("p2", 2, 2), + new Deletion("p3", 3, 3), + new Deletion("p4", 4, 4), + }; + try { + ModificationFile mFile = new ModificationFile(tempFileName); + for (int i = 0; i < 2; i++) { + mFile.write(modifications[i]); + } + List modificationList = (List) mFile.getModifications(); + for (int i = 0; i < 2; i++) { + assertEquals(modifications[i], modificationList.get(i)); + } + + for (int i = 2; i < 4; i++) { + mFile.write(modifications[i]); + } + modificationList = (List) mFile.getModifications(); + for (int i = 0; i < 4; i++) { + assertEquals(modifications[i], modificationList.get(i)); + } + mFile.close(); + } catch (IOException e) { + fail(e.getMessage()); + } finally { + new File(tempFileName).delete(); + } + } +} \ No newline at end of file diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java new file mode 100644 index 000000000..125d243e7 --- /dev/null +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java @@ -0,0 +1,79 @@ +/** + * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.engine.modification.io; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import org.apache.iotdb.db.engine.modification.Deletion; +import org.apache.iotdb.db.engine.modification.Modification; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class LocalTextModificationAccessorTest { + + @Test + public void readMyWrite() { + String tempFileName = "mod.temp"; + Modification[] modifications = new Modification[]{ + new Deletion("p1", 1, 1), + new Deletion("p2", 2, 2), + new Deletion("p3", 3, 3), + new Deletion("p4", 4, 4), + }; + try { + LocalTextModificationAccessor accessor = new LocalTextModificationAccessor(tempFileName); + for (int i = 0; i < 2; i++) { + accessor.write(modifications[i]); + } + List modificationList = (List) accessor.read(); + for (int i = 0; i < 2; i++) { + assertEquals(modifications[i], modificationList.get(i)); + } + + for (int i = 2; i < 4; i++) { + accessor.write(modifications[i]); + } + modificationList = (List) accessor.read(); + for (int i = 0; i < 4; i++) { + assertEquals(modifications[i], modificationList.get(i)); + } + accessor.close(); + } catch (IOException e) { + fail(e.getMessage()); + } finally { + new File(tempFileName).delete(); + } + } + + @Test + public void readNull() throws IOException { + String tempFileName = "mod.temp"; + LocalTextModificationAccessor accessor = null; + try { + accessor = new LocalTextModificationAccessor(tempFileName); + } catch (IOException e) { + fail(e.getMessage()); + } + new File(tempFileName).delete(); + Collection modifications = accessor.read(); + assertNull(modifications); + } +} \ No newline at end of file