from __future__ import division import struct from minecraft.networking.types.basic import ( Type, Byte, Short, Integer, Long, Float, Double, ShortPrefixedByteArray, Boolean, VarInt, TrailingByteArray, Position, String, UnsignedByte ) from minecraft.networking.types.utility import Vector class IntegerPrefixedByteArray(Type): @staticmethod def read(file_object): length = Integer.read(file_object) return struct.unpack(str(length) + "s", file_object.read(length))[0] @staticmethod def send(value, socket): Integer.send(len(value), socket) socket.send(value) TAG_End = 0 TAG_Byte = 1 TAG_Short = 2 TAG_Int = 3 TAG_Long = 4 TAG_Float = 5 TAG_Double = 6 TAG_Byte_Array = 7 TAG_String = 8 TAG_List = 9 TAG_Compound = 10 TAG_Int_Array = 11 TAG_Long_Array = 12 class Nbt(Type): @staticmethod def read(file_object): type_id = Byte.read(file_object) if type_id != TAG_Compound: #raise Exception("Invalid NBT header") return None name = ShortPrefixedByteArray.read(file_object).decode('utf-8') a = Nbt.decode_tag(file_object, TAG_Compound) a['_name'] = name return a @staticmethod def decode_tag(file_object, type_id): if type_id == TAG_Byte: return Byte.read(file_object) elif type_id == TAG_Short: return Short.read(file_object) elif type_id == TAG_Int: return Integer.read(file_object) elif type_id == TAG_Long: return Long.read(file_object) elif type_id == TAG_Float: return Float.read(file_object) elif type_id == TAG_Double: return Double.read(file_object) elif type_id == TAG_Byte_Array: return IntegerPrefixedByteArray.read(file_object).decode('utf-8') elif type_id == TAG_String: return ShortPrefixedByteArray.read(file_object) elif type_id == TAG_List: list_type_id = Byte.read(file_object) size = Integer.read(file_object) a = [] for i in range(size): a.append(Nbt.decode_tag(file_object, list_type_id)) return a elif type_id == TAG_Compound: c = { } child_type_id = Byte.read(file_object) while child_type_id != TAG_End: child_name = ShortPrefixedByteArray.read(file_object).decode('utf-8') c[child_name] = Nbt.decode_tag(file_object, child_type_id) child_type_id = Byte.read(file_object) return c elif type_id == TAG_Int_Array: size = Integer.read(file_object) a = [] for i in range(size): a.append(Integer.read(file_object)) return a elif type_id == TAG_Long_Array: size = Integer.read(file_object) a = [] for i in range(size): a.append(Long.read(file_object)) return a else: raise Exception("Invalid NBT tag type") @staticmethod def send(value, socket): # TODO pass class Slot(Type): def __init__(self, present, item_id, item_count, nbt): self.present = present self.item_id = item_id self.item_count = item_count self.nbt = nbt def __str__(self): return str(self.__dict__) def __repr__(self): return 'Slot(present={}, item_id={}, item_count={}, nbt={}'.format( self.present, self.item_id, self.item_count, self.nbt) @staticmethod def read(file_object): present = Boolean.read(file_object) if present: item_id = VarInt.read(file_object) item_count = Byte.read(file_object) nbt = Nbt.read(file_object) else: item_id = None item_count = None nbt = None return Slot(present, item_id, item_count, nbt) @staticmethod def send(value, socket): Boolean.send(value.present, socket) VarInt.send(value.item_id, socket) Byte.send(value.item_count, socket) Byte.send(0x00, socket) class Entry(Type): types = { 0: Byte, 1: VarInt, 2: Float, 3: String, 5: Boolean, 6: Slot, 7: Boolean, 9: Position, 18: VarInt, } def __init__(self, index, type, value): self.index = index self.type = type self.value = value def __str__(self): return str(self.__dict__) def __repr__(self): return 'Entry(index={}, type={}, value={})'.format( self.index, self.type, self.value) @staticmethod def read(file_object, context): index = UnsignedByte.read(file_object) if index == 0xff: return None type = VarInt.read(file_object) try: value = Entry.types[type].read(file_object) except TypeError: value = Entry.types[type].read_with_context(file_object, context) except KeyError: return None return Entry(index, type, value)