Saturday, March 23, 2019

Warning appears with testing cluster (single node) with following code in Spark Streaming DStream


lines = ssc.socketTextStream(hostname,port)
19/03/23 22:55:01 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 
19/03/23 22:55:01 WARN storage.BlockManager: Block input-0-1553352900800 replicated to only 0 peer(s) instead of 1 peers
Default replicaiton level is 2 based on the documentation
socketTextStream(hostname, port, storageLevel=StorageLevel(True, True, False, False, 2))
Create an input from TCP source hostname:port. Data is received using a TCP socket and receive byte is interpreted as UTF8 encoded \n delimited lines.
Parameters:
hostname – Hostname to connect to for receiving data
port – Port to connect to for receiving data
storageLevel – Storage level to use for storing the received objects

How to fix

  • Method 1:
#Create a DStream by reading the data from the host and port provided as input parameters.
#Default storage level is replication=2
from pyspark import StorageLevel
lines = ssc.socketTextStream(hostname,port,StorageLevel.MEMORY_AND_DISK)
  • Method 2:
from pyspark import StorageLevel
lines = ssc.socketTextStream(hostname,port,storageLevel=StorageLevel(True, True, False, False, 1))
  • Method 3:
# Set log level to ERROR to avoid distracting extra output
sc.setLogLevel("ERROR")

Reference on StorageLevel

class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication=1)[source]
Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory in a JAVA-specific serialized format, and whether to replicate the RDD partitions on multiple nodes. Also contains static constants for some commonly used storage levels, MEMORY_ONLY. Since the data is always serialized on the Python side, all the constants use the serialized formats.
DISK_ONLY = StorageLevel(True, False, False, False, 1)
DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)
MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)
MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False, 1)
MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)
MEMORY_ONLY = StorageLevel(False, True, False, False, 1)
MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
MEMORY_ONLY_SER = StorageLevel(False, True, False, False, 1)
MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2)
OFF_HEAP = StorageLevel(True, True, True, False, 1)

No comments:

Post a Comment