214 lines
6.3 KiB
Python
214 lines
6.3 KiB
Python
from __future__ import division
|
|
|
|
import struct
|
|
|
|
from minecraft.networking.types.basic import (
|
|
Type, Byte, Short, Integer, Long, Float, Double,
|
|
ShortPrefixedByteArray, Boolean, VarInt, TrailingByteArray,
|
|
Position, String, UnsignedByte
|
|
)
|
|
from minecraft.networking.types.utility import Vector
|
|
|
|
|
|
class IntegerPrefixedByteArray(Type):
|
|
@staticmethod
|
|
def read(file_object):
|
|
length = Integer.read(file_object)
|
|
return struct.unpack(str(length) + "s", file_object.read(length))[0]
|
|
|
|
@staticmethod
|
|
def send(value, socket):
|
|
Integer.send(len(value), socket)
|
|
socket.send(value)
|
|
|
|
|
|
TAG_End = 0
|
|
TAG_Byte = 1
|
|
TAG_Short = 2
|
|
TAG_Int = 3
|
|
TAG_Long = 4
|
|
TAG_Float = 5
|
|
TAG_Double = 6
|
|
TAG_Byte_Array = 7
|
|
TAG_String = 8
|
|
TAG_List = 9
|
|
TAG_Compound = 10
|
|
TAG_Int_Array = 11
|
|
TAG_Long_Array = 12
|
|
|
|
class Nbt(Type):
|
|
@staticmethod
|
|
def read(file_object):
|
|
type_id = Byte.read(file_object)
|
|
if type_id != TAG_Compound:
|
|
#raise Exception("Invalid NBT header")
|
|
return None
|
|
name = ShortPrefixedByteArray.read(file_object).decode('utf-8')
|
|
a = Nbt.decode_tag(file_object, TAG_Compound)
|
|
a['_name'] = name
|
|
return a
|
|
|
|
@staticmethod
|
|
def decode_tag(file_object, type_id):
|
|
if type_id == TAG_Byte:
|
|
return Byte.read(file_object)
|
|
elif type_id == TAG_Short:
|
|
return Short.read(file_object)
|
|
elif type_id == TAG_Int:
|
|
return Integer.read(file_object)
|
|
elif type_id == TAG_Long:
|
|
return Long.read(file_object)
|
|
elif type_id == TAG_Float:
|
|
return Float.read(file_object)
|
|
elif type_id == TAG_Double:
|
|
return Double.read(file_object)
|
|
elif type_id == TAG_Byte_Array:
|
|
return IntegerPrefixedByteArray.read(file_object).decode('utf-8')
|
|
elif type_id == TAG_String:
|
|
return ShortPrefixedByteArray.read(file_object)
|
|
elif type_id == TAG_List:
|
|
list_type_id = Byte.read(file_object)
|
|
size = Integer.read(file_object)
|
|
a = []
|
|
for i in range(size):
|
|
a.append(Nbt.decode_tag(file_object, list_type_id))
|
|
return a
|
|
elif type_id == TAG_Compound:
|
|
c = { }
|
|
child_type_id = Byte.read(file_object)
|
|
while child_type_id != TAG_End:
|
|
child_name = ShortPrefixedByteArray.read(file_object).decode('utf-8')
|
|
c[child_name] = Nbt.decode_tag(file_object, child_type_id)
|
|
child_type_id = Byte.read(file_object)
|
|
return c
|
|
elif type_id == TAG_Int_Array:
|
|
size = Integer.read(file_object)
|
|
a = []
|
|
for i in range(size):
|
|
a.append(Integer.read(file_object))
|
|
return a
|
|
elif type_id == TAG_Long_Array:
|
|
size = Integer.read(file_object)
|
|
a = []
|
|
for i in range(size):
|
|
a.append(Long.read(file_object))
|
|
return a
|
|
else:
|
|
raise Exception("Invalid NBT tag type")
|
|
|
|
@staticmethod
|
|
def send(value, socket):
|
|
# TODO
|
|
pass
|
|
|
|
|
|
class Slot(Type):
|
|
def __init__(self, present, item_id, item_count, nbt):
|
|
self.present = present
|
|
self.item_id = item_id
|
|
self.item_count = item_count
|
|
self.nbt = nbt
|
|
|
|
def __str__(self):
|
|
return str(self.__dict__)
|
|
def __repr__(self):
|
|
return 'Slot(present={}, item_id={}, item_count={}, nbt={}'.format(
|
|
self.present, self.item_id, self.item_count, self.nbt)
|
|
|
|
@staticmethod
|
|
def read(file_object):
|
|
present = Boolean.read(file_object)
|
|
if present:
|
|
item_id = VarInt.read(file_object)
|
|
item_count = Byte.read(file_object)
|
|
nbt = Nbt.read(file_object)
|
|
else:
|
|
item_id = None
|
|
item_count = None
|
|
nbt = None
|
|
return Slot(present, item_id, item_count, nbt)
|
|
|
|
@staticmethod
|
|
def send(value, socket):
|
|
Boolean.send(value.present, socket)
|
|
VarInt.send(value.item_id, socket)
|
|
Byte.send(value.item_count, socket)
|
|
Byte.send(0x00, socket)
|
|
|
|
|
|
class Entry(Type):
|
|
types = {
|
|
0: Byte,
|
|
1: VarInt,
|
|
2: Float,
|
|
3: String,
|
|
5: Boolean,
|
|
6: Slot,
|
|
7: Boolean,
|
|
9: Position,
|
|
18: VarInt,
|
|
}
|
|
|
|
def __init__(self, index, type, value):
|
|
self.index = index
|
|
self.type = type
|
|
self.value = value
|
|
|
|
def __str__(self):
|
|
return str(self.__dict__)
|
|
def __repr__(self):
|
|
return 'Entry(index={}, type={}, value={})'.format(
|
|
self.index, self.type, self.value)
|
|
|
|
@staticmethod
|
|
def read(file_object, context):
|
|
index = UnsignedByte.read(file_object)
|
|
if index == 0xff: return None
|
|
type = VarInt.read(file_object)
|
|
try:
|
|
value = Entry.types[type].read(file_object)
|
|
except TypeError:
|
|
value = Entry.types[type].read_with_context(file_object, context)
|
|
except KeyError:
|
|
return None
|
|
return Entry(index, type, value)
|
|
|
|
|
|
class Trade(Type):
|
|
fields = (
|
|
'input_item_1',
|
|
'output_item',
|
|
'has_second_item',
|
|
'input_item_2',
|
|
'trade_disabled',
|
|
'num_uses',
|
|
'max_num_uses',
|
|
'xp',
|
|
'special_price',
|
|
'price_multiplier',
|
|
'demand',
|
|
)
|
|
|
|
def __str__(self):
|
|
return str(self.__dict__)
|
|
def __repr__(self):
|
|
inner_str = ', '.join('%s=%s' % (a, getattr(self, a, None)) for a in self.fields if hasattr(self, a))
|
|
return 'Trade(%s)' % inner_str
|
|
|
|
@staticmethod
|
|
def read(file_object):
|
|
trade = Trade()
|
|
trade.input_item_1 = Slot.read(file_object)
|
|
trade.output_item = Slot.read(file_object)
|
|
trade.has_second_item = Boolean.read(file_object)
|
|
if trade.has_second_item:
|
|
trade.input_item_2 = Slot.read(file_object)
|
|
trade.trade_disabled = Boolean.read(file_object)
|
|
trade.num_uses = Integer.read(file_object)
|
|
trade.max_num_uses = Integer.read(file_object)
|
|
trade.xp = Integer.read(file_object)
|
|
trade.special_price = Integer.read(file_object)
|
|
trade.price_multiplier = Float.read(file_object)
|
|
trade.demand = Integer.read(file_object)
|
|
return trade
|